aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/server.py
diff options
context:
space:
mode:
authorbozo.kopic <bozo.kopic@gmail.com>2017-08-23 15:02:19 +0200
committerbozo.kopic <bozo.kopic@gmail.com>2017-08-23 15:02:19 +0200
commitd736cd1392a56ad5103867c72761cfcb4ccd4f1b (patch)
tree9da36c58191b2d88dcdc84e3b45aa8c855d209b4 /src_py/hatter/server.py
parent8fc558c5508a0a052f58f2ca2c8131d86a134ca3 (diff)
backend
Diffstat (limited to 'src_py/hatter/server.py')
-rw-r--r--src_py/hatter/server.py102
1 files changed, 92 insertions, 10 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
index 0955e98..8263eea 100644
--- a/src_py/hatter/server.py
+++ b/src_py/hatter/server.py
@@ -3,6 +3,7 @@ import json
import aiohttp.web
from hatter import util
+import hatter.json_validator
async def create_web_server(backend, host, port, webhook_path, web_path):
@@ -32,7 +33,7 @@ class WebServer:
async def _ws_handler(self, request):
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
- client = Client(self._backend, ws)
+ client = _Client(self._backend, ws)
await client.run()
return ws
@@ -51,15 +52,6 @@ class WebServer:
return aiohttp.web.Response()
-class Client:
-
- def __init__(self, backend, ws):
- pass
-
- async def run(self):
- pass
-
-
_WebhookRequest = util.namedtuple('_WebhookRequest', 'url', 'commits')
@@ -73,3 +65,93 @@ def _parse_webhook_request(headers, data):
else:
raise Exception('unsupported webhook event')
return _WebhookRequest(url, commits)
+
+
+class _Client:
+
+ def __init__(self, backend, ws):
+ self._backend = backend
+ self._ws = ws
+ self._log_offset = 0
+ self._log_limit = 0
+ self._log_entries = []
+ self._active_job = None
+ self._job_queue = []
+
+ async def run(self):
+ self._active_job = self._backend.active
+ self._job_queue = list(self._backend.queue)
+ with self._backend.register_active_change_cb(self._on_active_change):
+ with self._backend.register_queue_change_cb(self._on_queue_change):
+ with self._backend.register_log_change_cb(self._on_log_change):
+ try:
+ self._send_active_job()
+ self._send_job_queue()
+ self._send_log_entries()
+ while True:
+ msg = await self._ws.receive()
+ if self._ws.closed:
+ break
+ if msg.type != aiohttp.WSMsgType.TEXT:
+ continue
+ json_msg = json.loads(msg.data, encoding='utf-8')
+ hatter.json_validator.validate(json_msg, 'hatter://message.yaml#/definitions/client_message') # NOQA
+ await self._process_msg(json_msg)
+ except Exception as e:
+ print('>>>', e)
+
+ async def _process_msg(self, msg):
+ if msg['type'] == 'set_log':
+ self._log_offset = msg['log_offset']
+ self._log_limit = msg['log_limit']
+ await self._update_log()
+ elif msg['type'] == 'add_job':
+ await self._backend.add_job(msg['repository'], msg['commit'])
+
+ def _on_active_change(self):
+ if self._active_job != self._backend.active:
+ self._active_job = self._backend.active
+ self._send_active_job()
+
+ def _on_queue_change(self):
+ if self._job_queue != self._backend.job_queue:
+ self._job_queue = list(self._backend.job_queue)
+ self._send_job_queue()
+
+ def _on_log_change(self):
+ asyncio.ensure_future(self._update_log())
+
+ async def _update_log(self, offset, limit):
+ log_entries = await self._backend.query_log(offset, limit)
+ if log_entries != self._log_entries:
+ self._log_entries = log_entries
+ self._send_log_entries()
+
+ def _send_active_job(self):
+ self._ws.send_str(json.dumps({
+ 'type': 'active_job',
+ 'job': _job_to_json(self._active_job)}))
+
+ def _send_job_queue(self):
+ self._ws.send_str(json.dumps({
+ 'type': 'job_queue',
+ 'jobs': [_job_to_json(i) for i in self._job_queue]}))
+
+ def _send_log_entries(self):
+ self._ws.send_str(json.dumps({
+ 'type': 'log_entries',
+ 'entries': [_log_entry_to_json(i) for i in self._log_entries]}))
+
+
+def _job_to_json(job):
+ return {'id': job.id,
+ 'timestamp': job.timestamp.timestamp(),
+ 'repository': job.repository,
+ 'commit': job.commit}
+
+
+def _log_entry_to_json(entry):
+ return {'timestamp': entry.timestamp.timestamp(),
+ 'repository': entry.repository,
+ 'commit': entry.commit,
+ 'message': entry.msg}