diff options
Diffstat (limited to 'src_py/hatter')
| -rw-r--r-- | src_py/hatter/backend.py | 77 | ||||
| -rw-r--r-- | src_py/hatter/common.py | 2 | ||||
| -rw-r--r-- | src_py/hatter/server.py | 58 |
3 files changed, 124 insertions, 13 deletions
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py index e89a867..dd6a4f7 100644 --- a/src_py/hatter/backend.py +++ b/src_py/hatter/backend.py @@ -1,4 +1,5 @@ from pathlib import Path +import sqlite3 import typing from hat import aio @@ -41,33 +42,93 @@ class Backend(aio.Resource): self._executor, _ext_get_commit, self._db, repo, commit_hash) async def update_commit(self, commit: common.Commit): - return await self._async_group.spawn( + await self._async_group.spawn( self._executor, _ext_update_commit, self._db, commit) async def remove_commit(self, commit: common.Commit): - return await self.async_group.spawn( + await self.async_group.spawn( self._executor, _ext_remove_commit, self._db, commit) def _ext_create(db_path): - pass + db_path.parent.mkdir(exist_ok=True) + db = sqlite3.connect(f'file:{db_path}?nolock=1', + uri=True, + isolation_level=None, + detect_types=sqlite3.PARSE_DECLTYPES) + + try: + db.executescript(r""" + PRAGMA journal_mode = OFF; + CREATE TABLE IF NOT EXISTS commits ( + repo TEXT, + hash TEXT, + change INTEGER, + status INTEGER, + output TEXT, + PRIMARY KEY (repo, hash) ON CONFLICT REPLACE + ); + CREATE INDEX IF NOT EXISTS commits_change_index ON commits ( + change + )""") + + except Exception: + db.close() + raise + + return db def _ext_close(db): - pass + db.close() def _ext_get_commits(db, repo, statuses, order): - return [] + cmd = "SELECT * FROM commits" + where = [] + if repo: + where.append("repo = :repo") + if statuses: + status_values = (str(status.value) for status in statuses) + where.append(f"status IN ({', '.join(status_values)})") + if where: + cmd += f" WHERE {' AND '.join(where)}" + cmd += f" ORDER BY change {order.value}" + args = {'repo': repo} + cur = db.execute(cmd, args) + return [_commit_from_row(row) for row in cur] def _ext_get_commit(db, repo, commit_hash): - pass + cmd = "SELECT * FROM commits WHERE repo = :repo AND hash = :hash" + args = {'repo': repo, + 'hash': commit_hash} + cur = db.execute(cmd, args) + row = cur.fetchone() + return _commit_from_row(row) if row else None def _ext_update_commit(db, commit): - pass + cmd = ("INSERT OR REPLACE INTO commits VALUES " + "(:repo, :hash, :change, :status, :output)") + args = {'repo': commit.repo, + 'hash': commit.hash, + 'change': commit.change, + 'status': commit.status.value, + 'output': commit.output} + db.execute(cmd, args) def _ext_remove_commit(db, commit): - pass + cmd = "DELETE FROM commits WHERE repo = :repo AND hash = :hash" + args = {'repo': commit.repo, + 'hash': commit.hash} + db.execute(cmd, args) + + +def _commit_from_row(row): + return common.Commit(repo=row[0], + hash=row[1], + change=row[2], + status=common.Status(row[3]), + output=row[4]) diff --git a/src_py/hatter/common.py b/src_py/hatter/common.py index 1b1de47..ae314a5 100644 --- a/src_py/hatter/common.py +++ b/src_py/hatter/common.py @@ -27,6 +27,6 @@ class Status(enum.Enum): class Commit(typing.NamedTuple): repo: str hash: str - change: float + change: int status: Status output: str diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py index ccf100f..6d37244 100644 --- a/src_py/hatter/server.py +++ b/src_py/hatter/server.py @@ -1,5 +1,7 @@ import asyncio import multiprocessing +import subprocess +import sys import time import typing @@ -36,7 +38,7 @@ async def create(conf: json.Data, order=common.Order.ASC) for commit in commits: - commit = commit._replace(change=time.time(), + commit = commit._replace(change=int(time.time()), status=common.Status.PENDING, output='') await backend.update_commit(commit) @@ -83,7 +85,7 @@ class Server(aio.Resource): if not commit: commit = common.Commit(repo=repo, hash=commit_hash, - change=time.time(), + change=int(time.time()), status=common.Status.PENDING, output='') await self._backend.update_commit(commit) @@ -105,7 +107,7 @@ class Server(aio.Resource): if not commit: raise ValueError(f'invalid commit {commit_hash}') - commit = commit._replace(change=time.time(), + commit = commit._replace(change=int(time.time()), status=common.Status.PENDING, output='') await self._backend.update_commit(commit) @@ -128,4 +130,52 @@ class Server(aio.Resource): pass async def _run_loop(self): - pass + try: + while True: + commit = await self._run_queue.get() + repo_conf = self._conf['repos'][commit.repo] + url = repo_conf['url'] + ref = commit.hash + action = repo_conf.get('action', '.hatter.yaml') + + commit = commit._replace(change=int(time.time()), + status=common.Status.RUNNING, + output='') + await self._backend.update_commit(commit) + + try: + output = await _execute(url, ref, action) + status = common.Status.SUCCESS + + except Exception as e: + output = str(e) + status = common.Status.FAILURE + + commit = commit._replace(change=int(time.time()), + status=status, + output=output) + await self._backend.update_commit(commit) + + finally: + self.close() + + +async def _execute(url, ref, action): + p = await asyncio.create_subprocess_exec( + sys.executable, '-m', 'hatter', 'execute', url, ref, action, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + try: + output, _ = await p.communicate() + output = str(output, encoding='utf-8', errors='ignore') + + if p.returncode: + raise Exception(output) + + return output + + finally: + if p.returncode is None: + p.terminate() |
