summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/verify.py101
-rw-r--r--tests/test_verify.py68
2 files changed, 169 insertions, 0 deletions
diff --git a/gemato/verify.py b/gemato/verify.py
index a4d336b..5e315a2 100644
--- a/gemato/verify.py
+++ b/gemato/verify.py
@@ -13,6 +13,107 @@ import gemato.hash
import gemato.manifest
+def get_file_metadata(path, hashes):
+ """
+ Get a generator for the metadata of the file at system path @path.
+
+ The generator yields, in order:
+ 1. A boolean indicating whether the file exists.
+ 2. st_dev, if the file exists.
+ 3. Tuple of (S_IFMT(st_mode), file type as string), if the file
+ exists.
+ 4. st_size, if the file exists and is a regular file. Note that
+ it may be 0 on some filesystems, so treat the value with caution.
+ 5. A dict of @hashes and their values, if the file exists and is
+ a regular file. Special __size__ member is added unconditionally.
+
+ Note that the generator acquires resources, and does not release
+ them until terminated. Always make sure to pull it until
+ StopIteration, or close it explicitly.
+ """
+
+ try:
+ # we want O_NONBLOCK to avoid blocking when opening pipes
+ fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK)
+ except OSError as err:
+ if err.errno == errno.ENOENT:
+ exists = False
+ opened = False
+ elif err.errno == errno.ENXIO:
+ # unconnected device or socket
+ exists = True
+ opened = False
+ else:
+ raise
+ else:
+ exists = True
+ opened = True
+
+ try:
+ # 1. does it exist?
+ yield exists
+
+ # we can't provide any more data for a file that does not exist
+ if not exists:
+ return
+
+ if opened:
+ st = os.fstat(fd)
+ else:
+ st = os.stat(path)
+
+ # 2. st_dev
+ yield st.st_dev
+
+ # 3. file type tuple
+ if stat.S_ISREG(st.st_mode):
+ ftype = 'regular file'
+ elif stat.S_ISDIR(st.st_mode):
+ ftype = 'directory'
+ elif stat.S_ISCHR(st.st_mode):
+ ftype = 'character device'
+ elif stat.S_ISBLK(st.st_mode):
+ ftype = 'block device'
+ elif stat.S_ISFIFO(st.st_mode):
+ ftype = 'named pipe'
+ elif stat.S_ISSOCK(st.st_mode):
+ ftype = 'UNIX socket'
+ else:
+ ftype = 'unknown'
+ yield (stat.S_IFMT(st.st_mode), ftype)
+
+ if not stat.S_ISREG(st.st_mode):
+ if opened:
+ os.close(fd)
+ return
+
+ # 4. st_size
+ yield st.st_size
+
+ f = os.fdopen(fd, 'rb')
+ except:
+ if opened:
+ os.close(fd)
+ raise
+
+ with f:
+ # open() might have left the file as O_NONBLOCK
+ # make sure to fix that
+ fcntl.fcntl(fd, fcntl.F_SETFL, 0)
+
+ # 5. checksums
+ e_hashes = sorted(hashes)
+ hashes = list(gemato.manifest.manifest_hashes_to_hashlib(e_hashes))
+ e_hashes.append('__size__')
+ hashes.append('__size__')
+ checksums = gemato.hash.hash_file(f, hashes)
+
+ ret = {}
+ for ek, k in zip(e_hashes, hashes):
+ ret[ek] = checksums[k]
+ yield ret
+
+
def verify_path(path, e, expected_dev=None):
"""
Verify the file at system path @path against the data in entry @e.
diff --git a/tests/test_verify.py b/tests/test_verify.py
index 2b656a8..5393b4c 100644
--- a/tests/test_verify.py
+++ b/tests/test_verify.py
@@ -6,6 +6,7 @@
import os
import os.path
import socket
+import stat
import tempfile
import unittest
@@ -21,6 +22,11 @@ class NonExistingFileVerificationTest(unittest.TestCase):
def tearDown(self):
os.rmdir(self.dir)
+ def test_get_file_metadata(self):
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ os.path.join(self.dir, 'test'), hashes=[])),
+ [False])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', 'test', '0'))
@@ -51,6 +57,12 @@ class DirectoryVerificationTest(unittest.TestCase):
def tearDown(self):
os.rmdir(self.dir)
+ def test_get_file_metadata(self):
+ st = os.stat(self.dir)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.dir, hashes=[])),
+ [True, st.st_dev, (stat.S_IFDIR, 'directory')])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.dir), '0'))
@@ -78,6 +90,12 @@ class CharacterDeviceVerificationTest(unittest.TestCase):
def setUp(self):
self.path = '/dev/null'
+ def test_get_file_metadata(self):
+ st = os.stat(self.path)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.path, hashes=[])),
+ [True, st.st_dev, (stat.S_IFCHR, 'character device')])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.path), '0'))
@@ -111,6 +129,12 @@ class NamedPipeVerificationTest(unittest.TestCase):
os.unlink(self.path)
os.rmdir(self.dir)
+ def test_get_file_metadata(self):
+ st = os.stat(self.path)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.path, hashes=[])),
+ [True, st.st_dev, (stat.S_IFIFO, 'named pipe')])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.path), '0'))
@@ -147,6 +171,12 @@ class UNIXSocketVerificationTest(unittest.TestCase):
os.unlink(self.path)
os.rmdir(self.dir)
+ def test_get_file_metadata(self):
+ st = os.stat(self.path)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.path, hashes=[])),
+ [True, st.st_dev, (stat.S_IFSOCK, 'UNIX socket')])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.path), '0'))
@@ -178,6 +208,17 @@ class EmptyFileVerificationTest(unittest.TestCase):
def tearDown(self):
self.f.close()
+ def test_get_file_metadata(self):
+ st = os.stat(self.path)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.path, hashes=['MD5', 'SHA1'])),
+ [True, st.st_dev, (stat.S_IFREG, 'regular file'),
+ 0, {
+ 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
+ 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
+ '__size__': 0,
+ }])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.path), '0'))
@@ -277,6 +318,17 @@ class NonEmptyFileVerificationTest(unittest.TestCase):
def tearDown(self):
self.f.close()
+ def test_get_file_metadata(self):
+ st = os.stat(self.path)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.path, hashes=['MD5', 'SHA1'])),
+ [True, st.st_dev, (stat.S_IFREG, 'regular file'),
+ st.st_size, {
+ 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
+ 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
+ '__size__': 43,
+ }])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.path), '43'))
@@ -412,6 +464,17 @@ class ProcFileVerificationTest(unittest.TestCase):
self.md5 = gemato.hash.hash_bytes(data, 'md5')
self.sha1 = gemato.hash.hash_bytes(data, 'sha1')
+ def test_get_file_metadata(self):
+ st = os.stat(self.path)
+ self.assertEqual(list(gemato.verify.get_file_metadata(
+ self.path, hashes=['MD5', 'SHA1'])),
+ [True, st.st_dev, (stat.S_IFREG, 'regular file'),
+ st.st_size, {
+ 'MD5': self.md5,
+ 'SHA1': self.sha1,
+ '__size__': self.size,
+ }])
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', os.path.basename(self.path), str(self.size)))
@@ -488,6 +551,11 @@ class UnreadableFileVerificationTest(unittest.TestCase):
os.unlink(self.path)
os.rmdir(self.dir)
+ def test_get_file_metadata(self):
+ with self.assertRaises(OSError):
+ list(gemato.verify.get_file_metadata(
+ os.path.join(self.dir, self.path), []))
+
def testDATA(self):
e = gemato.manifest.ManifestEntryDATA.from_list(
('DATA', 'test', '0'))