aboutsummaryrefslogtreecommitdiff
path: root/src_py
diff options
context:
space:
mode:
Diffstat (limited to 'src_py')
-rw-r--r--src_py/restlog/__init__.py0
-rw-r--r--src_py/restlog/__main__.py7
-rw-r--r--src_py/restlog/backend.py138
-rw-r--r--src_py/restlog/clients/__init__.py0
-rw-r--r--src_py/restlog/clients/status/__init__.py0
-rw-r--r--src_py/restlog/clients/status/linux.py73
-rw-r--r--src_py/restlog/common.py14
-rw-r--r--src_py/restlog/main.py65
-rw-r--r--src_py/restlog/server.py97
9 files changed, 394 insertions, 0 deletions
diff --git a/src_py/restlog/__init__.py b/src_py/restlog/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src_py/restlog/__init__.py
diff --git a/src_py/restlog/__main__.py b/src_py/restlog/__main__.py
new file mode 100644
index 0000000..b721830
--- /dev/null
+++ b/src_py/restlog/__main__.py
@@ -0,0 +1,7 @@
+import sys
+
+from restlog.main import main
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src_py/restlog/backend.py b/src_py/restlog/backend.py
new file mode 100644
index 0000000..cc87718
--- /dev/null
+++ b/src_py/restlog/backend.py
@@ -0,0 +1,138 @@
+from pathlib import Path
+import sqlite3
+import typing
+
+from hat import aio
+from hat import json
+
+
+async def create(path: Path,
+ max_results: int
+ ) -> 'Backend':
+ backend = Backend()
+ backend._max_results = max_results
+ backend._async_group = aio.Group()
+ backend._executor = aio.create_executor(1)
+
+ backend._db = await backend._executor(_ext_sqlite_connect, path)
+ backend.async_group.spawn(aio.call_on_cancel, backend._executor,
+ _ext_sqlite_close, backend._db)
+
+ return backend
+
+
+class Backend(aio.Resource):
+
+ @property
+ def async_group(self):
+ return self._async_group
+
+ async def register(self,
+ timestamp: float,
+ address: str,
+ source: str,
+ type: str,
+ data: json.Data
+ ) -> json.Data:
+ params = {'timestamp': timestamp,
+ 'address': address,
+ 'source': source,
+ 'type': type,
+ 'data': json.encode(data)}
+ sql = ("INSERT INTO entries (timestamp, address, source, type, data) "
+ "VALUES (:timestamp, :address, :source, :type, :data)")
+ rowid = await self._executor(_ext_sqlite_execute, self._db, sql,
+ params, False)
+
+ params = {'rowid': rowid}
+ sql = ("SELECT entry_id, timestamp, address, source, type, data "
+ "FROM entries "
+ "WHERE rowid = :rowid")
+ result = await self._executor(_ext_sqlite_execute, self._db, sql,
+ params)
+
+ return _row_to_entry(result[0])
+
+ async def get_entries(self,
+ source: typing.Optional[str] = None,
+ type: typing.Optional[str] = None,
+ last_entry_id: typing.Optional[int] = None,
+ max_results: typing.Optional[int] = None
+ ) -> json.Data:
+ params = {}
+ sql_conditions = []
+
+ if source is not None:
+ params['source'] = source
+ sql_conditions.append('source = :source')
+
+ if type is not None:
+ params['type'] = type
+ sql_conditions.append('type = :type')
+
+ if last_entry_id is not None:
+ params['last_entry_id'] = last_entry_id
+ sql_conditions.append('entry_id <= :last_entry_id')
+
+ if max_results is None or not (0 < max_results < self._max_results):
+ max_results = self._max_results
+ params['max_results'] = max_results
+
+ sql_condition = (f"WHERE {' AND '.join(sql_conditions)}"
+ if sql_conditions else "")
+ sql = (f"SELECT entry_id, timestamp, address, source, type, data "
+ f"FROM entries "
+ f"{sql_condition} "
+ f"ORDER BY entry_id DESC "
+ f"LIMIT :max_results")
+ result = await self._executor(_ext_sqlite_execute, self._db, sql,
+ params)
+
+ return {'entries': [_row_to_entry(i) for i in result[:max_results]],
+ 'more': len(result) > max_results}
+
+ async def get_entry(self, entry_id: int) -> typing.Optional[json.Data]:
+ params = {'entry_id': entry_id}
+ sql = ("SELECT entry_id, timestamp, address, source, type, data "
+ "FROM entries "
+ "WHERE entry_id = :entry_id")
+ result = await self._executor(_ext_sqlite_execute, self._db, sql,
+ params)
+
+ return _row_to_entry(result[0]) if result else None
+
+
+def _row_to_entry(row):
+ return {'entry_id': row[0],
+ 'timestamp': row[1],
+ 'address': row[2],
+ 'source': row[3],
+ 'type': row[4],
+ 'data': json.decode(row[5])}
+
+
+def _ext_sqlite_connect(db_path):
+ db_path.parent.mkdir(exist_ok=True)
+ db = sqlite3.connect(f'file:{db_path}?nolock=1', uri=True,
+ isolation_level=None,
+ detect_types=sqlite3.PARSE_DECLTYPES)
+ db.executescript("""
+ CREATE TABLE IF NOT EXISTS entries (
+ entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ timestamp REAL,
+ address TEXT,
+ source TEXT,
+ type TEXT,
+ data TEXT);
+ """)
+ db.commit()
+ return db
+
+
+def _ext_sqlite_execute(db, sql, parameters, returns=True):
+ c = db.execute(sql, parameters)
+ return c.fetchall() if returns else c.lastrowid
+
+
+def _ext_sqlite_close(db):
+ db.close()
diff --git a/src_py/restlog/clients/__init__.py b/src_py/restlog/clients/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src_py/restlog/clients/__init__.py
diff --git a/src_py/restlog/clients/status/__init__.py b/src_py/restlog/clients/status/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src_py/restlog/clients/status/__init__.py
diff --git a/src_py/restlog/clients/status/linux.py b/src_py/restlog/clients/status/linux.py
new file mode 100644
index 0000000..bfce583
--- /dev/null
+++ b/src_py/restlog/clients/status/linux.py
@@ -0,0 +1,73 @@
+from pathlib import Path
+import asyncio
+import contextlib
+import subprocess
+import sys
+import time
+
+import aiohttp
+import click
+
+from hat import aio
+
+
+@click.command()
+@click.option('--source', required=True)
+@click.argument('addr')
+def main(source: str, addr: str):
+ aio.init_asyncio()
+ with contextlib.suppress(asyncio.CancelledError):
+ aio.run_asyncio(async_main(source, addr))
+
+
+async def async_main(source: str, addr: str):
+ url = f'{addr}/entry/{source}/builtin.status.linux'
+ data = _get_data()
+
+ async with aiohttp.ClientSession() as session:
+ await session.post(url, json=data)
+
+
+def _get_data():
+ data = {'type': sys.platform,
+ 'timestamp': time.time()}
+
+ if sys.platform == 'linux':
+ data['uptime'] = _get_linux_uptime()
+ data['thermal'] = list(_get_linux_thermal())
+ data['disks'] = list(_get_linux_disks())
+
+ return data
+
+
+def _get_linux_uptime():
+ return float(Path('/proc/uptime').read_text().split(maxsplit=1)[0])
+
+
+def _get_linux_thermal():
+ for i in Path('/sys/devices/virtual/thermal').glob('thermal_zone*'):
+ yield {'type': (i / 'type').read_text().strip(),
+ 'temp': int((i / 'temp').read_text()) / 1000}
+
+
+def _get_linux_disks():
+ result = _run_with_output(['df', '-H', '-x', 'tmpfs', '-x', 'devtmpfs',
+ '--output=target,size,used,avail,pcent'])
+ for line in result.split('\n')[1:]:
+ if not line:
+ continue
+ segments = line.split()
+ yield {'name': segments[0],
+ 'size': segments[1],
+ 'used': segments[2],
+ 'available': segments[3],
+ 'percent': segments[4]}
+
+
+def _run_with_output(args):
+ result = subprocess.run(args, capture_output=True, check=True)
+ return result.stdout.decode('utf-8', errors='replace')
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src_py/restlog/common.py b/src_py/restlog/common.py
new file mode 100644
index 0000000..f439e3b
--- /dev/null
+++ b/src_py/restlog/common.py
@@ -0,0 +1,14 @@
+from pathlib import Path
+
+import appdirs
+
+from hat import json
+
+
+package_path: Path = Path(__file__).parent
+
+user_conf_dir: Path = Path(appdirs.user_config_dir('restlog'))
+
+json_schema_repo: json.SchemaRepository = json.SchemaRepository(
+ json.json_schema_repo,
+ json.SchemaRepository.from_json(package_path / 'json_schema_repo.json'))
diff --git a/src_py/restlog/main.py b/src_py/restlog/main.py
new file mode 100644
index 0000000..79427b4
--- /dev/null
+++ b/src_py/restlog/main.py
@@ -0,0 +1,65 @@
+from pathlib import Path
+import asyncio
+import contextlib
+import logging.config
+import sys
+import typing
+
+import click
+
+from hat import aio
+from hat import json
+
+from restlog import common
+import restlog.backend
+import restlog.server
+
+
+@click.command()
+@click.option('--conf', default=None, metavar='PATH', type=Path,
+ help="configuration defined by restlog://main.yaml# "
+ "(default $XDG_CONFIG_HOME/restlog/restlog.{yaml|yml|json})") # NOQA
+def main(conf: typing.Optional[Path]):
+ aio.init_asyncio()
+
+ if not conf:
+ for suffix in ('.yaml', '.yml', '.json'):
+ conf = (common.user_conf_dir / 'restlog').with_suffix(suffix)
+ if conf.exists():
+ break
+ conf = json.decode_file(conf)
+ common.json_schema_repo.validate('restlog://main.yaml#', conf)
+
+ logging.config.dictConfig(conf['log'])
+
+ with contextlib.suppress(asyncio.CancelledError):
+ aio.run_asyncio(async_main(conf))
+
+
+async def async_main(conf: json.Data):
+ async_group = aio.Group()
+
+ try:
+ backend = await restlog.backend.create(path=Path(conf['db_path']),
+ max_results=conf['max_results'])
+ _bind_resource(async_group, backend)
+
+ server = await restlog.server.create(host=conf['host'],
+ port=conf['port'],
+ backend=backend)
+ _bind_resource(async_group, server)
+
+ await async_group.wait_closing()
+
+ finally:
+ await aio.uncancellable(async_group.async_close())
+
+
+def _bind_resource(async_group, resource):
+ async_group.spawn(aio.call_on_cancel, resource.async_close)
+ async_group.spawn(aio.call_on_done, resource.wait_closing(),
+ async_group.close)
+
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/src_py/restlog/server.py b/src_py/restlog/server.py
new file mode 100644
index 0000000..47df4a1
--- /dev/null
+++ b/src_py/restlog/server.py
@@ -0,0 +1,97 @@
+from pathlib import Path
+import time
+
+import aiohttp.web
+import aiohttp_remotes
+
+from hat import aio
+
+from restlog import common
+import restlog.backend
+
+
+static_dir: Path = common.package_path / 'ui'
+
+
+async def create(host: str,
+ port: int,
+ backend: restlog.backend.Backend
+ ) -> 'Server':
+ server = Server()
+ server._backend = backend
+ server._async_group = aio.Group()
+
+ app = aiohttp.web.Application()
+ app.add_routes([
+ aiohttp.web.get('/', server._get_root_handler),
+ aiohttp.web.get('/entries', server._get_entries_handler),
+ aiohttp.web.get('/entry/{entry_id}', server._get_entry_handler),
+ aiohttp.web.post('/entry/{source}/{type}', server._post_entry_handler),
+ aiohttp.web.static('/', static_dir)])
+ await aiohttp_remotes.setup(app, aiohttp_remotes.XForwardedRelaxed())
+
+ runner = aiohttp.web.AppRunner(app)
+ await runner.setup()
+ server.async_group.spawn(aio.call_on_cancel, runner.cleanup)
+
+ try:
+ site = aiohttp.web.TCPSite(runner=runner,
+ host=host,
+ port=port,
+ shutdown_timeout=0.1,
+ reuse_address=True)
+ await site.start()
+
+ except BaseException:
+ await aio.uncancellable(server.async_group.async_close())
+ raise
+
+ return server
+
+
+class Server(aio.Resource):
+
+ @property
+ def async_group(self):
+ return self._async_group
+
+ async def _get_root_handler(self, request):
+ raise aiohttp.web.HTTPFound('/index.html')
+
+ async def _get_entries_handler(self, request):
+ source = request.query.get('source')
+ type = request.query.get('type')
+ last_entry_id = request.query.get('last_entry_id')
+ max_results = request.query.get('max_results')
+
+ last_entry_id = int(last_entry_id) if last_entry_id else None
+ max_results = int(max_results) if max_results else None
+
+ result = await self._backend.get_entries(source=source,
+ type=type,
+ last_entry_id=last_entry_id,
+ max_results=max_results)
+
+ return aiohttp.web.json_response(result)
+
+ async def _get_entry_handler(self, request):
+ entry_id = int(request.match_info['entry_id'])
+ entry = await self._backend.get_entry(entry_id)
+ if entry is None:
+ return aiohttp.web.Response(status=404)
+ return aiohttp.web.json_response(entry)
+
+ async def _post_entry_handler(self, request):
+ timestamp = time.time()
+ address = request.remote
+ source = request.match_info['source']
+ type = request.match_info['type']
+ data = await request.json()
+
+ entry = await self._backend.register(timestamp=timestamp,
+ address=address,
+ source=source,
+ type=type,
+ data=data)
+
+ return aiohttp.web.json_response(entry)