aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'src_py/hatter/util.py')
-rw-r--r--src_py/hatter/util.py105
1 files changed, 105 insertions, 0 deletions
diff --git a/src_py/hatter/util.py b/src_py/hatter/util.py
new file mode 100644
index 0000000..5d23751
--- /dev/null
+++ b/src_py/hatter/util.py
@@ -0,0 +1,105 @@
+import collections
+import sys
+import contextlib
+import asyncio
+import datetime
+import sqlite3
+
+
+def namedtuple(name, *props):
+ """Create documented namedtuple
+
+ Args:
+ name (Union[str,Tuple[str,str]]):
+ named tuple's name or named tuple's name with documentation
+ props (Sequence[Union[str,Tuple[str,str]]]):
+ named tuple' properties with optional documentation
+
+ Returns:
+ class implementing collections.namedtuple
+
+ """
+ props = [(i, None) if isinstance(i, str) else i for i in props]
+ cls = collections.namedtuple(name if isinstance(name, str) else name[0],
+ [i[0] for i in props])
+ if not isinstance(name, str) and name[1]:
+ cls.__doc__ = name[1]
+ for k, v in props:
+ if v:
+ getattr(cls, k).__doc__ = v
+ try:
+ cls.__module__ = sys._getframe(1).f_globals.get('__name__', '__main__')
+ except (AttributeError, ValueError):
+ pass
+ return cls
+
+
+def run_until_complete_without_interrupt(future):
+ """Run event loop until future or coroutine is done
+
+ Args:
+ future (Awaitable): future or coroutine
+
+ Returns:
+ Any: provided future's result
+
+ KeyboardInterrupt is suppressed (while event loop is running) and is mapped
+ to single cancelation of running task. If multipple KeyboardInterrupts
+ occur, task is cancelled only once.
+
+ """
+ async def ping_loop():
+ with contextlib.suppress(asyncio.CancelledError):
+ while True:
+ await asyncio.sleep(1)
+
+ task = asyncio.ensure_future(future)
+ if sys.platform == 'win32':
+ ping_loop_task = asyncio.ensure_future(ping_loop())
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.get_event_loop().run_until_complete(task)
+ asyncio.get_event_loop().call_soon(task.cancel)
+ if sys.platform == 'win32':
+ asyncio.get_event_loop().call_soon(ping_loop_task.cancel)
+ while not task.done():
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.get_event_loop().run_until_complete(task)
+ if sys.platform == 'win32':
+ while not ping_loop_task.done():
+ with contextlib.suppress(KeyboardInterrupt):
+ asyncio.get_event_loop().run_until_complete(ping_loop_task)
+ return task.result()
+
+
+def monkeypatch_sqlite3():
+ """Monkeypatch sqlite timestamp converter"""
+
+ def _sqlite_convert_timestamp(val):
+ datepart, timetzpart = val.split(b" ")
+ if b"+" in timetzpart:
+ tzsign = 1
+ timepart, tzpart = timetzpart.split(b"+")
+ elif b"-" in timetzpart:
+ tzsign = -1
+ timepart, tzpart = timetzpart.split(b"-")
+ else:
+ timepart, tzpart = timetzpart, None
+ year, month, day = map(int, datepart.split(b"-"))
+ timepart_full = timepart.split(b".")
+ hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
+ if len(timepart_full) == 2:
+ microseconds = int('{:0<6.6}'.format(timepart_full[1].decode()))
+ else:
+ microseconds = 0
+ if tzpart:
+ tzhours, tzminutes = map(int, tzpart.split(b":"))
+ tz = datetime.timezone(
+ tzsign * datetime.timedelta(hours=tzhours, minutes=tzminutes))
+ else:
+ tz = None
+
+ val = datetime.datetime(year, month, day, hours, minutes, seconds,
+ microseconds, tz)
+ return val
+
+ sqlite3.register_converter("timestamp", _sqlite_convert_timestamp)