summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/manifest.py7
-rw-r--r--gemato/recursiveloader.py44
-rw-r--r--tests/test_recursiveloader.py155
3 files changed, 197 insertions, 9 deletions
diff --git a/gemato/manifest.py b/gemato/manifest.py
index b6d6652..8683f95 100644
--- a/gemato/manifest.py
+++ b/gemato/manifest.py
@@ -386,7 +386,12 @@ class ManifestFile(object):
if state == ManifestState.POST_SIGNED_DATA:
raise gemato.exceptions.ManifestUnsignedData()
tag = sl[0]
- self.entries.append(MANIFEST_TAG_MAPPING[tag].from_list(sl))
+ try:
+ self.entries.append(MANIFEST_TAG_MAPPING[tag]
+ .from_list(sl))
+ except KeyError:
+ raise gemato.exceptions.ManifestSyntaxError(
+ "Invalid Manifest line: {}".format(l))
if state == ManifestState.SIGNED_PREAMBLE:
raise gemato.exceptions.ManifestSyntaxError(
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index e91158b..5d07e0b 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -666,6 +666,27 @@ class ManifestRecursiveLoader(object):
for d in skip_dirs:
dirnames.remove(d)
+ # check for unregistered Manifest
+ new_manifests = set()
+ for mname in (gemato.compression
+ .get_potential_compressed_names('Manifest')):
+ if mname in filenames:
+ fpath = os.path.join(relpath, mname)
+ if fpath in self.loaded_manifests:
+ continue
+
+ # we've just found ourselves a new Manifest,
+ # let's load it
+ try:
+ self.load_manifest(fpath)
+ except gemato.exceptions.ManifestSyntaxError:
+ # syntax error? probably not a Manifest then.
+ pass
+ else:
+ new_manifests.add(mname)
+ entry_dict.update(
+ self.get_deduplicated_file_entry_dict_for_update(
+ relpath))
dir_manifest = None
for f in filenames:
@@ -684,18 +705,25 @@ class ManifestRecursiveLoader(object):
if fe.tag in ('IGNORE', 'OPTIONAL'):
continue
else:
- # find appropriate Manifest for this directory
- if dir_manifest is None:
+ if f in new_manifests:
+ cls = gemato.manifest.ManifestEntryMANIFEST
+ # new Manifests go into the parent directory
for mpath, mrpath, m in (self
- ._iter_manifests_for_path(fpath)):
- dir_manifest = (mpath, mrpath, m)
+ ._iter_manifests_for_path(
+ os.path.dirname(relpath))):
break
else:
- mpath, mrpath, m = dir_manifest
+ cls = gemato.manifest.ManifestEntryDATA
+ # find appropriate Manifest for this directory
+ if dir_manifest is None:
+ for mpath, mrpath, m in (self
+ ._iter_manifests_for_path(fpath)):
+ dir_manifest = (mpath, mrpath, m)
+ break
+ else:
+ mpath, mrpath, m = dir_manifest
- fe = gemato.manifest.ManifestEntryDATA(
- os.path.relpath(fpath, mrpath),
- 0, {})
+ fe = cls(os.path.relpath(fpath, mrpath), 0, {})
m.entries.append(fe)
# update the existing entry
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
index e0067c9..50e2f16 100644
--- a/tests/test_recursiveloader.py
+++ b/tests/test_recursiveloader.py
@@ -1974,3 +1974,158 @@ class MultipleSubdirectoryFilesTest(TempDirTestCase):
'sub/file.c')
m.save_manifests()
m.assert_directory_verifies()
+
+
+class UnregisteredManifestTestCase(TempDirTestCase):
+ """
+ Test for finding a sub-Manifest that's not listed as MANIFEST.
+ """
+
+ DIRS = ['sub']
+ FILES = {
+ 'Manifest': u'',
+ 'sub/Manifest': u'''
+DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+''',
+ 'sub/test': u'',
+ }
+
+ def test_load_manifests(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+ m.load_manifests_for_path('sub/test')
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+
+ def test_update_entries_for_directory(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entries_for_directory('', hashes=['SHA256', 'SHA512'])
+ self.assertIn('sub/Manifest', m.loaded_manifests)
+ # entry for sub-Manifest should go into parent dir
+ # and for test into the sub-Manifest
+ self.assertEqual(m.find_path_entry('sub/Manifest').path,
+ 'sub/Manifest')
+ self.assertEqual(m.find_path_entry('sub/test').path, 'test')
+ m.save_manifests()
+ m.assert_directory_verifies()
+
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 0)
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'verify',
+ self.dir]),
+ 0)
+
+
+class UnregisteredCompressedManifestTestCase(TempDirTestCase):
+ """
+ Test for finding a compressed sub-Manifest that's not listed
+ as MANIFEST.
+ """
+
+ SUB_MANIFEST = b'''
+DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+'''
+ DIRS = ['sub']
+ FILES = {
+ 'Manifest': u'',
+ 'sub/test': u'',
+ }
+
+ def setUp(self):
+ super(UnregisteredCompressedManifestTestCase, self).setUp()
+ self.manifest_gz = os.path.join(self.dir, 'sub/Manifest.gz')
+ with gemato.compression.open_potentially_compressed_path(
+ os.path.join(self.dir, 'sub/Manifest.gz'), 'wb') as f:
+ f.write(self.SUB_MANIFEST)
+
+ def tearDown(self):
+ os.unlink(self.manifest_gz)
+ super(UnregisteredCompressedManifestTestCase, self).tearDown()
+
+ def test_load_manifests(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertNotIn('sub/Manifest.gz', m.loaded_manifests)
+ m.load_manifests_for_path('sub/test')
+ self.assertNotIn('sub/Manifest.gz', m.loaded_manifests)
+
+ def test_update_entries_for_directory(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entries_for_directory('', hashes=['SHA256', 'SHA512'])
+ self.assertIn('sub/Manifest.gz', m.loaded_manifests)
+ # entry for sub-Manifest should go into parent dir
+ # and for test into the sub-Manifest
+ self.assertEqual(m.find_path_entry('sub/Manifest.gz').path,
+ 'sub/Manifest.gz')
+ self.assertEqual(m.find_path_entry('sub/test').path, 'test')
+ m.save_manifests()
+ m.assert_directory_verifies()
+
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 0)
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'verify',
+ self.dir]),
+ 0)
+
+
+class InvalidManifestTestCase(TempDirTestCase):
+ """
+ Test for ignoring a stray "Manifest" file that's invalid.
+ """
+
+ DIRS = ['sub']
+ FILES = {
+ 'Manifest': u'',
+ 'sub/Manifest': u'''
+INVALID STUFF IN HERE
+''',
+ 'sub/test': u'',
+ }
+
+ def test_load_manifests(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+ m.load_manifests_for_path('sub/test')
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+
+ def test_update_entries_for_directory(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entries_for_directory('', hashes=['SHA256', 'SHA512'])
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+ # entry for sub-Manifest should go into parent dir
+ # and for test into the sub-Manifest
+ self.assertIsInstance(m.find_path_entry('sub/Manifest'),
+ gemato.manifest.ManifestEntryDATA)
+ self.assertEqual(m.find_path_entry('sub/test').path,
+ 'sub/test')
+ m.save_manifests()
+ # ensure that the file was not modified
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertEqual(f.read(), self.FILES['sub/Manifest'])
+ m.assert_directory_verifies()
+
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 0)
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertEqual(f.read(), self.FILES['sub/Manifest'])
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'verify',
+ self.dir]),
+ 0)