summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/compression.py9
-rw-r--r--tests/test_compression.py88
2 files changed, 97 insertions, 0 deletions
diff --git a/gemato/compression.py b/gemato/compression.py
index 2c810a2..f909ea5 100644
--- a/gemato/compression.py
+++ b/gemato/compression.py
@@ -39,6 +39,15 @@ def open_compressed_file(suffix, f, mode='r'):
"""
if suffix == "gz":
+ # work-around the deficiency in GzipFile class in py<3.3 causing
+ # it to break with TextIOWrapper
+ if sys.version_info < (3, 3):
+ class FixedGzipFile(gzip.GzipFile):
+ def read1(self, *args, **kwargs):
+ return self.read(*args, **kwargs)
+
+ return FixedGzipFile(fileobj=f, mode=mode)
+
return gzip.GzipFile(fileobj=f, mode=mode)
elif suffix == "bz2" and bz2 is not None:
return bz2.BZ2File(f, mode=mode)
diff --git a/tests/test_compression.py b/tests/test_compression.py
index 820f23d..1c1ff68 100644
--- a/tests/test_compression.py
+++ b/tests/test_compression.py
@@ -93,6 +93,17 @@ L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA=
with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz:
self.assertEqual(gz.read(), UTF16_TEST_STRING)
+ def test_open_potentially_compressed_path_with_encoding_line_api(self):
+ with tempfile.NamedTemporaryFile(suffix='.gz') as wf:
+ with gemato.compression.open_compressed_file('gz', wf, 'wb') as gz:
+ gz.write(UTF16_TEST_STRING)
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'r', encoding='utf_16_be') as cf:
+ self.assertListEqual([x for x in cf],
+ [TEST_STRING.decode('utf8')])
+
def test_open_potentially_compressed_path_fileno_passthrough(self):
with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
fs1 = gemato.compression.open_potentially_compressed_path(
@@ -210,6 +221,20 @@ OxleaA==
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('bz2 compression unsupported')
+ def test_open_potentially_compressed_path_with_encoding_line_api(self):
+ with tempfile.NamedTemporaryFile(suffix='.bz2') as wf:
+ try:
+ with gemato.compression.open_compressed_file('bz2', wf, 'wb') as bz2:
+ bz2.write(UTF16_TEST_STRING)
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'r', encoding='utf_16_be') as cf:
+ self.assertListEqual([x for x in cf],
+ [TEST_STRING.decode('utf8')])
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
def test_open_potentially_compressed_path_fileno_passthrough(self):
with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
fs1 = gemato.compression.open_potentially_compressed_path(
@@ -338,6 +363,20 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA==
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('lzma compression unsupported')
+ def test_open_potentially_compressed_path_with_encoding_line_api(self):
+ with tempfile.NamedTemporaryFile(suffix='.lzma') as wf:
+ try:
+ with gemato.compression.open_compressed_file('lzma', wf, 'wb') as lzma:
+ lzma.write(UTF16_TEST_STRING)
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'r', encoding='utf_16_be') as cf:
+ self.assertListEqual([x for x in cf],
+ [TEST_STRING.decode('utf8')])
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
def test_open_potentially_compressed_path_fileno_passthrough(self):
with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
fs1 = gemato.compression.open_potentially_compressed_path(
@@ -442,6 +481,45 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('xz compression unsupported')
+ def test_open_potentially_compressed_path_with_encoding(self):
+ with tempfile.NamedTemporaryFile(suffix='.xz') as wf:
+ try:
+ with gemato.compression.open_compressed_file('xz', wf, 'wb') as xz:
+ xz.write(UTF16_TEST_STRING)
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'r', encoding='utf_16_be') as cf:
+ self.assertEqual(cf.read(), TEST_STRING.decode('utf8'))
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ def test_open_potentially_compressed_path_write_with_encoding(self):
+ with tempfile.NamedTemporaryFile(suffix='.xz') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'w', encoding='utf_16_be') as cf:
+ cf.write(TEST_STRING.decode('utf8'))
+
+ with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz:
+ self.assertEqual(xz.read(), UTF16_TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ def test_open_potentially_compressed_path_with_encoding_line_api(self):
+ with tempfile.NamedTemporaryFile(suffix='.xz') as wf:
+ try:
+ with gemato.compression.open_compressed_file('xz', wf, 'wb') as xz:
+ xz.write(UTF16_TEST_STRING)
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'r', encoding='utf_16_be') as cf:
+ self.assertListEqual([x for x in cf],
+ [TEST_STRING.decode('utf8')])
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
def test_open_potentially_compressed_path_fileno_passthrough(self):
with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
fs1 = gemato.compression.open_potentially_compressed_path(
@@ -496,6 +574,16 @@ class NoCompressionTest(unittest.TestCase):
self.assertEqual(rf.read(), UTF16_TEST_STRING)
+ def test_open_potentially_compressed_path_with_encoding_line_api(self):
+ with tempfile.NamedTemporaryFile() as wf:
+ wf.write(UTF16_TEST_STRING)
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'r', encoding='utf_16_be') as cf:
+ self.assertListEqual([x for x in cf],
+ [TEST_STRING.decode('utf8')])
+
class OtherUtilityTests(unittest.TestCase):
def test_get_potential_compressed_names(self):