summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2017-10-25 19:37:19 +0200
committerMichał Górny <mgorny@gentoo.org>2017-10-25 19:37:19 +0200
commitf91a5a4d62de9b1d31c2263fab168165e1765830 (patch)
tree03ae5cc7bdff00c215cae46607c4ddf2bac10b98
parentbd1022e46a1933f634a27b1ed7632054f7f89a4f (diff)
downloadgemato-f91a5a4d62de9b1d31c2263fab168165e1765830.tar.gz
Provide compression abstraction
-rw-r--r--gemato/compression.py46
-rw-r--r--gemato/exceptions.py6
-rw-r--r--tests/test_compression.py166
-rw-r--r--tox.ini4
-rwxr-xr-xutils/gen-compression-tests.bash31
5 files changed, 253 insertions, 0 deletions
diff --git a/gemato/compression.py b/gemato/compression.py
new file mode 100644
index 0000000..26e03be
--- /dev/null
+++ b/gemato/compression.py
@@ -0,0 +1,46 @@
+# gemato: compressed file support
+# vim:fileencoding=utf-8
+# (c) 2017 Michał Górny
+# Licensed under the terms of 2-clause BSD license
+
+import gzip
+import sys
+
+if sys.version_info >= (3, 3):
+ import bz2
+else:
+ # older bz2 module versions do not handle multiple streams correctly
+ # so use the backport instead
+ try:
+ import bz2file as bz2
+ except ImportError:
+ bz2 = None
+
+try:
+ import lzma
+except ImportError:
+ try:
+ import backports.lzma as lzma
+ except ImportError:
+ lzma = None
+
+import gemato.exceptions
+
+
+def open_compressed_file(suffix, f):
+ """
+ Get a file-like object for an open compressed file @fileobj
+ of format @suffix. The file should be open in binary mode
+ and positioned at the beginning. @suffix should specify a standard
+ suffix for the compression format without the leading dot,
+ e.g. "gz", "bz2".
+ """
+
+ if suffix == "gz":
+ return gzip.GzipFile(fileobj=f)
+ elif suffix == "bz2" and bz2 is not None:
+ return bz2.BZ2File(f)
+ elif suffix in ("lzma", "xz") and lzma is not None:
+ return lzma.LZMAFile(f)
+
+ raise gemato.exceptions.UnsupportedCompression(suffix)
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index 9edf47d..950d6a5 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -3,6 +3,12 @@
# (c) 2017 Michał Górny
# Licensed under the terms of 2-clause BSD license
+class UnsupportedCompression(Exception):
+ def __init__(self, suffix):
+ super(UnsupportedCompression, self).__init__(
+ 'Unsupported compression suffix: {}'.format(suffix))
+
+
class UnsupportedHash(Exception):
def __init__(self, hash_name):
super(UnsupportedHash, self).__init__(
diff --git a/tests/test_compression.py b/tests/test_compression.py
new file mode 100644
index 0000000..71cd736
--- /dev/null
+++ b/tests/test_compression.py
@@ -0,0 +1,166 @@
+# gemato: compressed file tests
+# vim:fileencoding=utf-8
+# (c) 2017 Michał Górny
+# Licensed under the terms of 2-clause BSD license
+
+import base64
+import io
+import unittest
+
+import gemato.compression
+
+
+TEST_STRING = b'The quick brown fox jumps over the lazy dog'
+
+
+class GzipCompressionTest(unittest.TestCase):
+ BASE64 = b'''
+H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QyCrNLShWyC9LLVIoAUrnJFZVKqTkpwMAOaNP
+QSsAAAA=
+'''
+
+ EMPTY_BASE64 = b'''
+H4sIACbJ8FkAAwMAAAAAAAAAAAA=
+'''
+
+ SPLIT_BASE64 = b'''
+H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QAADidbCIFAAAAB+LCAAmyfBZAAPLKs0tKFbI
+L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA=
+'''
+
+ def test_gzip(self):
+ with io.BytesIO(base64.b64decode(self.BASE64)) as f:
+ with gemato.compression.open_compressed_file('gz', f) as gz:
+ self.assertEqual(gz.read(), TEST_STRING)
+
+ def test_gzip_empty(self):
+ with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f:
+ with gemato.compression.open_compressed_file('gz', f) as gz:
+ self.assertEqual(gz.read(), b'')
+
+ def test_gzip_split(self):
+ with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f:
+ with gemato.compression.open_compressed_file('gz', f) as gz:
+ self.assertEqual(gz.read(), TEST_STRING)
+
+
+class Bzip2CompressionTest(unittest.TestCase):
+ BASE64 = b'''
+QlpoOTFBWSZTWUWd7mEAAAQTgEAABAA////wIAEABTQAAAGigAAAAEBoLtBqVm1CpOmzyfUXAw5P
+HXD0304jMvvfF3JFOFCQRZ3uYQ==
+'''
+
+ EMPTY_BASE64 = b'''
+QlpoORdyRThQkAAAAAA=
+'''
+
+ SPLIT_BASE64 = b'''
+QlpoOTFBWSZTWQgcCrAAAAITgEAABAAbabLAIABBEaDR6jT9UoAAAbUXZJ48gnMg3xdyRThQkAgc
+CrBCWmg5MUFZJlNZOxleaAAABRGAQAAm1t8wIACAUaNDRtTaSgAAAcAcViIdSEhzctM/F3JFOFCQ
+OxleaA==
+'''
+
+ def test_bzip2(self):
+ with io.BytesIO(base64.b64decode(self.BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('bz2', f) as bz2:
+ self.assertEqual(bz2.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
+ def test_bzip2_empty(self):
+ with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('bz2', f) as bz2:
+ self.assertEqual(bz2.read(), b'')
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
+ def test_bzip2_split(self):
+ with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('bz2', f) as bz2:
+ self.assertEqual(bz2.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
+
+class LZMALegacyCompressionTest(unittest.TestCase):
+ BASE64 = b'''
+XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq00+OId6EE2+twGabs85BA0Jwnrs2bs
+Ghcv//zOkAA=
+'''
+
+ EMPTY_BASE64 = b'''
+XQAAAAT//////////wCD//v//8AAAAA=
+'''
+
+ SPLIT_BASE64 = b'''
+XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq1EvQql//X0QABdAAAABP//////////
+ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA==
+'''
+
+ def test_lzma_legacy(self):
+ with io.BytesIO(base64.b64decode(self.BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('lzma', f) as lzma:
+ self.assertEqual(lzma.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
+ def test_lzma_legacy_empty(self):
+ with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('lzma', f) as lzma:
+ self.assertEqual(lzma.read(), b'')
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
+ def test_lzma_legacy_split(self):
+ with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('lzma', f) as lzma:
+ self.assertEqual(lzma.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
+
+class XZCompressionTest(unittest.TestCase):
+ BASE64 = b'''
+/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQAqVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVy
+IHRoZSBsYXp5IGRvZwAAxKFK5cK4XlsAAUMrrVBuVx+2830BAAAAAARZWg==
+'''
+
+ EMPTY_BASE64 = b'''
+/Td6WFoAAATm1rRGAAAAABzfRCEftvN9AQAAAAAEWVo=
+'''
+
+ SPLIT_BASE64 = b'''
+/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQATVGhlIHF1aWNrIGJyb3duIGZveCAAIEFC5acaLXcA
+ASwU+AptAx+2830BAAAAAARZWv03elhaAAAE5ta0RgIAIQEcAAAAEM9YzAEAFmp1bXBzIG92ZXIg
+dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla
+'''
+
+ def test_xz(self):
+ with io.BytesIO(base64.b64decode(self.BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('xz', f) as xz:
+ self.assertEqual(xz.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ def test_xz_empty(self):
+ with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('xz', f) as xz:
+ self.assertEqual(xz.read(), b'')
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ def test_xz_split(self):
+ with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f:
+ try:
+ with gemato.compression.open_compressed_file('xz', f) as xz:
+ self.assertEqual(xz.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
diff --git a/tox.ini b/tox.ini
index a9ca8a8..cd8baa5 100644
--- a/tox.ini
+++ b/tox.ini
@@ -13,6 +13,8 @@ commands =
[testenv:py27]
deps =
+ backports.lzma
+ bz2file
coverage
pyblake2
pysha3
@@ -36,6 +38,8 @@ deps =
[testenv:pypy]
deps =
+ backports.lzma
+ bz2file
coverage
pyblake2
pysha3
diff --git a/utils/gen-compression-tests.bash b/utils/gen-compression-tests.bash
new file mode 100755
index 0000000..70fa631
--- /dev/null
+++ b/utils/gen-compression-tests.bash
@@ -0,0 +1,31 @@
+#!/bin/bash
+test_string='The quick brown fox jumps over the lazy dog'
+
+if [[ ${#} -lt 1 ]]; then
+ echo "Usage: ${0} <program-to-use>"
+ exit 1
+fi
+
+program=${1}
+
+empty=$(printf '' | ${program} | base64)
+str=$(printf '%s' "${test_string}" | ${program} | base64)
+split=$(
+ ( printf '%s' "${test_string::20}" | ${program}
+ printf '%s' "${test_string:20}" | ${program} ) | base64)
+
+cat <<_EOF_
+
+ BASE64 = b'''
+${str}
+'''
+
+ EMPTY_BASE64 = b'''
+${empty}
+'''
+
+ SPLIT_BASE64 = b'''
+${split}
+'''
+
+_EOF_