aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/util.py
blob: 5d23751a6174cfda48f653cbc34cae5f6e17c07e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import collections
import sys
import contextlib
import asyncio
import datetime
import sqlite3


def namedtuple(name, *props):
    """Create documented namedtuple

    Args:
        name (Union[str,Tuple[str,str]]):
            named tuple's name or named tuple's name with documentation
        props (Sequence[Union[str,Tuple[str,str]]]):
            named tuple' properties with optional documentation

    Returns:
        class implementing collections.namedtuple

    """
    props = [(i, None) if isinstance(i, str) else i for i in props]
    cls = collections.namedtuple(name if isinstance(name, str) else name[0],
                                 [i[0] for i in props])
    if not isinstance(name, str) and name[1]:
        cls.__doc__ = name[1]
    for k, v in props:
        if v:
            getattr(cls, k).__doc__ = v
    try:
        cls.__module__ = sys._getframe(1).f_globals.get('__name__', '__main__')
    except (AttributeError, ValueError):
        pass
    return cls


def run_until_complete_without_interrupt(future):
    """Run event loop until future or coroutine is done

    Args:
        future (Awaitable): future or coroutine

    Returns:
        Any: provided future's result

    KeyboardInterrupt is suppressed (while event loop is running) and is mapped
    to single cancelation of running task. If multipple KeyboardInterrupts
    occur, task is cancelled only once.

    """
    async def ping_loop():
        with contextlib.suppress(asyncio.CancelledError):
            while True:
                await asyncio.sleep(1)

    task = asyncio.ensure_future(future)
    if sys.platform == 'win32':
        ping_loop_task = asyncio.ensure_future(ping_loop())
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.get_event_loop().run_until_complete(task)
    asyncio.get_event_loop().call_soon(task.cancel)
    if sys.platform == 'win32':
        asyncio.get_event_loop().call_soon(ping_loop_task.cancel)
    while not task.done():
        with contextlib.suppress(KeyboardInterrupt):
            asyncio.get_event_loop().run_until_complete(task)
    if sys.platform == 'win32':
        while not ping_loop_task.done():
            with contextlib.suppress(KeyboardInterrupt):
                asyncio.get_event_loop().run_until_complete(ping_loop_task)
    return task.result()


def monkeypatch_sqlite3():
    """Monkeypatch sqlite timestamp converter"""

    def _sqlite_convert_timestamp(val):
        datepart, timetzpart = val.split(b" ")
        if b"+" in timetzpart:
            tzsign = 1
            timepart, tzpart = timetzpart.split(b"+")
        elif b"-" in timetzpart:
            tzsign = -1
            timepart, tzpart = timetzpart.split(b"-")
        else:
            timepart, tzpart = timetzpart, None
        year, month, day = map(int, datepart.split(b"-"))
        timepart_full = timepart.split(b".")
        hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
        if len(timepart_full) == 2:
            microseconds = int('{:0<6.6}'.format(timepart_full[1].decode()))
        else:
            microseconds = 0
        if tzpart:
            tzhours, tzminutes = map(int, tzpart.split(b":"))
            tz = datetime.timezone(
                tzsign * datetime.timedelta(hours=tzhours, minutes=tzminutes))
        else:
            tz = None

        val = datetime.datetime(year, month, day, hours, minutes, seconds,
                                microseconds, tz)
        return val

    sqlite3.register_converter("timestamp", _sqlite_convert_timestamp)