aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'src_py/hatter/server.py')
-rw-r--r--src_py/hatter/server.py227
1 files changed, 0 insertions, 227 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
deleted file mode 100644
index 5f00707..0000000
--- a/src_py/hatter/server.py
+++ /dev/null
@@ -1,227 +0,0 @@
-import asyncio
-import collections
-import contextlib
-import itertools
-import multiprocessing
-import os
-import subprocess
-import sys
-import time
-import typing
-
-from hat import aio
-from hat import json
-
-from hatter import common
-import hatter.backend
-
-
-async def create(conf: json.Data,
- backend: hatter.backend.Backend
- ) -> 'Server':
- server = Server()
- server._conf = conf
- server._backend = backend
- server._async_group = aio.Group()
- server._repos = set(conf['repos'].keys())
- server._run_queue = aio.Queue()
- server._sync_events = {}
-
- for repo, repo_conf in conf['repos'].items():
- sync_event = asyncio.Event()
- server._sync_events[repo] = sync_event
- server.async_group.spawn(server._sync_loop, repo, repo_conf,
- sync_event)
-
- for _ in range(multiprocessing.cpu_count()):
- server.async_group.spawn(server._run_loop)
-
- try:
- commits = await backend.get_commits(repo=None,
- statuses={common.Status.PENDING,
- common.Status.RUNNING},
- order=common.Order.ASC)
-
- for commit in commits:
- commit = commit._replace(change=int(time.time()),
- status=common.Status.PENDING,
- output='')
- await backend.update_commit(commit)
- server._run_queue.put_nowait(commit)
-
- except BaseException:
- await aio.uncancellable(server.async_close())
- raise
-
- return server
-
-
-class Server(aio.Resource):
-
- @property
- def async_group(self):
- return self._async_group
-
- @property
- def repos(self) -> typing.Set[str]:
- return self._repos
-
- async def get_commits(self,
- repo: typing.Optional[str],
- ) -> typing.List[common.Commit]:
- return await self._backend.get_commits(repo=repo,
- statuses=None,
- order=common.Order.DESC)
-
- async def get_commit(self,
- repo: str,
- commit_hash: str
- ) -> typing.Optional[common.Commit]:
- return await self._backend.get_commit(repo, commit_hash)
-
- async def run_commit(self,
- repo: str,
- commit_hash: str
- ) -> common.Commit:
- commit = common.Commit(repo=repo,
- hash=commit_hash,
- change=int(time.time()),
- status=common.Status.PENDING,
- output='')
- await self._backend.update_commit(commit)
- self._run_queue.put_nowait(commit)
- return commit
-
- def sync_repo(self, repo: str):
- self._sync_events[repo].set()
-
- async def remove_commit(self, commit: common.Commit):
- await self._backend.remove_commit(commit)
-
- async def _sync_loop(self, repo, repo_conf, sync_event):
- try:
- url = repo_conf['url']
- refs = repo_conf.get('refs', ['refs/heads/*'])
- min_sync_delay = repo_conf.get('min_sync_delay', 60) or 0
- max_sync_delay = repo_conf.get('max_sync_delay')
- last_sync = time.monotonic() - min_sync_delay
-
- while True:
- dt = time.monotonic() - last_sync
- if dt < min_sync_delay:
- await asyncio.sleep(min_sync_delay - dt)
-
- sync_event.clear()
- commit_hashes = await _git_ls_remote(url, refs)
- last_sync = time.monotonic()
-
- for commit_hash in commit_hashes:
- commit = await self._backend.get_commit(repo, commit_hash)
- if commit:
- continue
- await self.run_commit(repo, commit_hash)
-
- if max_sync_delay is None:
- await sync_event.wait()
-
- else:
- with contextlib.suppress(asyncio.TimeoutError):
- await asyncio.wait_for(sync_event.wait(),
- max_sync_delay)
-
- finally:
- self.close()
-
- async def _run_loop(self):
- try:
- while True:
- commit = await self._run_queue.get()
- repo_conf = self._conf['repos'][commit.repo]
- action = repo_conf.get('action', '.hatter.yaml')
- env = {**self._conf.get('env', {}),
- **repo_conf.get('env', {})}
- url = repo_conf['url']
- ref = commit.hash
-
- commit = commit._replace(change=int(time.time()),
- status=common.Status.RUNNING,
- output='')
- await self._backend.update_commit(commit)
-
- try:
- output = await _execute(action=action,
- env=env,
- url=url,
- ref=ref)
- status = common.Status.SUCCESS
-
- except Exception as e:
- output = str(e)
- status = common.Status.FAILURE
-
- commit = commit._replace(change=int(time.time()),
- status=status,
- output=output)
- await self._backend.update_commit(commit)
-
- finally:
- self.close()
-
-
-async def _execute(action, env, url, ref):
- cmd = [sys.executable, '-m', 'hatter', 'execute',
- '--action', action,
- *itertools.chain.from_iterable(('--env', i) for i in env),
- url, ref]
-
- p = await asyncio.create_subprocess_exec(*cmd,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- env={**os.environ, **env})
-
- try:
- output, _ = await p.communicate()
- output = str(output, encoding='utf-8', errors='ignore')
-
- if p.returncode:
- raise Exception(output)
-
- return output
-
- finally:
- if p.returncode is None:
- p.terminate()
-
-
-async def _git_ls_remote(url, refs):
- cmd = ['git', 'ls-remote', url, *refs]
-
- p = await asyncio.create_subprocess_exec(*cmd,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- try:
- stdout, stderr = await p.communicate()
- if p.returncode:
- stderr = str(stderr, encoding='utf-8', errors='ignore')
- raise Exception(stderr)
-
- result = collections.deque()
- stdout = str(stdout, encoding='utf-8', errors='ignore')
-
- for line in stdout.split('\n'):
- segments = line.split(maxsplit=1)
- if not segments:
- continue
- result.append(segments[0])
-
- return result
-
- except Exception:
- return []
-
- finally:
- if p.returncode is None:
- p.terminate()