summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/compression.py6
-rw-r--r--tests/test_compression.py24
2 files changed, 28 insertions, 2 deletions
diff --git a/gemato/compression.py b/gemato/compression.py
index e420702..c15de0c 100644
--- a/gemato/compression.py
+++ b/gemato/compression.py
@@ -40,7 +40,9 @@ def open_compressed_file(suffix, f, mode='r'):
return gzip.GzipFile(fileobj=f, mode=mode)
elif suffix == "bz2" and bz2 is not None:
return bz2.BZ2File(f, mode=mode)
- elif suffix in ("lzma", "xz") and lzma is not None:
- return lzma.LZMAFile(f, mode=mode)
+ elif suffix == "lzma" and lzma is not None:
+ return lzma.LZMAFile(f, format=lzma.FORMAT_ALONE, mode=mode)
+ elif suffix == "xz" and lzma is not None:
+ return lzma.LZMAFile(f, format=lzma.FORMAT_XZ, mode=mode)
raise gemato.exceptions.UnsupportedCompression(suffix)
diff --git a/tests/test_compression.py b/tests/test_compression.py
index 6f443cc..a81e80f 100644
--- a/tests/test_compression.py
+++ b/tests/test_compression.py
@@ -160,6 +160,18 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA==
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('lzma compression unsupported')
+ def test_lzma_legacy_as_xz(self):
+ """
+ Test that the class rejects mislabel files.
+ """
+ if gemato.compression.lzma is None:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ with io.BytesIO(base64.b64decode(self.BASE64)) as f:
+ with self.assertRaises(gemato.compression.lzma.LZMAError):
+ with gemato.compression.open_compressed_file('xz', f, "rb") as xz:
+ xz.read()
+
class XZCompressionTest(unittest.TestCase):
BASE64 = b'''
@@ -213,3 +225,15 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla
self.assertEqual(xz.read(), TEST_STRING)
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('xz compression unsupported')
+
+ def test_xz_as_lzma_legacy(self):
+ """
+ Test that the class rejects mislabel files.
+ """
+ if gemato.compression.lzma is None:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ with io.BytesIO(base64.b64decode(self.BASE64)) as f:
+ with self.assertRaises(gemato.compression.lzma.LZMAError):
+ with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma:
+ lzma.read()