1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
from pathlib import Path
import sqlite3
import typing
from hat import aio
from hatter import common
async def create(db_path: Path
) -> 'Backend':
backend = Backend()
backend._async_group = aio.Group()
backend._executor = aio.create_executor(1)
backend._db = await backend._executor(_ext_create, db_path)
backend.async_group.spawn(aio.call_on_cancel, backend._executor,
_ext_close, backend._db)
return backend
class Backend(aio.Resource):
@property
def async_group(self):
return self._async_group
async def get_commits(self,
repo: typing.Optional[str],
statuses: typing.Optional[typing.Set[common.Status]],
order: common.Order
) -> typing.List[common.Commit]:
return await self.async_group.spawn(
self._executor, _ext_get_commits, self._db, repo, statuses, order)
async def get_commit(self,
repo: str,
commit_hash: str
) -> typing.Optional[common.Commit]:
return await self.async_group.spawn(
self._executor, _ext_get_commit, self._db, repo, commit_hash)
async def update_commit(self, commit: common.Commit):
await self._async_group.spawn(
self._executor, _ext_update_commit, self._db, commit)
async def remove_commit(self, commit: common.Commit):
await self.async_group.spawn(
self._executor, _ext_remove_commit, self._db, commit)
def _ext_create(db_path):
db_path.parent.mkdir(exist_ok=True)
db = sqlite3.connect(f'file:{db_path}?nolock=1',
uri=True,
isolation_level=None,
detect_types=sqlite3.PARSE_DECLTYPES)
try:
db.executescript(r"""
PRAGMA journal_mode = OFF;
CREATE TABLE IF NOT EXISTS commits (
repo TEXT,
hash TEXT,
change INTEGER,
status INTEGER,
output TEXT,
PRIMARY KEY (repo, hash) ON CONFLICT REPLACE
);
CREATE INDEX IF NOT EXISTS commits_change_index ON commits (
change
)""")
except Exception:
db.close()
raise
return db
def _ext_close(db):
db.close()
def _ext_get_commits(db, repo, statuses, order):
cmd = "SELECT * FROM commits"
where = []
if repo:
where.append("repo = :repo")
if statuses:
status_values = (str(status.value) for status in statuses)
where.append(f"status IN ({', '.join(status_values)})")
if where:
cmd += f" WHERE {' AND '.join(where)}"
cmd += f" ORDER BY change {order.value}"
args = {'repo': repo}
cur = db.execute(cmd, args)
return [_commit_from_row(row) for row in cur]
def _ext_get_commit(db, repo, commit_hash):
cmd = "SELECT * FROM commits WHERE repo = :repo AND hash = :hash"
args = {'repo': repo,
'hash': commit_hash}
cur = db.execute(cmd, args)
row = cur.fetchone()
return _commit_from_row(row) if row else None
def _ext_update_commit(db, commit):
cmd = ("INSERT OR REPLACE INTO commits VALUES "
"(:repo, :hash, :change, :status, :output)")
args = {'repo': commit.repo,
'hash': commit.hash,
'change': commit.change,
'status': commit.status.value,
'output': commit.output}
db.execute(cmd, args)
def _ext_remove_commit(db, commit):
cmd = "DELETE FROM commits WHERE repo = :repo AND hash = :hash"
args = {'repo': commit.repo,
'hash': commit.hash}
db.execute(cmd, args)
def _commit_from_row(row):
return common.Commit(repo=row[0],
hash=row[1],
change=row[2],
status=common.Status(row[3]),
output=row[4])
|