diff options
Diffstat (limited to 'src_py/hatter/backend.py')
| -rw-r--r-- | src_py/hatter/backend.py | 99 |
1 files changed, 86 insertions, 13 deletions
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py index 155e888..57c5192 100644 --- a/src_py/hatter/backend.py +++ b/src_py/hatter/backend.py @@ -2,34 +2,107 @@ import sqlite3 import datetime import threading import logging +import concurrent.futures +import asyncio from hatter import util +from hatter import executor util.monkeypatch_sqlite3() -LogEntry = util.namedtuple( - 'LogEntry', - ['timestamp', 'datetime.datetime: timestamp'], - ['repository', 'str: repository'], - ['commit', 'str: commit'], - ['msg', 'str: message']) +LogEntry = util.namedtuple('LogEntry', + ['timestamp', 'datetime.datetime: timestamp'], + ['repository', 'str: repository'], + ['commit', 'str: commit'], + ['msg', 'str: message']) + +Job = util.namedtuple('Job', + ['id', 'int: id'], + ['timestamp', 'datetime.datetime: timestamp'], + ['repository', 'str: repository'], + ['commit', 'str: commit']) class Backend: def __init__(self, db_path): - pass + self._next_job_id = 0 + self._active = None + self._queue = [] + self._active_change_cbs = util.CallbackRegistry() + self._queue_change_cbs = util.CallbackRegistry() + self._log_change_cbs = util.CallbackRegistry() + self._cv = asyncio.Condition() + self._db = _DB(db_path) + self._executor = concurrent.futures.ThreadPoolExecutor() + self._run_loop_future = asyncio.ensure_future(self._run_loop()) + + @property + def active(self): + self._active + + @property + def queue(self): + return self._queue + + def register_active_change_cb(self, cb): + return self._active_change_cbs.register(cb) + + def register_queue_change_cb(self, cb): + return self._queue_change_cbs.register(cb) + + def register_log_change_cb(self, cb): + return self._log_change_cbs.register(cb) async def async_close(self): - pass - - def add_job(self, url, commit): - pass + self._run_loop_future.cancel() + await self._run_loop_future + + async def query_log(self, offset, limit): + return await asyncio.get_event_loop().run_in_executor( + self._executor, self._db.query, offset, limit) + + async def add_job(self, repository, commit): + job = Job(id=self._next_job_id, + timestamp=datetime.datetime.now(datetime.timezone.utc), + repository=repository, + commit=commit) + self._next_job_id += 1 + with await self._cv: + self._queue.append(job) + self._cv.notify_all() + self._queue_change_cbs.notify() + + async def _run_loop(self): + log = logging.getLogger('hatter.project') + while True: + with await self._cv: + while not self._queue: + await self._cv.wait() + self._active = self._queue_change_cbs.pop(0) + self._queue_change_cbs.notify() + self._active_change_cbs.notify() + + handler = _LogHandler( + self._db, self._active.repository, self._active.commit) + log.addHandler(handler) + try: + await asyncio.get_event_loop().run_in_executor( + self._executor, executor.run, + log, self._active.repository, self._active.commit) + except asyncio.CancelledError: + break + except Exception as e: + log.error("%s", e, exc_info=True) + finally: + log.removeHandler(handler) + self._active = None + self._active_change_cbs.notify() -class LogHandler(logging.Handler): +class _LogHandler(logging.Handler): def __init__(self, db, repository, commit): super().__init__() @@ -46,7 +119,7 @@ class LogHandler(logging.Handler): msg=record.getMessage()) -class DB: +class _DB: def __init__(self, db_path): db_path.parent.mkdir(exist_ok=True) |
