aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter
diff options
context:
space:
mode:
Diffstat (limited to 'src_py/hatter')
-rw-r--r--src_py/hatter/main.py18
-rw-r--r--src_py/hatter/server.py163
-rw-r--r--src_py/hatter/ui.py135
3 files changed, 187 insertions, 129 deletions
diff --git a/src_py/hatter/main.py b/src_py/hatter/main.py
index c0ca225..8ceb892 100644
--- a/src_py/hatter/main.py
+++ b/src_py/hatter/main.py
@@ -1,6 +1,7 @@
from pathlib import Path
import asyncio
import contextlib
+import itertools
import logging.config
import subprocess
import sys
@@ -57,12 +58,16 @@ def main(log_level: str,
@main.command()
+@click.option('--action', default='.hatter.yaml',
+ help="action file path inside repository")
+@click.option('--env', multiple=True,
+ help="environment variables")
@click.argument('url', required=True)
@click.argument('ref', required=False, default='HEAD')
-@click.argument('action', required=False, default='.hatter.yaml')
-def execute(url: str,
- ref: str,
- action: str):
+def execute(action: str,
+ env: typing.Tuple[str],
+ url: str,
+ ref: str):
with contextlib.suppress(Exception):
path = Path(url)
if path.exists():
@@ -94,6 +99,8 @@ def execute(url: str,
command = conf['command']
subprocess.run(['podman', 'run', '-i', '--rm',
'-v', f'{repo_dir}:/hatter',
+ *itertools.chain.from_iterable(('--env', i)
+ for i in env),
image, '/bin/sh'],
input=f'set -e\ncd /hatter\n{command}\n',
encoding='utf-8',
@@ -135,9 +142,6 @@ async def async_server(host: str,
server = await hatter.server.create(conf, backend)
_bind_resource(async_group, server)
- for repo in server.repos:
- server.sync_repo(repo)
-
ui = await hatter.ui.create(host, port, server)
_bind_resource(async_group, ui)
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
index 5364f52..bce5880 100644
--- a/src_py/hatter/server.py
+++ b/src_py/hatter/server.py
@@ -1,4 +1,7 @@
import asyncio
+import collections
+import contextlib
+import itertools
import multiprocessing
import subprocess
import sys
@@ -20,14 +23,14 @@ async def create(conf: json.Data,
server._backend = backend
server._async_group = aio.Group()
server._repos = set(conf['repos'].keys())
- server._lock = asyncio.Lock()
server._run_queue = aio.Queue()
server._sync_events = {}
for repo, repo_conf in conf['repos'].items():
sync_event = asyncio.Event()
server._sync_events[repo] = sync_event
- server.async_group.spawn(server._sync_loop, repo_conf, sync_event)
+ server.async_group.spawn(server._sync_loop, repo, repo_conf,
+ sync_event)
for _ in range(multiprocessing.cpu_count()):
server.async_group.spawn(server._run_loop)
@@ -64,81 +67,80 @@ class Server(aio.Resource):
async def get_commits(self,
repo: typing.Optional[str],
- ) -> typing.Iterable[common.Commit]:
- if repo and repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
-
- commits = await self._backend.get_commits(repo=repo,
- statuses=None,
- order=common.Order.DESC)
-
- return commits
+ ) -> typing.List[common.Commit]:
+ return await self._backend.get_commits(repo=repo,
+ statuses=None,
+ order=common.Order.DESC)
async def get_commit(self,
repo: str,
commit_hash: str
- ) -> common.Commit:
- if repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
-
- async with self._lock:
- commit = await self._backend.get_commit(repo, commit_hash)
-
- if not commit:
- commit = common.Commit(repo=repo,
- hash=commit_hash,
- change=int(time.time()),
- status=common.Status.PENDING,
- output='')
- await self._backend.update_commit(commit)
- self._run_queue.put_nowait(commit)
+ ) -> typing.Optional[common.Commit]:
+ return await self._backend.get_commit(repo, commit_hash)
+ async def run_commit(self,
+ repo: str,
+ commit_hash: str
+ ) -> common.Commit:
+ commit = common.Commit(repo=repo,
+ hash=commit_hash,
+ change=int(time.time()),
+ status=common.Status.PENDING,
+ output='')
+ await self._backend.update_commit(commit)
+ self._run_queue.put_nowait(commit)
return commit
def sync_repo(self, repo: str):
self._sync_events[repo].set()
- async def rerun_commit(self,
- repo: str,
- commit_hash: str):
- if repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
+ async def remove_commit(self, commit: common.Commit):
+ await self._backend.remove_commit(commit)
+
+ async def _sync_loop(self, repo, repo_conf, sync_event):
+ try:
+ url = repo_conf['url']
+ refs = repo_conf.get('refs', ['refs/heads/*'])
+ min_sync_delay = repo_conf.get('min_sync_delay') or 0
+ max_sync_delay = repo_conf.get('max_sync_delay')
+ last_sync = time.monotonic() - min_sync_delay
- async with self._lock:
- commit = await self._backend.get_commit(repo, commit_hash)
- if not commit:
- raise ValueError(f'invalid commit {commit_hash}')
+ while True:
+ dt = time.monotonic() - last_sync
+ if dt < min_sync_delay:
+ await asyncio.sleep(min_sync_delay - dt)
- commit = commit._replace(change=int(time.time()),
- status=common.Status.PENDING,
- output='')
- await self._backend.update_commit(commit)
- self._run_queue.put_nowait(commit)
+ sync_event.clear()
+ commit_hashes = await _git_ls_remote(url, refs)
+ last_sync = time.monotonic()
- async def remove_commit(self,
- repo: str,
- commit_hash: str):
- if repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
+ for commit_hash in commit_hashes:
+ commit = await self._backend.get_commit(repo, commit_hash)
+ if commit:
+ continue
+ await self.run_commit(repo, commit_hash)
- async with self._lock:
- commit = await self._backend.get_commit(repo, commit_hash)
- if not commit:
- raise ValueError(f'invalid commit {commit_hash}')
+ if max_sync_delay is None:
+ await sync_event.wait()
- await self._backend.remove_commit(commit)
+ else:
+ with contextlib.suppress(asyncio.TimeoutError):
+ await asyncio.wait_for(sync_event.wait(),
+ max_sync_delay)
- async def _sync_loop(self, repo_conf, sync_event):
- pass
+ finally:
+ self.close()
async def _run_loop(self):
try:
while True:
commit = await self._run_queue.get()
repo_conf = self._conf['repos'][commit.repo]
+ action = repo_conf.get('action', '.hatter.yaml')
+ env = {**self._conf.get('env', {}),
+ **repo_conf.get('env', {})}
url = repo_conf['url']
ref = commit.hash
- action = repo_conf.get('action', '.hatter.yaml')
commit = commit._replace(change=int(time.time()),
status=common.Status.RUNNING,
@@ -146,7 +148,10 @@ class Server(aio.Resource):
await self._backend.update_commit(commit)
try:
- output = await _execute(url, ref, action)
+ output = await _execute(action=action,
+ env=env,
+ url=url,
+ ref=ref)
status = common.Status.SUCCESS
except Exception as e:
@@ -162,12 +167,17 @@ class Server(aio.Resource):
self.close()
-async def _execute(url, ref, action):
- p = await asyncio.create_subprocess_exec(
- sys.executable, '-m', 'hatter', 'execute', url, ref, action,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+async def _execute(action, env, url, ref):
+ cmd = [sys.executable, '-m', 'hatter', 'execute',
+ '--action', action,
+ *itertools.chain.from_iterable(('--env', i) for i in env),
+ url, ref]
+
+ p = await asyncio.create_subprocess_exec(cmd,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=env)
try:
output, _ = await p.communicate()
@@ -181,3 +191,36 @@ async def _execute(url, ref, action):
finally:
if p.returncode is None:
p.terminate()
+
+
+async def _git_ls_remote(url, refs):
+ cmd = ['git', 'ls-remote', url, *refs]
+
+ p = await asyncio.create_subprocess_exec(cmd,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ try:
+ stdout, stderr = await p.communicate()
+ if p.returncode:
+ stderr = str(stderr, encoding='utf-8', errors='ignore')
+ raise Exception(stderr)
+
+ result = collections.deque()
+ stdout = str(stdout, encoding='utf-8', errors='ignore')
+
+ for line in stdout.split('\n'):
+ segments = line.split(maxsplit=1)
+ if not segments:
+ continue
+ result.append(segments[0])
+
+ return result
+
+ except Exception:
+ return []
+
+ finally:
+ if p.returncode is None:
+ p.terminate()
diff --git a/src_py/hatter/ui.py b/src_py/hatter/ui.py
index cb827bd..a10eede 100644
--- a/src_py/hatter/ui.py
+++ b/src_py/hatter/ui.py
@@ -22,18 +22,17 @@ async def create(host: str,
app = aiohttp.web.Application()
get_routes = (
aiohttp.web.get(path, handler) for path, handler in (
- ('/', ui._get_root_handler),
- ('/repo/{repo}', ui._get_repo_handler),
- ('/repo/{repo}/commit/{commit}', ui._get_commit_handler)))
+ ('/', ui._process_get_root),
+ ('/repo/{repo}', ui._process_get_repo),
+ ('/repo/{repo}/commit/{commit}', ui._process_get_commit)))
post_routes = (
aiohttp.web.post(path, handler) for path, handler in (
- ('/repo/{repo}/webhook', ui._post_webhook_handler),
- ('/repo/{repo}/add', ui._post_add_handler),
- ('/repo/{repo}/commit/{commit}/rerun', ui._post_rerun_handler),
- ('/repo/{repo}/commit/{commit}/remove', ui._post_remove_handler)))
- app.add_routes([*get_routes,
- *post_routes,
- aiohttp.web.static('/', static_dir)])
+ ('/repo/{repo}/run', ui._process_post_run),
+ ('/repo/{repo}/commit/{commit}/remove', ui._process_post_remove)))
+ webhook_route = aiohttp.web.route('*', '/repo/{repo}/webhook',
+ ui._process_webhook)
+ static_route = aiohttp.web.static('/', static_dir)
+ app.add_routes([*get_routes, *post_routes, webhook_route, static_route])
runner = aiohttp.web.AppRunner(app)
await runner.setup()
@@ -60,70 +59,68 @@ class UI(aio.Resource):
def async_group(self):
return self._async_group
- async def _get_root_handler(self, request):
+ async def _process_get_root(self, request):
commits = await self._server.get_commits(None)
body = (f'{_generate_repos(self._server.repos)}\n'
f'{_generate_commits(commits)}')
return _create_html_response('hatter', body)
- async def _get_repo_handler(self, request):
- repo = _parse_repo(request, self._server.repos)
-
+ async def _process_get_repo(self, request):
+ repo = self._get_repo(request)
commits = await self._server.get_commits(repo)
+ title = f'hatter - {repo}'
body = (f'{_generate_commits(commits)}\n'
- f'{_generate_add(repo)}')
- return _create_html_response(f'hatter - {repo}', body)
-
- async def _get_commit_handler(self, request):
- repo = _parse_repo(request, self._server.repos)
+ f'{_generate_run(repo)}')
+ return _create_html_response(title, body)
- commit_hash = request.match_info['commit']
- commit = await self._server.get_commit(repo, commit_hash)
+ async def _process_get_commit(self, request):
+ commit = await self._get_commit(request)
+ title = f'hatter - {commit.repo}/{commit.hash}'
body = _generate_commit(commit)
- return _create_html_response(f'hatter - {repo}/{commit_hash}', body)
-
- async def _post_webhook_handler(self, request):
- repo = _parse_repo(request, self._server.repos)
-
- self._server.sync_repo(repo)
-
- return aiohttp.web.Response()
+ return _create_html_response(title, body)
- async def _post_add_handler(self, request):
- repo = _parse_repo(request, self._server.repos)
+ async def _process_post_run(self, request):
+ repo = self._get_repo(request)
body = await request.post()
commit_hash = body['hash']
if not commit_hash:
raise aiohttp.web.HTTPBadRequest()
- raise aiohttp.web.HTTPFound(f'/repo/{repo}/commit/{commit_hash}')
+ commit = await self._server.run_commit(repo, commit_hash)
- async def _post_rerun_handler(self, request):
- repo = _parse_repo(request, self._server.repos)
+ url = f'/repo/{commit.repo}/commit/{commit.hash}'
+ raise aiohttp.web.HTTPFound(url)
- commit_hash = request.match_info['commit']
- await self._server.rerun_commit(repo, commit_hash)
+ async def _process_post_remove(self, request):
+ commit = await self._get_commit(request)
- raise aiohttp.web.HTTPFound(f'/repo/{repo}/commit/{commit_hash}')
+ await self._server.remove_commit(commit)
- async def _post_remove_handler(self, request):
- repo = _parse_repo(request, self._server.repos)
+ raise aiohttp.web.HTTPFound(f'/repo/{commit.repo}')
- commit_hash = request.match_info['commit']
- await self._server.remove_commit(repo, commit_hash)
+ async def _process_webhook(self, request):
+ repo = self._get_repo(request)
- raise aiohttp.web.HTTPFound(f'/repo/{repo}')
+ self._server.sync_repo(repo)
+ return aiohttp.web.Response()
+ def _get_repo(self, request):
+ repo = request.match_info['repo']
+ if repo not in self._server.repos:
+ raise aiohttp.web.HTTPBadRequest()
+ return repo
-def _parse_repo(request, repos):
- repo = request.match_info['repo']
- if repo not in repos:
- raise aiohttp.web.HTTPBadRequest()
- return repo
+ async def _get_commit(self, request):
+ repo = self._get_repo(request)
+ commit_hash = request.match_info['commit']
+ commit = await self._server.get_commit(repo, commit_hash)
+ if not commit:
+ raise aiohttp.web.HTTPBadRequest()
+ return commit
def _create_html_response(title, body):
@@ -155,8 +152,8 @@ def _generate_commits(commits):
tbody = '\n'.join(
(f'<tr>\n'
f'<td class="col-change">{_format_time(commit.change)}</td>\n'
- f'<td class="col-repo"><a href="/repo/{commit.repo}">{commit.repo}</a></td>\n' # NOQA
- f'<td class="col-hash"><a href="/repo/{commit.repo}/commit/{commit.hash}">{commit.hash}</a></td>\n' # NOQA
+ f'<td class="col-repo">{_generate_repo_link(commit.repo)}</td>\n'
+ f'<td class="col-hash">{_generate_commit_link(commit)}</td>\n'
f'<td class="col-status">{commit.status.name}</td>\n'
f'</tr>')
for commit in commits)
@@ -175,33 +172,47 @@ def _generate_commits(commits):
def _generate_commit(commit):
- buttons = '\n'.join(
- (f'<form method="post" action="{action}">\n'
- f'<input type="submit" value="{value}">\n'
- f'</form>')
- for value, action in (
- ('Rerun', f'/repo/{commit.repo}/commit/{commit.hash}/rerun'),
- ('Remove', f'/repo/{commit.repo}/commit/{commit.hash}/remove')))
+ run_action = f'/repo/{commit.repo}/run'
+ run_button = (f'<form method="post" action="{run_action}">\n'
+ f'<input type="hidden" name="hash" value="{commit.hash}">\n'
+ f'<input type="submit" value="Run commit">\n'
+ f'</form>')
+
+ remove_action = f'/repo/{commit.repo}/commit/{commit.hash}/remove'
+ remove_button = (f'<form method="post" action="{remove_action}">\n'
+ f'<input type="submit" value="Remove commit">\n'
+ f'</form>')
+
+ repo_link = _generate_repo_link(commit.repo)
return (f'<div class="commit">\n'
- f'<label>Repo:</label><div><a href="/repo/{commit.repo}">{commit.repo}</a></div>\n' # NOQA
+ f'<label>Repo:</label><div>{repo_link}</div>\n'
f'<label>Commit:</label><div>{commit.hash}</div>\n'
f'<label>Change:</label><div>{_format_time(commit.change)}</div>\n'
f'<label>Status:</label><div>{commit.status.name}</div>\n'
f'<label>Output:</label><pre>{commit.output}</pre>\n'
- f'<label></label><div>{buttons}</div>\n'
+ f'<label></label><div>{run_button}{remove_button}</div>\n'
f'</div>')
-def _generate_add(repo):
- return (f'<div class="add">\n'
- f'<form method="post" action="/repo/{repo}/add">\n'
+def _generate_run(repo):
+ return (f'<div class="run">\n'
+ f'<form method="post" action="/repo/{repo}/run">\n'
f'<input type="text" name="hash">\n'
- f'<input type="submit" value="Add commit">\n'
+ f'<input type="submit" value="Run commit">\n'
f'</form>\n'
f'</div>')
+def _generate_repo_link(repo):
+ return f'<a href="/repo/{repo}">{repo}</a>'
+
+
+def _generate_commit_link(commit):
+ url = f'/repo/{commit.repo}/commit/{commit.hash}'
+ return f'<a href="{url}">{commit.hash}</a>'
+
+
def _format_time(t):
return datetime.datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S")