summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/exceptions.py2
-rw-r--r--gemato/recursiveloader.py138
-rw-r--r--tests/test_recursiveloader.py227
3 files changed, 365 insertions, 2 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index 901d0b1..cbbcb12 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -107,6 +107,8 @@ class ManifestInvalidPath(Exception):
"""
def __init__(self, path, detail):
+ self.path = path
+ self.detail = detail
super(ManifestInvalidPath, self).__init__(
"Attempting to add invalid path {} to Manifest: {} must not be {}"
.format(path, detail[0], detail[1]))
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index 752ecf5..cb1b6d0 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -356,3 +356,141 @@ class ManifestRecursiveLoader(object):
fail_handler, warn_handler)
return ret
+
+ def update_entry_for_path(self, path, new_entry_type='DATA',
+ hashes=None):
+ """
+ Update the Manifest entries for @path and the appropriate
+ MANIFEST entries. @path must not be covered by IGNORE.
+
+ If the path exists and has a matching Manifest entry, the most
+ specific existing entry will be updated. If the path has more
+ entries, the remaining entries will be removed. This function
+ does not check if they were compatible.
+
+ The type of MANIFEST, DATA and MISC derived entries
+ is preserved. OPTIONAL entries are left as-is.
+
+ If the path exists and has no Manifest entry, a new entry
+ of type @new_entry_type will be created in the Manifest most
+ specific to the location. Note that AUX entries can only
+ be created if they're located in 'files/' directory relative
+ to an existing Manifest.
+
+ If the path does not exist, all Manifest entries for it will
+ be removed except for OPTIONAL entries.
+
+ @hashes specifies the requested hash set. By default,
+ the existing hashes in the entry are updated. @hashes
+ must be specified when creating a new entry.
+ """
+
+ had_entry = False
+ manifests_to_update = set()
+
+ self.load_manifests_for_path(path)
+ for mpath, relpath, m in self._iter_manifests_for_path(path):
+ entries_to_remove = []
+ for e in m.entries:
+ if isinstance(e, gemato.manifest.ManifestEntryIGNORE):
+ # ignore matches recursively, so we process it separately
+ # py<3.5 does not have os.path.commonpath()
+ fullpath = os.path.join(relpath, e.path)
+ assert not gemato.util.path_starts_with(path, fullpath)
+ elif isinstance(e, gemato.manifest.ManifestEntryDIST):
+ # distfiles are not local files, so skip them
+ pass
+ elif isinstance(e, gemato.manifest.ManifestEntryOPTIONAL):
+ # leave OPTIONAL entries as-is
+ fullpath = os.path.join(relpath, e.path)
+ if fullpath == path:
+ had_entry = True
+ elif isinstance(e, gemato.manifest.ManifestPathEntry):
+ # we update either file at the specified path
+ # or any relevant Manifests
+ fullpath = os.path.join(relpath, e.path)
+ if fullpath == path:
+ if had_entry:
+ # duplicate entry!
+ entries_to_remove.append(e)
+ continue
+ # pass through
+ elif fullpath in manifests_to_update:
+ pass
+ else:
+ continue
+
+ try:
+ gemato.verify.update_entry_for_path(
+ os.path.join(self.root_directory,
+ fullpath),
+ e,
+ hashes=hashes,
+ expected_dev=self.manifest_device)
+ except gemato.exceptions.ManifestInvalidPath as err:
+ if err.detail[0] == '__exists__':
+ # file does not exist anymore, so remove
+ # the entry
+ entries_to_remove.append(e)
+ had_entry = True
+ else:
+ raise err
+ else:
+ manifests_to_update.add(mpath)
+ had_entry = True
+
+ if entries_to_remove:
+ for e in entries_to_remove:
+ m.entries.remove(e)
+ manifests_to_update.add(mpath)
+
+ # we've apparently added this Manifest, so store it now
+ if mpath in manifests_to_update:
+ self.save_manifest(mpath)
+
+ if not had_entry:
+ assert hashes is not None
+ for mpath, relpath, m in self._iter_manifests_for_path(path):
+ # add to the first relevant Manifest
+ if not had_entry:
+ assert new_entry_type not in (
+ 'DIST', 'IGNORE', 'OPTIONAL')
+ newpath = os.path.relpath(path, relpath)
+ if new_entry_type == 'AUX':
+ # AUX has implicit files/ prefix
+ assert gemato.util.path_inside_dir(newpath,
+ 'files')
+ # drop files/ prefix
+ newpath = os.path.relpath(newpath, 'files')
+ e = gemato.manifest.new_manifest_entry(
+ new_entry_type, newpath, 0, {})
+ gemato.verify.update_entry_for_path(
+ os.path.join(self.root_directory, path),
+ e,
+ hashes=hashes,
+ expected_dev=self.manifest_device)
+ m.entries.append(e)
+ manifests_to_update.add(mpath)
+ had_entry = True
+ else:
+ for e in m.entries:
+ if not isinstance(e, gemato.manifest.ManifestEntryMANIFEST):
+ continue
+
+ # we update either file at the specified path
+ # or any relevant Manifests
+ fullpath = os.path.join(relpath, e.path)
+ if fullpath not in manifests_to_update:
+ continue
+
+ gemato.verify.update_entry_for_path(
+ os.path.join(self.root_directory,
+ fullpath),
+ e,
+ hashes=hashes,
+ expected_dev=self.manifest_device)
+ manifests_to_update.add(mpath)
+
+ # we've apparently added this Manifest, so store it now
+ if mpath in manifests_to_update:
+ self.save_manifest(mpath)
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
index e17f144..53d1501 100644
--- a/tests/test_recursiveloader.py
+++ b/tests/test_recursiveloader.py
@@ -295,18 +295,99 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
'r', encoding='utf8') as f:
self.assertEqual(f.read(), self.FILES['Manifest'].lstrip())
+ def test_update_entry_for_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'])
+ self.assertIsInstance(m.find_path_entry('sub/stray'),
+ gemato.manifest.ManifestEntryDATA)
+ # relevant Manifests should have been updated
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest'])
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
+ def test_update_entry_for_path_MANIFEST(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'],
+ new_entry_type='MANIFEST')
+ self.assertIsInstance(m.find_path_entry('sub/stray'),
+ gemato.manifest.ManifestEntryMANIFEST)
+ # relevant Manifests should have been updated
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest'])
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+ self.assertIn('sub/stray', m.loaded_manifests)
+
+ def test_update_entry_for_path_MISC(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'],
+ new_entry_type='MISC')
+ self.assertIsInstance(m.find_path_entry('sub/stray'),
+ gemato.manifest.ManifestEntryMISC)
+ # relevant Manifests should have been updated
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest'])
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
+ def test_update_entry_for_path_EBUILD(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'],
+ new_entry_type='EBUILD')
+ self.assertIsInstance(m.find_path_entry('sub/stray'),
+ gemato.manifest.ManifestEntryEBUILD)
+ # relevant Manifests should have been updated
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest'])
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
+ def test_update_entry_for_path_AUX_invalid(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertRaises(AssertionError,
+ m.update_entry_for_path, 'sub/stray',
+ hashes=['SHA256', 'SHA512'],
+ new_entry_type='AUX')
+
+ def test_update_entry_for_path_nohash(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertRaises(AssertionError,
+ m.update_entry_for_path, 'sub/stray')
+
class MultipleManifestTest(TempDirTestCase):
DIRS = ['sub']
FILES = {
'Manifest': u'''
-MANIFEST sub/Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+MANIFEST sub/Manifest.a 50 MD5 33fd9df6d410a93ff859d75e088bde7e
MANIFEST sub/Manifest.b 32 MD5 95737355786df5760d6369a80935cf8a
''',
- 'sub/Manifest.a': u'',
+ 'sub/Manifest.a': u'''
+DATA foo 32 MD5 d41d8cd98f00b204e9800998ecf8427e
+''',
'sub/Manifest.b': u'''
TIMESTAMP 2017-01-01T01:01:01Z
''',
+ 'sub/foo': u'1234567890123456',
}
def test_load_sub_manifest(self):
@@ -334,6 +415,56 @@ TIMESTAMP 2017-01-01T01:01:01Z
# to be top-level
self.assertIsNone(m.find_timestamp())
+ def test_verify_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertEqual(m.verify_path('sub/foo'),
+ (False, [('__size__', 32, 16)]))
+
+ def test_update_entry_for_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('sub/foo')
+ # relevant Manifests should have been updated
+ # but sub/Manifest.b should be left intact
+ with io.open(os.path.join(self.dir, 'sub/Manifest.a'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a'])
+ with io.open(os.path.join(self.dir, 'sub/Manifest.b'),
+ 'r', encoding='utf8') as f:
+ self.assertEqual(f.read(), self.FILES['sub/Manifest.b'])
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
+ def test_update_entry_for_path_hashes(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('sub/foo', hashes=['SHA256', 'SHA512'])
+ # check for checksums
+ self.assertListEqual(
+ sorted(m.find_path_entry('sub/foo').checksums),
+ ['SHA256', 'SHA512'])
+ self.assertListEqual(
+ sorted(m.find_path_entry('sub/Manifest.a').checksums),
+ ['SHA256', 'SHA512'])
+ self.assertListEqual(
+ sorted(m.find_path_entry('sub/Manifest.b').checksums),
+ ['MD5'])
+ # relevant Manifests should have been updated
+ # but sub/Manifest.b should be left intact
+ with io.open(os.path.join(self.dir, 'sub/Manifest.a'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a'])
+ with io.open(os.path.join(self.dir, 'sub/Manifest.b'),
+ 'r', encoding='utf8') as f:
+ self.assertEqual(f.read(), self.FILES['sub/Manifest.b'])
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
class MultipleTopLevelManifestTest(TempDirTestCase):
FILES = {
@@ -532,6 +663,58 @@ AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e
os.path.join(self.dir, 'Manifest'))
m.assert_directory_verifies('')
+class DuplicateAUXTypeFileRemovalTest(TempDirTestCase):
+ DIRS = ['files']
+ FILES = {
+ 'Manifest': u'''
+DATA files/test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+''',
+ }
+
+ def test_update_entry(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('files/test.patch')
+ self.assertIsNone(m.find_path_entry('files/test.patch'))
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
+ def test_update_entry_wrong_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ m.update_entry_for_path, 'test.patch', hashes=['MD5'])
+
+
+class AUXTypeFileAdditionTest(TempDirTestCase):
+ DIRS = ['files']
+ FILES = {
+ 'Manifest': u'',
+ 'files/test.txt': u'test',
+ }
+
+ def test_update_entry(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('files/test.txt',
+ hashes=['MD5'], new_entry_type='AUX')
+ self.assertIsInstance(m.find_path_entry('files/test.txt'),
+ gemato.manifest.ManifestEntryAUX)
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
+ def test_update_entry_wrong_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertRaises(AssertionError,
+ m.update_entry_for_path, 'test.txt',
+ hashes=['MD5'], new_entry_type='AUX')
+
class DuplicateDifferentHashSetFileEntryTest(TempDirTestCase):
"""
@@ -577,6 +760,19 @@ DATA test 0 SHA1 2fd4e1c67a2d28fced849ee1bb76e7391b93eb12
gemato.cli.main(['gemato', 'verify', self.dir]),
1)
+ def test_update_entry_for_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('test')
+ # either of the entries could have been taken
+ self.assertIn(
+ tuple(m.find_path_entry('test').checksums),
+ (('MD5',), ('SHA1',)))
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
class DuplicateIncompatibleDataMiscTypeFileEntryTest(TempDirTestCase):
"""
@@ -792,6 +988,16 @@ MISC foo 0 MD5 d41d8cd98f00b204e9800998ecf8427e
gemato.cli.main(['gemato', 'verify', '--no-strict', self.dir]),
0)
+ def test_update_entry_for_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('foo')
+ self.assertIsNone(m.find_path_entry('foo'))
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['Manifest'])
+ m.assert_directory_verifies()
+
class ManifestOptionalEntryTest(TempDirTestCase):
"""
@@ -833,6 +1039,17 @@ OPTIONAL foo
gemato.cli.main(['gemato', 'verify', '--no-strict', self.dir]),
0)
+ def test_update_entry_for_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.update_entry_for_path('foo')
+ self.assertIsNotNone(m.find_path_entry('foo'))
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertEqual(f.read(), self.FILES['Manifest'])
+ self.assertRaises(gemato.exceptions.ManifestMismatch,
+ m.assert_directory_verifies, '')
+
class CrossDeviceManifestTest(TempDirTestCase):
"""
@@ -998,6 +1215,12 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
gemato.cli.main(['gemato', 'verify', self.dir]),
1)
+ def test_update_entry_for_path(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ m.update_entry_for_path, 'test')
+
class UnreadableDirectoryTest(TempDirTestCase):
"""