diff options
Diffstat (limited to 'src_py')
| -rw-r--r-- | src_py/hatter/main.py | 18 | ||||
| -rw-r--r-- | src_py/hatter/server.py | 163 | ||||
| -rw-r--r-- | src_py/hatter/ui.py | 135 |
3 files changed, 187 insertions, 129 deletions
diff --git a/src_py/hatter/main.py b/src_py/hatter/main.py index c0ca225..8ceb892 100644 --- a/src_py/hatter/main.py +++ b/src_py/hatter/main.py @@ -1,6 +1,7 @@ from pathlib import Path import asyncio import contextlib +import itertools import logging.config import subprocess import sys @@ -57,12 +58,16 @@ def main(log_level: str, @main.command() +@click.option('--action', default='.hatter.yaml', + help="action file path inside repository") +@click.option('--env', multiple=True, + help="environment variables") @click.argument('url', required=True) @click.argument('ref', required=False, default='HEAD') -@click.argument('action', required=False, default='.hatter.yaml') -def execute(url: str, - ref: str, - action: str): +def execute(action: str, + env: typing.Tuple[str], + url: str, + ref: str): with contextlib.suppress(Exception): path = Path(url) if path.exists(): @@ -94,6 +99,8 @@ def execute(url: str, command = conf['command'] subprocess.run(['podman', 'run', '-i', '--rm', '-v', f'{repo_dir}:/hatter', + *itertools.chain.from_iterable(('--env', i) + for i in env), image, '/bin/sh'], input=f'set -e\ncd /hatter\n{command}\n', encoding='utf-8', @@ -135,9 +142,6 @@ async def async_server(host: str, server = await hatter.server.create(conf, backend) _bind_resource(async_group, server) - for repo in server.repos: - server.sync_repo(repo) - ui = await hatter.ui.create(host, port, server) _bind_resource(async_group, ui) diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py index 5364f52..bce5880 100644 --- a/src_py/hatter/server.py +++ b/src_py/hatter/server.py @@ -1,4 +1,7 @@ import asyncio +import collections +import contextlib +import itertools import multiprocessing import subprocess import sys @@ -20,14 +23,14 @@ async def create(conf: json.Data, server._backend = backend server._async_group = aio.Group() server._repos = set(conf['repos'].keys()) - server._lock = asyncio.Lock() server._run_queue = aio.Queue() server._sync_events = {} for repo, repo_conf in conf['repos'].items(): sync_event = asyncio.Event() server._sync_events[repo] = sync_event - server.async_group.spawn(server._sync_loop, repo_conf, sync_event) + server.async_group.spawn(server._sync_loop, repo, repo_conf, + sync_event) for _ in range(multiprocessing.cpu_count()): server.async_group.spawn(server._run_loop) @@ -64,81 +67,80 @@ class Server(aio.Resource): async def get_commits(self, repo: typing.Optional[str], - ) -> typing.Iterable[common.Commit]: - if repo and repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') - - commits = await self._backend.get_commits(repo=repo, - statuses=None, - order=common.Order.DESC) - - return commits + ) -> typing.List[common.Commit]: + return await self._backend.get_commits(repo=repo, + statuses=None, + order=common.Order.DESC) async def get_commit(self, repo: str, commit_hash: str - ) -> common.Commit: - if repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') - - async with self._lock: - commit = await self._backend.get_commit(repo, commit_hash) - - if not commit: - commit = common.Commit(repo=repo, - hash=commit_hash, - change=int(time.time()), - status=common.Status.PENDING, - output='') - await self._backend.update_commit(commit) - self._run_queue.put_nowait(commit) + ) -> typing.Optional[common.Commit]: + return await self._backend.get_commit(repo, commit_hash) + async def run_commit(self, + repo: str, + commit_hash: str + ) -> common.Commit: + commit = common.Commit(repo=repo, + hash=commit_hash, + change=int(time.time()), + status=common.Status.PENDING, + output='') + await self._backend.update_commit(commit) + self._run_queue.put_nowait(commit) return commit def sync_repo(self, repo: str): self._sync_events[repo].set() - async def rerun_commit(self, - repo: str, - commit_hash: str): - if repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') + async def remove_commit(self, commit: common.Commit): + await self._backend.remove_commit(commit) + + async def _sync_loop(self, repo, repo_conf, sync_event): + try: + url = repo_conf['url'] + refs = repo_conf.get('refs', ['refs/heads/*']) + min_sync_delay = repo_conf.get('min_sync_delay') or 0 + max_sync_delay = repo_conf.get('max_sync_delay') + last_sync = time.monotonic() - min_sync_delay - async with self._lock: - commit = await self._backend.get_commit(repo, commit_hash) - if not commit: - raise ValueError(f'invalid commit {commit_hash}') + while True: + dt = time.monotonic() - last_sync + if dt < min_sync_delay: + await asyncio.sleep(min_sync_delay - dt) - commit = commit._replace(change=int(time.time()), - status=common.Status.PENDING, - output='') - await self._backend.update_commit(commit) - self._run_queue.put_nowait(commit) + sync_event.clear() + commit_hashes = await _git_ls_remote(url, refs) + last_sync = time.monotonic() - async def remove_commit(self, - repo: str, - commit_hash: str): - if repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') + for commit_hash in commit_hashes: + commit = await self._backend.get_commit(repo, commit_hash) + if commit: + continue + await self.run_commit(repo, commit_hash) - async with self._lock: - commit = await self._backend.get_commit(repo, commit_hash) - if not commit: - raise ValueError(f'invalid commit {commit_hash}') + if max_sync_delay is None: + await sync_event.wait() - await self._backend.remove_commit(commit) + else: + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(sync_event.wait(), + max_sync_delay) - async def _sync_loop(self, repo_conf, sync_event): - pass + finally: + self.close() async def _run_loop(self): try: while True: commit = await self._run_queue.get() repo_conf = self._conf['repos'][commit.repo] + action = repo_conf.get('action', '.hatter.yaml') + env = {**self._conf.get('env', {}), + **repo_conf.get('env', {})} url = repo_conf['url'] ref = commit.hash - action = repo_conf.get('action', '.hatter.yaml') commit = commit._replace(change=int(time.time()), status=common.Status.RUNNING, @@ -146,7 +148,10 @@ class Server(aio.Resource): await self._backend.update_commit(commit) try: - output = await _execute(url, ref, action) + output = await _execute(action=action, + env=env, + url=url, + ref=ref) status = common.Status.SUCCESS except Exception as e: @@ -162,12 +167,17 @@ class Server(aio.Resource): self.close() -async def _execute(url, ref, action): - p = await asyncio.create_subprocess_exec( - sys.executable, '-m', 'hatter', 'execute', url, ref, action, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) +async def _execute(action, env, url, ref): + cmd = [sys.executable, '-m', 'hatter', 'execute', + '--action', action, + *itertools.chain.from_iterable(('--env', i) for i in env), + url, ref] + + p = await asyncio.create_subprocess_exec(cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env) try: output, _ = await p.communicate() @@ -181,3 +191,36 @@ async def _execute(url, ref, action): finally: if p.returncode is None: p.terminate() + + +async def _git_ls_remote(url, refs): + cmd = ['git', 'ls-remote', url, *refs] + + p = await asyncio.create_subprocess_exec(cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + try: + stdout, stderr = await p.communicate() + if p.returncode: + stderr = str(stderr, encoding='utf-8', errors='ignore') + raise Exception(stderr) + + result = collections.deque() + stdout = str(stdout, encoding='utf-8', errors='ignore') + + for line in stdout.split('\n'): + segments = line.split(maxsplit=1) + if not segments: + continue + result.append(segments[0]) + + return result + + except Exception: + return [] + + finally: + if p.returncode is None: + p.terminate() diff --git a/src_py/hatter/ui.py b/src_py/hatter/ui.py index cb827bd..a10eede 100644 --- a/src_py/hatter/ui.py +++ b/src_py/hatter/ui.py @@ -22,18 +22,17 @@ async def create(host: str, app = aiohttp.web.Application() get_routes = ( aiohttp.web.get(path, handler) for path, handler in ( - ('/', ui._get_root_handler), - ('/repo/{repo}', ui._get_repo_handler), - ('/repo/{repo}/commit/{commit}', ui._get_commit_handler))) + ('/', ui._process_get_root), + ('/repo/{repo}', ui._process_get_repo), + ('/repo/{repo}/commit/{commit}', ui._process_get_commit))) post_routes = ( aiohttp.web.post(path, handler) for path, handler in ( - ('/repo/{repo}/webhook', ui._post_webhook_handler), - ('/repo/{repo}/add', ui._post_add_handler), - ('/repo/{repo}/commit/{commit}/rerun', ui._post_rerun_handler), - ('/repo/{repo}/commit/{commit}/remove', ui._post_remove_handler))) - app.add_routes([*get_routes, - *post_routes, - aiohttp.web.static('/', static_dir)]) + ('/repo/{repo}/run', ui._process_post_run), + ('/repo/{repo}/commit/{commit}/remove', ui._process_post_remove))) + webhook_route = aiohttp.web.route('*', '/repo/{repo}/webhook', + ui._process_webhook) + static_route = aiohttp.web.static('/', static_dir) + app.add_routes([*get_routes, *post_routes, webhook_route, static_route]) runner = aiohttp.web.AppRunner(app) await runner.setup() @@ -60,70 +59,68 @@ class UI(aio.Resource): def async_group(self): return self._async_group - async def _get_root_handler(self, request): + async def _process_get_root(self, request): commits = await self._server.get_commits(None) body = (f'{_generate_repos(self._server.repos)}\n' f'{_generate_commits(commits)}') return _create_html_response('hatter', body) - async def _get_repo_handler(self, request): - repo = _parse_repo(request, self._server.repos) - + async def _process_get_repo(self, request): + repo = self._get_repo(request) commits = await self._server.get_commits(repo) + title = f'hatter - {repo}' body = (f'{_generate_commits(commits)}\n' - f'{_generate_add(repo)}') - return _create_html_response(f'hatter - {repo}', body) - - async def _get_commit_handler(self, request): - repo = _parse_repo(request, self._server.repos) + f'{_generate_run(repo)}') + return _create_html_response(title, body) - commit_hash = request.match_info['commit'] - commit = await self._server.get_commit(repo, commit_hash) + async def _process_get_commit(self, request): + commit = await self._get_commit(request) + title = f'hatter - {commit.repo}/{commit.hash}' body = _generate_commit(commit) - return _create_html_response(f'hatter - {repo}/{commit_hash}', body) - - async def _post_webhook_handler(self, request): - repo = _parse_repo(request, self._server.repos) - - self._server.sync_repo(repo) - - return aiohttp.web.Response() + return _create_html_response(title, body) - async def _post_add_handler(self, request): - repo = _parse_repo(request, self._server.repos) + async def _process_post_run(self, request): + repo = self._get_repo(request) body = await request.post() commit_hash = body['hash'] if not commit_hash: raise aiohttp.web.HTTPBadRequest() - raise aiohttp.web.HTTPFound(f'/repo/{repo}/commit/{commit_hash}') + commit = await self._server.run_commit(repo, commit_hash) - async def _post_rerun_handler(self, request): - repo = _parse_repo(request, self._server.repos) + url = f'/repo/{commit.repo}/commit/{commit.hash}' + raise aiohttp.web.HTTPFound(url) - commit_hash = request.match_info['commit'] - await self._server.rerun_commit(repo, commit_hash) + async def _process_post_remove(self, request): + commit = await self._get_commit(request) - raise aiohttp.web.HTTPFound(f'/repo/{repo}/commit/{commit_hash}') + await self._server.remove_commit(commit) - async def _post_remove_handler(self, request): - repo = _parse_repo(request, self._server.repos) + raise aiohttp.web.HTTPFound(f'/repo/{commit.repo}') - commit_hash = request.match_info['commit'] - await self._server.remove_commit(repo, commit_hash) + async def _process_webhook(self, request): + repo = self._get_repo(request) - raise aiohttp.web.HTTPFound(f'/repo/{repo}') + self._server.sync_repo(repo) + return aiohttp.web.Response() + def _get_repo(self, request): + repo = request.match_info['repo'] + if repo not in self._server.repos: + raise aiohttp.web.HTTPBadRequest() + return repo -def _parse_repo(request, repos): - repo = request.match_info['repo'] - if repo not in repos: - raise aiohttp.web.HTTPBadRequest() - return repo + async def _get_commit(self, request): + repo = self._get_repo(request) + commit_hash = request.match_info['commit'] + commit = await self._server.get_commit(repo, commit_hash) + if not commit: + raise aiohttp.web.HTTPBadRequest() + return commit def _create_html_response(title, body): @@ -155,8 +152,8 @@ def _generate_commits(commits): tbody = '\n'.join( (f'<tr>\n' f'<td class="col-change">{_format_time(commit.change)}</td>\n' - f'<td class="col-repo"><a href="/repo/{commit.repo}">{commit.repo}</a></td>\n' # NOQA - f'<td class="col-hash"><a href="/repo/{commit.repo}/commit/{commit.hash}">{commit.hash}</a></td>\n' # NOQA + f'<td class="col-repo">{_generate_repo_link(commit.repo)}</td>\n' + f'<td class="col-hash">{_generate_commit_link(commit)}</td>\n' f'<td class="col-status">{commit.status.name}</td>\n' f'</tr>') for commit in commits) @@ -175,33 +172,47 @@ def _generate_commits(commits): def _generate_commit(commit): - buttons = '\n'.join( - (f'<form method="post" action="{action}">\n' - f'<input type="submit" value="{value}">\n' - f'</form>') - for value, action in ( - ('Rerun', f'/repo/{commit.repo}/commit/{commit.hash}/rerun'), - ('Remove', f'/repo/{commit.repo}/commit/{commit.hash}/remove'))) + run_action = f'/repo/{commit.repo}/run' + run_button = (f'<form method="post" action="{run_action}">\n' + f'<input type="hidden" name="hash" value="{commit.hash}">\n' + f'<input type="submit" value="Run commit">\n' + f'</form>') + + remove_action = f'/repo/{commit.repo}/commit/{commit.hash}/remove' + remove_button = (f'<form method="post" action="{remove_action}">\n' + f'<input type="submit" value="Remove commit">\n' + f'</form>') + + repo_link = _generate_repo_link(commit.repo) return (f'<div class="commit">\n' - f'<label>Repo:</label><div><a href="/repo/{commit.repo}">{commit.repo}</a></div>\n' # NOQA + f'<label>Repo:</label><div>{repo_link}</div>\n' f'<label>Commit:</label><div>{commit.hash}</div>\n' f'<label>Change:</label><div>{_format_time(commit.change)}</div>\n' f'<label>Status:</label><div>{commit.status.name}</div>\n' f'<label>Output:</label><pre>{commit.output}</pre>\n' - f'<label></label><div>{buttons}</div>\n' + f'<label></label><div>{run_button}{remove_button}</div>\n' f'</div>') -def _generate_add(repo): - return (f'<div class="add">\n' - f'<form method="post" action="/repo/{repo}/add">\n' +def _generate_run(repo): + return (f'<div class="run">\n' + f'<form method="post" action="/repo/{repo}/run">\n' f'<input type="text" name="hash">\n' - f'<input type="submit" value="Add commit">\n' + f'<input type="submit" value="Run commit">\n' f'</form>\n' f'</div>') +def _generate_repo_link(repo): + return f'<a href="/repo/{repo}">{repo}</a>' + + +def _generate_commit_link(commit): + url = f'/repo/{commit.repo}/commit/{commit.hash}' + return f'<a href="{url}">{commit.hash}</a>' + + def _format_time(t): return datetime.datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S") |
