summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2017-10-25 20:55:55 +0200
committerMichał Górny <mgorny@gentoo.org>2017-10-25 20:55:55 +0200
commit193e60eb056851db7a5b9bfe017c504896250ab3 (patch)
treeded0d278484a22d00c2e6f0067adadb89a4d0ec7 /tests
parent85b27b59a5e9014f838f5ddc6dcb94424f78aaee (diff)
downloadgemato-193e60eb056851db7a5b9bfe017c504896250ab3.tar.gz
compression: Introduce a generic opener for files
Diffstat (limited to 'tests')
-rw-r--r--tests/test_compression.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/tests/test_compression.py b/tests/test_compression.py
index a81e80f..0870a1d 100644
--- a/tests/test_compression.py
+++ b/tests/test_compression.py
@@ -5,6 +5,7 @@
import base64
import io
+import tempfile
import unittest
import gemato.compression
@@ -53,6 +54,24 @@ L0stUijJSFXISayqVEjJTwcAlGd4GBcAAAA=
with gemato.compression.open_compressed_file('gz', f, 'rb') as gz:
self.assertEqual(gz.read(), TEST_STRING)
+ def test_open_potentially_compressed_path(self):
+ with tempfile.NamedTemporaryFile(suffix='.gz') as wf:
+ wf.write(base64.b64decode(self.BASE64))
+ wf.flush()
+
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'rb') as cf:
+ self.assertEqual(cf.read(), TEST_STRING)
+
+ def test_open_potentially_compressed_path_write(self):
+ with tempfile.NamedTemporaryFile(suffix='.gz') as rf:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'wb') as cf:
+ cf.write(TEST_STRING)
+
+ with gemato.compression.open_compressed_file('gz', rf, 'rb') as gz:
+ self.assertEqual(gz.read(), TEST_STRING)
+
class Bzip2CompressionTest(unittest.TestCase):
BASE64 = b'''
@@ -107,6 +126,30 @@ OxleaA==
except gemato.exceptions.UnsupportedCompression:
raise unittest.SkipTest('bz2 compression unsupported')
+ def test_open_potentially_compressed_path(self):
+ with tempfile.NamedTemporaryFile(suffix='.bz2') as wf:
+ wf.write(base64.b64decode(self.BASE64))
+ wf.flush()
+
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'rb') as cf:
+ self.assertEqual(cf.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
+ def test_open_potentially_compressed_path_write(self):
+ with tempfile.NamedTemporaryFile(suffix='.bz2') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'wb') as cf:
+ cf.write(TEST_STRING)
+
+ with gemato.compression.open_compressed_file('bz2', rf, 'rb') as bz2:
+ self.assertEqual(bz2.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('bz2 compression unsupported')
+
class LZMALegacyCompressionTest(unittest.TestCase):
BASE64 = b'''
@@ -172,6 +215,30 @@ ADUdSd6zBOkOpekGFH46zix9wE9VT65OVeV479//7uUAAA==
with gemato.compression.open_compressed_file('xz', f, "rb") as xz:
xz.read()
+ def test_open_potentially_compressed_path(self):
+ with tempfile.NamedTemporaryFile(suffix='.lzma') as wf:
+ wf.write(base64.b64decode(self.BASE64))
+ wf.flush()
+
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'rb') as cf:
+ self.assertEqual(cf.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
+ def test_open_potentially_compressed_path_write(self):
+ with tempfile.NamedTemporaryFile(suffix='.lzma') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'wb') as cf:
+ cf.write(TEST_STRING)
+
+ with gemato.compression.open_compressed_file('lzma', rf, 'rb') as lzma:
+ self.assertEqual(lzma.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('lzma compression unsupported')
+
class XZCompressionTest(unittest.TestCase):
BASE64 = b'''
@@ -237,3 +304,27 @@ dGhlIGxhenkgZG9nAADjZCTmHjHqggABLxeBCEmxH7bzfQEAAAAABFla
with self.assertRaises(gemato.compression.lzma.LZMAError):
with gemato.compression.open_compressed_file('lzma', f, "rb") as lzma:
lzma.read()
+
+ def test_open_potentially_compressed_path(self):
+ with tempfile.NamedTemporaryFile(suffix='.xz') as wf:
+ wf.write(base64.b64decode(self.BASE64))
+ wf.flush()
+
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ wf.name, 'rb') as cf:
+ self.assertEqual(cf.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')
+
+ def test_open_potentially_compressed_path_write(self):
+ with tempfile.NamedTemporaryFile(suffix='.xz') as rf:
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ rf.name, 'wb') as cf:
+ cf.write(TEST_STRING)
+
+ with gemato.compression.open_compressed_file('xz', rf, 'rb') as xz:
+ self.assertEqual(xz.read(), TEST_STRING)
+ except gemato.exceptions.UnsupportedCompression:
+ raise unittest.SkipTest('xz compression unsupported')