diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-10-25 22:24:09 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-10-25 22:40:51 +0200 |
commit | bd4491d09b69159c89b53fe930f4b4339a1dc042 (patch) | |
tree | 283dd0cf8f20b803917a3a8244dfe5c34b22c226 /tests/test_compression.py | |
parent | 2c734223401e0cca9947794cc6a2676eada5f962 (diff) | |
download | gemato-bd4491d09b69159c89b53fe930f4b4339a1dc042.tar.gz |
compression: Support passing text-mode kwargs to open
Diffstat (limited to 'tests/test_compression.py')
-rw-r--r-- | tests/test_compression.py | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/tests/test_compression.py b/tests/test_compression.py index e0deaf3..97b319a 100644 --- a/tests/test_compression.py +++ b/tests/test_compression.py @@ -12,6 +12,8 @@ import gemato.compression TEST_STRING = b'The quick brown fox jumps over the lazy dog' +# we need to be specific on endianness to avoid unreliably writing BOM +UTF16_TEST_STRING = TEST_STRING.decode('utf8').encode('utf_16_be') class GzipCompressionTest(unittest.TestCase): @@ -72,6 +74,25 @@ L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA= with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz: self.assertEqual(gz.read(), TEST_STRING) + def test_open_potentially_compressed_path_with_encoding(self): + with tempfile.NamedTemporaryFile(suffix='.gz') as wf: + with gemato.compression.open_compressed_file('gz', wf, 'wb') as gz: + gz.write(UTF16_TEST_STRING) + wf.flush() + + with gemato.compression.open_potentially_compressed_path( + wf.name, 'r', encoding='utf_16_be') as cf: + self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) + + def test_open_potentially_compressed_path_write_with_encoding(self): + with tempfile.NamedTemporaryFile(suffix='.gz') as rf: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'w', encoding='utf_16_be') as cf: + cf.write(TEST_STRING.decode('utf8')) + + with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz: + self.assertEqual(gz.read(), UTF16_TEST_STRING) + class Bzip2CompressionTest(unittest.TestCase): BASE64 = b''' @@ -150,6 +171,31 @@ OxleaA== except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('bz2 compression unsupported') + def test_open_potentially_compressed_path_with_encoding(self): + with tempfile.NamedTemporaryFile(suffix='.bz2') as wf: + try: + with gemato.compression.open_compressed_file('bz2', wf, 'wb') as bz2: + bz2.write(UTF16_TEST_STRING) + wf.flush() + + with gemato.compression.open_potentially_compressed_path( + wf.name, 'r', encoding='utf_16_be') as cf: + self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + + def test_open_potentially_compressed_path_write_with_encoding(self): + with tempfile.NamedTemporaryFile(suffix='.bz2') as rf: + try: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'w', encoding='utf_16_be') as cf: + cf.write(TEST_STRING.decode('utf8')) + + with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2: + self.assertEqual(bz2.read(), UTF16_TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('bz2 compression unsupported') + class LZMALegacyCompressionTest(unittest.TestCase): BASE64 = b''' @@ -239,6 +285,31 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA== except gemato.exceptions.UnsupportedCompression: raise unittest.SkipTest('lzma compression unsupported') + def test_open_potentially_compressed_path_with_encoding(self): + with tempfile.NamedTemporaryFile(suffix='.lzma') as wf: + try: + with gemato.compression.open_compressed_file('lzma', wf, 'wb') as lzma: + lzma.write(UTF16_TEST_STRING) + wf.flush() + + with gemato.compression.open_potentially_compressed_path( + wf.name, 'r', encoding='utf_16_be') as cf: + self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + + def test_open_potentially_compressed_path_write_with_encoding(self): + with tempfile.NamedTemporaryFile(suffix='.lzma') as rf: + try: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'w', encoding='utf_16_be') as cf: + cf.write(TEST_STRING.decode('utf8')) + + with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma: + self.assertEqual(lzma.read(), UTF16_TEST_STRING) + except gemato.exceptions.UnsupportedCompression: + raise unittest.SkipTest('lzma compression unsupported') + class XZCompressionTest(unittest.TestCase): BASE64 = b''' @@ -351,3 +422,20 @@ class NoCompressionTest(unittest.TestCase): cf.write(TEST_STRING) self.assertEqual(rf.read(), TEST_STRING) + + def test_open_potentially_compressed_path_with_encoding(self): + with tempfile.NamedTemporaryFile() as wf: + wf.write(UTF16_TEST_STRING) + wf.flush() + + with gemato.compression.open_potentially_compressed_path( + wf.name, 'r', encoding='utf_16_be') as cf: + self.assertEqual(cf.read(), TEST_STRING.decode('utf8')) + + def test_open_potentially_compressed_path_write_with_encoding(self): + with tempfile.NamedTemporaryFile() as rf: + with gemato.compression.open_potentially_compressed_path( + rf.name, 'w', encoding='utf_16_be') as cf: + cf.write(TEST_STRING.decode('utf8')) + + self.assertEqual(rf.read(), UTF16_TEST_STRING) |