From 1f6b2d221f77e5721db3ef91406d4435810c447d Mon Sep 17 00:00:00 2001 From: Michał Górny Date: Fri, 27 Oct 2017 17:03:54 +0200 Subject: verify: Introduce universal get_file_metadata() function --- gemato/verify.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++ tests/test_verify.py | 68 ++++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) diff --git a/gemato/verify.py b/gemato/verify.py index a4d336b..5e315a2 100644 --- a/gemato/verify.py +++ b/gemato/verify.py @@ -13,6 +13,107 @@ import gemato.hash import gemato.manifest +def get_file_metadata(path, hashes): + """ + Get a generator for the metadata of the file at system path @path. + + The generator yields, in order: + 1. A boolean indicating whether the file exists. + 2. st_dev, if the file exists. + 3. Tuple of (S_IFMT(st_mode), file type as string), if the file + exists. + 4. st_size, if the file exists and is a regular file. Note that + it may be 0 on some filesystems, so treat the value with caution. + 5. A dict of @hashes and their values, if the file exists and is + a regular file. Special __size__ member is added unconditionally. + + Note that the generator acquires resources, and does not release + them until terminated. Always make sure to pull it until + StopIteration, or close it explicitly. + """ + + try: + # we want O_NONBLOCK to avoid blocking when opening pipes + fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) + except OSError as err: + if err.errno == errno.ENOENT: + exists = False + opened = False + elif err.errno == errno.ENXIO: + # unconnected device or socket + exists = True + opened = False + else: + raise + else: + exists = True + opened = True + + try: + # 1. does it exist? + yield exists + + # we can't provide any more data for a file that does not exist + if not exists: + return + + if opened: + st = os.fstat(fd) + else: + st = os.stat(path) + + # 2. st_dev + yield st.st_dev + + # 3. file type tuple + if stat.S_ISREG(st.st_mode): + ftype = 'regular file' + elif stat.S_ISDIR(st.st_mode): + ftype = 'directory' + elif stat.S_ISCHR(st.st_mode): + ftype = 'character device' + elif stat.S_ISBLK(st.st_mode): + ftype = 'block device' + elif stat.S_ISFIFO(st.st_mode): + ftype = 'named pipe' + elif stat.S_ISSOCK(st.st_mode): + ftype = 'UNIX socket' + else: + ftype = 'unknown' + yield (stat.S_IFMT(st.st_mode), ftype) + + if not stat.S_ISREG(st.st_mode): + if opened: + os.close(fd) + return + + # 4. st_size + yield st.st_size + + f = os.fdopen(fd, 'rb') + except: + if opened: + os.close(fd) + raise + + with f: + # open() might have left the file as O_NONBLOCK + # make sure to fix that + fcntl.fcntl(fd, fcntl.F_SETFL, 0) + + # 5. checksums + e_hashes = sorted(hashes) + hashes = list(gemato.manifest.manifest_hashes_to_hashlib(e_hashes)) + e_hashes.append('__size__') + hashes.append('__size__') + checksums = gemato.hash.hash_file(f, hashes) + + ret = {} + for ek, k in zip(e_hashes, hashes): + ret[ek] = checksums[k] + yield ret + + def verify_path(path, e, expected_dev=None): """ Verify the file at system path @path against the data in entry @e. diff --git a/tests/test_verify.py b/tests/test_verify.py index 2b656a8..5393b4c 100644 --- a/tests/test_verify.py +++ b/tests/test_verify.py @@ -6,6 +6,7 @@ import os import os.path import socket +import stat import tempfile import unittest @@ -21,6 +22,11 @@ class NonExistingFileVerificationTest(unittest.TestCase): def tearDown(self): os.rmdir(self.dir) + def test_get_file_metadata(self): + self.assertEqual(list(gemato.verify.get_file_metadata( + os.path.join(self.dir, 'test'), hashes=[])), + [False]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', 'test', '0')) @@ -51,6 +57,12 @@ class DirectoryVerificationTest(unittest.TestCase): def tearDown(self): os.rmdir(self.dir) + def test_get_file_metadata(self): + st = os.stat(self.dir) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.dir, hashes=[])), + [True, st.st_dev, (stat.S_IFDIR, 'directory')]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.dir), '0')) @@ -78,6 +90,12 @@ class CharacterDeviceVerificationTest(unittest.TestCase): def setUp(self): self.path = '/dev/null' + def test_get_file_metadata(self): + st = os.stat(self.path) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.path, hashes=[])), + [True, st.st_dev, (stat.S_IFCHR, 'character device')]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.path), '0')) @@ -111,6 +129,12 @@ class NamedPipeVerificationTest(unittest.TestCase): os.unlink(self.path) os.rmdir(self.dir) + def test_get_file_metadata(self): + st = os.stat(self.path) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.path, hashes=[])), + [True, st.st_dev, (stat.S_IFIFO, 'named pipe')]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.path), '0')) @@ -147,6 +171,12 @@ class UNIXSocketVerificationTest(unittest.TestCase): os.unlink(self.path) os.rmdir(self.dir) + def test_get_file_metadata(self): + st = os.stat(self.path) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.path, hashes=[])), + [True, st.st_dev, (stat.S_IFSOCK, 'UNIX socket')]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.path), '0')) @@ -178,6 +208,17 @@ class EmptyFileVerificationTest(unittest.TestCase): def tearDown(self): self.f.close() + def test_get_file_metadata(self): + st = os.stat(self.path) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.path, hashes=['MD5', 'SHA1'])), + [True, st.st_dev, (stat.S_IFREG, 'regular file'), + 0, { + 'MD5': 'd41d8cd98f00b204e9800998ecf8427e', + 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709', + '__size__': 0, + }]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.path), '0')) @@ -277,6 +318,17 @@ class NonEmptyFileVerificationTest(unittest.TestCase): def tearDown(self): self.f.close() + def test_get_file_metadata(self): + st = os.stat(self.path) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.path, hashes=['MD5', 'SHA1'])), + [True, st.st_dev, (stat.S_IFREG, 'regular file'), + st.st_size, { + 'MD5': '9e107d9d372bb6826bd81d3542a419d6', + 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', + '__size__': 43, + }]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.path), '43')) @@ -412,6 +464,17 @@ class ProcFileVerificationTest(unittest.TestCase): self.md5 = gemato.hash.hash_bytes(data, 'md5') self.sha1 = gemato.hash.hash_bytes(data, 'sha1') + def test_get_file_metadata(self): + st = os.stat(self.path) + self.assertEqual(list(gemato.verify.get_file_metadata( + self.path, hashes=['MD5', 'SHA1'])), + [True, st.st_dev, (stat.S_IFREG, 'regular file'), + st.st_size, { + 'MD5': self.md5, + 'SHA1': self.sha1, + '__size__': self.size, + }]) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', os.path.basename(self.path), str(self.size))) @@ -488,6 +551,11 @@ class UnreadableFileVerificationTest(unittest.TestCase): os.unlink(self.path) os.rmdir(self.dir) + def test_get_file_metadata(self): + with self.assertRaises(OSError): + list(gemato.verify.get_file_metadata( + os.path.join(self.dir, self.path), [])) + def testDATA(self): e = gemato.manifest.ManifestEntryDATA.from_list( ('DATA', 'test', '0')) -- cgit v1.2.3