diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-08-26 22:59:04 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-08-26 22:59:04 +0200 |
commit | 675af8bd6091ad2aaa53b6ae49bb2a8119d2b18c (patch) | |
tree | 48071094aa5240fd905d6713aa1d3cf9b1e15116 /tests/test_compression.py | |
parent | 5d96305d0b1090d4eb96d37278be44333fdc4c77 (diff) | |
download | gemato-675af8bd6091ad2aaa53b6ae49bb2a8119d2b18c.tar.gz |
tests: Convert test_compression to pytest
Signed-off-by: Michał Górny <mgorny@gentoo.org>
Diffstat (limited to 'tests/test_compression.py')
-rw-r--r-- | tests/test_compression.py | 831 |
1 files changed, 171 insertions, 660 deletions
diff --git a/tests/test_compression.py b/tests/test_compression.py index df78ec8..0cb0ea7 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -1,6 +1,6 @@ # gemato: compressed file tests # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import base64 @@ -8,6 +8,8 @@ import io import tempfile import unittest +import pytest + import gemato.compression @@ -16,662 +18,171 @@ TEST_STRING = b'The quick brown fox jumps over the lazy dog' UTF16_TEST_STRING = TEST_STRING.decode('utf8').encode('utf_16_be') -class GzipCompressionTest(unittest.TestCase): - BASE64 = b''' -H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QyCrNLShWyC9LLVIoAUrnJFZVKqTkpwMAOaNP -QSsAAAA= -''' - - EMPTY_BASE64 = b''' -H4sIACbJ8FkAAwMAAAAAAAAAAAA= -''' - - SPLIT_BASE64 = b''' -H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QAADidbCIFAAAAB+LCAAmyfBZAAPLKs0tKFbI -L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA= -''' - - def test_gzip(self): - with io.BytesIO(base64.b64decode(self.BASE64)) as f: - with gemato.compression.open_compressed_file('gz', f, "rb") as gz: - self.assertEqual(gz.read(), TEST_STRING) - - def test_gzip_empty(self): - with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: - with gemato.compression.open_compressed_file('gz', f, "rb") as gz: - self.assertEqual(gz.read(), b'') - - def test_gzip_split(self): - with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: - with gemato.compression.open_compressed_file('gz', f, "rb") as gz: - self.assertEqual(gz.read(), TEST_STRING) - - def test_gzip_write(self): - with io.BytesIO() as f: - with gemato.compression.open_compressed_file('gz', f, 'wb') as gz: - gz.write(TEST_STRING) - - f.seek(0) - - with gemato.compression.open_compressed_file('gz', f, 'rb') as gz: - self.assertEqual(gz.read(), TEST_STRING) - - def test_open_potentially_compressed_path(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as wf: - wf.write(base64.b64decode(self.BASE64)) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'rb') as cf: - self.assertEqual(cf.read(), TEST_STRING) - - def test_open_potentially_compressed_path_write(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as rf: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'wb') as cf: - cf.write(TEST_STRING) - - with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz: - self.assertEqual(gz.read(), TEST_STRING) - - def test_open_potentially_compressed_path_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as wf: - with gemato.compression.open_compressed_file('gz', wf, 'wb') as gz: - gz.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) - - def test_open_potentially_compressed_path_write_with_unicode(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as rf: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz: - self.assertEqual(gz.read(), TEST_STRING) - - def test_open_potentially_compressed_path_write_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as rf: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz: - self.assertEqual(gz.read(), UTF16_TEST_STRING) - - def test_open_potentially_compressed_path_with_encoding_line_api(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as wf: - with gemato.compression.open_compressed_file('gz', wf, 'wb') as gz: - gz.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertListEqual([x for x in cf], - [TEST_STRING.decode('utf8')]) - - def test_open_potentially_compressed_path_fileno_passthrough(self): - with tempfile.NamedTemporaryFile(suffix='.gz') as rf: - fs1 = gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') - with fs1 as cf: - self.assertListEqual([f.fileno() for f in fs1.files], - [cf.fileno() for f in fs1.files]) - - fs2 = gemato.compression.open_potentially_compressed_path( - rf.name, 'r', encoding='utf_16_be') - with fs2 as cf: - self.assertListEqual([f.fileno() for f in fs2.files], - [cf.fileno() for f in fs2.files]) - - -class Bzip2CompressionTest(unittest.TestCase): - BASE64 = b''' -QlpoOTFBWSZTWUWd7mEAAAQTgEAABAA////wIAEABTQAAAGigAAAAEBoLtBqVm1CpOmzyfUXAw5P -HXD0304jMvvfF3JFOFCQRZ3uYQ== -''' - - EMPTY_BASE64 = b''' -QlpoORdyRThQkAAAAAA= -''' - - SPLIT_BASE64 = b''' -QlpoOTFBWSZTWQgcCrAAAAITgEAABAAbabLAIABBEaDR6jT9UoAAAbUXZJ48gnMg3xdyRThQkAgc -CrBCWmg5MUFZJlNZOxleaAAABRGAQAAm1t8wIACAUaNDRtTaSgAAAcAcViIdSEhzctM/F3JFOFCQ -OxleaA== -''' - - def test_bzip2(self): - with io.BytesIO(base64.b64decode(self.BASE64)) as f: - try: - with gemato.compression.open_compressed_file('bz2', f, "rb") as bz2: - self.assertEqual(bz2.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_bzip2_empty(self): - with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: - try: - with gemato.compression.open_compressed_file('bz2', f, "rb") as bz2: - self.assertEqual(bz2.read(), b'') - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_bzip2_split(self): - with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: - try: - with gemato.compression.open_compressed_file('bz2', f, "rb") as bz2: - self.assertEqual(bz2.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_bzip2_write(self): - with io.BytesIO() as f: - try: - with gemato.compression.open_compressed_file('bz2', f, 'wb') as bz2: - bz2.write(TEST_STRING) - - f.seek(0) - - with gemato.compression.open_compressed_file('bz2', f, 'rb') as bz2: - self.assertEqual(bz2.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as wf: - wf.write(base64.b64decode(self.BASE64)) - wf.flush() - - try: - with gemato.compression.open_potentially_compressed_path( - wf.name, 'rb') as cf: - self.assertEqual(cf.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path_write(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'wb') as cf: - cf.write(TEST_STRING) - - with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2: - self.assertEqual(bz2.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as wf: - try: - with gemato.compression.open_compressed_file('bz2', wf, 'wb') as bz2: - bz2.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path_write_with_unicode(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2: - self.assertEqual(bz2.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path_write_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2: - self.assertEqual(bz2.read(), UTF16_TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path_with_encoding_line_api(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as wf: - try: - with gemato.compression.open_compressed_file('bz2', wf, 'wb') as bz2: - bz2.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertListEqual([x for x in cf], - [TEST_STRING.decode('utf8')]) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - def test_open_potentially_compressed_path_fileno_passthrough(self): - with tempfile.NamedTemporaryFile(suffix='.bz2') as rf: - try: - fs1 = gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') - with fs1 as cf: - self.assertListEqual([f.fileno() for f in fs1.files], - [cf.fileno() for f in fs1.files]) - - fs2 = gemato.compression.open_potentially_compressed_path( - rf.name, 'r', encoding='utf_16_be') - with fs2 as cf: - self.assertListEqual([f.fileno() for f in fs2.files], - [cf.fileno() for f in fs2.files]) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - - -class LZMALegacyCompressionTest(unittest.TestCase): - BASE64 = b''' -XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq00+OId6EE2+twGabs85BA0Jwnrs2bs -Ghcv//zOkAA= -''' - - EMPTY_BASE64 = b''' -XQAAAAT//////////wCD//v//8AAAAA= -''' - - SPLIT_BASE64 = b''' -XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq1EvQql//X0QABdAAAABP////////// -ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== -''' - - def test_lzma_legacy(self): - with io.BytesIO(base64.b64decode(self.BASE64)) as f: - try: - with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: - self.assertEqual(lzma.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_lzma_legacy_empty(self): - with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: - try: - with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: - self.assertEqual(lzma.read(), b'') - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_lzma_legacy_split(self): - with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: - try: - with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: - self.assertEqual(lzma.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_lzma_legacy_write(self): - with io.BytesIO() as f: - try: - with gemato.compression.open_compressed_file('lzma', f, 'wb') as lzma: - lzma.write(TEST_STRING) - - f.seek(0) - - with gemato.compression.open_compressed_file('lzma', f, 'rb') as lzma: - self.assertEqual(lzma.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_lzma_legacy_as_xz(self): - """ - Test that the class rejects mislabel files. - """ - if gemato.compression.lzma is None: - raise unittest.SkipTest('xz compression unsupported') - - with io.BytesIO(base64.b64decode(self.BASE64)) as f: - with self.assertRaises(gemato.compression.lzma.LZMAError): - with gemato.compression.open_compressed_file('xz', f, "rb") as xz: - xz.read() - - def test_open_potentially_compressed_path(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as wf: - wf.write(base64.b64decode(self.BASE64)) - wf.flush() - - try: - with gemato.compression.open_potentially_compressed_path( - wf.name, 'rb') as cf: - self.assertEqual(cf.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_open_potentially_compressed_path_write(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'wb') as cf: - cf.write(TEST_STRING) - - with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma: - self.assertEqual(lzma.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_open_potentially_compressed_path_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as wf: - try: - with gemato.compression.open_compressed_file('lzma', wf, 'wb') as lzma: - lzma.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_open_potentially_compressed_path_write_with_unicode(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma: - self.assertEqual(lzma.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_open_potentially_compressed_path_write_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma: - self.assertEqual(lzma.read(), UTF16_TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_open_potentially_compressed_path_with_encoding_line_api(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as wf: - try: - with gemato.compression.open_compressed_file('lzma', wf, 'wb') as lzma: - lzma.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertListEqual([x for x in cf], - [TEST_STRING.decode('utf8')]) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - def test_open_potentially_compressed_path_fileno_passthrough(self): - with tempfile.NamedTemporaryFile(suffix='.lzma') as rf: - try: - fs1 = gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') - with fs1 as cf: - self.assertListEqual([f.fileno() for f in fs1.files], - [cf.fileno() for f in fs1.files]) - - fs2 = gemato.compression.open_potentially_compressed_path( - rf.name, 'r', encoding='utf_16_be') - with fs2 as cf: - self.assertListEqual([f.fileno() for f in fs2.files], - [cf.fileno() for f in fs2.files]) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - - -class XZCompressionTest(unittest.TestCase): - BASE64 = b''' -/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQAqVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVy -IHRoZSBsYXp5IGRvZwAAxKFK5cK4XlsAAUMrrVBuVx+2830BAAAAAARZWg== -''' - - EMPTY_BASE64 = b''' -/Td6WFoAAATm1rRGAAAAABzfRCEftvN9AQAAAAAEWVo= -''' - - SPLIT_BASE64 = b''' -/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQATVGhlIHF1aWNrIGJyb3duIGZveCAAIEFC5acaLXcA -ASwU+AptAx+2830BAAAAAARZWv03elhaAAAE5ta0RgIAIQEcAAAAEM9YzAEAFmp1bXBzIG92ZXIg -dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla -''' - - def test_xz(self): - with io.BytesIO(base64.b64decode(self.BASE64)) as f: - try: - with gemato.compression.open_compressed_file('xz', f, "rb") as xz: - self.assertEqual(xz.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_xz_empty(self): - with io.BytesIO(base64.b64decode(self.EMPTY_BASE64)) as f: - try: - with gemato.compression.open_compressed_file('xz', f, "rb") as xz: - self.assertEqual(xz.read(), b'') - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_xz_split(self): - with io.BytesIO(base64.b64decode(self.SPLIT_BASE64)) as f: - try: - with gemato.compression.open_compressed_file('xz', f, "rb") as xz: - self.assertEqual(xz.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_xz_write(self): - with io.BytesIO() as f: - try: - with gemato.compression.open_compressed_file('xz', f, 'wb') as xz: - xz.write(TEST_STRING) - - f.seek(0) - - with gemato.compression.open_compressed_file('xz', f, 'rb') as xz: - self.assertEqual(xz.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_xz_as_lzma_legacy(self): - """ - Test that the class rejects mislabel files. - """ - if gemato.compression.lzma is None: - raise unittest.SkipTest('xz compression unsupported') - - with io.BytesIO(base64.b64decode(self.BASE64)) as f: - with self.assertRaises(gemato.compression.lzma.LZMAError): - with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: - lzma.read() - - def test_open_potentially_compressed_path(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as wf: - wf.write(base64.b64decode(self.BASE64)) - wf.flush() - - try: - with gemato.compression.open_potentially_compressed_path( - wf.name, 'rb') as cf: - self.assertEqual(cf.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_open_potentially_compressed_path_write(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'wb') as cf: - cf.write(TEST_STRING) - - with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz: - self.assertEqual(xz.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_open_potentially_compressed_path_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as wf: - try: - with gemato.compression.open_compressed_file('xz', wf, 'wb') as xz: - xz.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_open_potentially_compressed_path_write_with_unicode(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz: - self.assertEqual(xz.read(), TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_open_potentially_compressed_path_write_with_encoding(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as rf: - try: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') as cf: - cf.write(TEST_STRING.decode('utf8')) - - with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz: - self.assertEqual(xz.read(), UTF16_TEST_STRING) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_open_potentially_compressed_path_with_encoding_line_api(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as wf: - try: - with gemato.compression.open_compressed_file('xz', wf, 'wb') as xz: - xz.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertListEqual([x for x in cf], - [TEST_STRING.decode('utf8')]) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - def test_open_potentially_compressed_path_fileno_passthrough(self): - with tempfile.NamedTemporaryFile(suffix='.xz') as rf: - try: - fs1 = gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') - with fs1 as cf: - self.assertListEqual([f.fileno() for f in fs1.files], - [cf.fileno() for f in fs1.files]) - - fs2 = gemato.compression.open_potentially_compressed_path( - rf.name, 'r', encoding='utf_16_be') - with fs2 as cf: - self.assertListEqual([f.fileno() for f in fs2.files], - [cf.fileno() for f in fs2.files]) - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - - -class NoCompressionTest(unittest.TestCase): - """ - Tests for non-compressed data. - """ - - def test_open_potentially_compressed_path(self): - with tempfile.NamedTemporaryFile() as wf: - wf.write(TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'rb') as cf: - self.assertEqual(cf.read(), TEST_STRING) - - def test_open_potentially_compressed_path_write(self): - with tempfile.NamedTemporaryFile() as rf: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'wb') as cf: - cf.write(TEST_STRING) - - self.assertEqual(rf.read(), TEST_STRING) - - def test_open_potentially_compressed_path_with_encoding(self): - with tempfile.NamedTemporaryFile() as wf: - wf.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) - - def test_open_potentially_compressed_path_write_with_unicode(self): - with tempfile.NamedTemporaryFile() as rf: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w') as cf: - cf.write(TEST_STRING.decode('utf8')) - - self.assertEqual(rf.read(), TEST_STRING) - - def test_open_potentially_compressed_path_write_with_encoding(self): - with tempfile.NamedTemporaryFile() as rf: - with gemato.compression.open_potentially_compressed_path( - rf.name, 'w', encoding='utf_16_be') as cf: - cf.write(TEST_STRING.decode('utf8')) - - self.assertEqual(rf.read(), UTF16_TEST_STRING) - - def test_open_potentially_compressed_path_with_encoding_line_api(self): - with tempfile.NamedTemporaryFile() as wf: - wf.write(UTF16_TEST_STRING) - wf.flush() - - with gemato.compression.open_potentially_compressed_path( - wf.name, 'r', encoding='utf_16_be') as cf: - self.assertListEqual([x for x in cf], - [TEST_STRING.decode('utf8')]) - - -class OtherUtilityTests(unittest.TestCase): - def test_get_potential_compressed_names(self): - self.assertSetEqual(frozenset(gemato.compression - .get_potential_compressed_names('test')), - frozenset([ - 'test', - 'test.gz', - 'test.bz2', - 'test.lzma', - 'test.xz', - ])) - - def test_get_compressed_suffix_from_filename(self): - self.assertEqual( - gemato.compression.get_compressed_suffix_from_filename( - 'test.gz'), 'gz') - self.assertEqual( - gemato.compression.get_compressed_suffix_from_filename( - 'test.bz2'), 'bz2') - self.assertEqual( - gemato.compression.get_compressed_suffix_from_filename( - 'test.lzma'), 'lzma') - self.assertEqual( - gemato.compression.get_compressed_suffix_from_filename( - 'test.xz'), 'xz') - self.assertIsNone( - gemato.compression.get_compressed_suffix_from_filename( - 'test')) +COMPRESSION_ALGOS = ['gz', 'bz2', 'lzma', 'xz'] + +COMPRESSION_DATA = { + 'baseline': { + None: TEST_STRING, + 'gz': b''' +H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QyCrNLShWyC9LLVIoAUrnJFZVKqTkpwMA +OaNPQSsAAAA= +''', + 'bz2': b''' +QlpoOTFBWSZTWUWd7mEAAAQTgEAABAA////wIAEABTQAAAGigAAAAEBoLtBqVm1CpOmzyfUX +Aw5PHXD0304jMvvfF3JFOFCQRZ3uYQ== +''', + 'lzma': b''' +XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq00+OId6EE2+twGabs85BA0Jwnr +s2bsGhcv//zOkAA= +''', + 'xz': b''' +/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQAqVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBv +dmVyIHRoZSBsYXp5IGRvZwAAxKFK5cK4XlsAAUMrrVBuVx+2830BAAAAAARZWg== +''', + }, + 'empty': { + None: b'', + 'gz': b'H4sIACbJ8FkAAwMAAAAAAAAAAAA=', + 'bz2': b'QlpoORdyRThQkAAAAAA=', + 'lzma': b'XQAAAAT//////////wCD//v//8AAAAA=', + 'xz': b'/Td6WFoAAATm1rRGAAAAABzfRCEftvN9AQAAAAAEWVo=', + }, + 'split': { + None: TEST_STRING, + 'gz': b''' +H4sIACbJ8FkAAwvJSFUoLM1MzlZIKsovz1NIy69QAADidbCIFAAAAB+LCAAmyfBZAAPLKs0t +KFbIL0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA= +''', + 'bz2': ''' +QlpoOTFBWSZTWQgcCrAAAAITgEAABAAbabLAIABBEaDR6jT9UoAAAbUXZJ48gnMg3xdyRThQ +kAgcCrBCWmg5MUFZJlNZOxleaAAABRGAQAAm1t8wIACAUaNDRtTaSgAAAcAcViIdSEhzctM/ +F3JFOFCQOxleaA== +''', + 'lzma': ''' +XQAAAAT//////////wAqGgiiAyVm8Ut4xaIF/y7m2dIgGq1EvQql//X0QABdAAAABP////// +////ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== +''', + 'xz': ''' +/Td6WFoAAATm1rRGAgAhARwAAAAQz1jMAQATVGhlIHF1aWNrIGJyb3duIGZveCAAIEFC5aca +LXcAASwU+AptAx+2830BAAAAAARZWv03elhaAAAE5ta0RgIAIQEcAAAAEM9YzAEAFmp1bXBz +IG92ZXIgdGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla +''', + }, +} + + +@pytest.mark.parametrize('suffix', COMPRESSION_ALGOS) +@pytest.mark.parametrize('data_group', COMPRESSION_DATA.keys()) +def test_decompress(suffix, data_group): + data = COMPRESSION_DATA[data_group] + with io.BytesIO(base64.b64decode(data[suffix])) as f: + with gemato.compression.open_compressed_file(suffix, f, "rb") as z: + assert z.read() == data[None] + + +@pytest.mark.parametrize('suffix', COMPRESSION_ALGOS) +def test_round_trip(suffix): + with io.BytesIO() as f: + with gemato.compression.open_compressed_file(suffix, f, 'wb') as z: + z.write(TEST_STRING) + + f.seek(0) + + with gemato.compression.open_compressed_file(suffix, f, 'rb') as z: + assert z.read() == TEST_STRING + + +@pytest.fixture(params=COMPRESSION_ALGOS) +def test_file(tmp_path, request): + yield tmp_path / f'test.{request.param}' + + +@pytest.mark.parametrize('data_group', COMPRESSION_DATA.keys()) +def test_open_potentially_compressed_path(test_file, data_group): + suffix = test_file.suffix.lstrip('.') + with open(test_file, 'wb') as wf: + wf.write(base64.b64decode(COMPRESSION_DATA[data_group][suffix])) + + with gemato.compression.open_potentially_compressed_path( + test_file, 'rb') as cf: + assert cf.read() == COMPRESSION_DATA[data_group][None] + + +def test_open_potentially_compressed_path_write(test_file): + with gemato.compression.open_potentially_compressed_path( + test_file, 'wb') as cf: + cf.write(TEST_STRING) + + suffix = test_file.suffix.lstrip('.') + with open(test_file, 'rb') as rf: + with gemato.compression.open_compressed_file(suffix, rf, 'rb') as z: + assert z.read() == TEST_STRING + + +def test_open_potentially_compressed_path_with_encoding(test_file): + suffix = test_file.suffix.lstrip('.') + with open(test_file, 'wb') as wf: + with gemato.compression.open_compressed_file(suffix, wf, 'wb') as z: + z.write(UTF16_TEST_STRING) + + with gemato.compression.open_potentially_compressed_path( + test_file, 'r', encoding='utf_16_be') as cf: + assert cf.read() == TEST_STRING.decode('utf8') + + +@pytest.mark.parametrize('encoding,out_var', [(None, 'TEST_STRING'), + ('utf_16_be', + 'UTF16_TEST_STRING'), + ]) +def test_open_potentially_compressed_path_write_with_unicode( + test_file, encoding, out_var): + kwargs = {} + if encoding is not None: + kwargs['encoding'] = encoding + with gemato.compression.open_potentially_compressed_path( + test_file, 'w', **kwargs) as cf: + cf.write(TEST_STRING.decode('utf8')) + + suffix = test_file.suffix.lstrip('.') + with open(test_file, 'rb') as rf: + with gemato.compression.open_compressed_file(suffix, rf, 'rb') as z: + assert z.read() == globals()[out_var] + + +def test_open_potentially_compressed_path_with_encoding_line_api(test_file): + suffix = test_file.suffix.lstrip('.') + with open(test_file, 'wb') as wf: + with gemato.compression.open_compressed_file(suffix, wf, 'wb') as z: + z.write(UTF16_TEST_STRING) + + with gemato.compression.open_potentially_compressed_path( + test_file, 'r', encoding='utf_16_be') as cf: + assert [x for x in cf] == [TEST_STRING.decode('utf8')] + + +def test_open_potentially_compressed_path_fileno_passthrough(test_file): + fs1 = gemato.compression.open_potentially_compressed_path( + test_file, 'w', encoding='utf_16_be') + with fs1 as cf: + assert ([f.fileno() for f in fs1.files] == + [cf.fileno() for f in fs1.files]) + + fs2 = gemato.compression.open_potentially_compressed_path( + test_file, 'r', encoding='utf_16_be') + with fs2 as cf: + assert ([f.fileno() for f in fs2.files] == + [cf.fileno() for f in fs2.files]) + + +def test_get_potential_compressed_names(): + assert ( + frozenset( + gemato.compression .get_potential_compressed_names('test')) == + frozenset(['test'] + [f'test.{sfx}' for sfx in COMPRESSION_ALGOS])) + + +@pytest.mark.parametrize('suffix', COMPRESSION_ALGOS) +def test_get_compressed_suffix_from_filename(suffix): + assert ( + gemato.compression.get_compressed_suffix_from_filename( + f'test.{suffix}') == suffix) |