aboutsummaryrefslogtreecommitdiff
path: root/src_py/hatter/backend.py
blob: dd6a4f7564a198882b698d501082c572f62372d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from pathlib import Path
import sqlite3
import typing

from hat import aio

from hatter import common


async def create(db_path: Path
                 ) -> 'Backend':
    backend = Backend()
    backend._async_group = aio.Group()
    backend._executor = aio.create_executor(1)

    backend._db = await backend._executor(_ext_create, db_path)
    backend.async_group.spawn(aio.call_on_cancel, backend._executor,
                              _ext_close, backend._db)

    return backend


class Backend(aio.Resource):

    @property
    def async_group(self):
        return self._async_group

    async def get_commits(self,
                          repo: typing.Optional[str],
                          statuses: typing.Optional[typing.Set[common.Status]],
                          order: common.Order
                          ) -> typing.List[common.Commit]:
        return await self.async_group.spawn(
            self._executor, _ext_get_commits, self._db, repo, statuses, order)

    async def get_commit(self,
                         repo: str,
                         commit_hash: str
                         ) -> typing.Optional[common.Commit]:
        return await self.async_group.spawn(
            self._executor, _ext_get_commit, self._db, repo, commit_hash)

    async def update_commit(self, commit: common.Commit):
        await self._async_group.spawn(
            self._executor, _ext_update_commit, self._db, commit)

    async def remove_commit(self, commit: common.Commit):
        await self.async_group.spawn(
            self._executor, _ext_remove_commit, self._db, commit)


def _ext_create(db_path):
    db_path.parent.mkdir(exist_ok=True)
    db = sqlite3.connect(f'file:{db_path}?nolock=1',
                         uri=True,
                         isolation_level=None,
                         detect_types=sqlite3.PARSE_DECLTYPES)

    try:
        db.executescript(r"""
            PRAGMA journal_mode = OFF;
            CREATE TABLE IF NOT EXISTS commits (
                repo TEXT,
                hash TEXT,
                change INTEGER,
                status INTEGER,
                output TEXT,
                PRIMARY KEY (repo, hash) ON CONFLICT REPLACE
            );
            CREATE INDEX IF NOT EXISTS commits_change_index ON commits (
                change
            )""")

    except Exception:
        db.close()
        raise

    return db


def _ext_close(db):
    db.close()


def _ext_get_commits(db, repo, statuses, order):
    cmd = "SELECT * FROM commits"
    where = []
    if repo:
        where.append("repo = :repo")
    if statuses:
        status_values = (str(status.value) for status in statuses)
        where.append(f"status IN ({', '.join(status_values)})")
    if where:
        cmd += f" WHERE {' AND '.join(where)}"
    cmd += f" ORDER BY change {order.value}"
    args = {'repo': repo}
    cur = db.execute(cmd, args)
    return [_commit_from_row(row) for row in cur]


def _ext_get_commit(db, repo, commit_hash):
    cmd = "SELECT * FROM commits WHERE repo = :repo AND hash = :hash"
    args = {'repo': repo,
            'hash': commit_hash}
    cur = db.execute(cmd, args)
    row = cur.fetchone()
    return _commit_from_row(row) if row else None


def _ext_update_commit(db, commit):
    cmd = ("INSERT OR REPLACE INTO commits VALUES "
           "(:repo, :hash, :change, :status, :output)")
    args = {'repo': commit.repo,
            'hash': commit.hash,
            'change': commit.change,
            'status': commit.status.value,
            'output': commit.output}
    db.execute(cmd, args)


def _ext_remove_commit(db, commit):
    cmd = "DELETE FROM commits WHERE repo = :repo AND hash = :hash"
    args = {'repo': commit.repo,
            'hash': commit.hash}
    db.execute(cmd, args)


def _commit_from_row(row):
    return common.Commit(repo=row[0],
                         hash=row[1],
                         change=row[2],
                         status=common.Status(row[3]),
                         output=row[4])