diff options
-rw-r--r-- | gemato/compression.py | 6 | ||||
-rw-r--r-- | tests/test_compression.py | 24 |
2 files changed, 28 insertions, 2 deletions
diff --git a/gemato/compression.py b/gemato/compression.py index e420702..c15de0c 100644 --- a/gemato/compression.py +++ b/gemato/compression.py @@ -40,7 +40,9 @@ def open_compressed_file(suffix, f, mode='r'): return gzip.GzipFile(fileobj=f, mode=mode) elif suffix == "bz2" and bz2 is not None: return bz2.BZ2File(f, mode=mode) - elif suffix in ("lzma", "xz") and lzma is not None: - return lzma.LZMAFile(f, mode=mode) + elif suffix == "lzma" and lzma is not None: + return lzma.LZMAFile(f, format=lzma.FORMAT_ALONE, mode=mode) + elif suffix == "xz" and lzma is not None: + return lzma.LZMAFile(f, format=lzma.FORMAT_XZ, mode=mode) raise gemato.exceptions.UnsupportedCompression(suffix) diff --git a/tests/test_compression.py b/tests/test_compression.py index 6f443cc..a81e80f 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -160,6 +160,18 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('lzma compression unsupported') + def test_lzma_legacy_as_xz(self): + """ + Test that the class rejects mislabel files. + """ + if gemato.compression.lzma is None: + raise unittest.SkipTest('xz compression unsupported') + + with io.BytesIO(base64.b64decode(self.BASE64)) as f: + with self.assertRaises(gemato.compression.lzma.LZMAError): + with gemato.compression.open_compressed_file('xz', f, "rb") as xz: + xz.read() + class XZCompressionTest(unittest.TestCase): BASE64 = b''' @@ -213,3 +225,15 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla self.assertEqual(xz.read(), TEST_STRING) except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('xz compression unsupported') + + def test_xz_as_lzma_legacy(self): + """ + Test that the class rejects mislabel files. + """ + if gemato.compression.lzma is None: + raise unittest.SkipTest('xz compression unsupported') + + with io.BytesIO(base64.b64decode(self.BASE64)) as f: + with self.assertRaises(gemato.compression.lzma.LZMAError): + with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: + lzma.read() |