1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
#!/usr/bin/env python3
import sys
import asyncio
import tempfile
import aiosqlite
import string
import random
from pypaste.server import Paste
from pypaste.server.sqlite import Sqlite
from datetime import datetime
from pathlib import Path
def truncate(path: Path) -> None:
with open(path, "w") as f:
f.truncate(0)
def generate_key() -> str:
chars = string.ascii_letters
return "".join(random.choice(chars) for _ in range(10))
async def test_exists_but_not_in_our_table(storage: Sqlite) -> None:
key = generate_key()
await storage.connection.execute(
"insert into pastes values(?, ?, ?, ?)",
(key, datetime.now().isoformat(), None, bytes()),
)
assert not await storage.exists(key)
async def test_exists(storage: Sqlite) -> None:
dt = datetime.now()
key = generate_key()
await storage.insert(Paste(key, dt, "test", "hello world"))
assert await storage.exists(key)
assert not await storage.exists(generate_key())
async def test_delete(storage: Sqlite) -> None:
dt = datetime.now()
key = generate_key()
await storage.insert(Paste(key, dt, "test", "hello world"))
assert await storage.exists(key)
await storage.delete(key)
assert not await storage.exists(key)
async def test_insert_retrieve(storage: Sqlite) -> None:
dt = datetime.now()
key = generate_key()
await storage.insert(Paste(key, dt, "test", "hello world"))
paste = await storage.retrieve(key)
assert paste is not None
assert paste.key == key
assert paste.dt == dt
assert paste.syntax == "test"
assert paste.text == "hello world"
async def main() -> int:
with tempfile.TemporaryDirectory() as tmpdir:
f = Path(tmpdir) / "database"
truncate(f)
async with aiosqlite.connect(f) as connection:
await connection.execute(
(
"create table pastes("
"key text,"
"datetime text,"
"size int,"
"syntax text"
")"
)
)
storage = Sqlite(connection)
await storage.setup()
await asyncio.gather(
test_insert_retrieve(storage),
test_delete(storage),
test_exists_but_not_in_our_table(storage),
)
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
|