aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/server.py
diff options
context:
space:
mode:
authorbozo.kopic <bozo@kopic.xyz>2022-03-27 20:18:44 +0200
committerbozo.kopic <bozo@kopic.xyz>2022-03-27 20:18:44 +0200
commit1d06423081b04a54d69376f4ac0cea0b40dc5b2d (patch)
tree50469ae88dee87b421dca468b753778b3666ce8f /src_py/hatter/server.py
parent81e0cbc034e594c73a38202afc0676b3522c6b46 (diff)
WIP server
Diffstat (limited to 'src_py/hatter/server.py')
-rw-r--r--src_py/hatter/server.py163
1 files changed, 103 insertions, 60 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
index 5364f52..bce5880 100644
--- a/src_py/hatter/server.py
+++ b/src_py/hatter/server.py
@@ -1,4 +1,7 @@
import asyncio
+import collections
+import contextlib
+import itertools
import multiprocessing
import subprocess
import sys
@@ -20,14 +23,14 @@ async def create(conf: json.Data,
server._backend = backend
server._async_group = aio.Group()
server._repos = set(conf['repos'].keys())
- server._lock = asyncio.Lock()
server._run_queue = aio.Queue()
server._sync_events = {}
for repo, repo_conf in conf['repos'].items():
sync_event = asyncio.Event()
server._sync_events[repo] = sync_event
- server.async_group.spawn(server._sync_loop, repo_conf, sync_event)
+ server.async_group.spawn(server._sync_loop, repo, repo_conf,
+ sync_event)
for _ in range(multiprocessing.cpu_count()):
server.async_group.spawn(server._run_loop)
@@ -64,81 +67,80 @@ class Server(aio.Resource):
async def get_commits(self,
repo: typing.Optional[str],
- ) -> typing.Iterable[common.Commit]:
- if repo and repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
-
- commits = await self._backend.get_commits(repo=repo,
- statuses=None,
- order=common.Order.DESC)
-
- return commits
+ ) -> typing.List[common.Commit]:
+ return await self._backend.get_commits(repo=repo,
+ statuses=None,
+ order=common.Order.DESC)
async def get_commit(self,
repo: str,
commit_hash: str
- ) -> common.Commit:
- if repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
-
- async with self._lock:
- commit = await self._backend.get_commit(repo, commit_hash)
-
- if not commit:
- commit = common.Commit(repo=repo,
- hash=commit_hash,
- change=int(time.time()),
- status=common.Status.PENDING,
- output='')
- await self._backend.update_commit(commit)
- self._run_queue.put_nowait(commit)
+ ) -> typing.Optional[common.Commit]:
+ return await self._backend.get_commit(repo, commit_hash)
+ async def run_commit(self,
+ repo: str,
+ commit_hash: str
+ ) -> common.Commit:
+ commit = common.Commit(repo=repo,
+ hash=commit_hash,
+ change=int(time.time()),
+ status=common.Status.PENDING,
+ output='')
+ await self._backend.update_commit(commit)
+ self._run_queue.put_nowait(commit)
return commit
def sync_repo(self, repo: str):
self._sync_events[repo].set()
- async def rerun_commit(self,
- repo: str,
- commit_hash: str):
- if repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
+ async def remove_commit(self, commit: common.Commit):
+ await self._backend.remove_commit(commit)
+
+ async def _sync_loop(self, repo, repo_conf, sync_event):
+ try:
+ url = repo_conf['url']
+ refs = repo_conf.get('refs', ['refs/heads/*'])
+ min_sync_delay = repo_conf.get('min_sync_delay') or 0
+ max_sync_delay = repo_conf.get('max_sync_delay')
+ last_sync = time.monotonic() - min_sync_delay
- async with self._lock:
- commit = await self._backend.get_commit(repo, commit_hash)
- if not commit:
- raise ValueError(f'invalid commit {commit_hash}')
+ while True:
+ dt = time.monotonic() - last_sync
+ if dt < min_sync_delay:
+ await asyncio.sleep(min_sync_delay - dt)
- commit = commit._replace(change=int(time.time()),
- status=common.Status.PENDING,
- output='')
- await self._backend.update_commit(commit)
- self._run_queue.put_nowait(commit)
+ sync_event.clear()
+ commit_hashes = await _git_ls_remote(url, refs)
+ last_sync = time.monotonic()
- async def remove_commit(self,
- repo: str,
- commit_hash: str):
- if repo not in self._conf['repos']:
- raise ValueError(f'invalid repo {repo}')
+ for commit_hash in commit_hashes:
+ commit = await self._backend.get_commit(repo, commit_hash)
+ if commit:
+ continue
+ await self.run_commit(repo, commit_hash)
- async with self._lock:
- commit = await self._backend.get_commit(repo, commit_hash)
- if not commit:
- raise ValueError(f'invalid commit {commit_hash}')
+ if max_sync_delay is None:
+ await sync_event.wait()
- await self._backend.remove_commit(commit)
+ else:
+ with contextlib.suppress(asyncio.TimeoutError):
+ await asyncio.wait_for(sync_event.wait(),
+ max_sync_delay)
- async def _sync_loop(self, repo_conf, sync_event):
- pass
+ finally:
+ self.close()
async def _run_loop(self):
try:
while True:
commit = await self._run_queue.get()
repo_conf = self._conf['repos'][commit.repo]
+ action = repo_conf.get('action', '.hatter.yaml')
+ env = {**self._conf.get('env', {}),
+ **repo_conf.get('env', {})}
url = repo_conf['url']
ref = commit.hash
- action = repo_conf.get('action', '.hatter.yaml')
commit = commit._replace(change=int(time.time()),
status=common.Status.RUNNING,
@@ -146,7 +148,10 @@ class Server(aio.Resource):
await self._backend.update_commit(commit)
try:
- output = await _execute(url, ref, action)
+ output = await _execute(action=action,
+ env=env,
+ url=url,
+ ref=ref)
status = common.Status.SUCCESS
except Exception as e:
@@ -162,12 +167,17 @@ class Server(aio.Resource):
self.close()
-async def _execute(url, ref, action):
- p = await asyncio.create_subprocess_exec(
- sys.executable, '-m', 'hatter', 'execute', url, ref, action,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+async def _execute(action, env, url, ref):
+ cmd = [sys.executable, '-m', 'hatter', 'execute',
+ '--action', action,
+ *itertools.chain.from_iterable(('--env', i) for i in env),
+ url, ref]
+
+ p = await asyncio.create_subprocess_exec(cmd,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=env)
try:
output, _ = await p.communicate()
@@ -181,3 +191,36 @@ async def _execute(url, ref, action):
finally:
if p.returncode is None:
p.terminate()
+
+
+async def _git_ls_remote(url, refs):
+ cmd = ['git', 'ls-remote', url, *refs]
+
+ p = await asyncio.create_subprocess_exec(cmd,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ try:
+ stdout, stderr = await p.communicate()
+ if p.returncode:
+ stderr = str(stderr, encoding='utf-8', errors='ignore')
+ raise Exception(stderr)
+
+ result = collections.deque()
+ stdout = str(stdout, encoding='utf-8', errors='ignore')
+
+ for line in stdout.split('\n'):
+ segments = line.split(maxsplit=1)
+ if not segments:
+ continue
+ result.append(segments[0])
+
+ return result
+
+ except Exception:
+ return []
+
+ finally:
+ if p.returncode is None:
+ p.terminate()