From b8a61f60a0aa115141987f461658666ccbfd2034 Mon Sep 17 00:00:00 2001 From: "bozo.kopic" Date: Mon, 28 Mar 2022 00:03:31 +0200 Subject: rename hatter to boxhatter --- src_py/boxhatter/__init__.py | 0 src_py/boxhatter/__main__.py | 8 ++ src_py/boxhatter/backend.py | 134 +++++++++++++++++++++++++ src_py/boxhatter/common.py | 32 ++++++ src_py/boxhatter/main.py | 162 ++++++++++++++++++++++++++++++ src_py/boxhatter/server.py | 227 ++++++++++++++++++++++++++++++++++++++++++ src_py/boxhatter/ui.py | 232 +++++++++++++++++++++++++++++++++++++++++++ src_py/hatter/__init__.py | 0 src_py/hatter/__main__.py | 8 -- src_py/hatter/backend.py | 134 ------------------------- src_py/hatter/common.py | 32 ------ src_py/hatter/main.py | 162 ------------------------------ src_py/hatter/server.py | 227 ------------------------------------------ src_py/hatter/ui.py | 232 ------------------------------------------- 14 files changed, 795 insertions(+), 795 deletions(-) create mode 100644 src_py/boxhatter/__init__.py create mode 100644 src_py/boxhatter/__main__.py create mode 100644 src_py/boxhatter/backend.py create mode 100644 src_py/boxhatter/common.py create mode 100644 src_py/boxhatter/main.py create mode 100644 src_py/boxhatter/server.py create mode 100644 src_py/boxhatter/ui.py delete mode 100644 src_py/hatter/__init__.py delete mode 100644 src_py/hatter/__main__.py delete mode 100644 src_py/hatter/backend.py delete mode 100644 src_py/hatter/common.py delete mode 100644 src_py/hatter/main.py delete mode 100644 src_py/hatter/server.py delete mode 100644 src_py/hatter/ui.py (limited to 'src_py') diff --git a/src_py/boxhatter/__init__.py b/src_py/boxhatter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src_py/boxhatter/__main__.py b/src_py/boxhatter/__main__.py new file mode 100644 index 0000000..fdd8f51 --- /dev/null +++ b/src_py/boxhatter/__main__.py @@ -0,0 +1,8 @@ +import sys + +from boxhatter.main import main + + +if __name__ == '__main__': + sys.argv[0] = 'boxhatter' + sys.exit(main()) diff --git a/src_py/boxhatter/backend.py b/src_py/boxhatter/backend.py new file mode 100644 index 0000000..ad83851 --- /dev/null +++ b/src_py/boxhatter/backend.py @@ -0,0 +1,134 @@ +from pathlib import Path +import sqlite3 +import typing + +from hat import aio + +from boxhatter import common + + +async def create(db_path: Path + ) -> 'Backend': + backend = Backend() + backend._async_group = aio.Group() + backend._executor = aio.create_executor(1) + + backend._db = await backend._executor(_ext_create, db_path) + backend.async_group.spawn(aio.call_on_cancel, backend._executor, + _ext_close, backend._db) + + return backend + + +class Backend(aio.Resource): + + @property + def async_group(self): + return self._async_group + + async def get_commits(self, + repo: typing.Optional[str], + statuses: typing.Optional[typing.Set[common.Status]], + order: common.Order + ) -> typing.List[common.Commit]: + return await self.async_group.spawn( + self._executor, _ext_get_commits, self._db, repo, statuses, order) + + async def get_commit(self, + repo: str, + commit_hash: str + ) -> typing.Optional[common.Commit]: + return await self.async_group.spawn( + self._executor, _ext_get_commit, self._db, repo, commit_hash) + + async def update_commit(self, commit: common.Commit): + await self._async_group.spawn( + self._executor, _ext_update_commit, self._db, commit) + + async def remove_commit(self, commit: common.Commit): + await self.async_group.spawn( + self._executor, _ext_remove_commit, self._db, commit) + + +def _ext_create(db_path): + db_path.parent.mkdir(exist_ok=True) + db = sqlite3.connect(f'file:{db_path}?nolock=1', + uri=True, + isolation_level=None, + detect_types=sqlite3.PARSE_DECLTYPES) + + try: + db.executescript(r""" + PRAGMA journal_mode = OFF; + CREATE TABLE IF NOT EXISTS commits ( + repo TEXT, + hash TEXT, + change INTEGER, + status INTEGER, + output TEXT, + PRIMARY KEY (repo, hash) ON CONFLICT REPLACE + ); + CREATE INDEX IF NOT EXISTS commits_change_index ON commits ( + change + )""") + + except Exception: + db.close() + raise + + return db + + +def _ext_close(db): + db.close() + + +def _ext_get_commits(db, repo, statuses, order): + cmd = "SELECT * FROM commits" + where = [] + if repo: + where.append("repo = :repo") + if statuses: + status_values = (str(status.value) for status in statuses) + where.append(f"status IN ({', '.join(status_values)})") + if where: + cmd += f" WHERE {' AND '.join(where)}" + cmd += f" ORDER BY change {order.value}" + args = {'repo': repo} + cur = db.execute(cmd, args) + return [_commit_from_row(row) for row in cur] + + +def _ext_get_commit(db, repo, commit_hash): + cmd = "SELECT * FROM commits WHERE repo = :repo AND hash = :hash" + args = {'repo': repo, + 'hash': commit_hash} + cur = db.execute(cmd, args) + row = cur.fetchone() + return _commit_from_row(row) if row else None + + +def _ext_update_commit(db, commit): + cmd = ("INSERT OR REPLACE INTO commits VALUES " + "(:repo, :hash, :change, :status, :output)") + args = {'repo': commit.repo, + 'hash': commit.hash, + 'change': commit.change, + 'status': commit.status.value, + 'output': commit.output} + db.execute(cmd, args) + + +def _ext_remove_commit(db, commit): + cmd = "DELETE FROM commits WHERE repo = :repo AND hash = :hash" + args = {'repo': commit.repo, + 'hash': commit.hash} + db.execute(cmd, args) + + +def _commit_from_row(row): + return common.Commit(repo=row[0], + hash=row[1], + change=row[2], + status=common.Status(row[3]), + output=row[4]) diff --git a/src_py/boxhatter/common.py b/src_py/boxhatter/common.py new file mode 100644 index 0000000..ae314a5 --- /dev/null +++ b/src_py/boxhatter/common.py @@ -0,0 +1,32 @@ +from pathlib import Path +import enum +import typing + +from hat import json + + +package_path: Path = Path(__file__).parent + +json_schema_repo: json.SchemaRepository = json.SchemaRepository( + json.json_schema_repo, + json.SchemaRepository.from_json(package_path / 'json_schema_repo.json')) + + +class Order(enum.Enum): + ASC = 'ASC' + DESC = 'DESC' + + +class Status(enum.Enum): + PENDING = 0 + RUNNING = 1 + SUCCESS = 2 + FAILURE = 3 + + +class Commit(typing.NamedTuple): + repo: str + hash: str + change: int + status: Status + output: str diff --git a/src_py/boxhatter/main.py b/src_py/boxhatter/main.py new file mode 100644 index 0000000..2cc30cc --- /dev/null +++ b/src_py/boxhatter/main.py @@ -0,0 +1,162 @@ +from pathlib import Path +import asyncio +import contextlib +import itertools +import logging.config +import subprocess +import sys +import tempfile +import typing + +from hat import aio +from hat import json +import appdirs +import click + +from boxhatter import common +import boxhatter.backend +import boxhatter.server +import boxhatter.ui + + +user_config_dir: Path = Path(appdirs.user_config_dir('boxhatter')) +user_data_dir: Path = Path(appdirs.user_data_dir('boxhatter')) + +default_conf_path: Path = user_config_dir / 'server.yaml' +default_db_path: Path = user_data_dir / 'boxhatter.db' + +ssh_key_path: typing.Optional[Path] = None + + +@click.group() +@click.option('--log-level', + default='INFO', + type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', + 'DEBUG', 'NOTSET']), + help="log level") +@click.option('--ssh-key', default=None, metavar='PATH', type=Path, + help="private key used for ssh authentication") +def main(log_level: str, + ssh_key: typing.Optional[Path]): + global ssh_key_path + ssh_key_path = ssh_key + + logging.config.dictConfig({ + 'version': 1, + 'formatters': { + 'console': { + 'format': "[%(asctime)s %(levelname)s %(name)s] %(message)s"}}, + 'handlers': { + 'console': { + 'class': 'logging.StreamHandler', + 'formatter': 'console', + 'level': log_level}}, + 'root': { + 'level': log_level, + 'handlers': ['console']}, + 'disable_existing_loggers': False}) + + +@main.command() +@click.option('--action', default='.boxhatter.yaml', + help="action file path inside repository") +@click.option('--env', multiple=True, + help="environment variables") +@click.argument('url', required=True) +@click.argument('ref', required=False, default='HEAD') +def execute(action: str, + env: typing.Tuple[str], + url: str, + ref: str): + with contextlib.suppress(Exception): + path = Path(url) + if path.exists(): + url = str(path.resolve()) + + with tempfile.TemporaryDirectory() as repo_dir: + repo_dir = Path(repo_dir) + + subprocess.run(['git', 'init', '-q'], + cwd=str(repo_dir), + check=True) + + subprocess.run(['git', 'remote', 'add', 'origin', url], + cwd=str(repo_dir), + check=True) + + subprocess.run(['git', 'fetch', '-q', '--depth=1', 'origin', ref], + cwd=str(repo_dir), + check=True) + + subprocess.run(['git', 'checkout', '-q', 'FETCH_HEAD'], + cwd=str(repo_dir), + check=True) + + conf = json.decode_file(repo_dir / action) + common.json_schema_repo.validate('boxhatter://action.yaml#', conf) + + image = conf['image'] + command = conf['command'] + subprocess.run(['podman', 'run', '-i', '--rm', + '-v', f'{repo_dir}:/boxhatter', + *itertools.chain.from_iterable(('--env', i) + for i in env), + image, '/bin/sh'], + input=f'set -e\ncd /boxhatter\n{command}\n', + encoding='utf-8', + check=True) + + +@main.command() +@click.option('--host', default='0.0.0.0', + help="listening host name (default 0.0.0.0)") +@click.option('--port', default=24000, type=int, + help="listening TCP port (default 24000)") +@click.option('--conf', default=default_conf_path, metavar='PATH', type=Path, + help="configuration defined by boxhatter://server.yaml# " + "(default $XDG_CONFIG_HOME/boxhatter/server.yaml)") +@click.option('--db', default=default_db_path, metavar='PATH', type=Path, + help="sqlite database path " + "(default $XDG_CONFIG_HOME/boxhatter/boxhatter.db") +def server(host: str, + port: int, + conf: Path, + db: Path): + conf = json.decode_file(conf) + common.json_schema_repo.validate('boxhatter://server.yaml#', conf) + + with contextlib.suppress(asyncio.CancelledError): + aio.run_asyncio(async_server(host, port, conf, db)) + + +async def async_server(host: str, + port: int, + conf: json.Data, + db_path: Path): + async_group = aio.Group() + + try: + backend = await boxhatter.backend.create(db_path) + _bind_resource(async_group, backend) + + server = await boxhatter.server.create(conf, backend) + _bind_resource(async_group, server) + + ui = await boxhatter.ui.create(host, port, server) + _bind_resource(async_group, ui) + + await async_group.wait_closing() + + finally: + await aio.uncancellable(async_group.async_close()) + + +def _bind_resource(async_group, resource): + async_group.spawn(aio.call_on_cancel, resource.async_close) + async_group.spawn(aio.call_on_done, resource.wait_closing(), + async_group.close) + + +if __name__ == '__main__': + sys.argv[0] = 'boxhatter' + main() diff --git a/src_py/boxhatter/server.py b/src_py/boxhatter/server.py new file mode 100644 index 0000000..24e4af0 --- /dev/null +++ b/src_py/boxhatter/server.py @@ -0,0 +1,227 @@ +import asyncio +import collections +import contextlib +import itertools +import multiprocessing +import os +import subprocess +import sys +import time +import typing + +from hat import aio +from hat import json + +from boxhatter import common +import boxhatter.backend + + +async def create(conf: json.Data, + backend: boxhatter.backend.Backend + ) -> 'Server': + server = Server() + server._conf = conf + server._backend = backend + server._async_group = aio.Group() + server._repos = set(conf['repos'].keys()) + server._run_queue = aio.Queue() + server._sync_events = {} + + for repo, repo_conf in conf['repos'].items(): + sync_event = asyncio.Event() + server._sync_events[repo] = sync_event + server.async_group.spawn(server._sync_loop, repo, repo_conf, + sync_event) + + for _ in range(multiprocessing.cpu_count()): + server.async_group.spawn(server._run_loop) + + try: + commits = await backend.get_commits(repo=None, + statuses={common.Status.PENDING, + common.Status.RUNNING}, + order=common.Order.ASC) + + for commit in commits: + commit = commit._replace(change=int(time.time()), + status=common.Status.PENDING, + output='') + await backend.update_commit(commit) + server._run_queue.put_nowait(commit) + + except BaseException: + await aio.uncancellable(server.async_close()) + raise + + return server + + +class Server(aio.Resource): + + @property + def async_group(self): + return self._async_group + + @property + def repos(self) -> typing.Set[str]: + return self._repos + + async def get_commits(self, + repo: typing.Optional[str], + ) -> typing.List[common.Commit]: + return await self._backend.get_commits(repo=repo, + statuses=None, + order=common.Order.DESC) + + async def get_commit(self, + repo: str, + commit_hash: str + ) -> typing.Optional[common.Commit]: + return await self._backend.get_commit(repo, commit_hash) + + async def run_commit(self, + repo: str, + commit_hash: str + ) -> common.Commit: + commit = common.Commit(repo=repo, + hash=commit_hash, + change=int(time.time()), + status=common.Status.PENDING, + output='') + await self._backend.update_commit(commit) + self._run_queue.put_nowait(commit) + return commit + + def sync_repo(self, repo: str): + self._sync_events[repo].set() + + async def remove_commit(self, commit: common.Commit): + await self._backend.remove_commit(commit) + + async def _sync_loop(self, repo, repo_conf, sync_event): + try: + url = repo_conf['url'] + refs = repo_conf.get('refs', ['refs/heads/*']) + min_sync_delay = repo_conf.get('min_sync_delay', 60) or 0 + max_sync_delay = repo_conf.get('max_sync_delay') + last_sync = time.monotonic() - min_sync_delay + + while True: + dt = time.monotonic() - last_sync + if dt < min_sync_delay: + await asyncio.sleep(min_sync_delay - dt) + + sync_event.clear() + commit_hashes = await _git_ls_remote(url, refs) + last_sync = time.monotonic() + + for commit_hash in commit_hashes: + commit = await self._backend.get_commit(repo, commit_hash) + if commit: + continue + await self.run_commit(repo, commit_hash) + + if max_sync_delay is None: + await sync_event.wait() + + else: + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(sync_event.wait(), + max_sync_delay) + + finally: + self.close() + + async def _run_loop(self): + try: + while True: + commit = await self._run_queue.get() + repo_conf = self._conf['repos'][commit.repo] + action = repo_conf.get('action', '.boxhatter.yaml') + env = {**self._conf.get('env', {}), + **repo_conf.get('env', {})} + url = repo_conf['url'] + ref = commit.hash + + commit = commit._replace(change=int(time.time()), + status=common.Status.RUNNING, + output='') + await self._backend.update_commit(commit) + + try: + output = await _execute(action=action, + env=env, + url=url, + ref=ref) + status = common.Status.SUCCESS + + except Exception as e: + output = str(e) + status = common.Status.FAILURE + + commit = commit._replace(change=int(time.time()), + status=status, + output=output) + await self._backend.update_commit(commit) + + finally: + self.close() + + +async def _execute(action, env, url, ref): + cmd = [sys.executable, '-m', 'boxhatter', 'execute', + '--action', action, + *itertools.chain.from_iterable(('--env', i) for i in env), + url, ref] + + p = await asyncio.create_subprocess_exec(*cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env={**os.environ, **env}) + + try: + output, _ = await p.communicate() + output = str(output, encoding='utf-8', errors='ignore') + + if p.returncode: + raise Exception(output) + + return output + + finally: + if p.returncode is None: + p.terminate() + + +async def _git_ls_remote(url, refs): + cmd = ['git', 'ls-remote', url, *refs] + + p = await asyncio.create_subprocess_exec(*cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + try: + stdout, stderr = await p.communicate() + if p.returncode: + stderr = str(stderr, encoding='utf-8', errors='ignore') + raise Exception(stderr) + + result = collections.deque() + stdout = str(stdout, encoding='utf-8', errors='ignore') + + for line in stdout.split('\n'): + segments = line.split(maxsplit=1) + if not segments: + continue + result.append(segments[0]) + + return result + + except Exception: + return [] + + finally: + if p.returncode is None: + p.terminate() diff --git a/src_py/boxhatter/ui.py b/src_py/boxhatter/ui.py new file mode 100644 index 0000000..2cf174d --- /dev/null +++ b/src_py/boxhatter/ui.py @@ -0,0 +1,232 @@ +from pathlib import Path +import datetime + +from hat import aio +import aiohttp.web + +from boxhatter import common +import boxhatter.server + + +static_dir: Path = common.package_path / 'ui' + + +async def create(host: str, + port: int, + server: boxhatter.server.Server + ) -> 'UI': + ui = UI() + ui._server = server + ui._async_group = aio.Group() + + app = aiohttp.web.Application() + get_routes = ( + aiohttp.web.get(path, handler) for path, handler in ( + ('/', ui._process_get_root), + ('/repo/{repo}', ui._process_get_repo), + ('/repo/{repo}/commit/{commit}', ui._process_get_commit))) + post_routes = ( + aiohttp.web.post(path, handler) for path, handler in ( + ('/repo/{repo}/run', ui._process_post_run), + ('/repo/{repo}/commit/{commit}/remove', ui._process_post_remove))) + webhook_route = aiohttp.web.route('*', '/repo/{repo}/webhook', + ui._process_webhook) + static_route = aiohttp.web.static('/', static_dir) + app.add_routes([*get_routes, *post_routes, webhook_route, static_route]) + + runner = aiohttp.web.AppRunner(app) + await runner.setup() + ui.async_group.spawn(aio.call_on_cancel, runner.cleanup) + + try: + site = aiohttp.web.TCPSite(runner=runner, + host=host, + port=port, + shutdown_timeout=0.1, + reuse_address=True) + await site.start() + + except BaseException: + await aio.uncancellable(ui.async_group.async_close()) + raise + + return ui + + +class UI(aio.Resource): + + @property + def async_group(self): + return self._async_group + + async def _process_get_root(self, request): + commits = await self._server.get_commits(None) + + body = (f'{_generate_repos(self._server.repos)}\n' + f'{_generate_commits(commits)}') + return _create_html_response('Box Hatter', body) + + async def _process_get_repo(self, request): + repo = self._get_repo(request) + commits = await self._server.get_commits(repo) + + title = f'Box Hatter - {repo}' + body = (f'{_generate_commits(commits)}\n' + f'{_generate_run(repo)}') + return _create_html_response(title, body) + + async def _process_get_commit(self, request): + commit = await self._get_commit(request) + + title = f'Box Hatter - {commit.repo}/{commit.hash}' + body = _generate_commit(commit) + return _create_html_response(title, body) + + async def _process_post_run(self, request): + repo = self._get_repo(request) + + body = await request.post() + commit_hash = body['hash'] + if not commit_hash: + raise aiohttp.web.HTTPBadRequest() + + commit = await self._server.run_commit(repo, commit_hash) + + url = f'/repo/{commit.repo}/commit/{commit.hash}' + raise aiohttp.web.HTTPFound(url) + + async def _process_post_remove(self, request): + commit = await self._get_commit(request) + + await self._server.remove_commit(commit) + + raise aiohttp.web.HTTPFound(f'/repo/{commit.repo}') + + async def _process_webhook(self, request): + repo = self._get_repo(request) + + self._server.sync_repo(repo) + return aiohttp.web.Response() + + def _get_repo(self, request): + repo = request.match_info['repo'] + if repo not in self._server.repos: + raise aiohttp.web.HTTPBadRequest() + return repo + + async def _get_commit(self, request): + repo = self._get_repo(request) + commit_hash = request.match_info['commit'] + commit = await self._server.get_commit(repo, commit_hash) + if not commit: + raise aiohttp.web.HTTPBadRequest() + return commit + + +def _create_html_response(title, body): + text = _html_template.format(title=title, + body=body) + return aiohttp.web.Response(content_type='text/html', + text=text) + + +def _generate_repos(repos): + items = '\n'.join(f'
  • {repo}
  • ' + for repo in repos) + return (f'
    \n' + f'

    Repositories

    \n' + f'\n' + f'
    ') + + +def _generate_commits(commits): + thead = ('\n' + 'Change\n' + 'Repo\n' + 'Commit\n' + 'Status\n' + '') + + tbody = '\n'.join( + (f'\n' + f'{_format_time(commit.change)}\n' + f'{_generate_repo_link(commit.repo)}\n' + f'{_generate_commit_link(commit)}\n' + f'{commit.status.name}\n' + f'') + for commit in commits) + + return (f'
    \n' + f'

    Commits

    \n' + f'\n' + f'\n' + f'{thead}\n' + f'\n' + f'\n' + f'{tbody}\n' + f'\n' + f'
    \n' + f'
    ') + + +def _generate_commit(commit): + run_action = f'/repo/{commit.repo}/run' + run_button = (f'
    \n' + f'\n' + f'\n' + f'
    ') + + remove_action = f'/repo/{commit.repo}/commit/{commit.hash}/remove' + remove_button = (f'
    \n' + f'\n' + f'
    ') + + repo_link = _generate_repo_link(commit.repo) + + return (f'
    \n' + f'
    {repo_link}
    \n' + f'
    {commit.hash}
    \n' + f'
    {_format_time(commit.change)}
    \n' + f'
    {commit.status.name}
    \n' + f'
    {commit.output}
    \n' + f'
    {run_button}{remove_button}
    \n' + f'
    ') + + +def _generate_run(repo): + return (f'
    \n' + f'
    \n' + f'\n' + f'\n' + f'
    \n' + f'
    ') + + +def _generate_repo_link(repo): + return f'{repo}' + + +def _generate_commit_link(commit): + url = f'/repo/{commit.repo}/commit/{commit.hash}' + return f'{commit.hash}' + + +def _format_time(t): + return datetime.datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S") + + +_html_template = r""" + + + + +{title} + + + +{body} + + +""" diff --git a/src_py/hatter/__init__.py b/src_py/hatter/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src_py/hatter/__main__.py b/src_py/hatter/__main__.py deleted file mode 100644 index 065fe88..0000000 --- a/src_py/hatter/__main__.py +++ /dev/null @@ -1,8 +0,0 @@ -import sys - -from hatter.main import main - - -if __name__ == '__main__': - sys.argv[0] = 'hatter' - sys.exit(main()) diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py deleted file mode 100644 index dd6a4f7..0000000 --- a/src_py/hatter/backend.py +++ /dev/null @@ -1,134 +0,0 @@ -from pathlib import Path -import sqlite3 -import typing - -from hat import aio - -from hatter import common - - -async def create(db_path: Path - ) -> 'Backend': - backend = Backend() - backend._async_group = aio.Group() - backend._executor = aio.create_executor(1) - - backend._db = await backend._executor(_ext_create, db_path) - backend.async_group.spawn(aio.call_on_cancel, backend._executor, - _ext_close, backend._db) - - return backend - - -class Backend(aio.Resource): - - @property - def async_group(self): - return self._async_group - - async def get_commits(self, - repo: typing.Optional[str], - statuses: typing.Optional[typing.Set[common.Status]], - order: common.Order - ) -> typing.List[common.Commit]: - return await self.async_group.spawn( - self._executor, _ext_get_commits, self._db, repo, statuses, order) - - async def get_commit(self, - repo: str, - commit_hash: str - ) -> typing.Optional[common.Commit]: - return await self.async_group.spawn( - self._executor, _ext_get_commit, self._db, repo, commit_hash) - - async def update_commit(self, commit: common.Commit): - await self._async_group.spawn( - self._executor, _ext_update_commit, self._db, commit) - - async def remove_commit(self, commit: common.Commit): - await self.async_group.spawn( - self._executor, _ext_remove_commit, self._db, commit) - - -def _ext_create(db_path): - db_path.parent.mkdir(exist_ok=True) - db = sqlite3.connect(f'file:{db_path}?nolock=1', - uri=True, - isolation_level=None, - detect_types=sqlite3.PARSE_DECLTYPES) - - try: - db.executescript(r""" - PRAGMA journal_mode = OFF; - CREATE TABLE IF NOT EXISTS commits ( - repo TEXT, - hash TEXT, - change INTEGER, - status INTEGER, - output TEXT, - PRIMARY KEY (repo, hash) ON CONFLICT REPLACE - ); - CREATE INDEX IF NOT EXISTS commits_change_index ON commits ( - change - )""") - - except Exception: - db.close() - raise - - return db - - -def _ext_close(db): - db.close() - - -def _ext_get_commits(db, repo, statuses, order): - cmd = "SELECT * FROM commits" - where = [] - if repo: - where.append("repo = :repo") - if statuses: - status_values = (str(status.value) for status in statuses) - where.append(f"status IN ({', '.join(status_values)})") - if where: - cmd += f" WHERE {' AND '.join(where)}" - cmd += f" ORDER BY change {order.value}" - args = {'repo': repo} - cur = db.execute(cmd, args) - return [_commit_from_row(row) for row in cur] - - -def _ext_get_commit(db, repo, commit_hash): - cmd = "SELECT * FROM commits WHERE repo = :repo AND hash = :hash" - args = {'repo': repo, - 'hash': commit_hash} - cur = db.execute(cmd, args) - row = cur.fetchone() - return _commit_from_row(row) if row else None - - -def _ext_update_commit(db, commit): - cmd = ("INSERT OR REPLACE INTO commits VALUES " - "(:repo, :hash, :change, :status, :output)") - args = {'repo': commit.repo, - 'hash': commit.hash, - 'change': commit.change, - 'status': commit.status.value, - 'output': commit.output} - db.execute(cmd, args) - - -def _ext_remove_commit(db, commit): - cmd = "DELETE FROM commits WHERE repo = :repo AND hash = :hash" - args = {'repo': commit.repo, - 'hash': commit.hash} - db.execute(cmd, args) - - -def _commit_from_row(row): - return common.Commit(repo=row[0], - hash=row[1], - change=row[2], - status=common.Status(row[3]), - output=row[4]) diff --git a/src_py/hatter/common.py b/src_py/hatter/common.py deleted file mode 100644 index ae314a5..0000000 --- a/src_py/hatter/common.py +++ /dev/null @@ -1,32 +0,0 @@ -from pathlib import Path -import enum -import typing - -from hat import json - - -package_path: Path = Path(__file__).parent - -json_schema_repo: json.SchemaRepository = json.SchemaRepository( - json.json_schema_repo, - json.SchemaRepository.from_json(package_path / 'json_schema_repo.json')) - - -class Order(enum.Enum): - ASC = 'ASC' - DESC = 'DESC' - - -class Status(enum.Enum): - PENDING = 0 - RUNNING = 1 - SUCCESS = 2 - FAILURE = 3 - - -class Commit(typing.NamedTuple): - repo: str - hash: str - change: int - status: Status - output: str diff --git a/src_py/hatter/main.py b/src_py/hatter/main.py deleted file mode 100644 index 8ceb892..0000000 --- a/src_py/hatter/main.py +++ /dev/null @@ -1,162 +0,0 @@ -from pathlib import Path -import asyncio -import contextlib -import itertools -import logging.config -import subprocess -import sys -import tempfile -import typing - -from hat import aio -from hat import json -import appdirs -import click - -from hatter import common -import hatter.backend -import hatter.server -import hatter.ui - - -user_config_dir: Path = Path(appdirs.user_config_dir('hatter')) -user_data_dir: Path = Path(appdirs.user_data_dir('hatter')) - -default_conf_path: Path = user_config_dir / 'server.yaml' -default_db_path: Path = user_data_dir / 'hatter.db' - -ssh_key_path: typing.Optional[Path] = None - - -@click.group() -@click.option('--log-level', - default='INFO', - type=click.Choice(['CRITICAL', 'ERROR', 'WARNING', 'INFO', - 'DEBUG', 'NOTSET']), - help="log level") -@click.option('--ssh-key', default=None, metavar='PATH', type=Path, - help="private key used for ssh authentication") -def main(log_level: str, - ssh_key: typing.Optional[Path]): - global ssh_key_path - ssh_key_path = ssh_key - - logging.config.dictConfig({ - 'version': 1, - 'formatters': { - 'console': { - 'format': "[%(asctime)s %(levelname)s %(name)s] %(message)s"}}, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'console', - 'level': log_level}}, - 'root': { - 'level': log_level, - 'handlers': ['console']}, - 'disable_existing_loggers': False}) - - -@main.command() -@click.option('--action', default='.hatter.yaml', - help="action file path inside repository") -@click.option('--env', multiple=True, - help="environment variables") -@click.argument('url', required=True) -@click.argument('ref', required=False, default='HEAD') -def execute(action: str, - env: typing.Tuple[str], - url: str, - ref: str): - with contextlib.suppress(Exception): - path = Path(url) - if path.exists(): - url = str(path.resolve()) - - with tempfile.TemporaryDirectory() as repo_dir: - repo_dir = Path(repo_dir) - - subprocess.run(['git', 'init', '-q'], - cwd=str(repo_dir), - check=True) - - subprocess.run(['git', 'remote', 'add', 'origin', url], - cwd=str(repo_dir), - check=True) - - subprocess.run(['git', 'fetch', '-q', '--depth=1', 'origin', ref], - cwd=str(repo_dir), - check=True) - - subprocess.run(['git', 'checkout', '-q', 'FETCH_HEAD'], - cwd=str(repo_dir), - check=True) - - conf = json.decode_file(repo_dir / action) - common.json_schema_repo.validate('hatter://action.yaml#', conf) - - image = conf['image'] - command = conf['command'] - subprocess.run(['podman', 'run', '-i', '--rm', - '-v', f'{repo_dir}:/hatter', - *itertools.chain.from_iterable(('--env', i) - for i in env), - image, '/bin/sh'], - input=f'set -e\ncd /hatter\n{command}\n', - encoding='utf-8', - check=True) - - -@main.command() -@click.option('--host', default='0.0.0.0', - help="listening host name (default 0.0.0.0)") -@click.option('--port', default=24000, type=int, - help="listening TCP port (default 24000)") -@click.option('--conf', default=default_conf_path, metavar='PATH', type=Path, - help="configuration defined by hatter://server.yaml# " - "(default $XDG_CONFIG_HOME/hatter/server.yaml)") -@click.option('--db', default=default_db_path, metavar='PATH', type=Path, - help="sqlite database path " - "(default $XDG_CONFIG_HOME/hatter/hatter.db") -def server(host: str, - port: int, - conf: Path, - db: Path): - conf = json.decode_file(conf) - common.json_schema_repo.validate('hatter://server.yaml#', conf) - - with contextlib.suppress(asyncio.CancelledError): - aio.run_asyncio(async_server(host, port, conf, db)) - - -async def async_server(host: str, - port: int, - conf: json.Data, - db_path: Path): - async_group = aio.Group() - - try: - backend = await hatter.backend.create(db_path) - _bind_resource(async_group, backend) - - server = await hatter.server.create(conf, backend) - _bind_resource(async_group, server) - - ui = await hatter.ui.create(host, port, server) - _bind_resource(async_group, ui) - - await async_group.wait_closing() - - finally: - await aio.uncancellable(async_group.async_close()) - - -def _bind_resource(async_group, resource): - async_group.spawn(aio.call_on_cancel, resource.async_close) - async_group.spawn(aio.call_on_done, resource.wait_closing(), - async_group.close) - - -if __name__ == '__main__': - sys.argv[0] = 'hatter' - main() diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py deleted file mode 100644 index 5f00707..0000000 --- a/src_py/hatter/server.py +++ /dev/null @@ -1,227 +0,0 @@ -import asyncio -import collections -import contextlib -import itertools -import multiprocessing -import os -import subprocess -import sys -import time -import typing - -from hat import aio -from hat import json - -from hatter import common -import hatter.backend - - -async def create(conf: json.Data, - backend: hatter.backend.Backend - ) -> 'Server': - server = Server() - server._conf = conf - server._backend = backend - server._async_group = aio.Group() - server._repos = set(conf['repos'].keys()) - server._run_queue = aio.Queue() - server._sync_events = {} - - for repo, repo_conf in conf['repos'].items(): - sync_event = asyncio.Event() - server._sync_events[repo] = sync_event - server.async_group.spawn(server._sync_loop, repo, repo_conf, - sync_event) - - for _ in range(multiprocessing.cpu_count()): - server.async_group.spawn(server._run_loop) - - try: - commits = await backend.get_commits(repo=None, - statuses={common.Status.PENDING, - common.Status.RUNNING}, - order=common.Order.ASC) - - for commit in commits: - commit = commit._replace(change=int(time.time()), - status=common.Status.PENDING, - output='') - await backend.update_commit(commit) - server._run_queue.put_nowait(commit) - - except BaseException: - await aio.uncancellable(server.async_close()) - raise - - return server - - -class Server(aio.Resource): - - @property - def async_group(self): - return self._async_group - - @property - def repos(self) -> typing.Set[str]: - return self._repos - - async def get_commits(self, - repo: typing.Optional[str], - ) -> typing.List[common.Commit]: - return await self._backend.get_commits(repo=repo, - statuses=None, - order=common.Order.DESC) - - async def get_commit(self, - repo: str, - commit_hash: str - ) -> typing.Optional[common.Commit]: - return await self._backend.get_commit(repo, commit_hash) - - async def run_commit(self, - repo: str, - commit_hash: str - ) -> common.Commit: - commit = common.Commit(repo=repo, - hash=commit_hash, - change=int(time.time()), - status=common.Status.PENDING, - output='') - await self._backend.update_commit(commit) - self._run_queue.put_nowait(commit) - return commit - - def sync_repo(self, repo: str): - self._sync_events[repo].set() - - async def remove_commit(self, commit: common.Commit): - await self._backend.remove_commit(commit) - - async def _sync_loop(self, repo, repo_conf, sync_event): - try: - url = repo_conf['url'] - refs = repo_conf.get('refs', ['refs/heads/*']) - min_sync_delay = repo_conf.get('min_sync_delay', 60) or 0 - max_sync_delay = repo_conf.get('max_sync_delay') - last_sync = time.monotonic() - min_sync_delay - - while True: - dt = time.monotonic() - last_sync - if dt < min_sync_delay: - await asyncio.sleep(min_sync_delay - dt) - - sync_event.clear() - commit_hashes = await _git_ls_remote(url, refs) - last_sync = time.monotonic() - - for commit_hash in commit_hashes: - commit = await self._backend.get_commit(repo, commit_hash) - if commit: - continue - await self.run_commit(repo, commit_hash) - - if max_sync_delay is None: - await sync_event.wait() - - else: - with contextlib.suppress(asyncio.TimeoutError): - await asyncio.wait_for(sync_event.wait(), - max_sync_delay) - - finally: - self.close() - - async def _run_loop(self): - try: - while True: - commit = await self._run_queue.get() - repo_conf = self._conf['repos'][commit.repo] - action = repo_conf.get('action', '.hatter.yaml') - env = {**self._conf.get('env', {}), - **repo_conf.get('env', {})} - url = repo_conf['url'] - ref = commit.hash - - commit = commit._replace(change=int(time.time()), - status=common.Status.RUNNING, - output='') - await self._backend.update_commit(commit) - - try: - output = await _execute(action=action, - env=env, - url=url, - ref=ref) - status = common.Status.SUCCESS - - except Exception as e: - output = str(e) - status = common.Status.FAILURE - - commit = commit._replace(change=int(time.time()), - status=status, - output=output) - await self._backend.update_commit(commit) - - finally: - self.close() - - -async def _execute(action, env, url, ref): - cmd = [sys.executable, '-m', 'hatter', 'execute', - '--action', action, - *itertools.chain.from_iterable(('--env', i) for i in env), - url, ref] - - p = await asyncio.create_subprocess_exec(*cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - env={**os.environ, **env}) - - try: - output, _ = await p.communicate() - output = str(output, encoding='utf-8', errors='ignore') - - if p.returncode: - raise Exception(output) - - return output - - finally: - if p.returncode is None: - p.terminate() - - -async def _git_ls_remote(url, refs): - cmd = ['git', 'ls-remote', url, *refs] - - p = await asyncio.create_subprocess_exec(*cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - try: - stdout, stderr = await p.communicate() - if p.returncode: - stderr = str(stderr, encoding='utf-8', errors='ignore') - raise Exception(stderr) - - result = collections.deque() - stdout = str(stdout, encoding='utf-8', errors='ignore') - - for line in stdout.split('\n'): - segments = line.split(maxsplit=1) - if not segments: - continue - result.append(segments[0]) - - return result - - except Exception: - return [] - - finally: - if p.returncode is None: - p.terminate() diff --git a/src_py/hatter/ui.py b/src_py/hatter/ui.py deleted file mode 100644 index a10eede..0000000 --- a/src_py/hatter/ui.py +++ /dev/null @@ -1,232 +0,0 @@ -from pathlib import Path -import datetime - -from hat import aio -import aiohttp.web - -from hatter import common -import hatter.server - - -static_dir: Path = common.package_path / 'ui' - - -async def create(host: str, - port: int, - server: hatter.server.Server - ) -> 'UI': - ui = UI() - ui._server = server - ui._async_group = aio.Group() - - app = aiohttp.web.Application() - get_routes = ( - aiohttp.web.get(path, handler) for path, handler in ( - ('/', ui._process_get_root), - ('/repo/{repo}', ui._process_get_repo), - ('/repo/{repo}/commit/{commit}', ui._process_get_commit))) - post_routes = ( - aiohttp.web.post(path, handler) for path, handler in ( - ('/repo/{repo}/run', ui._process_post_run), - ('/repo/{repo}/commit/{commit}/remove', ui._process_post_remove))) - webhook_route = aiohttp.web.route('*', '/repo/{repo}/webhook', - ui._process_webhook) - static_route = aiohttp.web.static('/', static_dir) - app.add_routes([*get_routes, *post_routes, webhook_route, static_route]) - - runner = aiohttp.web.AppRunner(app) - await runner.setup() - ui.async_group.spawn(aio.call_on_cancel, runner.cleanup) - - try: - site = aiohttp.web.TCPSite(runner=runner, - host=host, - port=port, - shutdown_timeout=0.1, - reuse_address=True) - await site.start() - - except BaseException: - await aio.uncancellable(ui.async_group.async_close()) - raise - - return ui - - -class UI(aio.Resource): - - @property - def async_group(self): - return self._async_group - - async def _process_get_root(self, request): - commits = await self._server.get_commits(None) - - body = (f'{_generate_repos(self._server.repos)}\n' - f'{_generate_commits(commits)}') - return _create_html_response('hatter', body) - - async def _process_get_repo(self, request): - repo = self._get_repo(request) - commits = await self._server.get_commits(repo) - - title = f'hatter - {repo}' - body = (f'{_generate_commits(commits)}\n' - f'{_generate_run(repo)}') - return _create_html_response(title, body) - - async def _process_get_commit(self, request): - commit = await self._get_commit(request) - - title = f'hatter - {commit.repo}/{commit.hash}' - body = _generate_commit(commit) - return _create_html_response(title, body) - - async def _process_post_run(self, request): - repo = self._get_repo(request) - - body = await request.post() - commit_hash = body['hash'] - if not commit_hash: - raise aiohttp.web.HTTPBadRequest() - - commit = await self._server.run_commit(repo, commit_hash) - - url = f'/repo/{commit.repo}/commit/{commit.hash}' - raise aiohttp.web.HTTPFound(url) - - async def _process_post_remove(self, request): - commit = await self._get_commit(request) - - await self._server.remove_commit(commit) - - raise aiohttp.web.HTTPFound(f'/repo/{commit.repo}') - - async def _process_webhook(self, request): - repo = self._get_repo(request) - - self._server.sync_repo(repo) - return aiohttp.web.Response() - - def _get_repo(self, request): - repo = request.match_info['repo'] - if repo not in self._server.repos: - raise aiohttp.web.HTTPBadRequest() - return repo - - async def _get_commit(self, request): - repo = self._get_repo(request) - commit_hash = request.match_info['commit'] - commit = await self._server.get_commit(repo, commit_hash) - if not commit: - raise aiohttp.web.HTTPBadRequest() - return commit - - -def _create_html_response(title, body): - text = _html_template.format(title=title, - body=body) - return aiohttp.web.Response(content_type='text/html', - text=text) - - -def _generate_repos(repos): - items = '\n'.join(f'
  • {repo}
  • ' - for repo in repos) - return (f'
    \n' - f'

    Repositories

    \n' - f'\n' - f'
    ') - - -def _generate_commits(commits): - thead = ('\n' - 'Change\n' - 'Repo\n' - 'Commit\n' - 'Status\n' - '') - - tbody = '\n'.join( - (f'\n' - f'{_format_time(commit.change)}\n' - f'{_generate_repo_link(commit.repo)}\n' - f'{_generate_commit_link(commit)}\n' - f'{commit.status.name}\n' - f'') - for commit in commits) - - return (f'
    \n' - f'

    Commits

    \n' - f'\n' - f'\n' - f'{thead}\n' - f'\n' - f'\n' - f'{tbody}\n' - f'\n' - f'
    \n' - f'
    ') - - -def _generate_commit(commit): - run_action = f'/repo/{commit.repo}/run' - run_button = (f'
    \n' - f'\n' - f'\n' - f'
    ') - - remove_action = f'/repo/{commit.repo}/commit/{commit.hash}/remove' - remove_button = (f'
    \n' - f'\n' - f'
    ') - - repo_link = _generate_repo_link(commit.repo) - - return (f'
    \n' - f'
    {repo_link}
    \n' - f'
    {commit.hash}
    \n' - f'
    {_format_time(commit.change)}
    \n' - f'
    {commit.status.name}
    \n' - f'
    {commit.output}
    \n' - f'
    {run_button}{remove_button}
    \n' - f'
    ') - - -def _generate_run(repo): - return (f'
    \n' - f'
    \n' - f'\n' - f'\n' - f'
    \n' - f'
    ') - - -def _generate_repo_link(repo): - return f'{repo}' - - -def _generate_commit_link(commit): - url = f'/repo/{commit.repo}/commit/{commit.hash}' - return f'{commit.hash}' - - -def _format_time(t): - return datetime.datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S") - - -_html_template = r""" - - - - -{title} - - - -{body} - - -""" -- cgit v1.2.3-70-g09d2