diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-10-25 20:55:55 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-10-25 20:55:55 +0200 |
commit | 193e60eb056851db7a5b9bfe017c504896250ab3 (patch) | |
tree | ded0d278484a22d00c2e6f0067adadb89a4d0ec7 /tests/test_compression.py | |
parent | 85b27b59a5e9014f838f5ddc6dcb94424f78aaee (diff) | |
download | gemato-193e60eb056851db7a5b9bfe017c504896250ab3.tar.gz |
compression: Introduce a generic opener for files
Diffstat (limited to 'tests/test_compression.py')
-rw-r--r-- | tests/test_compression.py | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/tests/test_compression.py b/tests/test_compression.py index a81e80f..0870a1d 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -5,6 +5,7 @@ import base64 import io +import tempfile import unittest import gemato.compression @@ -53,6 +54,24 @@ L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA= with gemato.compression.open_compressed_file('gz', f, 'rb') as gz: self.assertEqual(gz.read(), TEST_STRING) + def test_open_potentially_compressed_path(self): + with tempfile.NamedTemporaryFile(suffix='.gz') as wf: + wf.write(base64.b64decode(self.BASE64)) + wf.flush() + + with gemato.compression.open_potentially_compressed_path( + wf.name, 'rb') as cf: + self.assertEqual(cf.read(), TEST_STRING) + + def test_open_potentially_compressed_path_write(self): + with tempfile.NamedTemporaryFile(suffix='.gz') as rf: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'wb') as cf: + cf.write(TEST_STRING) + + with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz: + self.assertEqual(gz.read(), TEST_STRING) + class Bzip2CompressionTest(unittest.TestCase): BASE64 = b''' @@ -107,6 +126,30 @@ OxleaA== except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('bz2 compression unsupported') + def test_open_potentially_compressed_path(self): + with tempfile.NamedTemporaryFile(suffix='.bz2') as wf: + wf.write(base64.b64decode(self.BASE64)) + wf.flush() + + try: + with gemato.compression.open_potentially_compressed_path( + wf.name, 'rb') as cf: + self.assertEqual(cf.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + + def test_open_potentially_compressed_path_write(self): + with tempfile.NamedTemporaryFile(suffix='.bz2') as rf: + try: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'wb') as cf: + cf.write(TEST_STRING) + + with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2: + self.assertEqual(bz2.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + class LZMALegacyCompressionTest(unittest.TestCase): BASE64 = b''' @@ -172,6 +215,30 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== with gemato.compression.open_compressed_file('xz', f, "rb") as xz: xz.read() + def test_open_potentially_compressed_path(self): + with tempfile.NamedTemporaryFile(suffix='.lzma') as wf: + wf.write(base64.b64decode(self.BASE64)) + wf.flush() + + try: + with gemato.compression.open_potentially_compressed_path( + wf.name, 'rb') as cf: + self.assertEqual(cf.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + + def test_open_potentially_compressed_path_write(self): + with tempfile.NamedTemporaryFile(suffix='.lzma') as rf: + try: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'wb') as cf: + cf.write(TEST_STRING) + + with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma: + self.assertEqual(lzma.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + class XZCompressionTest(unittest.TestCase): BASE64 = b''' @@ -237,3 +304,27 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla with self.assertRaises(gemato.compression.lzma.LZMAError): with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma: lzma.read() + + def test_open_potentially_compressed_path(self): + with tempfile.NamedTemporaryFile(suffix='.xz') as wf: + wf.write(base64.b64decode(self.BASE64)) + wf.flush() + + try: + with gemato.compression.open_potentially_compressed_path( + wf.name, 'rb') as cf: + self.assertEqual(cf.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('xz compression unsupported') + + def test_open_potentially_compressed_path_write(self): + with tempfile.NamedTemporaryFile(suffix='.xz') as rf: + try: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'wb') as cf: + cf.write(TEST_STRING) + + with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz: + self.assertEqual(xz.read(), TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('xz compression unsupported') |