From f91a5a4d62de9b1d31c2263fab168165e1765830 Mon Sep 17 00:00:00 2001 From: Michał Górny Date: Wed, 25 Oct 2017 19:37:19 +0200 Subject: Provide compression abstraction --- gemato/compression.py | 46 +++++++++++ gemato/exceptions.py | 6 ++ tests/test_compression.py | 166 +++++++++++++++++++++++++++++++++++++++ tox.ini | 4 + utils/gen-compression-tests.bash | 31 ++++++++ 5 files changed, 253 insertions(+) create mode 100644 gemato/compression.py create mode 100644 tests/test_compression.py create mode 100755 utils/gen-compression-tests.bash diff --git a/gemato/compression.py b/gemato/compression.py new file mode 100644 index 0000000..26e03be --- /dev/null +++ b/gemato/compression.py @@ -0,0 +1,46 @@ +# gemato: compressed file support +# vim:fileencoding=utf-8 +# (c) 2017 Michał Górny +# Licensed under the terms of 2-clause BSD license + +import gzip +import sys + +if sys.version_info >= (3, 3): + import bz2 +else: + # older bz2 module versions do not handle multiple streams correctly + # so use the backport instead + try: + import bz2file as bz2 + except ImportError: + bz2 = None + +try: + import lzma +except ImportError: + try: + import backports.lzma as lzma + except ImportError: + lzma = None + +import gemato.exceptions + + +def open_compressed_file(suffix, f): + """ + Get a file-like object for an open compressed file @fileobj + of format @suffix. The file should be open in binary mode + and positioned at the beginning. @suffix should specify a standard + suffix for the compression format without the leading dot, + e.g. "gz", "bz2". + """ + + if suffix == "gz": + return gzip.GzipFile(fileobj=f) + elif suffix == "bz2" and bz2 is not None: + return bz2.BZ2File(f) + elif suffix in ("lzma", "xz") and lzma is not None: + return lzma.LZMAFile(f) + + raise gemato.exceptions.UnsupportedCompression(suffix) diff --git a/gemato/exceptions.py b/gemato/exceptions.py index 9edf47d..950d6a5 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -3,6 +3,12 @@ # (c) 2017 Michał Górny # Licensed under the terms of 2-clause BSD license +class UnsupportedCompression(Exception): + def __init__(self, suffix): + super(UnsupportedCompression, self).__init__( + 'Unsupported compression suffix: {}'.format(suffix)) + + class UnsupportedHash(Exception): def __init__(self, hash_name): super(UnsupportedHash, self).__init__( diff --git a/tests/test_compression.py b/tests/test_compression.py new file mode 100644 index 0000000..71cd736 --- /dev/null +++ b/tests/test_compression.py @@ -0,0 +1,166 @@ +# gemato: compressed file tests +# vim:fileencoding=utf-8 +# (c) 2017 Michał Górny +# Licensed under the terms of 2-clause BSD license + +import base64 +import io +import unittest + +import gemato.compression + + +TEST_STRING = b'The quick brown fox jumps over the lazy dog' + + +class GzipCompressionTest(unittest.TestCase): + BASE64 = b''' +H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QyCrNLShWyC9LLVIoAUrnJFZVKqTkpwMAOaNP +QSsAAAA= +''' + + EMPTY_BASE64 = b''' +H4sIACbJ8FkAAwMAAAAAAAAAAAA= +''' + + SPLIT_BASE64 = b''' +H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QAADidbCIFAAAAB+LCAAmyfBZAAPLKs0tKFbI +L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA= +''' + + def test_gzip(self): + with io.BytesIO(base64.b64decode(self.BASE64)) as f: + with gemato.compression.open_compressed_file('gz', f) as gz: + self.assertEqual(gz.read(), TEST_STRING) + + def test_gzip_empty(self): + with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: + with gemato.compression.open_compressed_file('gz', f) as gz: + self.assertEqual(gz.read(), b'') + + def test_gzip_split(self): + with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: + with gemato.compression.open_compressed_file('gz', f) as gz: + self.assertEqual(gz.read(), TEST_STRING) + + +class Bzip2CompressionTest(unittest.TestCase): + BASE64 = b''' +QlpoOTFBWSZTWUWd7mEAAAQTgEAABAA////wIAEABTQAAAGigAAAAEBoLtBqVm1CpOmzyfUXAw5P +HXD0304jMvvfF3JFOFCQRZ3uYQ== +''' + + EMPTY_BASE64 = b''' +QlpoORdyRThQkAAAAAA= +''' + + SPLIT_BASE64 = b''' +QlpoOTFBWSZTWQgcCrAAAAITgEAABAAbabLAIABBEaDR6jT9UoAAAbUXZJ48gnMg3xdyRThQkAgc +CrBCWmg5MUFZJlNZOxleaAAABRGAQAAm1t8wIACAUaNDRtTaSgAAAcAcViIdSEhzctM/F3JFOFCQ +OxleaA== +''' + + def test_bzip2(self): + with io.BytesIO(base64.b64decode(self.BASE64)) as f: + try: + with gemato.compression.open_compressed_file('bz2', f) as bz2: + self.assertEqual(bz2.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + + def test_bzip2_empty(self): + with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: + try: + with gemato.compression.open_compressed_file('bz2', f) as bz2: + self.assertEqual(bz2.read(), b'') + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + + def test_bzip2_split(self): + with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: + try: + with gemato.compression.open_compressed_file('bz2', f) as bz2: + self.assertEqual(bz2.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + + +class LZMALegacyCompressionTest(unittest.TestCase): + BASE64 = b''' +XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq00+OId6EE2+twGabs85BA0Jwnrs2bs +Ghcv//zOkAA= +''' + + EMPTY_BASE64 = b''' +XQAAAAT//////////wCD//v//8AAAAA= +''' + + SPLIT_BASE64 = b''' +XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq1EvQql//X0QABdAAAABP////////// +ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== +''' + + def test_lzma_legacy(self): + with io.BytesIO(base64.b64decode(self.BASE64)) as f: + try: + with gemato.compression.open_compressed_file('lzma', f) as lzma: + self.assertEqual(lzma.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + + def test_lzma_legacy_empty(self): + with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: + try: + with gemato.compression.open_compressed_file('lzma', f) as lzma: + self.assertEqual(lzma.read(), b'') + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + + def test_lzma_legacy_split(self): + with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: + try: + with gemato.compression.open_compressed_file('lzma', f) as lzma: + self.assertEqual(lzma.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + + +class XZCompressionTest(unittest.TestCase): + BASE64 = b''' +/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQAqVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVy +IHRoZSBsYXp5IGRvZwAAxKFK5cK4XlsAAUMrrVBuVx+2830BAAAAAARZWg== +''' + + EMPTY_BASE64 = b''' +/Td6WFoAAATm1rRGAAAAABzfRCEftvN9AQAAAAAEWVo= +''' + + SPLIT_BASE64 = b''' +/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQATVGhlIHF1aWNrIGJyb3duIGZveCAAIEFC5acaLXcA +ASwU+AptAx+2830BAAAAAARZWv03elhaAAAE5ta0RgIAIQEcAAAAEM9YzAEAFmp1bXBzIG92ZXIg +dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla +''' + + def test_xz(self): + with io.BytesIO(base64.b64decode(self.BASE64)) as f: + try: + with gemato.compression.open_compressed_file('xz', f) as xz: + self.assertEqual(xz.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('xz compression unsupported') + + def test_xz_empty(self): + with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: + try: + with gemato.compression.open_compressed_file('xz', f) as xz: + self.assertEqual(xz.read(), b'') + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('xz compression unsupported') + + def test_xz_split(self): + with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: + try: + with gemato.compression.open_compressed_file('xz', f) as xz: + self.assertEqual(xz.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('xz compression unsupported') diff --git a/tox.ini b/tox.ini index a9ca8a8..cd8baa5 100644 --- a/tox.ini +++ b/tox.ini @@ -13,6 +13,8 @@ commands = [testenv:py27] deps = + backports.lzma + bz2file coverage pyblake2 pysha3 @@ -36,6 +38,8 @@ deps = [testenv:pypy] deps = + backports.lzma + bz2file coverage pyblake2 pysha3 diff --git a/utils/gen-compression-tests.bash b/utils/gen-compression-tests.bash new file mode 100755 index 0000000..70fa631 --- /dev/null +++ b/utils/gen-compression-tests.bash @@ -0,0 +1,31 @@ +#!/bin/bash +test_string='The quick brown fox jumps over the lazy dog' + +if [[ ${#} -lt 1 ]]; then + echo "Usage: ${0} " + exit 1 +fi + +program=${1} + +empty=$(printf '' | ${program} | base64) +str=$(printf '%s' "${test_string}" | ${program} | base64) +split=$( + ( printf '%s' "${test_string::20}" | ${program} + printf '%s' "${test_string:20}" | ${program} ) | base64) + +cat <<_EOF_ + + BASE64 = b''' +${str} +''' + + EMPTY_BASE64 = b''' +${empty} +''' + + SPLIT_BASE64 = b''' +${split} +''' + +_EOF_ -- cgit v1.2.3