diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-10-25 20:31:15 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-10-25 20:36:30 +0200 |
commit | f93df723eeec064dd8b5bb92a7926be49ab19bed (patch) | |
tree | 79a6a6e363a9a2e4707feea579e3223a92e67f34 | |
parent | 3722218dc8e8013f4563bd4f2207f50eeec22c57 (diff) | |
download | gemato-f93df723eeec064dd8b5bb92a7926be49ab19bed.tar.gz |
compression: Support passing mode to the compressor
-rw-r--r-- | gemato/compression.py | 8 | ||||
-rw-r--r-- | tests/test_compression.py | 73 |
2 files changed, 65 insertions, 16 deletions
diff --git a/gemato/compression.py b/gemato/compression.py index 26e03be..e420702 100644 --- a/gemato/compression.py +++ b/gemato/compression.py @@ -27,7 +27,7 @@ except ImportError: import gemato.exceptions -def open_compressed_file(suffix, f): +def open_compressed_file(suffix, f, mode='r'): """ Get a file-like object for an open compressed file @fileobj of format @suffix. The file should be open in binary mode @@ -37,10 +37,10 @@ def open_compressed_file(suffix, f): """ if suffix == "gz": - return gzip.GzipFile(fileobj=f) + return gzip.GzipFile(fileobj=f, mode=mode) elif suffix == "bz2" and bz2 is not None: - return bz2.BZ2File(f) + return bz2.BZ2File(f, mode=mode) elif suffix in ("lzma", "xz") and lzma is not None: - return lzma.LZMAFile(f) + return lzma.LZMAFile(f, mode=mode) raise gemato.exceptions.UnsupportedCompression(suffix) diff --git a/tests/test_compression.py b/tests/test_compression.py index 71cd736..6f443cc 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -30,17 +30,27 @@ L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA= def test_gzip(self): with io.BytesIO(base64.b64decode(self.BASE64)) as f: - with gemato.compression.open_compressed_file('gz', f) as gz: + with gemato.compression.open_compressed_file('gz', f, "rb") as gz: self.assertEqual(gz.read(), TEST_STRING) def test_gzip_empty(self): with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: - with gemato.compression.open_compressed_file('gz', f) as gz: + with gemato.compression.open_compressed_file('gz', f, "rb") as gz: self.assertEqual(gz.read(), b'') def test_gzip_split(self): with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: - with gemato.compression.open_compressed_file('gz', f) as gz: + with gemato.compression.open_compressed_file('gz', f, "rb") as gz: + self.assertEqual(gz.read(), TEST_STRING) + + def test_gzip_write(self): + with io.BytesIO() as f: + with gemato.compression.open_compressed_file('gz', f, 'wb') as gz: + gz.write(TEST_STRING) + + f.seek(0) + + with gemato.compression.open_compressed_file('gz', f, 'rb') as gz: self.assertEqual(gz.read(), TEST_STRING) @@ -63,7 +73,7 @@ OxleaA== def test_bzip2(self): with io.BytesIO(base64.b64decode(self.BASE64)) as f: try: - with gemato.compression.open_compressed_file('bz2', f) as bz2: + with gemato.compression.open_compressed_file('bz2', f, "rb") as bz2: self.assertEqual(bz2.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('bz2 compression unsupported') @@ -71,7 +81,7 @@ OxleaA== def test_bzip2_empty(self): with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: try: - with gemato.compression.open_compressed_file('bz2', f) as bz2: + with gemato.compression.open_compressed_file('bz2', f, "rb") as bz2: self.assertEqual(bz2.read(), b'') except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('bz2 compression unsupported') @@ -79,7 +89,20 @@ OxleaA== def test_bzip2_split(self): with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: try: - with gemato.compression.open_compressed_file('bz2', f) as bz2: + with gemato.compression.open_compressed_file('bz2', f, "rb") as bz2: + self.assertEqual(bz2.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + + def test_bzip2_write(self): + with io.BytesIO() as f: + try: + with gemato.compression.open_compressed_file('bz2', f, 'wb') as bz2: + bz2.write(TEST_STRING) + + f.seek(0) + + with gemato.compression.open_compressed_file('bz2', f, 'rb') as bz2: self.assertEqual(bz2.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('bz2 compression unsupported') @@ -103,7 +126,7 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== def test_lzma_legacy(self): with io.BytesIO(base64.b64decode(self.BASE64)) as f: try: - with gemato.compression.open_compressed_file('lzma', f) as lzma: + with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: self.assertEqual(lzma.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('lzma compression unsupported') @@ -111,7 +134,7 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== def test_lzma_legacy_empty(self): with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: try: - with gemato.compression.open_compressed_file('lzma', f) as lzma: + with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: self.assertEqual(lzma.read(), b'') except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('lzma compression unsupported') @@ -119,7 +142,20 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== def test_lzma_legacy_split(self): with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: try: - with gemato.compression.open_compressed_file('lzma', f) as lzma: + with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: + self.assertEqual(lzma.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + + def test_lzma_legacy_write(self): + with io.BytesIO() as f: + try: + with gemato.compression.open_compressed_file('lzma', f, 'wb') as lzma: + lzma.write(TEST_STRING) + + f.seek(0) + + with gemato.compression.open_compressed_file('lzma', f, 'rb') as lzma: self.assertEqual(lzma.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('lzma compression unsupported') @@ -144,7 +180,7 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla def test_xz(self): with io.BytesIO(base64.b64decode(self.BASE64)) as f: try: - with gemato.compression.open_compressed_file('xz', f) as xz: + with gemato.compression.open_compressed_file('xz', f, "rb") as xz: self.assertEqual(xz.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('xz compression unsupported') @@ -152,7 +188,7 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla def test_xz_empty(self): with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: try: - with gemato.compression.open_compressed_file('xz', f) as xz: + with gemato.compression.open_compressed_file('xz', f, "rb") as xz: self.assertEqual(xz.read(), b'') except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('xz compression unsupported') @@ -160,7 +196,20 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla def test_xz_split(self): with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: try: - with gemato.compression.open_compressed_file('xz', f) as xz: + with gemato.compression.open_compressed_file('xz', f, "rb") as xz: + self.assertEqual(xz.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('xz compression unsupported') + + def test_xz_write(self): + with io.BytesIO() as f: + try: + with gemato.compression.open_compressed_file('xz', f, 'wb') as xz: + xz.write(TEST_STRING) + + f.seek(0) + + with gemato.compression.open_compressed_file('xz', f, 'rb') as xz: self.assertEqual(xz.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('xz compression unsupported') |