aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/backend.py
diff options
context:
space:
mode:
authorbozo.kopic <bozo@kopic.xyz>2022-03-22 01:31:27 +0100
committerbozo.kopic <bozo@kopic.xyz>2022-03-22 01:31:27 +0100
commitcc4ba3b063f14943579ffbfe416828590f70ae0a (patch)
treeaf2127920fb57603206ca670beb63b5d58650fb8 /src_py/hatter/backend.py
parentc594b1fca854a7b9fb73d854a9830143cd1032fc (diff)
WIP major rewrite
Diffstat (limited to 'src_py/hatter/backend.py')
-rw-r--r--src_py/hatter/backend.py171
1 files changed, 0 insertions, 171 deletions
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py
deleted file mode 100644
index 8b78570..0000000
--- a/src_py/hatter/backend.py
+++ /dev/null
@@ -1,171 +0,0 @@
-import sqlite3
-import datetime
-import threading
-import logging
-import concurrent.futures
-import asyncio
-
-from hatter import util
-from hatter import executor
-
-
-util.monkeypatch_sqlite3()
-
-
-LogEntry = util.namedtuple('LogEntry',
- ['timestamp', 'datetime.datetime: timestamp'],
- ['repository', 'str: repository'],
- ['commit', 'str: commit'],
- ['msg', 'str: message'])
-
-Job = util.namedtuple('Job',
- ['id', 'int: id'],
- ['timestamp', 'datetime.datetime: timestamp'],
- ['repository', 'str: repository'],
- ['commit', 'str: commit'])
-
-
-class Backend:
-
- def __init__(self, db_path, repositories):
- self._repositories = repositories
- self._next_job_id = 0
- self._active = None
- self._queue = []
- self._active_change_cbs = util.CallbackRegistry()
- self._queue_change_cbs = util.CallbackRegistry()
- self._log_change_cbs = util.CallbackRegistry()
- self._cv = asyncio.Condition()
- self._db = _DB(db_path)
- self._executor = concurrent.futures.ThreadPoolExecutor()
- self._run_loop_future = asyncio.ensure_future(self._run_loop())
-
- @property
- def repositories(self):
- return self._repositories
-
- @property
- def active(self):
- return self._active
-
- @property
- def queue(self):
- return self._queue
-
- def register_active_change_cb(self, cb):
- return self._active_change_cbs.register(cb)
-
- def register_queue_change_cb(self, cb):
- return self._queue_change_cbs.register(cb)
-
- def register_log_change_cb(self, cb):
- return self._log_change_cbs.register(cb)
-
- async def async_close(self):
- self._run_loop_future.cancel()
- await self._run_loop_future
-
- async def query_log(self, offset, limit):
- return await asyncio.get_event_loop().run_in_executor(
- self._executor, self._db.query, offset, limit)
-
- async def add_job(self, repository, commit):
- job = Job(id=self._next_job_id,
- timestamp=datetime.datetime.now(datetime.timezone.utc),
- repository=repository,
- commit=commit)
- self._next_job_id += 1
- with await self._cv:
- self._queue.append(job)
- self._cv.notify_all()
- self._queue_change_cbs.notify()
-
- async def _run_loop(self):
- log = logging.getLogger('hatter.project')
- while True:
- with await self._cv:
- while not self._queue:
- await self._cv.wait()
- self._active = self._queue_change_cbs.pop(0)
- self._queue_change_cbs.notify()
- self._active_change_cbs.notify()
-
- handler = _LogHandler(
- self._db, self._active.repository, self._active.commit)
- log.addHandler(handler)
- try:
- await asyncio.get_event_loop().run_in_executor(
- self._executor, executor.run,
- log, self._active.repository, self._active.commit)
- except asyncio.CancelledError:
- break
- except Exception as e:
- log.error("%s", e, exc_info=True)
- finally:
- log.removeHandler(handler)
- self._active = None
- self._active_change_cbs.notify()
-
-
-class _LogHandler(logging.Handler):
-
- def __init__(self, db, repository, commit):
- super().__init__()
- self._db = db
- self._repository
- self._commit = commit
-
- def emit(self, record):
- self._db.add(
- timestamp=datetime.datetime.fromtimestamp(
- record.created, datetime.timezone.utc),
- repository=self._repository,
- commit=self._commit,
- msg=record.getMessage())
-
-
-class _DB:
-
- def __init__(self, db_path):
- db_path.parent.mkdir(exist_ok=True)
- self._db = sqlite3.connect('file:{}?nolock=1'.format(db_path),
- uri=True,
- isolation_level=None,
- detect_types=sqlite3.PARSE_DECLTYPES)
- self._db.executescript("CREATE TABLE IF NOT EXISTS log ("
- "timestamp TIMESTAMP, "
- "repository TEXT, "
- "commit TEXT, "
- "msg TEXT)")
- self._db.commit()
- self._lock = threading.Lock()
-
- def close(self):
- with self._lock:
- self._db.close()
-
- def add(self, timestamp, repository, commit, msg):
- with self._lock:
- self._db.execute(
- "INSERT INTO log VALUES "
- "(:timestamp, :repository, :commit, :msg)",
- {'timestamp': timestamp,
- 'repository': repository,
- 'commit': commit,
- 'msg': msg})
-
- def query(self, offset, limit):
- with self._lock:
- c = self._db.execute(
- "SELECT rowid, * FROM log ORDER BY rowid DESC "
- "LIMIT :limit OFFSET :offset",
- {'limit': limit, 'offset': offset})
- try:
- result = c.fetchall()
- except Exception as e:
- result = []
- return [LogEntry(timestamp=i[1],
- repository=i[2],
- commit=i[3],
- msg=i[4])
- for i in result]