diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-10-27 18:52:56 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-10-27 18:57:22 +0200 |
commit | 9375abd7adf21b63d48eb8be59684e4924c3bea7 (patch) | |
tree | 99a1ce0a08e6947dd1718ddfe8742bd790e93b28 | |
parent | d6f3fe016e1d2ab7809afd928d63127379494163 (diff) | |
download | gemato-9375abd7adf21b63d48eb8be59684e4924c3bea7.tar.gz |
verify: Support updating entry data from path
-rw-r--r-- | gemato/exceptions.py | 12 | ||||
-rw-r--r-- | gemato/verify.py | 55 | ||||
-rw-r--r-- | tests/test_verify.py | 168 |
3 files changed, 235 insertions, 0 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py index 950d6a5..7c75f46 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -88,3 +88,15 @@ class OpenPGPNoImplementation(Exception): def __init__(self): super(OpenPGPNoImplementation, self).__init__( "No supported OpenPGP implementation found (install gnupg)") + + +class ManifestInvalidPath(Exception): + """ + An exception raised when an invalid path tries to be added to + Manifest. + """ + + def __init__(self, path, detail): + super(ManifestInvalidPath, self).__init__( + "Attempting to add invalid path {} to Manifest: {} must not be {}" + .format(path, detail[0], detail[1])) diff --git a/gemato/verify.py b/gemato/verify.py index 046c575..04edc91 100644 --- a/gemato/verify.py +++ b/gemato/verify.py @@ -198,6 +198,61 @@ def verify_path(path, e, expected_dev=None): return (True, []) +def update_entry_for_path(path, e, hashes=None, expected_dev=None): + """ + Update the data in entry @e to match the current state of file + at path @path. Uses hashes listed in @hashes (using Manifest names), + or the current set of hashes in @e if @hashes is None. + + The file must exist and be a regular file, and the entry must be + of MISC, DATA, MANIFEST or a derived type. The path/filename + is not updated nor checked. + + If @expected_dev is not None, verifies that the file resides + on specified device. If the device does not match, raises + ManifestCrossDevice exception. It can be used to verify that + the files do not cross filesystem boundaries. + """ + + assert isinstance(e, gemato.manifest.ManifestPathEntry) + assert not isinstance(e, gemato.manifest.ManifestEntryIGNORE) + assert not isinstance(e, gemato.manifest.ManifestEntryOPTIONAL) + + if hashes is None: + hashes = list(e.checksums) + + with contextlib.closing(get_file_metadata(path, hashes)) as g: + # 1. verify whether the file existed in the first place + exists = next(g) + if not exists: + raise gemato.exceptions.ManifestInvalidPath(path, + ('__exists__', exists)) + + # 2. check for xdev condition + st_dev = next(g) + if expected_dev is not None and st_dev != expected_dev: + raise gemato.exceptions.ManifestCrossDevice(path) + + # 3. verify whether the file is a regular file + ifmt, ftype = next(g) + if not stat.S_ISREG(ifmt): + raise gemato.exceptions.ManifestInvalidPath(path, + ('__type__', ftype)) + + # 4. get the apparent file size + st_size = next(g) + + # 5. get the checksums and real size + checksums = next(g) + size = checksums.pop('__size__') + if st_size != 0: + assert st_size == size, ('Apparent size (st_size = {}) and real size ({}) are different!' + .format(st_size, size)) + + e.size = size + e.checksums = checksums + + def verify_entry_compatibility(e1, e2): """ Verify that the two entries @e1 and @e2 are compatible. diff --git a/tests/test_verify.py b/tests/test_verify.py index 5393b4c..a9e7a5e 100644 --- a/tests/test_verify.py +++ b/tests/test_verify.py @@ -49,6 +49,12 @@ class NonExistingFileVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, 'test'), None), (True, [])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA('test', 0, {}) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + gemato.verify.update_entry_for_path, + os.path.join(self.dir, 'test'), e) + class DirectoryVerificationTest(unittest.TestCase): def setUp(self): @@ -85,6 +91,12 @@ class DirectoryVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(self.dir, None), (False, [('__exists__', False, True)])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.dir), 0, {}) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + gemato.verify.update_entry_for_path, self.dir, e) + class CharacterDeviceVerificationTest(unittest.TestCase): def setUp(self): @@ -118,6 +130,12 @@ class CharacterDeviceVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(self.path, None), (False, [('__exists__', False, True)])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + gemato.verify.update_entry_for_path, self.path, e) + class NamedPipeVerificationTest(unittest.TestCase): def setUp(self): @@ -157,6 +175,12 @@ class NamedPipeVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(self.path, None), (False, [('__exists__', False, True)])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + gemato.verify.update_entry_for_path, self.path, e) + class UNIXSocketVerificationTest(unittest.TestCase): def setUp(self): @@ -199,6 +223,12 @@ class UNIXSocketVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(self.path, None), (False, [('__exists__', False, True)])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + self.assertRaises(gemato.exceptions.ManifestInvalidPath, + gemato.verify.update_entry_for_path, self.path, e) + class EmptyFileVerificationTest(unittest.TestCase): def setUp(self): @@ -306,6 +336,78 @@ class EmptyFileVerificationTest(unittest.TestCase): gemato.verify.verify_path, self.path, e, expected_dev=st.st_dev) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 0) + self.assertDictEqual(e.checksums, {}) + + def test_update_with_hashes(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e, ['MD5', 'SHA1']) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 0) + self.assertDictEqual(e.checksums, { + 'MD5': 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709', + }) + + def test_update_with_hashes_from_manifest(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 0) + self.assertDictEqual(e.checksums, { + 'MD5': 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709', + }) + + def test_update_cross_filesystem(self): + try: + st = os.stat('/proc') + except OSError: + raise unittest.SkipTest('Unable to stat /proc') + + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + self.assertRaises(gemato.exceptions.ManifestCrossDevice, + gemato.verify.update_entry_for_path, self.path, e, + expected_dev=st.st_dev) + + def test_update_MISC(self): + e = gemato.manifest.ManifestEntryMISC( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 0) + self.assertDictEqual(e.checksums, {}) + + def test_update_OPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL( + os.path.basename(self.path)) + self.assertRaises(AssertionError, + gemato.verify.update_entry_for_path, self.path, e) + + def test_update_IGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE( + os.path.basename(self.path)) + self.assertRaises(AssertionError, + gemato.verify.update_entry_for_path, self.path, e) + + def test_update_AUX(self): + e = gemato.manifest.ManifestEntryAUX( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, + os.path.join('files', os.path.basename(self.path))) + self.assertEqual(e.size, 0) + self.assertDictEqual(e.checksums, {}) + + class NonEmptyFileVerificationTest(unittest.TestCase): def setUp(self): @@ -390,6 +492,36 @@ class NonEmptyFileVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(self.path, None), (False, [('__exists__', False, True)])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 43) + self.assertDictEqual(e.checksums, {}) + + def test_update_with_hashes(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e, ['MD5', 'SHA1']) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 43) + self.assertDictEqual(e.checksums, { + 'MD5': '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', + }) + + def test_update_with_hashes_from_manifest(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, 43) + self.assertDictEqual(e.checksums, { + 'MD5': '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', + }) + class SymbolicLinkVerificationTest(NonEmptyFileVerificationTest): """ @@ -538,6 +670,36 @@ class ProcFileVerificationTest(unittest.TestCase): self.assertEqual(gemato.verify.verify_path(self.path, None), (False, [('__exists__', False, True)])) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, self.size) + self.assertDictEqual(e.checksums, {}) + + def test_update_with_hashes(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + gemato.verify.update_entry_for_path(self.path, e, ['MD5', 'SHA1']) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, self.size) + self.assertDictEqual(e.checksums, { + 'MD5': self.md5, + 'SHA1': self.sha1, + }) + + def test_update_with_hashes_from_manifest(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''}) + gemato.verify.update_entry_for_path(self.path, e) + self.assertEqual(e.path, os.path.basename(self.path)) + self.assertEqual(e.size, self.size) + self.assertDictEqual(e.checksums, { + 'MD5': self.md5, + 'SHA1': self.sha1, + }) + class UnreadableFileVerificationTest(unittest.TestCase): def setUp(self): @@ -562,6 +724,12 @@ class UnreadableFileVerificationTest(unittest.TestCase): self.assertRaises(OSError, gemato.verify.verify_path, os.path.join(self.dir, e.path), e) + def test_update(self): + e = gemato.manifest.ManifestEntryDATA( + os.path.basename(self.path), 0, {}) + self.assertRaises(OSError, + gemato.verify.update_entry_for_path, self.path, e) + class EntryCompatibilityVerificationTest(unittest.TestCase): def test_matching_entry(self): |