summaryrefslogtreecommitdiff
path: root/pypaste/server/s3/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'pypaste/server/s3/__init__.py')
-rw-r--r--pypaste/server/s3/__init__.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/pypaste/server/s3/__init__.py b/pypaste/server/s3/__init__.py
new file mode 100644
index 0000000..07e2580
--- /dev/null
+++ b/pypaste/server/s3/__init__.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2025 John Turner
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import asyncio
+import zstandard
+import asyncio
+import aiosqlite
+from pypaste.server import Storage, Paste
+from pypaste.server.s3.bucket import Bucket
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class S3(Storage):
+ connection: aiosqlite.Connection
+
+ def __init__(
+ self,
+ connection: aiosqlite.Connection,
+ endpoint: str,
+ region: str,
+ bucket: str,
+ access_key: str,
+ secret_key: str,
+ ):
+ self.connection = connection
+ self.bucket = Bucket(endpoint, region, bucket, access_key, secret_key)
+
+ async def setup(self) -> None:
+ await self.connection.execute("create table if not exists s3(key text)")
+ await self.connection.commit()
+
+ async def insert(self, paste: Paste) -> None:
+ def compress():
+ return zstandard.compress(paste.text.encode())
+
+ compressed = await asyncio.to_thread(compress)
+
+ await self.connection.execute(
+ "insert into pastes values(?, ?, ?, ?)",
+ (paste.key, paste.dt.isoformat(), len(compressed), paste.syntax),
+ )
+
+ try:
+ await self.bucket.put(paste.key, compressed)
+ await self.connection.commit()
+ except Exception as e:
+ await self.connection.rollback()
+ raise e
+
+ async def retrieve(self, key: str) -> Optional[Paste]:
+ if not await self.exists(key):
+ return None
+
+ row = await self.read_row(key)
+
+ assert row is not None
+
+ (dt, size, syntax) = row
+
+ data = await self.bucket.get(key)
+
+ assert data is not None
+
+ def decompress() -> str:
+ return zstandard.decompress(data).decode()
+
+ text = await asyncio.to_thread(decompress)
+
+ return Paste(key, dt, syntax, text)
+
+ async def delete(self, key: str) -> None:
+ await self.connection.execute("delete from pastes where key=?", (key,))
+
+ try:
+ await self.bucket.delete(key)
+ await self.connection.commit()
+ except Exception as e:
+ await self.connection.rollback()
+ raise e
+
+ async def vacuum(self, max: int) -> None:
+ while True:
+ async with self.connection.execute(
+ (
+ "select sum(pastes.size) from pastes "
+ "inner join s3 on s3.key "
+ "where s3.key=pastes.key"
+ )
+ ) as cursor:
+ if (row := await cursor.fetchone()) is None:
+ return
+ else:
+ use = row[0]
+
+ async with self.connection.execute(
+ (
+ "select pastes.key from pastes "
+ "inner join s3 on s3.key "
+ "where s3.key=pastes.key "
+ "order by pastes.datetime "
+ "limit 1"
+ )
+ ) as cursor:
+ if (row := await cursor.fetchone()) is None:
+ return
+ else:
+ oldest = row[0]
+
+ if use > max:
+ await self.delete(oldest)
+ else:
+ return