aboutsummaryrefslogtreecommitdiff
path: root/src_py
diff options
context:
space:
mode:
Diffstat (limited to 'src_py')
-rw-r--r--src_py/opcut/server.py58
-rw-r--r--src_py/opcut/util.py47
2 files changed, 95 insertions, 10 deletions
diff --git a/src_py/opcut/server.py b/src_py/opcut/server.py
index edd6b91..c67279f 100644
--- a/src_py/opcut/server.py
+++ b/src_py/opcut/server.py
@@ -1,12 +1,22 @@
import ssl
import contextlib
import asyncio
+import functools
+import base64
import aiohttp.web
+from opcut import util
+from opcut import common
+import opcut.json_validator
+import opcut.csp
+import opcut.output
+
async def run(addr, pem_path, ui_path):
+ executor = util.create_async_executor()
+
if addr.scheme == 'https':
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_ctx.load_cert_chain(pem_path)
@@ -16,6 +26,10 @@ async def run(addr, pem_path, ui_path):
app = aiohttp.web.Application()
app.router.add_route('GET', '/',
lambda req: aiohttp.web.HTTPFound('/index.html'))
+ app.router.add_route('POST', '/calculate',
+ functools.partial(_calculate_handler, executor))
+ app.router.add_route('POST', '/generate_output',
+ functools.partial(_generate_output_handler, executor))
app.router.add_static('/', ui_path)
app_handler = app.make_handler()
@@ -28,5 +42,47 @@ async def run(addr, pem_path, ui_path):
srv.close()
await srv.wait_closed()
await app.shutdown()
- await app_handler.finish_connections(0)
+ await app_handler.finish_connections(0.1)
await app.cleanup()
+
+
+async def _calculate_handler(executor, request):
+ try:
+ msg = await request.json()
+ opcut.json_validator.validate(
+ msg, 'opcut://messages.yaml#/definitions/calculate/request')
+ params = common.json_data_to_params(msg['params'])
+ method = common.Method[msg['method']]
+ result = await executor(_ext_calculate, params, method)
+ result_json_data = common.result_to_json_data(result)
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ result_json_data = None
+ return aiohttp.web.json_response({'result': result_json_data})
+
+
+async def _generate_output_handler(executor, request):
+ try:
+ msg = await request.json()
+ opcut.json_validator.validate(
+ msg, 'opcut://messages.yaml#/definitions/generate_output/request')
+ result = common.json_data_to_result(msg['result'])
+ output_type = msg['output_type']
+ output = await executor(_ext_generate_output, result, output_type)
+ output_json_data = base64.b64encode(output).decode('utf-8')
+ except asyncio.CancelledError:
+ raise
+ except Exception:
+ output_json_data = None
+ return aiohttp.web.json_response({'data': output_json_data})
+
+
+def _ext_calculate(params, method):
+ return opcut.csp.calculate(params, method)
+
+
+def _ext_generate_output(result, output_type):
+ if output_type == 'PDF':
+ return opcut.output.generate_pdf(result)
+ raise ValueError('output_type {} not supported'.format(output_type))
diff --git a/src_py/opcut/util.py b/src_py/opcut/util.py
index a13cd14..38aaf41 100644
--- a/src_py/opcut/util.py
+++ b/src_py/opcut/util.py
@@ -2,6 +2,7 @@ import contextlib
import asyncio
import sys
import collections
+import concurrent
def namedtuple(name, *props):
@@ -41,11 +42,12 @@ def namedtuple(name, *props):
return cls
-def run_until_complete_without_interrupt(future):
+def run_until_complete_without_interrupt(future, loop=None):
"""Run event loop until future or coroutine is done
Args:
future (Awaitable): future or coroutine
+ loop (Optional[asyncio.AbstractEventLoop]): asyncio loop
Returns:
Any: provided future's result
@@ -55,24 +57,51 @@ def run_until_complete_without_interrupt(future):
occur, task is cancelled only once.
"""
+ if not loop:
+ loop = asyncio.get_event_loop()
+
async def ping_loop():
with contextlib.suppress(asyncio.CancelledError):
while True:
- await asyncio.sleep(1)
+ await asyncio.sleep(1, loop=loop)
- task = asyncio.ensure_future(future)
+ task = asyncio.ensure_future(future, loop=loop)
if sys.platform == 'win32':
- ping_loop_task = asyncio.ensure_future(ping_loop())
+ ping_loop_task = asyncio.ensure_future(ping_loop(), loop=loop)
with contextlib.suppress(KeyboardInterrupt):
- asyncio.get_event_loop().run_until_complete(task)
- asyncio.get_event_loop().call_soon(task.cancel)
+ loop.run_until_complete(task)
+ loop.call_soon(task.cancel)
if sys.platform == 'win32':
- asyncio.get_event_loop().call_soon(ping_loop_task.cancel)
+ loop.call_soon(ping_loop_task.cancel)
while not task.done():
with contextlib.suppress(KeyboardInterrupt):
- asyncio.get_event_loop().run_until_complete(task)
+ loop.run_until_complete(task)
if sys.platform == 'win32':
while not ping_loop_task.done():
with contextlib.suppress(KeyboardInterrupt):
- asyncio.get_event_loop().run_until_complete(ping_loop_task)
+ loop.run_until_complete(ping_loop_task)
return task.result()
+
+
+def create_async_executor(*args,
+ executor_cls=concurrent.futures.ThreadPoolExecutor,
+ loop=None):
+ """Create run_in_executor wrapper
+
+ Args:
+ args (Any): executor init args
+ executor_cls (Type): executor class
+ loop (Optional[asyncio.AbstractEventLoop]): asyncio loop
+
+ Returns:
+ Callable[[Callable,...],Any]: coroutine accepting function and it's
+ arguments and returning function call result
+
+ """
+ executor = executor_cls(*args)
+
+ async def executor_wrapper(fn, *fn_args):
+ _loop = loop if loop else asyncio.get_event_loop()
+ return await _loop.run_in_executor(executor, fn, *fn_args)
+
+ return executor_wrapper