summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/find_top_level.py18
-rw-r--r--tests/test_find_top_level.py28
-rw-r--r--tests/test_recursiveloader.py38
3 files changed, 35 insertions, 49 deletions
diff --git a/gemato/find_top_level.py b/gemato/find_top_level.py
index b70abdf..23a90d4 100644
--- a/gemato/find_top_level.py
+++ b/gemato/find_top_level.py
@@ -11,11 +11,16 @@ import gemato.compression
import gemato.manifest
-def find_top_level_manifest(path='.'):
+def find_top_level_manifest(path='.', allow_compressed=False):
"""
Find top-level Manifest file that covers @path (defaults
to the current directory). Returns the path to the Manifest
or None.
+
+ If @allow_compressed is true, the function allows the top-level
+ Manifest to be compressed and opens all compressed files *without*
+ verifying them first. It is false by default to prevent zip bombs
+ and other decompression attacks.
"""
cur_path = path
@@ -25,6 +30,11 @@ def find_top_level_manifest(path='.'):
root_st = os.stat('/')
+ manifest_filenames = ('Manifest',)
+ if allow_compressed:
+ manifest_filenames = list(gemato.compression
+ .get_potential_compressed_names('Manifest'))
+
while True:
st = os.stat(cur_path)
@@ -35,9 +45,11 @@ def find_top_level_manifest(path='.'):
break
m_path = os.path.join(cur_path, 'Manifest')
- for m_path in (gemato.compression
- .get_potential_compressed_names(m_path)):
+ for m_name in manifest_filenames:
+ m_path = os.path.join(cur_path, m_name)
try:
+ # note: this is safe for allow_compressed=False
+ # since it detects compression by filename suffix
with (gemato.compression
.open_potentially_compressed_path(m_path, 'r',
encoding='utf8')) as f:
diff --git a/tests/test_find_top_level.py b/tests/test_find_top_level.py
index 2218799..5e20453 100644
--- a/tests/test_find_top_level.py
+++ b/tests/test_find_top_level.py
@@ -207,10 +207,15 @@ class TestCompressedManifest(TempDirTestCase):
with gzip.GzipFile(os.path.join(self.dir, 'subc/sub/Manifest.gz'), 'wb'):
pass
+ def test_find_top_level_manifest_no_allow_compressed(self):
+ self.assertIsNone(
+ gemato.find_top_level.find_top_level_manifest(self.dir))
+
def test_find_top_level_manifest(self):
self.assertEqual(
os.path.relpath(
- gemato.find_top_level.find_top_level_manifest(self.dir),
+ gemato.find_top_level.find_top_level_manifest(
+ self.dir, allow_compressed=True),
self.dir),
'Manifest.gz')
@@ -218,7 +223,8 @@ class TestCompressedManifest(TempDirTestCase):
self.assertEqual(
os.path.relpath(
gemato.find_top_level.find_top_level_manifest(
- os.path.join(self.dir, 'suba')),
+ os.path.join(self.dir, 'suba'),
+ allow_compressed=True),
self.dir),
'Manifest.gz')
@@ -226,7 +232,8 @@ class TestCompressedManifest(TempDirTestCase):
self.assertEqual(
os.path.relpath(
gemato.find_top_level.find_top_level_manifest(
- os.path.join(self.dir, 'subb')),
+ os.path.join(self.dir, 'subb'),
+ allow_compressed=True),
self.dir),
'Manifest.gz')
@@ -234,7 +241,8 @@ class TestCompressedManifest(TempDirTestCase):
self.assertEqual(
os.path.relpath(
gemato.find_top_level.find_top_level_manifest(
- os.path.join(self.dir, 'subc', 'sub')),
+ os.path.join(self.dir, 'subc', 'sub'),
+ allow_compressed=True),
self.dir),
'Manifest.gz')
@@ -256,20 +264,23 @@ class TestCompressedManifestWithIgnore(TempDirTestCase):
def test_find_top_level_manifest(self):
self.assertEqual(
os.path.relpath(
- gemato.find_top_level.find_top_level_manifest(self.dir),
+ gemato.find_top_level.find_top_level_manifest(
+ self.dir, allow_compressed=True),
self.dir),
'Manifest.gz')
def test_find_top_level_manifest_from_ignored_empty_subdir(self):
self.assertIsNone(
gemato.find_top_level.find_top_level_manifest(
- os.path.join(self.dir, 'suba')))
+ os.path.join(self.dir, 'suba'),
+ allow_compressed=True))
def test_find_top_level_manifest_from_manifest_subdir(self):
self.assertEqual(
os.path.relpath(
gemato.find_top_level.find_top_level_manifest(
- os.path.join(self.dir, 'subb')),
+ os.path.join(self.dir, 'subb'),
+ allow_compressed=True),
self.dir),
'Manifest.gz')
@@ -277,6 +288,7 @@ class TestCompressedManifestWithIgnore(TempDirTestCase):
self.assertEqual(
os.path.relpath(
gemato.find_top_level.find_top_level_manifest(
- os.path.join(self.dir, 'subc', 'sub')),
+ os.path.join(self.dir, 'subc', 'sub'),
+ allow_compressed=True),
self.dir),
'subc/sub/Manifest.gz')
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
index 3f7d41d..5e4df3d 100644
--- a/tests/test_recursiveloader.py
+++ b/tests/test_recursiveloader.py
@@ -1790,11 +1790,6 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
self.manifest_gz)
m.assert_directory_verifies('')
- def test_cli_verifies(self):
- self.assertEqual(
- gemato.cli.main(['gemato', 'verify', self.dir]),
- 0)
-
def test_save_manifest(self):
m = gemato.recursiveloader.ManifestRecursiveLoader(
os.path.join(self.dir, 'Manifest.gz'))
@@ -1841,30 +1836,6 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
self.assertTrue(os.path.exists(
os.path.join(self.dir, 'Manifest')))
- def test_cli_decompress_manifests_low_watermark(self):
- self.assertEqual(
- gemato.cli.main(['gemato', 'update',
- '--hashes=SHA256 SHA512',
- '--compress-watermark=0',
- self.dir]),
- 0)
- self.assertFalse(os.path.exists(
- os.path.join(self.dir, 'Manifest')))
- self.assertTrue(os.path.exists(
- os.path.join(self.dir, 'Manifest.gz')))
-
- def test_cli_decompress_manifests_high_watermark(self):
- self.assertEqual(
- gemato.cli.main(['gemato', 'update',
- '--hashes=SHA256 SHA512',
- '--compress-watermark=4096',
- self.dir]),
- 0)
- self.assertFalse(os.path.exists(
- os.path.join(self.dir, 'Manifest.gz')))
- self.assertTrue(os.path.exists(
- os.path.join(self.dir, 'Manifest')))
-
class CompressedSubManifestTest(TempDirTestCase):
"""
@@ -2035,15 +2006,6 @@ MANIFEST a/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e
m.save_manifests()
m.assert_directory_verifies()
- def test_cli_update(self):
- self.assertEqual(
- gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
- self.dir]),
- 0)
- self.assertEqual(
- gemato.cli.main(['gemato', 'verify', self.dir]),
- 0)
-
class MultipleSubdirectoryFilesTest(TempDirTestCase):
"""