diff options
Diffstat (limited to 'pypaste/server/s3/__init__.py')
-rw-r--r-- | pypaste/server/s3/__init__.py | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/pypaste/server/s3/__init__.py b/pypaste/server/s3/__init__.py new file mode 100644 index 0000000..07e2580 --- /dev/null +++ b/pypaste/server/s3/__init__.py @@ -0,0 +1,126 @@ +# Copyright (C) 2025 John Turner + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +import asyncio +import zstandard +import asyncio +import aiosqlite +from pypaste.server import Storage, Paste +from pypaste.server.s3.bucket import Bucket +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class S3(Storage): + connection: aiosqlite.Connection + + def __init__( + self, + connection: aiosqlite.Connection, + endpoint: str, + region: str, + bucket: str, + access_key: str, + secret_key: str, + ): + self.connection = connection + self.bucket = Bucket(endpoint, region, bucket, access_key, secret_key) + + async def setup(self) -> None: + await self.connection.execute("create table if not exists s3(key text)") + await self.connection.commit() + + async def insert(self, paste: Paste) -> None: + def compress(): + return zstandard.compress(paste.text.encode()) + + compressed = await asyncio.to_thread(compress) + + await self.connection.execute( + "insert into pastes values(?, ?, ?, ?)", + (paste.key, paste.dt.isoformat(), len(compressed), paste.syntax), + ) + + try: + await self.bucket.put(paste.key, compressed) + await self.connection.commit() + except Exception as e: + await self.connection.rollback() + raise e + + async def retrieve(self, key: str) -> Optional[Paste]: + if not await self.exists(key): + return None + + row = await self.read_row(key) + + assert row is not None + + (dt, size, syntax) = row + + data = await self.bucket.get(key) + + assert data is not None + + def decompress() -> str: + return zstandard.decompress(data).decode() + + text = await asyncio.to_thread(decompress) + + return Paste(key, dt, syntax, text) + + async def delete(self, key: str) -> None: + await self.connection.execute("delete from pastes where key=?", (key,)) + + try: + await self.bucket.delete(key) + await self.connection.commit() + except Exception as e: + await self.connection.rollback() + raise e + + async def vacuum(self, max: int) -> None: + while True: + async with self.connection.execute( + ( + "select sum(pastes.size) from pastes " + "inner join s3 on s3.key " + "where s3.key=pastes.key" + ) + ) as cursor: + if (row := await cursor.fetchone()) is None: + return + else: + use = row[0] + + async with self.connection.execute( + ( + "select pastes.key from pastes " + "inner join s3 on s3.key " + "where s3.key=pastes.key " + "order by pastes.datetime " + "limit 1" + ) + ) as cursor: + if (row := await cursor.fetchone()) is None: + return + else: + oldest = row[0] + + if use > max: + await self.delete(oldest) + else: + return |