diff options
-rw-r--r-- | gemato/find_top_level.py | 18 | ||||
-rw-r--r-- | tests/test_find_top_level.py | 28 | ||||
-rw-r--r-- | tests/test_recursiveloader.py | 38 |
3 files changed, 35 insertions, 49 deletions
diff --git a/gemato/find_top_level.py b/gemato/find_top_level.py index b70abdf..23a90d4 100644 --- a/gemato/find_top_level.py +++ b/gemato/find_top_level.py @@ -11,11 +11,16 @@ import gemato.compression import gemato.manifest -def find_top_level_manifest(path='.'): +def find_top_level_manifest(path='.', allow_compressed=False): """ Find top-level Manifest file that covers @path (defaults to the current directory). Returns the path to the Manifest or None. + + If @allow_compressed is true, the function allows the top-level + Manifest to be compressed and opens all compressed files *without* + verifying them first. It is false by default to prevent zip bombs + and other decompression attacks. """ cur_path = path @@ -25,6 +30,11 @@ def find_top_level_manifest(path='.'): root_st = os.stat('/') + manifest_filenames = ('Manifest',) + if allow_compressed: + manifest_filenames = list(gemato.compression + .get_potential_compressed_names('Manifest')) + while True: st = os.stat(cur_path) @@ -35,9 +45,11 @@ def find_top_level_manifest(path='.'): break m_path = os.path.join(cur_path, 'Manifest') - for m_path in (gemato.compression - .get_potential_compressed_names(m_path)): + for m_name in manifest_filenames: + m_path = os.path.join(cur_path, m_name) try: + # note: this is safe for allow_compressed=False + # since it detects compression by filename suffix with (gemato.compression .open_potentially_compressed_path(m_path, 'r', encoding='utf8')) as f: diff --git a/tests/test_find_top_level.py b/tests/test_find_top_level.py index 2218799..5e20453 100644 --- a/tests/test_find_top_level.py +++ b/tests/test_find_top_level.py @@ -207,10 +207,15 @@ class TestCompressedManifest(TempDirTestCase): with gzip.GzipFile(os.path.join(self.dir, 'subc/sub/Manifest.gz'), 'wb'): pass + def test_find_top_level_manifest_no_allow_compressed(self): + self.assertIsNone( + gemato.find_top_level.find_top_level_manifest(self.dir)) + def test_find_top_level_manifest(self): self.assertEqual( os.path.relpath( - gemato.find_top_level.find_top_level_manifest(self.dir), + gemato.find_top_level.find_top_level_manifest( + self.dir, allow_compressed=True), self.dir), 'Manifest.gz') @@ -218,7 +223,8 @@ class TestCompressedManifest(TempDirTestCase): self.assertEqual( os.path.relpath( gemato.find_top_level.find_top_level_manifest( - os.path.join(self.dir, 'suba')), + os.path.join(self.dir, 'suba'), + allow_compressed=True), self.dir), 'Manifest.gz') @@ -226,7 +232,8 @@ class TestCompressedManifest(TempDirTestCase): self.assertEqual( os.path.relpath( gemato.find_top_level.find_top_level_manifest( - os.path.join(self.dir, 'subb')), + os.path.join(self.dir, 'subb'), + allow_compressed=True), self.dir), 'Manifest.gz') @@ -234,7 +241,8 @@ class TestCompressedManifest(TempDirTestCase): self.assertEqual( os.path.relpath( gemato.find_top_level.find_top_level_manifest( - os.path.join(self.dir, 'subc', 'sub')), + os.path.join(self.dir, 'subc', 'sub'), + allow_compressed=True), self.dir), 'Manifest.gz') @@ -256,20 +264,23 @@ class TestCompressedManifestWithIgnore(TempDirTestCase): def test_find_top_level_manifest(self): self.assertEqual( os.path.relpath( - gemato.find_top_level.find_top_level_manifest(self.dir), + gemato.find_top_level.find_top_level_manifest( + self.dir, allow_compressed=True), self.dir), 'Manifest.gz') def test_find_top_level_manifest_from_ignored_empty_subdir(self): self.assertIsNone( gemato.find_top_level.find_top_level_manifest( - os.path.join(self.dir, 'suba'))) + os.path.join(self.dir, 'suba'), + allow_compressed=True)) def test_find_top_level_manifest_from_manifest_subdir(self): self.assertEqual( os.path.relpath( gemato.find_top_level.find_top_level_manifest( - os.path.join(self.dir, 'subb')), + os.path.join(self.dir, 'subb'), + allow_compressed=True), self.dir), 'Manifest.gz') @@ -277,6 +288,7 @@ class TestCompressedManifestWithIgnore(TempDirTestCase): self.assertEqual( os.path.relpath( gemato.find_top_level.find_top_level_manifest( - os.path.join(self.dir, 'subc', 'sub')), + os.path.join(self.dir, 'subc', 'sub'), + allow_compressed=True), self.dir), 'subc/sub/Manifest.gz') diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py index 3f7d41d..5e4df3d 100644 --- a/tests/test_recursiveloader.py +++ b/tests/test_recursiveloader.py @@ -1790,11 +1790,6 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e self.manifest_gz) m.assert_directory_verifies('') - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 0) - def test_save_manifest(self): m = gemato.recursiveloader.ManifestRecursiveLoader( os.path.join(self.dir, 'Manifest.gz')) @@ -1841,30 +1836,6 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e self.assertTrue(os.path.exists( os.path.join(self.dir, 'Manifest'))) - def test_cli_decompress_manifests_low_watermark(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', - '--hashes=SHA256 SHA512', - '--compress-watermark=0', - self.dir]), - 0) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - - def test_cli_decompress_manifests_high_watermark(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', - '--hashes=SHA256 SHA512', - '--compress-watermark=4096', - self.dir]), - 0) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - class CompressedSubManifestTest(TempDirTestCase): """ @@ -2035,15 +2006,6 @@ MANIFEST a/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e m.save_manifests() m.assert_directory_verifies() - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', - self.dir]), - 0) - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 0) - class MultipleSubdirectoryFilesTest(TempDirTestCase): """ |