aboutsummaryrefslogtreecommitdiff
path: root/test_pytest/test_backend.py
blob: 539f5ce6b07eefbe7860155a29976c9995a4b1b5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import pytest

import restlog.backend


pytestmark = pytest.mark.asyncio


@pytest.fixture
def db_path(tmp_path):
    return tmp_path / 'restlog.db'


async def test_create(db_path):
    assert not db_path.exists()

    backend = await restlog.backend.create(db_path, 100)
    assert db_path.exists()

    await backend.async_close()
    assert db_path.exists()


async def test_register(db_path):
    backend = await restlog.backend.create(db_path, 100)

    result = await backend.get_entries()
    assert result == {'entries': [],
                      'more': False}

    entry = await backend.register(timestamp=123,
                                   address='address',
                                   source='source',
                                   type='type',
                                   data={'abc': 123})
    assert entry['entry_id'] is not None
    assert entry['timestamp'] == 123
    assert entry['address'] == 'address'
    assert entry['source'] == 'source'
    assert entry['type'] == 'type'
    assert entry['data'] == {'abc': 123}

    result = await backend.get_entry(entry['entry_id'])
    assert entry == result

    result = await backend.get_entries()
    assert result == {'entries': [entry],
                      'more': False}

    await backend.async_close()

    backend = await restlog.backend.create(db_path, 100)
    result = await backend.get_entry(entry['entry_id'])
    assert entry == result
    await backend.async_close()


@pytest.mark.parametrize('entries_count', [1, 2, 10])
async def test_get_entry(db_path, entries_count):
    backend = await restlog.backend.create(db_path, 100)

    result = await backend.get_entry(123)
    assert result is None

    entry_ids = []
    for _ in range(entries_count):
        entry = await backend.register(timestamp=123,
                                       address='address',
                                       source='source',
                                       type='type',
                                       data={'abc': 123})
        entry_ids.append(entry['entry_id'])

    for entry_id in entry_ids:
        result = await backend.get_entry(entry_id)
        assert result['entry_id'] == entry_id

    await backend.async_close()


@pytest.mark.parametrize('entries_count', [1, 2, 10])
async def test_get_entries(db_path, entries_count):
    backend = await restlog.backend.create(db_path, 100)

    result = await backend.get_entry(123)
    assert result is None

    entry_ids = []
    for i in range(entries_count):
        entry = await backend.register(timestamp=123,
                                       address='address',
                                       source=f'source{i}',
                                       type=f'type{i}',
                                       data={'abc': 123})
        entry_ids.append(entry['entry_id'])

    assert entry_ids == sorted(entry_ids)

    result = await backend.get_entries()
    assert ([i['entry_id'] for i in result['entries']] ==
            sorted(entry_ids, reverse=True))

    for i in range(entries_count):
        result = await backend.get_entries(source=f'source{i}')
        assert len(result['entries']) == 1
        assert result['entries'][0]['entry_id'] == entry_ids[i]

        result = await backend.get_entries(type=f'type{i}')
        assert len(result['entries']) == 1
        assert result['entries'][0]['entry_id'] == entry_ids[i]

        result = await backend.get_entries(last_entry_id=entry_ids[i])
        assert len(result['entries']) == 1 + i

    await backend.async_close()