diff options
Diffstat (limited to 'src_py')
| -rw-r--r-- | src_py/hatter/backend.py | 99 | ||||
| -rw-r--r-- | src_py/hatter/server.py | 4 | ||||
| -rw-r--r-- | src_py/hatter/util.py | 42 |
3 files changed, 130 insertions, 15 deletions
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py index 155e888..57c5192 100644 --- a/src_py/hatter/backend.py +++ b/src_py/hatter/backend.py @@ -2,34 +2,107 @@ import sqlite3 import datetime import threading import logging +import concurrent.futures +import asyncio from hatter import util +from hatter import executor util.monkeypatch_sqlite3() -LogEntry = util.namedtuple( - 'LogEntry', - ['timestamp', 'datetime.datetime: timestamp'], - ['repository', 'str: repository'], - ['commit', 'str: commit'], - ['msg', 'str: message']) +LogEntry = util.namedtuple('LogEntry', + ['timestamp', 'datetime.datetime: timestamp'], + ['repository', 'str: repository'], + ['commit', 'str: commit'], + ['msg', 'str: message']) + +Job = util.namedtuple('Job', + ['id', 'int: id'], + ['timestamp', 'datetime.datetime: timestamp'], + ['repository', 'str: repository'], + ['commit', 'str: commit']) class Backend: def __init__(self, db_path): - pass + self._next_job_id = 0 + self._active = None + self._queue = [] + self._active_change_cbs = util.CallbackRegistry() + self._queue_change_cbs = util.CallbackRegistry() + self._log_change_cbs = util.CallbackRegistry() + self._cv = asyncio.Condition() + self._db = _DB(db_path) + self._executor = concurrent.futures.ThreadPoolExecutor() + self._run_loop_future = asyncio.ensure_future(self._run_loop()) + + @property + def active(self): + self._active + + @property + def queue(self): + return self._queue + + def register_active_change_cb(self, cb): + return self._active_change_cbs.register(cb) + + def register_queue_change_cb(self, cb): + return self._queue_change_cbs.register(cb) + + def register_log_change_cb(self, cb): + return self._log_change_cbs.register(cb) async def async_close(self): - pass - - def add_job(self, url, commit): - pass + self._run_loop_future.cancel() + await self._run_loop_future + + async def query_log(self, offset, limit): + return await asyncio.get_event_loop().run_in_executor( + self._executor, self._db.query, offset, limit) + + async def add_job(self, repository, commit): + job = Job(id=self._next_job_id, + timestamp=datetime.datetime.now(datetime.timezone.utc), + repository=repository, + commit=commit) + self._next_job_id += 1 + with await self._cv: + self._queue.append(job) + self._cv.notify_all() + self._queue_change_cbs.notify() + + async def _run_loop(self): + log = logging.getLogger('hatter.project') + while True: + with await self._cv: + while not self._queue: + await self._cv.wait() + self._active = self._queue_change_cbs.pop(0) + self._queue_change_cbs.notify() + self._active_change_cbs.notify() + + handler = _LogHandler( + self._db, self._active.repository, self._active.commit) + log.addHandler(handler) + try: + await asyncio.get_event_loop().run_in_executor( + self._executor, executor.run, + log, self._active.repository, self._active.commit) + except asyncio.CancelledError: + break + except Exception as e: + log.error("%s", e, exc_info=True) + finally: + log.removeHandler(handler) + self._active = None + self._active_change_cbs.notify() -class LogHandler(logging.Handler): +class _LogHandler(logging.Handler): def __init__(self, db, repository, commit): super().__init__() @@ -46,7 +119,7 @@ class LogHandler(logging.Handler): msg=record.getMessage()) -class DB: +class _DB: def __init__(self, db_path): db_path.parent.mkdir(exist_ok=True) diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py index 1ca9483..0955e98 100644 --- a/src_py/hatter/server.py +++ b/src_py/hatter/server.py @@ -60,7 +60,7 @@ class Client: pass -WebhookRequest = util.namedtuple('WebhookRequest', 'url', 'commits') +_WebhookRequest = util.namedtuple('_WebhookRequest', 'url', 'commits') def _parse_webhook_request(headers, data): @@ -72,4 +72,4 @@ def _parse_webhook_request(headers, data): commits = [commit['id'] for commit in data['commits']] else: raise Exception('unsupported webhook event') - return WebhookRequest(url, commits) + return _WebhookRequest(url, commits) diff --git a/src_py/hatter/util.py b/src_py/hatter/util.py index 5d23751..a4fff55 100644 --- a/src_py/hatter/util.py +++ b/src_py/hatter/util.py @@ -103,3 +103,45 @@ def monkeypatch_sqlite3(): return val sqlite3.register_converter("timestamp", _sqlite_convert_timestamp) + + +class RegisterCallbackHandle(collections.namedtuple( + 'RegisterCallbackHandle', ['cancel'])): + """Handle used for canceling callback registration + + Attributes: + cancel (Callable[[],None]): cancel registered callback + + """ + + def __enter__(self): + return self + + def __exit__(self, *args): + self.cancel() + + +class CallbackRegistry: + """Callback registry""" + + def __init__(self): + self._cbs = [] + + def register(self, cb): + """Register callback + + Args: + cb (Callable): callback + + Returns: + RegisterCallbackHandle + + """ + self.cbs.append(cb) + return RegisterCallbackHandle(lambda: self.cbs.remove(cb)) + + def notify(self, *args, **kwargs): + """Notify all registered callbacks""" + + for cb in self._cbs: + cb(*args, **kwargs) |
