diff options
Diffstat (limited to 'src_py/hatter/util.py')
| -rw-r--r-- | src_py/hatter/util.py | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/src_py/hatter/util.py b/src_py/hatter/util.py new file mode 100644 index 0000000..5d23751 --- /dev/null +++ b/src_py/hatter/util.py @@ -0,0 +1,105 @@ +import collections +import sys +import contextlib +import asyncio +import datetime +import sqlite3 + + +def namedtuple(name, *props): + """Create documented namedtuple + + Args: + name (Union[str,Tuple[str,str]]): + named tuple's name or named tuple's name with documentation + props (Sequence[Union[str,Tuple[str,str]]]): + named tuple' properties with optional documentation + + Returns: + class implementing collections.namedtuple + + """ + props = [(i, None) if isinstance(i, str) else i for i in props] + cls = collections.namedtuple(name if isinstance(name, str) else name[0], + [i[0] for i in props]) + if not isinstance(name, str) and name[1]: + cls.__doc__ = name[1] + for k, v in props: + if v: + getattr(cls, k).__doc__ = v + try: + cls.__module__ = sys._getframe(1).f_globals.get('__name__', '__main__') + except (AttributeError, ValueError): + pass + return cls + + +def run_until_complete_without_interrupt(future): + """Run event loop until future or coroutine is done + + Args: + future (Awaitable): future or coroutine + + Returns: + Any: provided future's result + + KeyboardInterrupt is suppressed (while event loop is running) and is mapped + to single cancelation of running task. If multipple KeyboardInterrupts + occur, task is cancelled only once. + + """ + async def ping_loop(): + with contextlib.suppress(asyncio.CancelledError): + while True: + await asyncio.sleep(1) + + task = asyncio.ensure_future(future) + if sys.platform == 'win32': + ping_loop_task = asyncio.ensure_future(ping_loop()) + with contextlib.suppress(KeyboardInterrupt): + asyncio.get_event_loop().run_until_complete(task) + asyncio.get_event_loop().call_soon(task.cancel) + if sys.platform == 'win32': + asyncio.get_event_loop().call_soon(ping_loop_task.cancel) + while not task.done(): + with contextlib.suppress(KeyboardInterrupt): + asyncio.get_event_loop().run_until_complete(task) + if sys.platform == 'win32': + while not ping_loop_task.done(): + with contextlib.suppress(KeyboardInterrupt): + asyncio.get_event_loop().run_until_complete(ping_loop_task) + return task.result() + + +def monkeypatch_sqlite3(): + """Monkeypatch sqlite timestamp converter""" + + def _sqlite_convert_timestamp(val): + datepart, timetzpart = val.split(b" ") + if b"+" in timetzpart: + tzsign = 1 + timepart, tzpart = timetzpart.split(b"+") + elif b"-" in timetzpart: + tzsign = -1 + timepart, tzpart = timetzpart.split(b"-") + else: + timepart, tzpart = timetzpart, None + year, month, day = map(int, datepart.split(b"-")) + timepart_full = timepart.split(b".") + hours, minutes, seconds = map(int, timepart_full[0].split(b":")) + if len(timepart_full) == 2: + microseconds = int('{:0<6.6}'.format(timepart_full[1].decode())) + else: + microseconds = 0 + if tzpart: + tzhours, tzminutes = map(int, tzpart.split(b":")) + tz = datetime.timezone( + tzsign * datetime.timedelta(hours=tzhours, minutes=tzminutes)) + else: + tz = None + + val = datetime.datetime(year, month, day, hours, minutes, seconds, + microseconds, tz) + return val + + sqlite3.register_converter("timestamp", _sqlite_convert_timestamp) |
