diff options
| author | bozo.kopic <bozo@kopic.xyz> | 2021-07-28 01:43:55 +0200 |
|---|---|---|
| committer | bozo.kopic <bozo@kopic.xyz> | 2021-07-29 00:01:57 +0200 |
| commit | 1e874e790c12839695761a654b44fb427149a353 (patch) | |
| tree | 6942441ac511ec1417b2434b111101fa8d7f7e68 /src_py | |
init
Diffstat (limited to 'src_py')
| -rw-r--r-- | src_py/restlog/__init__.py | 0 | ||||
| -rw-r--r-- | src_py/restlog/__main__.py | 7 | ||||
| -rw-r--r-- | src_py/restlog/backend.py | 138 | ||||
| -rw-r--r-- | src_py/restlog/clients/__init__.py | 0 | ||||
| -rw-r--r-- | src_py/restlog/clients/status/__init__.py | 0 | ||||
| -rw-r--r-- | src_py/restlog/clients/status/linux.py | 73 | ||||
| -rw-r--r-- | src_py/restlog/common.py | 14 | ||||
| -rw-r--r-- | src_py/restlog/main.py | 65 | ||||
| -rw-r--r-- | src_py/restlog/server.py | 97 |
9 files changed, 394 insertions, 0 deletions
diff --git a/src_py/restlog/__init__.py b/src_py/restlog/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src_py/restlog/__init__.py diff --git a/src_py/restlog/__main__.py b/src_py/restlog/__main__.py new file mode 100644 index 0000000..b721830 --- /dev/null +++ b/src_py/restlog/__main__.py @@ -0,0 +1,7 @@ +import sys + +from restlog.main import main + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src_py/restlog/backend.py b/src_py/restlog/backend.py new file mode 100644 index 0000000..cc87718 --- /dev/null +++ b/src_py/restlog/backend.py @@ -0,0 +1,138 @@ +from pathlib import Path +import sqlite3 +import typing + +from hat import aio +from hat import json + + +async def create(path: Path, + max_results: int + ) -> 'Backend': + backend = Backend() + backend._max_results = max_results + backend._async_group = aio.Group() + backend._executor = aio.create_executor(1) + + backend._db = await backend._executor(_ext_sqlite_connect, path) + backend.async_group.spawn(aio.call_on_cancel, backend._executor, + _ext_sqlite_close, backend._db) + + return backend + + +class Backend(aio.Resource): + + @property + def async_group(self): + return self._async_group + + async def register(self, + timestamp: float, + address: str, + source: str, + type: str, + data: json.Data + ) -> json.Data: + params = {'timestamp': timestamp, + 'address': address, + 'source': source, + 'type': type, + 'data': json.encode(data)} + sql = ("INSERT INTO entries (timestamp, address, source, type, data) " + "VALUES (:timestamp, :address, :source, :type, :data)") + rowid = await self._executor(_ext_sqlite_execute, self._db, sql, + params, False) + + params = {'rowid': rowid} + sql = ("SELECT entry_id, timestamp, address, source, type, data " + "FROM entries " + "WHERE rowid = :rowid") + result = await self._executor(_ext_sqlite_execute, self._db, sql, + params) + + return _row_to_entry(result[0]) + + async def get_entries(self, + source: typing.Optional[str] = None, + type: typing.Optional[str] = None, + last_entry_id: typing.Optional[int] = None, + max_results: typing.Optional[int] = None + ) -> json.Data: + params = {} + sql_conditions = [] + + if source is not None: + params['source'] = source + sql_conditions.append('source = :source') + + if type is not None: + params['type'] = type + sql_conditions.append('type = :type') + + if last_entry_id is not None: + params['last_entry_id'] = last_entry_id + sql_conditions.append('entry_id <= :last_entry_id') + + if max_results is None or not (0 < max_results < self._max_results): + max_results = self._max_results + params['max_results'] = max_results + + sql_condition = (f"WHERE {' AND '.join(sql_conditions)}" + if sql_conditions else "") + sql = (f"SELECT entry_id, timestamp, address, source, type, data " + f"FROM entries " + f"{sql_condition} " + f"ORDER BY entry_id DESC " + f"LIMIT :max_results") + result = await self._executor(_ext_sqlite_execute, self._db, sql, + params) + + return {'entries': [_row_to_entry(i) for i in result[:max_results]], + 'more': len(result) > max_results} + + async def get_entry(self, entry_id: int) -> typing.Optional[json.Data]: + params = {'entry_id': entry_id} + sql = ("SELECT entry_id, timestamp, address, source, type, data " + "FROM entries " + "WHERE entry_id = :entry_id") + result = await self._executor(_ext_sqlite_execute, self._db, sql, + params) + + return _row_to_entry(result[0]) if result else None + + +def _row_to_entry(row): + return {'entry_id': row[0], + 'timestamp': row[1], + 'address': row[2], + 'source': row[3], + 'type': row[4], + 'data': json.decode(row[5])} + + +def _ext_sqlite_connect(db_path): + db_path.parent.mkdir(exist_ok=True) + db = sqlite3.connect(f'file:{db_path}?nolock=1', uri=True, + isolation_level=None, + detect_types=sqlite3.PARSE_DECLTYPES) + db.executescript(""" + CREATE TABLE IF NOT EXISTS entries ( + entry_id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL, + address TEXT, + source TEXT, + type TEXT, + data TEXT); + """) + db.commit() + return db + + +def _ext_sqlite_execute(db, sql, parameters, returns=True): + c = db.execute(sql, parameters) + return c.fetchall() if returns else c.lastrowid + + +def _ext_sqlite_close(db): + db.close() diff --git a/src_py/restlog/clients/__init__.py b/src_py/restlog/clients/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src_py/restlog/clients/__init__.py diff --git a/src_py/restlog/clients/status/__init__.py b/src_py/restlog/clients/status/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src_py/restlog/clients/status/__init__.py diff --git a/src_py/restlog/clients/status/linux.py b/src_py/restlog/clients/status/linux.py new file mode 100644 index 0000000..bfce583 --- /dev/null +++ b/src_py/restlog/clients/status/linux.py @@ -0,0 +1,73 @@ +from pathlib import Path +import asyncio +import contextlib +import subprocess +import sys +import time + +import aiohttp +import click + +from hat import aio + + +@click.command() +@click.option('--source', required=True) +@click.argument('addr') +def main(source: str, addr: str): + aio.init_asyncio() + with contextlib.suppress(asyncio.CancelledError): + aio.run_asyncio(async_main(source, addr)) + + +async def async_main(source: str, addr: str): + url = f'{addr}/entry/{source}/builtin.status.linux' + data = _get_data() + + async with aiohttp.ClientSession() as session: + await session.post(url, json=data) + + +def _get_data(): + data = {'type': sys.platform, + 'timestamp': time.time()} + + if sys.platform == 'linux': + data['uptime'] = _get_linux_uptime() + data['thermal'] = list(_get_linux_thermal()) + data['disks'] = list(_get_linux_disks()) + + return data + + +def _get_linux_uptime(): + return float(Path('/proc/uptime').read_text().split(maxsplit=1)[0]) + + +def _get_linux_thermal(): + for i in Path('/sys/devices/virtual/thermal').glob('thermal_zone*'): + yield {'type': (i / 'type').read_text().strip(), + 'temp': int((i / 'temp').read_text()) / 1000} + + +def _get_linux_disks(): + result = _run_with_output(['df', '-H', '-x', 'tmpfs', '-x', 'devtmpfs', + '--output=target,size,used,avail,pcent']) + for line in result.split('\n')[1:]: + if not line: + continue + segments = line.split() + yield {'name': segments[0], + 'size': segments[1], + 'used': segments[2], + 'available': segments[3], + 'percent': segments[4]} + + +def _run_with_output(args): + result = subprocess.run(args, capture_output=True, check=True) + return result.stdout.decode('utf-8', errors='replace') + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src_py/restlog/common.py b/src_py/restlog/common.py new file mode 100644 index 0000000..f439e3b --- /dev/null +++ b/src_py/restlog/common.py @@ -0,0 +1,14 @@ +from pathlib import Path + +import appdirs + +from hat import json + + +package_path: Path = Path(__file__).parent + +user_conf_dir: Path = Path(appdirs.user_config_dir('restlog')) + +json_schema_repo: json.SchemaRepository = json.SchemaRepository( + json.json_schema_repo, + json.SchemaRepository.from_json(package_path / 'json_schema_repo.json')) diff --git a/src_py/restlog/main.py b/src_py/restlog/main.py new file mode 100644 index 0000000..79427b4 --- /dev/null +++ b/src_py/restlog/main.py @@ -0,0 +1,65 @@ +from pathlib import Path +import asyncio +import contextlib +import logging.config +import sys +import typing + +import click + +from hat import aio +from hat import json + +from restlog import common +import restlog.backend +import restlog.server + + +@click.command() +@click.option('--conf', default=None, metavar='PATH', type=Path, + help="configuration defined by restlog://main.yaml# " + "(default $XDG_CONFIG_HOME/restlog/restlog.{yaml|yml|json})") # NOQA +def main(conf: typing.Optional[Path]): + aio.init_asyncio() + + if not conf: + for suffix in ('.yaml', '.yml', '.json'): + conf = (common.user_conf_dir / 'restlog').with_suffix(suffix) + if conf.exists(): + break + conf = json.decode_file(conf) + common.json_schema_repo.validate('restlog://main.yaml#', conf) + + logging.config.dictConfig(conf['log']) + + with contextlib.suppress(asyncio.CancelledError): + aio.run_asyncio(async_main(conf)) + + +async def async_main(conf: json.Data): + async_group = aio.Group() + + try: + backend = await restlog.backend.create(path=Path(conf['db_path']), + max_results=conf['max_results']) + _bind_resource(async_group, backend) + + server = await restlog.server.create(host=conf['host'], + port=conf['port'], + backend=backend) + _bind_resource(async_group, server) + + await async_group.wait_closing() + + finally: + await aio.uncancellable(async_group.async_close()) + + +def _bind_resource(async_group, resource): + async_group.spawn(aio.call_on_cancel, resource.async_close) + async_group.spawn(aio.call_on_done, resource.wait_closing(), + async_group.close) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src_py/restlog/server.py b/src_py/restlog/server.py new file mode 100644 index 0000000..47df4a1 --- /dev/null +++ b/src_py/restlog/server.py @@ -0,0 +1,97 @@ +from pathlib import Path +import time + +import aiohttp.web +import aiohttp_remotes + +from hat import aio + +from restlog import common +import restlog.backend + + +static_dir: Path = common.package_path / 'ui' + + +async def create(host: str, + port: int, + backend: restlog.backend.Backend + ) -> 'Server': + server = Server() + server._backend = backend + server._async_group = aio.Group() + + app = aiohttp.web.Application() + app.add_routes([ + aiohttp.web.get('/', server._get_root_handler), + aiohttp.web.get('/entries', server._get_entries_handler), + aiohttp.web.get('/entry/{entry_id}', server._get_entry_handler), + aiohttp.web.post('/entry/{source}/{type}', server._post_entry_handler), + aiohttp.web.static('/', static_dir)]) + await aiohttp_remotes.setup(app, aiohttp_remotes.XForwardedRelaxed()) + + runner = aiohttp.web.AppRunner(app) + await runner.setup() + server.async_group.spawn(aio.call_on_cancel, runner.cleanup) + + try: + site = aiohttp.web.TCPSite(runner=runner, + host=host, + port=port, + shutdown_timeout=0.1, + reuse_address=True) + await site.start() + + except BaseException: + await aio.uncancellable(server.async_group.async_close()) + raise + + return server + + +class Server(aio.Resource): + + @property + def async_group(self): + return self._async_group + + async def _get_root_handler(self, request): + raise aiohttp.web.HTTPFound('/index.html') + + async def _get_entries_handler(self, request): + source = request.query.get('source') + type = request.query.get('type') + last_entry_id = request.query.get('last_entry_id') + max_results = request.query.get('max_results') + + last_entry_id = int(last_entry_id) if last_entry_id else None + max_results = int(max_results) if max_results else None + + result = await self._backend.get_entries(source=source, + type=type, + last_entry_id=last_entry_id, + max_results=max_results) + + return aiohttp.web.json_response(result) + + async def _get_entry_handler(self, request): + entry_id = int(request.match_info['entry_id']) + entry = await self._backend.get_entry(entry_id) + if entry is None: + return aiohttp.web.Response(status=404) + return aiohttp.web.json_response(entry) + + async def _post_entry_handler(self, request): + timestamp = time.time() + address = request.remote + source = request.match_info['source'] + type = request.match_info['type'] + data = await request.json() + + entry = await self._backend.register(timestamp=timestamp, + address=address, + source=source, + type=type, + data=data) + + return aiohttp.web.json_response(entry) |
