From 1d06423081b04a54d69376f4ac0cea0b40dc5b2d Mon Sep 17 00:00:00 2001 From: "bozo.kopic" Date: Sun, 27 Mar 2022 20:18:44 +0200 Subject: WIP server --- src_py/hatter/server.py | 163 ++++++++++++++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 60 deletions(-) (limited to 'src_py/hatter/server.py') diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py index 5364f52..bce5880 100644 --- a/src_py/hatter/server.py +++ b/src_py/hatter/server.py @@ -1,4 +1,7 @@ import asyncio +import collections +import contextlib +import itertools import multiprocessing import subprocess import sys @@ -20,14 +23,14 @@ async def create(conf: json.Data, server._backend = backend server._async_group = aio.Group() server._repos = set(conf['repos'].keys()) - server._lock = asyncio.Lock() server._run_queue = aio.Queue() server._sync_events = {} for repo, repo_conf in conf['repos'].items(): sync_event = asyncio.Event() server._sync_events[repo] = sync_event - server.async_group.spawn(server._sync_loop, repo_conf, sync_event) + server.async_group.spawn(server._sync_loop, repo, repo_conf, + sync_event) for _ in range(multiprocessing.cpu_count()): server.async_group.spawn(server._run_loop) @@ -64,81 +67,80 @@ class Server(aio.Resource): async def get_commits(self, repo: typing.Optional[str], - ) -> typing.Iterable[common.Commit]: - if repo and repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') - - commits = await self._backend.get_commits(repo=repo, - statuses=None, - order=common.Order.DESC) - - return commits + ) -> typing.List[common.Commit]: + return await self._backend.get_commits(repo=repo, + statuses=None, + order=common.Order.DESC) async def get_commit(self, repo: str, commit_hash: str - ) -> common.Commit: - if repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') - - async with self._lock: - commit = await self._backend.get_commit(repo, commit_hash) - - if not commit: - commit = common.Commit(repo=repo, - hash=commit_hash, - change=int(time.time()), - status=common.Status.PENDING, - output='') - await self._backend.update_commit(commit) - self._run_queue.put_nowait(commit) + ) -> typing.Optional[common.Commit]: + return await self._backend.get_commit(repo, commit_hash) + async def run_commit(self, + repo: str, + commit_hash: str + ) -> common.Commit: + commit = common.Commit(repo=repo, + hash=commit_hash, + change=int(time.time()), + status=common.Status.PENDING, + output='') + await self._backend.update_commit(commit) + self._run_queue.put_nowait(commit) return commit def sync_repo(self, repo: str): self._sync_events[repo].set() - async def rerun_commit(self, - repo: str, - commit_hash: str): - if repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') + async def remove_commit(self, commit: common.Commit): + await self._backend.remove_commit(commit) + + async def _sync_loop(self, repo, repo_conf, sync_event): + try: + url = repo_conf['url'] + refs = repo_conf.get('refs', ['refs/heads/*']) + min_sync_delay = repo_conf.get('min_sync_delay') or 0 + max_sync_delay = repo_conf.get('max_sync_delay') + last_sync = time.monotonic() - min_sync_delay - async with self._lock: - commit = await self._backend.get_commit(repo, commit_hash) - if not commit: - raise ValueError(f'invalid commit {commit_hash}') + while True: + dt = time.monotonic() - last_sync + if dt < min_sync_delay: + await asyncio.sleep(min_sync_delay - dt) - commit = commit._replace(change=int(time.time()), - status=common.Status.PENDING, - output='') - await self._backend.update_commit(commit) - self._run_queue.put_nowait(commit) + sync_event.clear() + commit_hashes = await _git_ls_remote(url, refs) + last_sync = time.monotonic() - async def remove_commit(self, - repo: str, - commit_hash: str): - if repo not in self._conf['repos']: - raise ValueError(f'invalid repo {repo}') + for commit_hash in commit_hashes: + commit = await self._backend.get_commit(repo, commit_hash) + if commit: + continue + await self.run_commit(repo, commit_hash) - async with self._lock: - commit = await self._backend.get_commit(repo, commit_hash) - if not commit: - raise ValueError(f'invalid commit {commit_hash}') + if max_sync_delay is None: + await sync_event.wait() - await self._backend.remove_commit(commit) + else: + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(sync_event.wait(), + max_sync_delay) - async def _sync_loop(self, repo_conf, sync_event): - pass + finally: + self.close() async def _run_loop(self): try: while True: commit = await self._run_queue.get() repo_conf = self._conf['repos'][commit.repo] + action = repo_conf.get('action', '.hatter.yaml') + env = {**self._conf.get('env', {}), + **repo_conf.get('env', {})} url = repo_conf['url'] ref = commit.hash - action = repo_conf.get('action', '.hatter.yaml') commit = commit._replace(change=int(time.time()), status=common.Status.RUNNING, @@ -146,7 +148,10 @@ class Server(aio.Resource): await self._backend.update_commit(commit) try: - output = await _execute(url, ref, action) + output = await _execute(action=action, + env=env, + url=url, + ref=ref) status = common.Status.SUCCESS except Exception as e: @@ -162,12 +167,17 @@ class Server(aio.Resource): self.close() -async def _execute(url, ref, action): - p = await asyncio.create_subprocess_exec( - sys.executable, '-m', 'hatter', 'execute', url, ref, action, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) +async def _execute(action, env, url, ref): + cmd = [sys.executable, '-m', 'hatter', 'execute', + '--action', action, + *itertools.chain.from_iterable(('--env', i) for i in env), + url, ref] + + p = await asyncio.create_subprocess_exec(cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env) try: output, _ = await p.communicate() @@ -181,3 +191,36 @@ async def _execute(url, ref, action): finally: if p.returncode is None: p.terminate() + + +async def _git_ls_remote(url, refs): + cmd = ['git', 'ls-remote', url, *refs] + + p = await asyncio.create_subprocess_exec(cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + try: + stdout, stderr = await p.communicate() + if p.returncode: + stderr = str(stderr, encoding='utf-8', errors='ignore') + raise Exception(stderr) + + result = collections.deque() + stdout = str(stdout, encoding='utf-8', errors='ignore') + + for line in stdout.split('\n'): + segments = line.split(maxsplit=1) + if not segments: + continue + result.append(segments[0]) + + return result + + except Exception: + return [] + + finally: + if p.returncode is None: + p.terminate() -- cgit v1.2.3-70-g09d2