diff options
| author | bozo.kopic <bozo.kopic@gmail.com> | 2017-08-23 15:02:19 +0200 |
|---|---|---|
| committer | bozo.kopic <bozo.kopic@gmail.com> | 2017-08-23 15:02:19 +0200 |
| commit | d736cd1392a56ad5103867c72761cfcb4ccd4f1b (patch) | |
| tree | 9da36c58191b2d88dcdc84e3b45aa8c855d209b4 /src_py/hatter/server.py | |
| parent | 8fc558c5508a0a052f58f2ca2c8131d86a134ca3 (diff) | |
backend
Diffstat (limited to 'src_py/hatter/server.py')
| -rw-r--r-- | src_py/hatter/server.py | 102 |
1 files changed, 92 insertions, 10 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py index 0955e98..8263eea 100644 --- a/src_py/hatter/server.py +++ b/src_py/hatter/server.py @@ -3,6 +3,7 @@ import json import aiohttp.web from hatter import util +import hatter.json_validator async def create_web_server(backend, host, port, webhook_path, web_path): @@ -32,7 +33,7 @@ class WebServer: async def _ws_handler(self, request): ws = aiohttp.web.WebSocketResponse() await ws.prepare(request) - client = Client(self._backend, ws) + client = _Client(self._backend, ws) await client.run() return ws @@ -51,15 +52,6 @@ class WebServer: return aiohttp.web.Response() -class Client: - - def __init__(self, backend, ws): - pass - - async def run(self): - pass - - _WebhookRequest = util.namedtuple('_WebhookRequest', 'url', 'commits') @@ -73,3 +65,93 @@ def _parse_webhook_request(headers, data): else: raise Exception('unsupported webhook event') return _WebhookRequest(url, commits) + + +class _Client: + + def __init__(self, backend, ws): + self._backend = backend + self._ws = ws + self._log_offset = 0 + self._log_limit = 0 + self._log_entries = [] + self._active_job = None + self._job_queue = [] + + async def run(self): + self._active_job = self._backend.active + self._job_queue = list(self._backend.queue) + with self._backend.register_active_change_cb(self._on_active_change): + with self._backend.register_queue_change_cb(self._on_queue_change): + with self._backend.register_log_change_cb(self._on_log_change): + try: + self._send_active_job() + self._send_job_queue() + self._send_log_entries() + while True: + msg = await self._ws.receive() + if self._ws.closed: + break + if msg.type != aiohttp.WSMsgType.TEXT: + continue + json_msg = json.loads(msg.data, encoding='utf-8') + hatter.json_validator.validate(json_msg, 'hatter://message.yaml#/definitions/client_message') # NOQA + await self._process_msg(json_msg) + except Exception as e: + print('>>>', e) + + async def _process_msg(self, msg): + if msg['type'] == 'set_log': + self._log_offset = msg['log_offset'] + self._log_limit = msg['log_limit'] + await self._update_log() + elif msg['type'] == 'add_job': + await self._backend.add_job(msg['repository'], msg['commit']) + + def _on_active_change(self): + if self._active_job != self._backend.active: + self._active_job = self._backend.active + self._send_active_job() + + def _on_queue_change(self): + if self._job_queue != self._backend.job_queue: + self._job_queue = list(self._backend.job_queue) + self._send_job_queue() + + def _on_log_change(self): + asyncio.ensure_future(self._update_log()) + + async def _update_log(self, offset, limit): + log_entries = await self._backend.query_log(offset, limit) + if log_entries != self._log_entries: + self._log_entries = log_entries + self._send_log_entries() + + def _send_active_job(self): + self._ws.send_str(json.dumps({ + 'type': 'active_job', + 'job': _job_to_json(self._active_job)})) + + def _send_job_queue(self): + self._ws.send_str(json.dumps({ + 'type': 'job_queue', + 'jobs': [_job_to_json(i) for i in self._job_queue]})) + + def _send_log_entries(self): + self._ws.send_str(json.dumps({ + 'type': 'log_entries', + 'entries': [_log_entry_to_json(i) for i in self._log_entries]})) + + +def _job_to_json(job): + return {'id': job.id, + 'timestamp': job.timestamp.timestamp(), + 'repository': job.repository, + 'commit': job.commit} + + +def _log_entry_to_json(entry): + return {'timestamp': entry.timestamp.timestamp(), + 'repository': entry.repository, + 'commit': entry.commit, + 'message': entry.msg} |
