diff options
| author | bozo.kopic <bozo@kopic.xyz> | 2022-03-28 00:03:31 +0200 |
|---|---|---|
| committer | bozo.kopic <bozo@kopic.xyz> | 2022-03-28 00:03:31 +0200 |
| commit | b8a61f60a0aa115141987f461658666ccbfd2034 (patch) | |
| tree | 071d7c265be7e1ef72d1b9057c8bf443ea545e2e /src_py/hatter/server.py | |
| parent | 1d56c732965cad4c8eb06ed5638f7ecdea88f290 (diff) | |
rename hatter to boxhatter
Diffstat (limited to 'src_py/hatter/server.py')
| -rw-r--r-- | src_py/hatter/server.py | 227 |
1 files changed, 0 insertions, 227 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py deleted file mode 100644 index 5f00707..0000000 --- a/src_py/hatter/server.py +++ /dev/null @@ -1,227 +0,0 @@ -import asyncio -import collections -import contextlib -import itertools -import multiprocessing -import os -import subprocess -import sys -import time -import typing - -from hat import aio -from hat import json - -from hatter import common -import hatter.backend - - -async def create(conf: json.Data, - backend: hatter.backend.Backend - ) -> 'Server': - server = Server() - server._conf = conf - server._backend = backend - server._async_group = aio.Group() - server._repos = set(conf['repos'].keys()) - server._run_queue = aio.Queue() - server._sync_events = {} - - for repo, repo_conf in conf['repos'].items(): - sync_event = asyncio.Event() - server._sync_events[repo] = sync_event - server.async_group.spawn(server._sync_loop, repo, repo_conf, - sync_event) - - for _ in range(multiprocessing.cpu_count()): - server.async_group.spawn(server._run_loop) - - try: - commits = await backend.get_commits(repo=None, - statuses={common.Status.PENDING, - common.Status.RUNNING}, - order=common.Order.ASC) - - for commit in commits: - commit = commit._replace(change=int(time.time()), - status=common.Status.PENDING, - output='') - await backend.update_commit(commit) - server._run_queue.put_nowait(commit) - - except BaseException: - await aio.uncancellable(server.async_close()) - raise - - return server - - -class Server(aio.Resource): - - @property - def async_group(self): - return self._async_group - - @property - def repos(self) -> typing.Set[str]: - return self._repos - - async def get_commits(self, - repo: typing.Optional[str], - ) -> typing.List[common.Commit]: - return await self._backend.get_commits(repo=repo, - statuses=None, - order=common.Order.DESC) - - async def get_commit(self, - repo: str, - commit_hash: str - ) -> typing.Optional[common.Commit]: - return await self._backend.get_commit(repo, commit_hash) - - async def run_commit(self, - repo: str, - commit_hash: str - ) -> common.Commit: - commit = common.Commit(repo=repo, - hash=commit_hash, - change=int(time.time()), - status=common.Status.PENDING, - output='') - await self._backend.update_commit(commit) - self._run_queue.put_nowait(commit) - return commit - - def sync_repo(self, repo: str): - self._sync_events[repo].set() - - async def remove_commit(self, commit: common.Commit): - await self._backend.remove_commit(commit) - - async def _sync_loop(self, repo, repo_conf, sync_event): - try: - url = repo_conf['url'] - refs = repo_conf.get('refs', ['refs/heads/*']) - min_sync_delay = repo_conf.get('min_sync_delay', 60) or 0 - max_sync_delay = repo_conf.get('max_sync_delay') - last_sync = time.monotonic() - min_sync_delay - - while True: - dt = time.monotonic() - last_sync - if dt < min_sync_delay: - await asyncio.sleep(min_sync_delay - dt) - - sync_event.clear() - commit_hashes = await _git_ls_remote(url, refs) - last_sync = time.monotonic() - - for commit_hash in commit_hashes: - commit = await self._backend.get_commit(repo, commit_hash) - if commit: - continue - await self.run_commit(repo, commit_hash) - - if max_sync_delay is None: - await sync_event.wait() - - else: - with contextlib.suppress(asyncio.TimeoutError): - await asyncio.wait_for(sync_event.wait(), - max_sync_delay) - - finally: - self.close() - - async def _run_loop(self): - try: - while True: - commit = await self._run_queue.get() - repo_conf = self._conf['repos'][commit.repo] - action = repo_conf.get('action', '.hatter.yaml') - env = {**self._conf.get('env', {}), - **repo_conf.get('env', {})} - url = repo_conf['url'] - ref = commit.hash - - commit = commit._replace(change=int(time.time()), - status=common.Status.RUNNING, - output='') - await self._backend.update_commit(commit) - - try: - output = await _execute(action=action, - env=env, - url=url, - ref=ref) - status = common.Status.SUCCESS - - except Exception as e: - output = str(e) - status = common.Status.FAILURE - - commit = commit._replace(change=int(time.time()), - status=status, - output=output) - await self._backend.update_commit(commit) - - finally: - self.close() - - -async def _execute(action, env, url, ref): - cmd = [sys.executable, '-m', 'hatter', 'execute', - '--action', action, - *itertools.chain.from_iterable(('--env', i) for i in env), - url, ref] - - p = await asyncio.create_subprocess_exec(*cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - env={**os.environ, **env}) - - try: - output, _ = await p.communicate() - output = str(output, encoding='utf-8', errors='ignore') - - if p.returncode: - raise Exception(output) - - return output - - finally: - if p.returncode is None: - p.terminate() - - -async def _git_ls_remote(url, refs): - cmd = ['git', 'ls-remote', url, *refs] - - p = await asyncio.create_subprocess_exec(*cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - try: - stdout, stderr = await p.communicate() - if p.returncode: - stderr = str(stderr, encoding='utf-8', errors='ignore') - raise Exception(stderr) - - result = collections.deque() - stdout = str(stdout, encoding='utf-8', errors='ignore') - - for line in stdout.split('\n'): - segments = line.split(maxsplit=1) - if not segments: - continue - result.append(segments[0]) - - return result - - except Exception: - return [] - - finally: - if p.returncode is None: - p.terminate() |
