aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter
diff options
context:
space:
mode:
Diffstat (limited to 'src_py/hatter')
-rw-r--r--src_py/hatter/backend.py83
-rw-r--r--src_py/hatter/main.py31
-rw-r--r--src_py/hatter/server.py7
-rw-r--r--src_py/hatter/util.py105
4 files changed, 195 insertions, 31 deletions
diff --git a/src_py/hatter/backend.py b/src_py/hatter/backend.py
index afb61a4..155e888 100644
--- a/src_py/hatter/backend.py
+++ b/src_py/hatter/backend.py
@@ -1,3 +1,20 @@
+import sqlite3
+import datetime
+import threading
+import logging
+
+from hatter import util
+
+
+util.monkeypatch_sqlite3()
+
+
+LogEntry = util.namedtuple(
+ 'LogEntry',
+ ['timestamp', 'datetime.datetime: timestamp'],
+ ['repository', 'str: repository'],
+ ['commit', 'str: commit'],
+ ['msg', 'str: message'])
class Backend:
@@ -8,5 +25,69 @@ class Backend:
async def async_close(self):
pass
- def enqueue(self, url, commit):
+ def add_job(self, url, commit):
pass
+
+
+class LogHandler(logging.Handler):
+
+ def __init__(self, db, repository, commit):
+ super().__init__()
+ self._db = db
+ self._repository
+ self._commit = commit
+
+ def emit(self, record):
+ self._db.add(
+ timestamp=datetime.datetime.fromtimestamp(
+ record.created, datetime.timezone.utc),
+ repository=self._repository,
+ commit=self._commit,
+ msg=record.getMessage())
+
+
+class DB:
+
+ def __init__(self, db_path):
+ db_path.parent.mkdir(exist_ok=True)
+ self._db = sqlite3.connect('file:{}?nolock=1'.format(db_path),
+ uri=True,
+ isolation_level=None,
+ detect_types=sqlite3.PARSE_DECLTYPES)
+ self._db.executescript("CREATE TABLE IF NOT EXISTS log ("
+ "timestamp TIMESTAMP, "
+ "repository TEXT, "
+ "commit TEXT, "
+ "msg TEXT)")
+ self._db.commit()
+ self._lock = threading.Lock()
+
+ def close(self):
+ with self._lock:
+ self._db.close()
+
+ def add(self, timestamp, repository, commit, msg):
+ with self._lock:
+ self._db.execute(
+ "INSERT INTO log VALUES "
+ "(:timestamp, :repository, :commit, :msg)",
+ {'timestamp': timestamp,
+ 'repository': repository,
+ 'commit': commit,
+ 'msg': msg})
+
+ def query(self, offset, limit):
+ with self._lock:
+ c = self._db.execute(
+ "SELECT rowid, * FROM log ORDER BY rowid DESC "
+ "LIMIT :limit OFFSET :offset",
+ {'limit': limit, 'offset': offset})
+ try:
+ result = c.fetchall()
+ except Exception as e:
+ result = []
+ return [LogEntry(timestamp=i[1],
+ repository=i[2],
+ commit=i[3],
+ msg=i[4])
+ for i in result]
diff --git a/src_py/hatter/main.py b/src_py/hatter/main.py
index 62ef457..87bc32c 100644
--- a/src_py/hatter/main.py
+++ b/src_py/hatter/main.py
@@ -2,13 +2,14 @@ import sys
import asyncio
import argparse
import pdb
-import contextlib
import yaml
import logging.config
import atexit
import pkg_resources
+import pathlib
import hatter.json_validator
+from hatter import util
from hatter.backend import Backend
from hatter.server import create_web_server
@@ -29,14 +30,14 @@ def main():
atexit.register(pkg_resources.cleanup_resources)
web_path = pkg_resources.resource_filename('hatter', 'web')
- _run_until_complete_without_interrupt(async_main(conf, web_path))
+ util.run_until_complete_without_interrupt(async_main(conf, web_path))
async def async_main(conf, web_path):
backend = None
web_server = None
try:
- backend = Backend(conf.get('db_path', 'hatter.db'))
+ backend = Backend(pathlib.Path(conf.get('db_path', 'hatter.db')))
web_server = await create_web_server(
backend, conf.get('host', '0.0.0.0'), conf.get('port', 24000),
conf.get('webhook_path', '/webhook'), web_path)
@@ -68,29 +69,5 @@ def _create_parser():
return parser
-def _run_until_complete_without_interrupt(future):
- async def ping_loop():
- with contextlib.suppress(asyncio.CancelledError):
- while True:
- await asyncio.sleep(1)
-
- task = asyncio.ensure_future(future)
- if sys.platform == 'win32':
- ping_loop_task = asyncio.ensure_future(ping_loop())
- with contextlib.suppress(KeyboardInterrupt):
- asyncio.get_event_loop().run_until_complete(task)
- asyncio.get_event_loop().call_soon(task.cancel)
- if sys.platform == 'win32':
- asyncio.get_event_loop().call_soon(ping_loop_task.cancel)
- while not task.done():
- with contextlib.suppress(KeyboardInterrupt):
- asyncio.get_event_loop().run_until_complete(task)
- if sys.platform == 'win32':
- while not ping_loop_task.done():
- with contextlib.suppress(KeyboardInterrupt):
- asyncio.get_event_loop().run_until_complete(ping_loop_task)
- return task.result()
-
-
if __name__ == '__main__':
sys.exit(main())
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
index 9e44a0b..1ca9483 100644
--- a/src_py/hatter/server.py
+++ b/src_py/hatter/server.py
@@ -1,8 +1,9 @@
import asyncio
import json
-import collections
import aiohttp.web
+from hatter import util
+
async def create_web_server(backend, host, port, webhook_path, web_path):
srv = WebServer()
@@ -44,7 +45,7 @@ class WebServer:
data = json.loads(body)
req = _parse_webhook_request(request.headers, data)
for commit in req.commits:
- self._backend.enqueue(req.url, commit)
+ self._backend.add_job(req.url, commit)
except Exception:
pass
return aiohttp.web.Response()
@@ -59,7 +60,7 @@ class Client:
pass
-WebhookRequest = collections.namedtuple('WebhookRequest', ['url', 'commits'])
+WebhookRequest = util.namedtuple('WebhookRequest', 'url', 'commits')
def _parse_webhook_request(headers, data):
diff --git a/src_py/hatter/util.py b/src_py/hatter/util.py
new file mode 100644
index 0000000..5d23751
--- /dev/null
+++ b/src_py/hatter/util.py
@@ -0,0 +1,105 @@
+import collections
+import sys
+import contextlib
+import asyncio
+import datetime
+import sqlite3
+
+
+def namedtuple(name, *props):
+ """Create documented namedtuple
+
+ Args:
+ name (Union[str,Tuple[str,str]]):
+ named tuple's name or named tuple's name with documentation
+ props (Sequence[Union[str,Tuple[str,str]]]):
+ named tuple' properties with optional documentation
+
+ Returns:
+ class implementing collections.namedtuple
+
+ """
+ props = [(i, None) if isinstance(i, str) else i for i in props]
+ cls = collections.namedtuple(name if isinstance(name, str) else name[0],
+ [i[0] for i in props])
+ if not isinstance(name, str) and name[1]:
+ cls.__doc__ = name[1]
+ for k, v in props:
+ if v:
+ getattr(cls, k).__doc__ = v
+ try:
+ cls.__module__ = sys._getframe(1).f_globals.get('__name__', '__main__')
+ except (AttributeError, ValueError):
+ pass
+ return cls
+
+
+def run_until_complete_without_interrupt(future):
+ """Run event loop until future or coroutine is done
+
+ Args:
+ future (Awaitable): future or coroutine
+
+ Returns:
+ Any: provided future's result
+
+ KeyboardInterrupt is suppressed (while event loop is running) and is mapped
+ to single cancelation of running task. If multipple KeyboardInterrupts
+ occur, task is cancelled only once.
+
+ """
+ async def ping_loop():
+ with contextlib.suppress(asyncio.CancelledError):
+ while True:
+ await asyncio.sleep(1)
+
+ task = asyncio.ensure_future(future)
+ if sys.platform == 'win32':
+ ping_loop_task = asyncio.ensure_future(ping_loop())
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.get_event_loop().run_until_complete(task)
+ asyncio.get_event_loop().call_soon(task.cancel)
+ if sys.platform == 'win32':
+ asyncio.get_event_loop().call_soon(ping_loop_task.cancel)
+ while not task.done():
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.get_event_loop().run_until_complete(task)
+ if sys.platform == 'win32':
+ while not ping_loop_task.done():
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.get_event_loop().run_until_complete(ping_loop_task)
+ return task.result()
+
+
+def monkeypatch_sqlite3():
+ """Monkeypatch sqlite timestamp converter"""
+
+ def _sqlite_convert_timestamp(val):
+ datepart, timetzpart = val.split(b" ")
+ if b"+" in timetzpart:
+ tzsign = 1
+ timepart, tzpart = timetzpart.split(b"+")
+ elif b"-" in timetzpart:
+ tzsign = -1
+ timepart, tzpart = timetzpart.split(b"-")
+ else:
+ timepart, tzpart = timetzpart, None
+ year, month, day = map(int, datepart.split(b"-"))
+ timepart_full = timepart.split(b".")
+ hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
+ if len(timepart_full) == 2:
+ microseconds = int('{:0<6.6}'.format(timepart_full[1].decode()))
+ else:
+ microseconds = 0
+ if tzpart:
+ tzhours, tzminutes = map(int, tzpart.split(b":"))
+ tz = datetime.timezone(
+ tzsign * datetime.timedelta(hours=tzhours, minutes=tzminutes))
+ else:
+ tz = None
+
+ val = datetime.datetime(year, month, day, hours, minutes, seconds,
+ microseconds, tz)
+ return val
+
+ sqlite3.register_converter("timestamp", _sqlite_convert_timestamp)