diff options
-rw-r--r-- | gemato/verify.py | 123 | ||||
-rw-r--r-- | tests/test_verify.py | 371 |
2 files changed, 494 insertions, 0 deletions
diff --git a/gemato/verify.py b/gemato/verify.py new file mode 100644 index 0000000..397488b --- /dev/null +++ b/gemato/verify.py @@ -0,0 +1,123 @@ +# gemato: File verification routines +# vim:fileencoding=utf-8 +# (c) 2017 Michał Górny +# Licensed under the terms of 2-clause BSD license + +import errno +import fcntl +import os +import stat + +import gemato.hash +import gemato.manifest + + +def verify_path(path, e): + """ + Verify the file at system path @path against the data in entry @e. + The path/filename is not matched against the entry -- the correct + entry must be passed by the caller. + + If the path passes verification, returns (True, []). Otherwise, + returns (False, diff) where diff is a list of differences between + the file at path and the Manifest entry. Each list element is + a tuple of (name, expected, got). + + Each name can be: + - __exists__ (boolean) to indicate whether the file existed, + - __type__ (string) as a human-readable description of file type, + - __size__ (int) as file size, + - any checksum name according to the entry. + """ + + assert isinstance(e, gemato.manifest.ManifestPathEntry) + + # IGNORE entries cause verification to always succeed + if isinstance(e, gemato.manifest.ManifestEntryIGNORE): + return (True, []) + + try: + # we want O_NONBLOCK to avoid blocking when opening pipes + fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) + except IOError as err: + if err.errno == errno.ENOENT: + exists = False + opened = False + elif err.errno == errno.ENXIO: + # unconnected device or socket + exists = True + opened = False + else: + raise + else: + exists = True + opened = True + + # 1. verify whether the file existed in the first place + expect_exist = not isinstance(e, gemato.manifest.ManifestEntryOPTIONAL) + if exists != expect_exist: + if opened: + os.close(fd) + return (False, [('__exists__', expect_exist, exists)]) + elif not exists: + return (True, []) + + # 2. verify whether the file is a regular file + if opened: + st = os.fstat(fd) + else: + st = os.stat(path) + if not opened or not stat.S_ISREG(st.st_mode): + if opened: + os.close(fd) + if stat.S_ISDIR(st.st_mode): + ftype = 'directory' + elif stat.S_ISCHR(st.st_mode): + ftype = 'character device' + elif stat.S_ISBLK(st.st_mode): + ftype = 'block device' + elif stat.S_ISREG(st.st_mode): # can only happen w/ ENXIO + ftype = 'unconnected regular file (?!)' + elif stat.S_ISFIFO(st.st_mode): + ftype = 'named pipe' + elif stat.S_ISSOCK(st.st_mode): + ftype = 'UNIX socket' + else: + ftype = 'unknown' + return (False, [('__type__', 'regular file', ftype)]) + + # grab the fd + try: + f = os.fdopen(fd, 'rb') + except Exception: + os.close(fd) + raise + + with f: + # open() might have left the file as O_NONBLOCK + # make sure to fix that + fcntl.fcntl(fd, fcntl.F_SETFL, 0) + + # ignore st_size == 0 in case of weird filesystem + if st.st_size != 0 and st.st_size != e.size: + return (False, [('__size__', e.size, st.st_size)]) + + e_hashes = sorted(e.checksums) + hashes = list(gemato.manifest.manifest_hashes_to_hashlib(e_hashes)) + hashes.append('__size__') + checksums = gemato.hash.hash_file(f, hashes) + + diff = [] + size = checksums['__size__'] + if size != e.size: + diff.append(('__size__', e.size, size)) + for ek, k in zip(e_hashes, hashes): + exp = e.checksums[ek] + got = checksums[k] + if got != exp: + diff.append((ek, exp, got)) + + if diff: + return (False, diff) + + return (True, []) diff --git a/tests/test_verify.py b/tests/test_verify.py new file mode 100644 index 0000000..432e0ac --- /dev/null +++ b/tests/test_verify.py @@ -0,0 +1,371 @@ +# gemato: Verification tests +# vim:fileencoding=utf-8 +# (c) 2017 Michał Górny +# Licensed under the terms of 2-clause BSD license + +import multiprocessing +import os +import os.path +import socket +import tempfile +import unittest + +import gemato.manifest +import gemato.verify + + +class NonExistingFileVerificationTest(unittest.TestCase): + def setUp(self): + self.dir = tempfile.mkdtemp() + + def tearDown(self): + os.rmdir(self.dir) + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', 'test', '0')) + self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, e.path), e), + (False, [('__exists__', True, False)])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', 'test')) + self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, e.path), e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', 'test')) + self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, e.path), e), + (True, [])) + + +class DirectoryVerificationTest(unittest.TestCase): + def setUp(self): + self.dir = tempfile.mkdtemp() + + def tearDown(self): + os.rmdir(self.dir) + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.dir), '0')) + self.assertEqual(gemato.verify.verify_path(self.dir, e), + (False, [('__type__', 'regular file', 'directory')])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.dir))) + self.assertEqual(gemato.verify.verify_path(self.dir, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.dir))) + self.assertEqual(gemato.verify.verify_path(self.dir, e), + (False, [('__exists__', False, True)])) + + +class CharacterDeviceVerificationTest(unittest.TestCase): + def setUp(self): + self.path = '/dev/null' + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__type__', 'regular file', 'character device')])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__exists__', False, True)])) + + +class NamedPipeVerificationTest(unittest.TestCase): + def setUp(self): + self.dir = tempfile.mkdtemp() + self.path = os.path.join(self.dir, 'test') + os.mkfifo(self.path) + + def tearDown(self): + os.unlink(self.path) + os.rmdir(self.dir) + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__type__', 'regular file', 'named pipe')])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__exists__', False, True)])) + + +class UNIXSocketVerificationTest(unittest.TestCase): + def setUp(self): + self.dir = tempfile.mkdtemp() + self.path = os.path.join(self.dir, 'test') + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.bind(self.path) + self.sock.listen(1) + + def tearDown(self): + self.sock.close() + os.unlink(self.path) + os.rmdir(self.dir) + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__type__', 'regular file', 'UNIX socket')])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__exists__', False, True)])) + + +class EmptyFileVerificationTest(unittest.TestCase): + def setUp(self): + self.f = tempfile.NamedTemporaryFile() + self.path = self.f.name + + def tearDown(self): + self.f.close() + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0', + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testWrongSizeDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '5')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__size__', 5, 0)])) + + def testWrongSingleChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0', + 'MD5', '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e')])) + + def testWrongAllChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0', + 'MD5', '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e'), + ('SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')])) + + def testWrongAllChecksumAndSizeDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '39', + 'MD5', '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__size__', 39, 0), + ('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e'), + ('SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__exists__', False, True)])) + + +class NonEmptyFileVerificationTest(unittest.TestCase): + def setUp(self): + TEST_STRING = b'The quick brown fox jumps over the lazy dog' + self.f = tempfile.NamedTemporaryFile() + self.f.write(TEST_STRING) + self.f.flush() + self.path = self.f.name + + def tearDown(self): + self.f.close() + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '43')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '43', + 'MD5', '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testWrongSizeDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__size__', 0, 43)])) + + def testWrongSingleChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '43', + 'MD5', '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')])) + + def testWrongAllChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '43', + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', '9e107d9d372bb6826bd81d3542a419d6'), + ('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')])) + + def testWrongAllChecksumAndSizeDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '0', + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__size__', 0, 43)])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__exists__', False, True)])) + + +class ProcFileVerificationTest(unittest.TestCase): + """ + Attempt to verify a file from /proc to verify that we can handle + filesystems that do not report st_size. + """ + + def setUp(self): + self.path = '/proc/version' + try: + with open(self.path, 'rb') as f: + data = f.read() + st = os.fstat(f.fileno()) + except: + raise unittest.SkipTest('{} not readable'.format(self.path)) + + if st.st_size != 0: + raise unittest.SkipTest('{} st_size is not 0'.format(self.path)) + + self.size = len(data) + if self.size == 0: + raise unittest.SkipTest('{} empty'.format(self.path)) + self.md5 = gemato.hash.hash_bytes(data, 'md5') + self.sha1 = gemato.hash.hash_bytes(data, 'sha1') + + def testDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), str(self.size))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), str(self.size), + 'MD5', self.md5, + 'SHA1', self.sha1)) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testWrongSizeDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '47474')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__size__', 47474, self.size)])) + + def testWrongSingleChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), self.size, + 'MD5', self.md5, + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', self.sha1)])) + + def testWrongAllChecksumDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), self.size, + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', self.md5), + ('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', self.sha1)])) + + def testWrongAllChecksumAndSizeDATA(self): + e = gemato.manifest.ManifestEntryDATA.from_list( + ('DATA', os.path.basename(self.path), '47474', + 'MD5', 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__size__', 47474, self.size), + ('MD5', 'd41d8cd98f00b204e9800998ecf8427e', self.md5), + ('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', self.sha1)])) + + def testIGNORE(self): + e = gemato.manifest.ManifestEntryIGNORE.from_list( + ('IGNORE', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (True, [])) + + def testOPTIONAL(self): + e = gemato.manifest.ManifestEntryOPTIONAL.from_list( + ('OPTIONAL', os.path.basename(self.path))) + self.assertEqual(gemato.verify.verify_path(self.path, e), + (False, [('__exists__', False, True)])) |