summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/compression.py14
-rw-r--r--tests/test_compression.py53
2 files changed, 62 insertions, 5 deletions
diff --git a/gemato/compression.py b/gemato/compression.py
index f909ea5..be1471a 100644
--- a/gemato/compression.py
+++ b/gemato/compression.py
@@ -29,13 +29,17 @@ except ImportError:
import gemato.exceptions
-def open_compressed_file(suffix, f, mode='r'):
+def open_compressed_file(suffix, f, mode='rb'):
"""
Get a file-like object for an open compressed file @fileobj
of format @suffix. The file should be open in binary mode
and positioned at the beginning. @suffix should specify a standard
suffix for the compression format without the leading dot,
- e.g. "gz", "bz2".
+ e.g. "gz", "bz2". @mode specifies the mode to pass to
+ the compressor.
+
+ Note that independently of @mode, the returned file objects
+ are always open in binary mode (i.e. expect bytestrings).
"""
if suffix == "gz":
@@ -107,9 +111,9 @@ def open_potentially_compressed_path(path, mode, **kwargs):
cf = open_compressed_file(ext[1:], f, bmode if kwargs else mode)
fs.files.append(cf)
- # special args are not supported by compressor backends
- # so add a TextIOWrapper on top
- if kwargs:
+ # add a TextIOWrapper on top whenever we do not want
+ # the standard compressor file binary mode
+ if 'b' not in mode:
iow = io.TextIOWrapper(cf, **kwargs)
fs.files.append(iow)
except:
diff --git a/tests/test_compression.py b/tests/test_compression.py
index cdb8c59..5f614d3 100644
--- a/tests/test_compression.py
+++ b/tests/test_compression.py
@@ -84,6 +84,15 @@ L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA=
wf.name, 'r', encoding='utf_16_be') as cf:
self.assertEqual(cf.read(), TEST_STRING.decode('utf8'))
+ def test_open_potentially_compressed_path_write_with_unicode(self):
+ with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'w') as cf:
+ cf.write(TEST_STRING.decode('utf8'))
+
+ with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz:
+ self.assertEqual(gz.read(), TEST_STRING)
+
def test_open_potentially_compressed_path_write_with_encoding(self):
with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
with gemato.compression.open_potentially_compressed_path(
@@ -209,6 +218,18 @@ OxleaA==
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('bz2 compression unsupported')
+ def test_open_potentially_compressed_path_write_with_unicode(self):
+ with tempfile.NamedTemporaryFile(suffix='.bz2') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'w') as cf:
+ cf.write(TEST_STRING.decode('utf8'))
+
+ with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2:
+ self.assertEqual(bz2.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
def test_open_potentially_compressed_path_write_with_encoding(self):
with tempfile.NamedTemporaryFile(suffix='.bz2') as rf:
try:
@@ -354,6 +375,18 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA==
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('lzma compression unsupported')
+ def test_open_potentially_compressed_path_write_with_unicode(self):
+ with tempfile.NamedTemporaryFile(suffix='.lzma') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'w') as cf:
+ cf.write(TEST_STRING.decode('utf8'))
+
+ with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma:
+ self.assertEqual(lzma.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
def test_open_potentially_compressed_path_write_with_encoding(self):
with tempfile.NamedTemporaryFile(suffix='.lzma') as rf:
try:
@@ -500,6 +533,18 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('xz compression unsupported')
+ def test_open_potentially_compressed_path_write_with_unicode(self):
+ with tempfile.NamedTemporaryFile(suffix='.xz') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'w') as cf:
+ cf.write(TEST_STRING.decode('utf8'))
+
+ with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz:
+ self.assertEqual(xz.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
def test_open_potentially_compressed_path_write_with_encoding(self):
with tempfile.NamedTemporaryFile(suffix='.xz') as rf:
try:
@@ -575,6 +620,14 @@ class NoCompressionTest(unittest.TestCase):
wf.name, 'r', encoding='utf_16_be') as cf:
self.assertEqual(cf.read(), TEST_STRING.decode('utf8'))
+ def test_open_potentially_compressed_path_write_with_unicode(self):
+ with tempfile.NamedTemporaryFile() as rf:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'w') as cf:
+ cf.write(TEST_STRING.decode('utf8'))
+
+ self.assertEqual(rf.read(), TEST_STRING)
+
def test_open_potentially_compressed_path_write_with_encoding(self):
with tempfile.NamedTemporaryFile() as rf:
with gemato.compression.open_potentially_compressed_path(