aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbozo.kopic <bozo.kopic@gmail.com>2017-08-22 16:01:58 +0200
committerbozo.kopic <bozo.kopic@gmail.com>2017-08-22 16:01:58 +0200
commitb82c0e53536dc100fbe93806dcd0062c197f4f83 (patch)
treec48d9747c2654305b2b28196b238e57496ae5617
parentd269714518d0f31b4774c83e29ac8aee3e416a0e (diff)
backend
-rw-r--r--README.rst1
-rw-r--r--src_py/hatter/backend.py99
-rw-r--r--src_py/hatter/server.py4
-rw-r--r--src_py/hatter/util.py42
4 files changed, 130 insertions, 16 deletions
diff --git a/README.rst b/README.rst
index 9851274..60eb227 100644
--- a/README.rst
+++ b/README.rst
@@ -68,7 +68,6 @@ TODO
* web server - backend
- * orchestrate automation executor
* JSON Schema describing messages used in communication between backend and
frontend
* implement functionality provided by internal communication protocol
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py
index 155e888..57c5192 100644
--- a/src_py/hatter/backend.py
+++ b/src_py/hatter/backend.py
@@ -2,34 +2,107 @@ import sqlite3
import datetime
import threading
import logging
+import concurrent.futures
+import asyncio
from hatter import util
+from hatter import executor
util.monkeypatch_sqlite3()
-LogEntry = util.namedtuple(
- 'LogEntry',
- ['timestamp', 'datetime.datetime: timestamp'],
- ['repository', 'str: repository'],
- ['commit', 'str: commit'],
- ['msg', 'str: message'])
+LogEntry = util.namedtuple('LogEntry',
+ ['timestamp', 'datetime.datetime: timestamp'],
+ ['repository', 'str: repository'],
+ ['commit', 'str: commit'],
+ ['msg', 'str: message'])
+
+Job = util.namedtuple('Job',
+ ['id', 'int: id'],
+ ['timestamp', 'datetime.datetime: timestamp'],
+ ['repository', 'str: repository'],
+ ['commit', 'str: commit'])
class Backend:
def __init__(self, db_path):
- pass
+ self._next_job_id = 0
+ self._active = None
+ self._queue = []
+ self._active_change_cbs = util.CallbackRegistry()
+ self._queue_change_cbs = util.CallbackRegistry()
+ self._log_change_cbs = util.CallbackRegistry()
+ self._cv = asyncio.Condition()
+ self._db = _DB(db_path)
+ self._executor = concurrent.futures.ThreadPoolExecutor()
+ self._run_loop_future = asyncio.ensure_future(self._run_loop())
+
+ @property
+ def active(self):
+ self._active
+
+ @property
+ def queue(self):
+ return self._queue
+
+ def register_active_change_cb(self, cb):
+ return self._active_change_cbs.register(cb)
+
+ def register_queue_change_cb(self, cb):
+ return self._queue_change_cbs.register(cb)
+
+ def register_log_change_cb(self, cb):
+ return self._log_change_cbs.register(cb)
async def async_close(self):
- pass
-
- def add_job(self, url, commit):
- pass
+ self._run_loop_future.cancel()
+ await self._run_loop_future
+
+ async def query_log(self, offset, limit):
+ return await asyncio.get_event_loop().run_in_executor(
+ self._executor, self._db.query, offset, limit)
+
+ async def add_job(self, repository, commit):
+ job = Job(id=self._next_job_id,
+ timestamp=datetime.datetime.now(datetime.timezone.utc),
+ repository=repository,
+ commit=commit)
+ self._next_job_id += 1
+ with await self._cv:
+ self._queue.append(job)
+ self._cv.notify_all()
+ self._queue_change_cbs.notify()
+
+ async def _run_loop(self):
+ log = logging.getLogger('hatter.project')
+ while True:
+ with await self._cv:
+ while not self._queue:
+ await self._cv.wait()
+ self._active = self._queue_change_cbs.pop(0)
+ self._queue_change_cbs.notify()
+ self._active_change_cbs.notify()
+
+ handler = _LogHandler(
+ self._db, self._active.repository, self._active.commit)
+ log.addHandler(handler)
+ try:
+ await asyncio.get_event_loop().run_in_executor(
+ self._executor, executor.run,
+ log, self._active.repository, self._active.commit)
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ log.error("%s", e, exc_info=True)
+ finally:
+ log.removeHandler(handler)
+ self._active = None
+ self._active_change_cbs.notify()
-class LogHandler(logging.Handler):
+class _LogHandler(logging.Handler):
def __init__(self, db, repository, commit):
super().__init__()
@@ -46,7 +119,7 @@ class LogHandler(logging.Handler):
msg=record.getMessage())
-class DB:
+class _DB:
def __init__(self, db_path):
db_path.parent.mkdir(exist_ok=True)
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
index 1ca9483..0955e98 100644
--- a/src_py/hatter/server.py
+++ b/src_py/hatter/server.py
@@ -60,7 +60,7 @@ class Client:
pass
-WebhookRequest = util.namedtuple('WebhookRequest', 'url', 'commits')
+_WebhookRequest = util.namedtuple('_WebhookRequest', 'url', 'commits')
def _parse_webhook_request(headers, data):
@@ -72,4 +72,4 @@ def _parse_webhook_request(headers, data):
commits = [commit['id'] for commit in data['commits']]
else:
raise Exception('unsupported webhook event')
- return WebhookRequest(url, commits)
+ return _WebhookRequest(url, commits)
diff --git a/src_py/hatter/util.py b/src_py/hatter/util.py
index 5d23751..a4fff55 100644
--- a/src_py/hatter/util.py
+++ b/src_py/hatter/util.py
@@ -103,3 +103,45 @@ def monkeypatch_sqlite3():
return val
sqlite3.register_converter("timestamp", _sqlite_convert_timestamp)
+
+
+class RegisterCallbackHandle(collections.namedtuple(
+ 'RegisterCallbackHandle', ['cancel'])):
+ """Handle used for canceling callback registration
+
+ Attributes:
+ cancel (Callable[[],None]): cancel registered callback
+
+ """
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.cancel()
+
+
+class CallbackRegistry:
+ """Callback registry"""
+
+ def __init__(self):
+ self._cbs = []
+
+ def register(self, cb):
+ """Register callback
+
+ Args:
+ cb (Callable): callback
+
+ Returns:
+ RegisterCallbackHandle
+
+ """
+ self.cbs.append(cb)
+ return RegisterCallbackHandle(lambda: self.cbs.remove(cb))
+
+ def notify(self, *args, **kwargs):
+ """Notify all registered callbacks"""
+
+ for cb in self._cbs:
+ cb(*args, **kwargs)