1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
import collections
import sys
import contextlib
import asyncio
import datetime
import sqlite3
def namedtuple(name, *props):
"""Create documented namedtuple
Args:
name (Union[str,Tuple[str,str]]):
named tuple's name or named tuple's name with documentation
props (Sequence[Union[str,Tuple[str,str]]]):
named tuple' properties with optional documentation
Returns:
class implementing collections.namedtuple
"""
props = [(i, None) if isinstance(i, str) else i for i in props]
cls = collections.namedtuple(name if isinstance(name, str) else name[0],
[i[0] for i in props])
if not isinstance(name, str) and name[1]:
cls.__doc__ = name[1]
for k, v in props:
if v:
getattr(cls, k).__doc__ = v
try:
cls.__module__ = sys._getframe(1).f_globals.get('__name__', '__main__')
except (AttributeError, ValueError):
pass
return cls
def run_until_complete_without_interrupt(future):
"""Run event loop until future or coroutine is done
Args:
future (Awaitable): future or coroutine
Returns:
Any: provided future's result
KeyboardInterrupt is suppressed (while event loop is running) and is mapped
to single cancelation of running task. If multipple KeyboardInterrupts
occur, task is cancelled only once.
"""
async def ping_loop():
with contextlib.suppress(asyncio.CancelledError):
while True:
await asyncio.sleep(1)
task = asyncio.ensure_future(future)
if sys.platform == 'win32':
ping_loop_task = asyncio.ensure_future(ping_loop())
with contextlib.suppress(KeyboardInterrupt):
asyncio.get_event_loop().run_until_complete(task)
asyncio.get_event_loop().call_soon(task.cancel)
if sys.platform == 'win32':
asyncio.get_event_loop().call_soon(ping_loop_task.cancel)
while not task.done():
with contextlib.suppress(KeyboardInterrupt):
asyncio.get_event_loop().run_until_complete(task)
if sys.platform == 'win32':
while not ping_loop_task.done():
with contextlib.suppress(KeyboardInterrupt):
asyncio.get_event_loop().run_until_complete(ping_loop_task)
return task.result()
def monkeypatch_sqlite3():
"""Monkeypatch sqlite timestamp converter"""
def _sqlite_convert_timestamp(val):
datepart, timetzpart = val.split(b" ")
if b"+" in timetzpart:
tzsign = 1
timepart, tzpart = timetzpart.split(b"+")
elif b"-" in timetzpart:
tzsign = -1
timepart, tzpart = timetzpart.split(b"-")
else:
timepart, tzpart = timetzpart, None
year, month, day = map(int, datepart.split(b"-"))
timepart_full = timepart.split(b".")
hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
if len(timepart_full) == 2:
microseconds = int('{:0<6.6}'.format(timepart_full[1].decode()))
else:
microseconds = 0
if tzpart:
tzhours, tzminutes = map(int, tzpart.split(b":"))
tz = datetime.timezone(
tzsign * datetime.timedelta(hours=tzhours, minutes=tzminutes))
else:
tz = None
val = datetime.datetime(year, month, day, hours, minutes, seconds,
microseconds, tz)
return val
sqlite3.register_converter("timestamp", _sqlite_convert_timestamp)
|