aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/server.py
diff options
context:
space:
mode:
authorbozo.kopic <bozo@kopic.xyz>2022-03-22 01:31:27 +0100
committerbozo.kopic <bozo@kopic.xyz>2022-03-22 01:31:27 +0100
commitcc4ba3b063f14943579ffbfe416828590f70ae0a (patch)
treeaf2127920fb57603206ca670beb63b5d58650fb8 /src_py/hatter/server.py
parentc594b1fca854a7b9fb73d854a9830143cd1032fc (diff)
WIP major rewrite
Diffstat (limited to 'src_py/hatter/server.py')
-rw-r--r--src_py/hatter/server.py163
1 files changed, 0 insertions, 163 deletions
diff --git a/src_py/hatter/server.py b/src_py/hatter/server.py
deleted file mode 100644
index baeffac..0000000
--- a/src_py/hatter/server.py
+++ /dev/null
@@ -1,163 +0,0 @@
-import asyncio
-import json
-import aiohttp.web
-
-from hatter import util
-import hatter.json_validator
-
-
-async def create_web_server(backend, host, port, webhook_path, web_path):
- srv = WebServer()
- srv._backend = backend
- srv._app = aiohttp.web.Application()
- srv._app.router.add_route(
- 'GET', '/', lambda req: aiohttp.web.HTTPFound('/index.html'))
- srv._app.router.add_route('*', '/ws', srv._ws_handler)
- srv._app.router.add_route('POST', webhook_path, srv._webhook_handler)
- srv._app.router.add_static('/', web_path)
- srv._app_handler = srv._app.make_handler()
- srv._srv = await asyncio.get_event_loop().create_server(
- srv._app_handler, host=host, port=port)
- return srv
-
-
-class WebServer:
-
- async def async_close(self):
- self._srv.close()
- await self._srv.wait_closed()
- await self._app.shutdown()
- await self._app_handler.finish_connections(0)
- await self._app.cleanup()
-
- async def _ws_handler(self, request):
- ws = aiohttp.web.WebSocketResponse()
- await ws.prepare(request)
- client = _Client(self._backend, ws)
- await client.run()
- return ws
-
- async def _webhook_handler(self, request):
- try:
- if not ({'X-Gitlab-Event', 'X-GitHub-Event'} &
- set(request.headers.keys())):
- raise Exception('unsupported webhook request')
- body = await request.read()
- data = json.loads(body)
- req = _parse_webhook_request(request.headers, data)
- for commit in req.commits:
- self._backend.add_job(req.url, commit)
- except Exception:
- pass
- return aiohttp.web.Response()
-
-
-_WebhookRequest = util.namedtuple('_WebhookRequest', 'url', 'commits')
-
-
-def _parse_webhook_request(headers, data):
- if headers.get('X-Gitlab-Event') == 'Push Hook':
- url = data['repository']['git_http_url']
- commits = [commit['id'] for commit in data['commits']]
- elif headers.get('X-GitHub-Event') == 'push':
- url = data['repository']['clone_url']
- commits = [commit['id'] for commit in data['commits']]
- else:
- raise Exception('unsupported webhook event')
- return _WebhookRequest(url, commits)
-
-
-class _Client:
-
- def __init__(self, backend, ws):
- self._backend = backend
- self._ws = ws
- self._log_offset = 0
- self._log_limit = 0
- self._log_entries = []
- self._active_job = None
- self._job_queue = []
-
- async def run(self):
- self._active_job = self._backend.active
- self._job_queue = list(self._backend.queue)
- with self._backend.register_active_change_cb(self._on_active_change):
- with self._backend.register_queue_change_cb(self._on_queue_change):
- with self._backend.register_log_change_cb(self._on_log_change):
- try:
- self._send_repositories()
- self._send_active_job()
- self._send_job_queue()
- self._send_log_entries()
- while True:
- msg = await self._ws.receive()
- if self._ws.closed:
- break
- if msg.type != aiohttp.WSMsgType.TEXT:
- continue
- json_msg = json.loads(msg.data, encoding='utf-8')
- hatter.json_validator.validate(json_msg, 'hatter://message.yaml#/definitions/client_message') # NOQA
- await self._process_msg(json_msg)
- except Exception as e:
- print('>>>', e)
-
- async def _process_msg(self, msg):
- if msg['type'] == 'set_log':
- self._log_offset = msg['log_offset']
- self._log_limit = msg['log_limit']
- await self._update_log()
- elif msg['type'] == 'add_job':
- await self._backend.add_job(msg['repository'], msg['commit'])
-
- def _on_active_change(self):
- if self._active_job != self._backend.active:
- self._active_job = self._backend.active
- self._send_active_job()
-
- def _on_queue_change(self):
- if self._job_queue != self._backend.job_queue:
- self._job_queue = list(self._backend.job_queue)
- self._send_job_queue()
-
- def _on_log_change(self):
- asyncio.ensure_future(self._update_log())
-
- async def _update_log(self, offset, limit):
- log_entries = await self._backend.query_log(offset, limit)
- if log_entries != self._log_entries:
- self._log_entries = log_entries
- self._send_log_entries()
-
- def _send_repositories(self):
- self._ws.send_str(json.dumps({
- 'type': 'repositories',
- 'repositories': self._backend.repositories}))
-
- def _send_active_job(self):
- self._ws.send_str(json.dumps({
- 'type': 'active_job',
- 'job': _job_to_json(self._active_job)}))
-
- def _send_job_queue(self):
- self._ws.send_str(json.dumps({
- 'type': 'job_queue',
- 'jobs': [_job_to_json(i) for i in self._job_queue]}))
-
- def _send_log_entries(self):
- self._ws.send_str(json.dumps({
- 'type': 'log_entries',
- 'entries': [_log_entry_to_json(i) for i in self._log_entries]}))
-
-
-def _job_to_json(job):
- return {'id': job.id,
- 'timestamp': job.timestamp.timestamp(),
- 'repository': job.repository,
- 'commit': job.commit}
-
-
-def _log_entry_to_json(entry):
- return {'timestamp': entry.timestamp.timestamp(),
- 'repository': entry.repository,
- 'commit': entry.commit,
- 'message': entry.msg}