aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter
diff options
context:
space:
mode:
Diffstat (limited to 'src_py/hatter')
-rw-r--r--src_py/hatter/backend.py77
-rw-r--r--src_py/hatter/common.py2
-rw-r--r--src_py/hatter/server.py58
3 files changed, 124 insertions, 13 deletions
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py
index e89a867..dd6a4f7 100644
--- a/src_py/hatter/backend.py
+++ b/src_py/hatter/backend.py
@@ -1,4 +1,5 @@
from pathlib import Path
+import sqlite3
import typing
from hat import aio
@@ -41,33 +42,93 @@ class Backend(aio.Resource):
self._executor, _ext_get_commit, self._db, repo, commit_hash)
async def update_commit(self, commit: common.Commit):
- return await self._async_group.spawn(
+ await self._async_group.spawn(
self._executor, _ext_update_commit, self._db, commit)
async def remove_commit(self, commit: common.Commit):
- return await self.async_group.spawn(
+ await self.async_group.spawn(
self._executor, _ext_remove_commit, self._db, commit)
def _ext_create(db_path):
- pass
+ db_path.parent.mkdir(exist_ok=True)
+ db = sqlite3.connect(f'file:{db_path}?nolock=1',
+ uri=True,
+ isolation_level=None,
+ detect_types=sqlite3.PARSE_DECLTYPES)
+
+ try:
+ db.executescript(r"""
+ PRAGMA journal_mode = OFF;
+ CREATE TABLE IF NOT EXISTS commits (
+ repo TEXT,
+ hash TEXT,
+ change INTEGER,
+ status INTEGER,
+ output TEXT,
+ PRIMARY KEY (repo, hash) ON CONFLICT REPLACE
+ );
+ CREATE INDEX IF NOT EXISTS commits_change_index ON commits (
+ change
+ )""")
+
+ except Exception:
+ db.close()
+ raise
+
+ return db
def _ext_close(db):
- pass
+ db.close()
def _ext_get_commits(db, repo, statuses, order):
- return []
+ cmd = "SELECT * FROM commits"
+ where = []
+ if repo:
+ where.append("repo = :repo")
+ if statuses:
+ status_values = (str(status.value) for status in statuses)
+ where.append(f"status IN ({', '.join(status_values)})")
+ if where:
+ cmd += f" WHERE {' AND '.join(where)}"
+ cmd += f" ORDER BY change {order.value}"
+ args = {'repo': repo}
+ cur = db.execute(cmd, args)
+ return [_commit_from_row(row) for row in cur]
def _ext_get_commit(db, repo, commit_hash):
- pass
+ cmd = "SELECT * FROM commits WHERE repo = :repo AND hash = :hash"
+ args = {'repo': repo,
+ 'hash': commit_hash}
+ cur = db.execute(cmd, args)
+ row = cur.fetchone()
+ return _commit_from_row(row) if row else None
def _ext_update_commit(db, commit):
- pass
+ cmd = ("INSERT OR REPLACE INTO commits VALUES "
+ "(:repo, :hash, :change, :status, :output)")
+ args = {'repo': commit.repo,
+ 'hash': commit.hash,
+ 'change': commit.change,
+ 'status': commit.status.value,
+ 'output': commit.output}
+ db.execute(cmd, args)
def _ext_remove_commit(db, commit):
- pass
+ cmd = "DELETE FROM commits WHERE repo = :repo AND hash = :hash"
+ args = {'repo': commit.repo,
+ 'hash': commit.hash}
+ db.execute(cmd, args)
+
+
+def _commit_from_row(row):
+ return common.Commit(repo=row[0],
+ hash=row[1],
+ change=row[2],
+ status=common.Status(row[3]),
+ output=row[4])
diff --git a/src_py/hatter/common.py b/src_py/hatter/common.py
index 1b1de47..ae314a5 100644
--- a/src_py/hatter/common.py
+++ b/src_py/hatter/common.py
@@ -27,6 +27,6 @@ class Status(enum.Enum):
class Commit(typing.NamedTuple):
repo: str
hash: str
- change: float
+ change: int
status: Status
output: str
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
index ccf100f..6d37244 100644
--- a/src_py/hatter/server.py
+++ b/src_py/hatter/server.py
@@ -1,5 +1,7 @@
import asyncio
import multiprocessing
+import subprocess
+import sys
import time
import typing
@@ -36,7 +38,7 @@ async def create(conf: json.Data,
order=common.Order.ASC)
for commit in commits:
- commit = commit._replace(change=time.time(),
+ commit = commit._replace(change=int(time.time()),
status=common.Status.PENDING,
output='')
await backend.update_commit(commit)
@@ -83,7 +85,7 @@ class Server(aio.Resource):
if not commit:
commit = common.Commit(repo=repo,
hash=commit_hash,
- change=time.time(),
+ change=int(time.time()),
status=common.Status.PENDING,
output='')
await self._backend.update_commit(commit)
@@ -105,7 +107,7 @@ class Server(aio.Resource):
if not commit:
raise ValueError(f'invalid commit {commit_hash}')
- commit = commit._replace(change=time.time(),
+ commit = commit._replace(change=int(time.time()),
status=common.Status.PENDING,
output='')
await self._backend.update_commit(commit)
@@ -128,4 +130,52 @@ class Server(aio.Resource):
pass
async def _run_loop(self):
- pass
+ try:
+ while True:
+ commit = await self._run_queue.get()
+ repo_conf = self._conf['repos'][commit.repo]
+ url = repo_conf['url']
+ ref = commit.hash
+ action = repo_conf.get('action', '.hatter.yaml')
+
+ commit = commit._replace(change=int(time.time()),
+ status=common.Status.RUNNING,
+ output='')
+ await self._backend.update_commit(commit)
+
+ try:
+ output = await _execute(url, ref, action)
+ status = common.Status.SUCCESS
+
+ except Exception as e:
+ output = str(e)
+ status = common.Status.FAILURE
+
+ commit = commit._replace(change=int(time.time()),
+ status=status,
+ output=output)
+ await self._backend.update_commit(commit)
+
+ finally:
+ self.close()
+
+
+async def _execute(url, ref, action):
+ p = await asyncio.create_subprocess_exec(
+ sys.executable, '-m', 'hatter', 'execute', url, ref, action,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+
+ try:
+ output, _ = await p.communicate()
+ output = str(output, encoding='utf-8', errors='ignore')
+
+ if p.returncode:
+ raise Exception(output)
+
+ return output
+
+ finally:
+ if p.returncode is None:
+ p.terminate()