diff options
Diffstat (limited to 'src_py/hatter/server.py')
| -rw-r--r-- | src_py/hatter/server.py | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py index f9ee29f..ccf100f 100644 --- a/src_py/hatter/server.py +++ b/src_py/hatter/server.py @@ -1,6 +1,12 @@ +import asyncio +import multiprocessing +import time +import typing + from hat import aio from hat import json +from hatter import common import hatter.backend @@ -8,8 +14,37 @@ async def create(conf: json.Data, backend: hatter.backend.Backend ) -> 'Server': server = Server() + server._conf = conf server._backend = backend server._async_group = aio.Group() + server._lock = asyncio.Lock() + server._run_queue = aio.Queue() + server._sync_events = {} + + for repo, repo_conf in conf['repos'].items(): + sync_event = asyncio.Event() + server._sync_events[repo] = sync_event + server.async_group.spawn(server._sync_loop, repo_conf, sync_event) + + for _ in range(multiprocessing.cpu_count()): + server.async_group.spawn(server._run_loop) + + try: + commits = await backend.get_commits(repo=None, + statuses={common.Status.PENDING, + common.Status.RUNNING}, + order=common.Order.ASC) + + for commit in commits: + commit = commit._replace(change=time.time(), + status=common.Status.PENDING, + output='') + await backend.update_commit(commit) + server._run_queue.put_nowait(commit) + + except BaseException: + await aio.uncancellable(server.async_close()) + raise return server @@ -19,3 +54,78 @@ class Server(aio.Resource): @property def async_group(self): return self._async_group + + def get_repos(self) -> typing.Iterable[str]: + return self._conf['repos'].keys() + + async def get_commits(self, + repo: typing.Optional[str], + ) -> typing.Iterable[common.Commit]: + if repo and repo not in self._conf['repos']: + raise ValueError(f'invalid repo {repo}') + + commits = await self._backend.get_commits(repo=repo, + statuses=None, + order=common.Order.DESC) + + return commits + + async def get_commit(self, + repo: str, + commit_hash: str + ) -> common.Commit: + if repo not in self._conf['repos']: + raise ValueError(f'invalid repo {repo}') + + async with self._lock: + commit = await self._backend.get_commit(repo, commit_hash) + + if not commit: + commit = common.Commit(repo=repo, + hash=commit_hash, + change=time.time(), + status=common.Status.PENDING, + output='') + await self._backend.update_commit(commit) + self._run_queue.put_nowait(commit) + + return commit + + def sync_repo(self, repo: str): + self._sync_events[repo].set() + + async def rerun_commit(self, + repo: str, + commit_hash: str): + if repo not in self._conf['repos']: + raise ValueError(f'invalid repo {repo}') + + async with self._lock: + commit = await self._backend.get_commit(repo, commit_hash) + if not commit: + raise ValueError(f'invalid commit {commit_hash}') + + commit = commit._replace(change=time.time(), + status=common.Status.PENDING, + output='') + await self._backend.update_commit(commit) + self._run_queue.put_nowait(commit) + + async def remove_commit(self, + repo: str, + commit_hash: str): + if repo not in self._conf['repos']: + raise ValueError(f'invalid repo {repo}') + + async with self._lock: + commit = await self._backend.get_commit(repo, commit_hash) + if not commit: + raise ValueError(f'invalid commit {commit_hash}') + + await self._backend.remove_commit(commit) + + async def _sync_loop(self, repo_conf, sync_event): + pass + + async def _run_loop(self): + pass |
