diff options
-rw-r--r-- | gemato/exceptions.py | 2 | ||||
-rw-r--r-- | gemato/recursiveloader.py | 138 | ||||
-rw-r--r-- | tests/test_recursiveloader.py | 227 |
3 files changed, 365 insertions, 2 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py index 901d0b1..cbbcb12 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -107,6 +107,8 @@ class ManifestInvalidPath(Exception): """ def __init__(self, path, detail): + self.path = path + self.detail = detail super(ManifestInvalidPath, self).__init__( "Attempting to add invalid path {} to Manifest: {} must not be {}" .format(path, detail[0], detail[1])) diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index 752ecf5..cb1b6d0 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -356,3 +356,141 @@ class ManifestRecursiveLoader(object): fail_handler, warn_handler) return ret + + def update_entry_for_path(self, path, new_entry_type='DATA', + hashes=None): + """ + Update the Manifest entries for @path and the appropriate + MANIFEST entries. @path must not be covered by IGNORE. + + If the path exists and has a matching Manifest entry, the most + specific existing entry will be updated. If the path has more + entries, the remaining entries will be removed. This function + does not check if they were compatible. + + The type of MANIFEST, DATA and MISC derived entries + is preserved. OPTIONAL entries are left as-is. + + If the path exists and has no Manifest entry, a new entry + of type @new_entry_type will be created in the Manifest most + specific to the location. Note that AUX entries can only + be created if they're located in 'files/' directory relative + to an existing Manifest. + + If the path does not exist, all Manifest entries for it will + be removed except for OPTIONAL entries. + + @hashes specifies the requested hash set. By default, + the existing hashes in the entry are updated. @hashes + must be specified when creating a new entry. + """ + + had_entry = False + manifests_to_update = set() + + self.load_manifests_for_path(path) + for mpath, relpath, m in self._iter_manifests_for_path(path): + entries_to_remove = [] + for e in m.entries: + if isinstance(e, gemato.manifest.ManifestEntryIGNORE): + # ignore matches recursively, so we process it separately + # py<3.5 does not have os.path.commonpath() + fullpath = os.path.join(relpath, e.path) + assert not gemato.util.path_starts_with(path, fullpath) + elif isinstance(e, gemato.manifest.ManifestEntryDIST): + # distfiles are not local files, so skip them + pass + elif isinstance(e, gemato.manifest.ManifestEntryOPTIONAL): + # leave OPTIONAL entries as-is + fullpath = os.path.join(relpath, e.path) + if fullpath == path: + had_entry = True + elif isinstance(e, gemato.manifest.ManifestPathEntry): + # we update either file at the specified path + # or any relevant Manifests + fullpath = os.path.join(relpath, e.path) + if fullpath == path: + if had_entry: + # duplicate entry! + entries_to_remove.append(e) + continue + # pass through + elif fullpath in manifests_to_update: + pass + else: + continue + + try: + gemato.verify.update_entry_for_path( + os.path.join(self.root_directory, + fullpath), + e, + hashes=hashes, + expected_dev=self.manifest_device) + except gemato.exceptions.ManifestInvalidPath as err: + if err.detail[0] == '__exists__': + # file does not exist anymore, so remove + # the entry + entries_to_remove.append(e) + had_entry = True + else: + raise err + else: + manifests_to_update.add(mpath) + had_entry = True + + if entries_to_remove: + for e in entries_to_remove: + m.entries.remove(e) + manifests_to_update.add(mpath) + + # we've apparently added this Manifest, so store it now + if mpath in manifests_to_update: + self.save_manifest(mpath) + + if not had_entry: + assert hashes is not None + for mpath, relpath, m in self._iter_manifests_for_path(path): + # add to the first relevant Manifest + if not had_entry: + assert new_entry_type not in ( + 'DIST', 'IGNORE', 'OPTIONAL') + newpath = os.path.relpath(path, relpath) + if new_entry_type == 'AUX': + # AUX has implicit files/ prefix + assert gemato.util.path_inside_dir(newpath, + 'files') + # drop files/ prefix + newpath = os.path.relpath(newpath, 'files') + e = gemato.manifest.new_manifest_entry( + new_entry_type, newpath, 0, {}) + gemato.verify.update_entry_for_path( + os.path.join(self.root_directory, path), + e, + hashes=hashes, + expected_dev=self.manifest_device) + m.entries.append(e) + manifests_to_update.add(mpath) + had_entry = True + else: + for e in m.entries: + if not isinstance(e, gemato.manifest.ManifestEntryMANIFEST): + continue + + # we update either file at the specified path + # or any relevant Manifests + fullpath = os.path.join(relpath, e.path) + if fullpath not in manifests_to_update: + continue + + gemato.verify.update_entry_for_path( + os.path.join(self.root_directory, + fullpath), + e, + hashes=hashes, + expected_dev=self.manifest_device) + manifests_to_update.add(mpath) + + # we've apparently added this Manifest, so store it now + if mpath in manifests_to_update: + self.save_manifest(mpath) diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py index e17f144..53d1501 100644 --- a/tests/test_recursiveloader.py +++ b/tests/test_recursiveloader.py @@ -295,18 +295,99 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e 'r', encoding='utf8') as f: self.assertEqual(f.read(), self.FILES['Manifest'].lstrip()) + def test_update_entry_for_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512']) + self.assertIsInstance(m.find_path_entry('sub/stray'), + gemato.manifest.ManifestEntryDATA) + # relevant Manifests should have been updated + with io.open(os.path.join(self.dir, 'sub/Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + + def test_update_entry_for_path_MANIFEST(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'], + new_entry_type='MANIFEST') + self.assertIsInstance(m.find_path_entry('sub/stray'), + gemato.manifest.ManifestEntryMANIFEST) + # relevant Manifests should have been updated + with io.open(os.path.join(self.dir, 'sub/Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + self.assertIn('sub/stray', m.loaded_manifests) + + def test_update_entry_for_path_MISC(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'], + new_entry_type='MISC') + self.assertIsInstance(m.find_path_entry('sub/stray'), + gemato.manifest.ManifestEntryMISC) + # relevant Manifests should have been updated + with io.open(os.path.join(self.dir, 'sub/Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + + def test_update_entry_for_path_EBUILD(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'], + new_entry_type='EBUILD') + self.assertIsInstance(m.find_path_entry('sub/stray'), + gemato.manifest.ManifestEntryEBUILD) + # relevant Manifests should have been updated + with io.open(os.path.join(self.dir, 'sub/Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + + def test_update_entry_for_path_AUX_invalid(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertRaises(AssertionError, + m.update_entry_for_path, 'sub/stray', + hashes=['SHA256', 'SHA512'], + new_entry_type='AUX') + + def test_update_entry_for_path_nohash(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertRaises(AssertionError, + m.update_entry_for_path, 'sub/stray') + class MultipleManifestTest(TempDirTestCase): DIRS = ['sub'] FILES = { 'Manifest': u''' -MANIFEST sub/Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e +MANIFEST sub/Manifest.a 50 MD5 33fd9df6d410a93ff859d75e088bde7e MANIFEST sub/Manifest.b 32 MD5 95737355786df5760d6369a80935cf8a ''', - 'sub/Manifest.a': u'', + 'sub/Manifest.a': u''' +DATA foo 32 MD5 d41d8cd98f00b204e9800998ecf8427e +''', 'sub/Manifest.b': u''' TIMESTAMP 2017-01-01T01:01:01Z ''', + 'sub/foo': u'1234567890123456', } def test_load_sub_manifest(self): @@ -334,6 +415,56 @@ TIMESTAMP 2017-01-01T01:01:01Z # to be top-level self.assertIsNone(m.find_timestamp()) + def test_verify_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertEqual(m.verify_path('sub/foo'), + (False, [('__size__', 32, 16)])) + + def test_update_entry_for_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('sub/foo') + # relevant Manifests should have been updated + # but sub/Manifest.b should be left intact + with io.open(os.path.join(self.dir, 'sub/Manifest.a'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) + with io.open(os.path.join(self.dir, 'sub/Manifest.b'), + 'r', encoding='utf8') as f: + self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + + def test_update_entry_for_path_hashes(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('sub/foo', hashes=['SHA256', 'SHA512']) + # check for checksums + self.assertListEqual( + sorted(m.find_path_entry('sub/foo').checksums), + ['SHA256', 'SHA512']) + self.assertListEqual( + sorted(m.find_path_entry('sub/Manifest.a').checksums), + ['SHA256', 'SHA512']) + self.assertListEqual( + sorted(m.find_path_entry('sub/Manifest.b').checksums), + ['MD5']) + # relevant Manifests should have been updated + # but sub/Manifest.b should be left intact + with io.open(os.path.join(self.dir, 'sub/Manifest.a'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) + with io.open(os.path.join(self.dir, 'sub/Manifest.b'), + 'r', encoding='utf8') as f: + self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + class MultipleTopLevelManifestTest(TempDirTestCase): FILES = { @@ -532,6 +663,58 @@ AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e os.path.join(self.dir, 'Manifest')) m.assert_directory_verifies('') +class DuplicateAUXTypeFileRemovalTest(TempDirTestCase): + DIRS = ['files'] + FILES = { + 'Manifest': u''' +DATA files/test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e +AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } + + def test_update_entry(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('files/test.patch') + self.assertIsNone(m.find_path_entry('files/test.patch')) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + + def test_update_entry_wrong_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + m.update_entry_for_path, 'test.patch', hashes=['MD5']) + + +class AUXTypeFileAdditionTest(TempDirTestCase): + DIRS = ['files'] + FILES = { + 'Manifest': u'', + 'files/test.txt': u'test', + } + + def test_update_entry(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('files/test.txt', + hashes=['MD5'], new_entry_type='AUX') + self.assertIsInstance(m.find_path_entry('files/test.txt'), + gemato.manifest.ManifestEntryAUX) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + + def test_update_entry_wrong_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertRaises(AssertionError, + m.update_entry_for_path, 'test.txt', + hashes=['MD5'], new_entry_type='AUX') + class DuplicateDifferentHashSetFileEntryTest(TempDirTestCase): """ @@ -577,6 +760,19 @@ DATA test 0 SHA1 2fd4e1c67a2d28fced849ee1bb76e7391b93eb12 gemato.cli.main(['gemato', 'verify', self.dir]), 1) + def test_update_entry_for_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('test') + # either of the entries could have been taken + self.assertIn( + tuple(m.find_path_entry('test').checksums), + (('MD5',), ('SHA1',))) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + class DuplicateIncompatibleDataMiscTypeFileEntryTest(TempDirTestCase): """ @@ -792,6 +988,16 @@ MISC foo 0 MD5 d41d8cd98f00b204e9800998ecf8427e gemato.cli.main(['gemato', 'verify', '--no-strict', self.dir]), 0) + def test_update_entry_for_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('foo') + self.assertIsNone(m.find_path_entry('foo')) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['Manifest']) + m.assert_directory_verifies() + class ManifestOptionalEntryTest(TempDirTestCase): """ @@ -833,6 +1039,17 @@ OPTIONAL foo gemato.cli.main(['gemato', 'verify', '--no-strict', self.dir]), 0) + def test_update_entry_for_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.update_entry_for_path('foo') + self.assertIsNotNone(m.find_path_entry('foo')) + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + self.assertEqual(f.read(), self.FILES['Manifest']) + self.assertRaises(gemato.exceptions.ManifestMismatch, + m.assert_directory_verifies, '') + class CrossDeviceManifestTest(TempDirTestCase): """ @@ -998,6 +1215,12 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e gemato.cli.main(['gemato', 'verify', self.dir]), 1) + def test_update_entry_for_path(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + m.update_entry_for_path, 'test') + class UnreadableDirectoryTest(TempDirTestCase): """ |