summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/exceptions.py12
-rw-r--r--gemato/verify.py55
-rw-r--r--tests/test_verify.py168
3 files changed, 235 insertions, 0 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index 950d6a5..7c75f46 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -88,3 +88,15 @@ class OpenPGPNoImplementation(Exception):
def __init__(self):
super(OpenPGPNoImplementation, self).__init__(
"No supported OpenPGP implementation found (install gnupg)")
+
+
+class ManifestInvalidPath(Exception):
+ """
+ An exception raised when an invalid path tries to be added to
+ Manifest.
+ """
+
+ def __init__(self, path, detail):
+ super(ManifestInvalidPath, self).__init__(
+ "Attempting to add invalid path {} to Manifest: {} must not be {}"
+ .format(path, detail[0], detail[1]))
diff --git a/gemato/verify.py b/gemato/verify.py
index 046c575..04edc91 100644
--- a/gemato/verify.py
+++ b/gemato/verify.py
@@ -198,6 +198,61 @@ def verify_path(path, e, expected_dev=None):
return (True, [])
+def update_entry_for_path(path, e, hashes=None, expected_dev=None):
+ """
+ Update the data in entry @e to match the current state of file
+ at path @path. Uses hashes listed in @hashes (using Manifest names),
+ or the current set of hashes in @e if @hashes is None.
+
+ The file must exist and be a regular file, and the entry must be
+ of MISC, DATA, MANIFEST or a derived type. The path/filename
+ is not updated nor checked.
+
+ If @expected_dev is not None, verifies that the file resides
+ on specified device. If the device does not match, raises
+ ManifestCrossDevice exception. It can be used to verify that
+ the files do not cross filesystem boundaries.
+ """
+
+ assert isinstance(e, gemato.manifest.ManifestPathEntry)
+ assert not isinstance(e, gemato.manifest.ManifestEntryIGNORE)
+ assert not isinstance(e, gemato.manifest.ManifestEntryOPTIONAL)
+
+ if hashes is None:
+ hashes = list(e.checksums)
+
+ with contextlib.closing(get_file_metadata(path, hashes)) as g:
+ # 1. verify whether the file existed in the first place
+ exists = next(g)
+ if not exists:
+ raise gemato.exceptions.ManifestInvalidPath(path,
+ ('__exists__', exists))
+
+ # 2. check for xdev condition
+ st_dev = next(g)
+ if expected_dev is not None and st_dev != expected_dev:
+ raise gemato.exceptions.ManifestCrossDevice(path)
+
+ # 3. verify whether the file is a regular file
+ ifmt, ftype = next(g)
+ if not stat.S_ISREG(ifmt):
+ raise gemato.exceptions.ManifestInvalidPath(path,
+ ('__type__', ftype))
+
+ # 4. get the apparent file size
+ st_size = next(g)
+
+ # 5. get the checksums and real size
+ checksums = next(g)
+ size = checksums.pop('__size__')
+ if st_size != 0:
+ assert st_size == size, ('Apparent size (st_size = {}) and real size ({}) are different!'
+ .format(st_size, size))
+
+ e.size = size
+ e.checksums = checksums
+
+
def verify_entry_compatibility(e1, e2):
"""
Verify that the two entries @e1 and @e2 are compatible.
diff --git a/tests/test_verify.py b/tests/test_verify.py
index 5393b4c..a9e7a5e 100644
--- a/tests/test_verify.py
+++ b/tests/test_verify.py
@@ -49,6 +49,12 @@ class NonExistingFileVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, 'test'), None),
(True, []))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA('test', 0, {})
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ gemato.verify.update_entry_for_path,
+ os.path.join(self.dir, 'test'), e)
+
class DirectoryVerificationTest(unittest.TestCase):
def setUp(self):
@@ -85,6 +91,12 @@ class DirectoryVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(self.dir, None),
(False, [('__exists__', False, True)]))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.dir), 0, {})
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ gemato.verify.update_entry_for_path, self.dir, e)
+
class CharacterDeviceVerificationTest(unittest.TestCase):
def setUp(self):
@@ -118,6 +130,12 @@ class CharacterDeviceVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(self.path, None),
(False, [('__exists__', False, True)]))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ gemato.verify.update_entry_for_path, self.path, e)
+
class NamedPipeVerificationTest(unittest.TestCase):
def setUp(self):
@@ -157,6 +175,12 @@ class NamedPipeVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(self.path, None),
(False, [('__exists__', False, True)]))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ gemato.verify.update_entry_for_path, self.path, e)
+
class UNIXSocketVerificationTest(unittest.TestCase):
def setUp(self):
@@ -199,6 +223,12 @@ class UNIXSocketVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(self.path, None),
(False, [('__exists__', False, True)]))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ self.assertRaises(gemato.exceptions.ManifestInvalidPath,
+ gemato.verify.update_entry_for_path, self.path, e)
+
class EmptyFileVerificationTest(unittest.TestCase):
def setUp(self):
@@ -306,6 +336,78 @@ class EmptyFileVerificationTest(unittest.TestCase):
gemato.verify.verify_path, self.path, e,
expected_dev=st.st_dev)
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 0)
+ self.assertDictEqual(e.checksums, {})
+
+ def test_update_with_hashes(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e, ['MD5', 'SHA1'])
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 0)
+ self.assertDictEqual(e.checksums, {
+ 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
+ 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
+ })
+
+ def test_update_with_hashes_from_manifest(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 0)
+ self.assertDictEqual(e.checksums, {
+ 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
+ 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
+ })
+
+ def test_update_cross_filesystem(self):
+ try:
+ st = os.stat('/proc')
+ except OSError:
+ raise unittest.SkipTest('Unable to stat /proc')
+
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ self.assertRaises(gemato.exceptions.ManifestCrossDevice,
+ gemato.verify.update_entry_for_path, self.path, e,
+ expected_dev=st.st_dev)
+
+ def test_update_MISC(self):
+ e = gemato.manifest.ManifestEntryMISC(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 0)
+ self.assertDictEqual(e.checksums, {})
+
+ def test_update_OPTIONAL(self):
+ e = gemato.manifest.ManifestEntryOPTIONAL(
+ os.path.basename(self.path))
+ self.assertRaises(AssertionError,
+ gemato.verify.update_entry_for_path, self.path, e)
+
+ def test_update_IGNORE(self):
+ e = gemato.manifest.ManifestEntryIGNORE(
+ os.path.basename(self.path))
+ self.assertRaises(AssertionError,
+ gemato.verify.update_entry_for_path, self.path, e)
+
+ def test_update_AUX(self):
+ e = gemato.manifest.ManifestEntryAUX(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path,
+ os.path.join('files', os.path.basename(self.path)))
+ self.assertEqual(e.size, 0)
+ self.assertDictEqual(e.checksums, {})
+
+
class NonEmptyFileVerificationTest(unittest.TestCase):
def setUp(self):
@@ -390,6 +492,36 @@ class NonEmptyFileVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(self.path, None),
(False, [('__exists__', False, True)]))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 43)
+ self.assertDictEqual(e.checksums, {})
+
+ def test_update_with_hashes(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e, ['MD5', 'SHA1'])
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 43)
+ self.assertDictEqual(e.checksums, {
+ 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
+ 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
+ })
+
+ def test_update_with_hashes_from_manifest(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, 43)
+ self.assertDictEqual(e.checksums, {
+ 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
+ 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
+ })
+
class SymbolicLinkVerificationTest(NonEmptyFileVerificationTest):
"""
@@ -538,6 +670,36 @@ class ProcFileVerificationTest(unittest.TestCase):
self.assertEqual(gemato.verify.verify_path(self.path, None),
(False, [('__exists__', False, True)]))
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, self.size)
+ self.assertDictEqual(e.checksums, {})
+
+ def test_update_with_hashes(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ gemato.verify.update_entry_for_path(self.path, e, ['MD5', 'SHA1'])
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, self.size)
+ self.assertDictEqual(e.checksums, {
+ 'MD5': self.md5,
+ 'SHA1': self.sha1,
+ })
+
+ def test_update_with_hashes_from_manifest(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''})
+ gemato.verify.update_entry_for_path(self.path, e)
+ self.assertEqual(e.path, os.path.basename(self.path))
+ self.assertEqual(e.size, self.size)
+ self.assertDictEqual(e.checksums, {
+ 'MD5': self.md5,
+ 'SHA1': self.sha1,
+ })
+
class UnreadableFileVerificationTest(unittest.TestCase):
def setUp(self):
@@ -562,6 +724,12 @@ class UnreadableFileVerificationTest(unittest.TestCase):
self.assertRaises(OSError, gemato.verify.verify_path,
os.path.join(self.dir, e.path), e)
+ def test_update(self):
+ e = gemato.manifest.ManifestEntryDATA(
+ os.path.basename(self.path), 0, {})
+ self.assertRaises(OSError,
+ gemato.verify.update_entry_for_path, self.path, e)
+
class EntryCompatibilityVerificationTest(unittest.TestCase):
def test_matching_entry(self):