summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-08-28 15:01:21 +0200
committerMichał Górny <mgorny@gentoo.org>2020-08-28 21:46:16 +0200
commit9021d449d35da8d40999bc0a945b52f9d8bc1640 (patch)
treeee618c688d139230669dc6d29c5e3d735f33f0f4
parent83048b72e52c63df8359f30f8f47b5941da71831 (diff)
downloadgemato-9021d449d35da8d40999bc0a945b52f9d8bc1640.tar.gz
tests: Port test_verify to pytest
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r--tests/test_verify.py1463
1 files changed, 494 insertions, 969 deletions
diff --git a/tests/test_verify.py b/tests/test_verify.py
index c2fbe7d..c613706 100644
--- a/tests/test_verify.py
+++ b/tests/test_verify.py
@@ -1,978 +1,503 @@
# gemato: Verification tests
# vim:fileencoding=utf-8
-# (c) 2017-2018 Michał Górny
+# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
-import io
+import itertools
import os
import os.path
import socket
import stat
-import tempfile
-import unittest
-
-import gemato.exceptions
-import gemato.manifest
-import gemato.verify
-
-
-class NonExistingFileVerificationTest(unittest.TestCase):
- def setUp(self):
- self.dir = tempfile.mkdtemp()
-
- def tearDown(self):
- os.rmdir(self.dir)
-
- def test_get_file_metadata(self):
- self.assertEqual(list(gemato.verify.get_file_metadata(
- os.path.join(self.dir, 'test'), hashes=[])),
- [False])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0'))
- self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, e.path), e),
- (False, [('__exists__', True, False)]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', 'test'))
- self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, e.path), e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(os.path.join(self.dir, 'test'), None),
- (True, []))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA('test', 0, {})
- self.assertRaises(gemato.exceptions.ManifestInvalidPath,
- gemato.verify.update_entry_for_path,
- os.path.join(self.dir, 'test'), e)
-
-
-class DirectoryVerificationTest(unittest.TestCase):
- def setUp(self):
- self.dir = tempfile.mkdtemp()
-
- def tearDown(self):
- os.rmdir(self.dir)
-
- def test_get_file_metadata(self):
- st = os.stat(self.dir)
- self.assertEqual(list(gemato.verify.get_file_metadata(
- self.dir, hashes=[])),
- [True, st.st_dev, (stat.S_IFDIR, 'directory')])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.dir), '0'))
- self.assertEqual(gemato.verify.verify_path(self.dir, e),
- (False, [('__type__', 'regular file', 'directory')]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.dir)))
- self.assertEqual(gemato.verify.verify_path(self.dir, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.dir, None),
- (False, [('__exists__', False, True)]))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.dir), 0, {})
- self.assertRaises(gemato.exceptions.ManifestInvalidPath,
- gemato.verify.update_entry_for_path, self.dir, e)
-
-
-class CharacterDeviceVerificationTest(unittest.TestCase):
- def setUp(self):
- self.path = '/dev/null'
-
- def test_get_file_metadata(self):
- st = os.stat(self.path)
- self.assertEqual(list(gemato.verify.get_file_metadata(
- self.path, hashes=[])),
- [True, st.st_dev, (stat.S_IFCHR, 'character device')])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__type__', 'regular file', 'character device')]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.path)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.path, None),
- (False, [('__exists__', False, True)]))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertRaises(gemato.exceptions.ManifestInvalidPath,
- gemato.verify.update_entry_for_path, self.path, e)
-
-
-class NamedPipeVerificationTest(unittest.TestCase):
- def setUp(self):
- self.dir = tempfile.mkdtemp()
- self.path = os.path.join(self.dir, 'test')
- os.mkfifo(self.path)
-
- def tearDown(self):
- os.unlink(self.path)
- os.rmdir(self.dir)
-
- def test_get_file_metadata(self):
- st = os.stat(self.path)
- self.assertEqual(list(gemato.verify.get_file_metadata(
- self.path, hashes=[])),
- [True, st.st_dev, (stat.S_IFIFO, 'named pipe')])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__type__', 'regular file', 'named pipe')]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.path)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.path, None),
- (False, [('__exists__', False, True)]))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertRaises(gemato.exceptions.ManifestInvalidPath,
- gemato.verify.update_entry_for_path, self.path, e)
-
-
-class UNIXSocketVerificationTest(unittest.TestCase):
- def setUp(self):
- self.dir = tempfile.mkdtemp()
- self.path = os.path.join(self.dir, 'test')
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.sock.bind(self.path)
- self.sock.listen(1)
-
- def tearDown(self):
- self.sock.close()
- os.unlink(self.path)
- os.rmdir(self.dir)
-
- def test_get_file_metadata(self):
- st = os.stat(self.path)
- self.assertEqual(list(gemato.verify.get_file_metadata(
- self.path, hashes=[])),
- [True, st.st_dev, (stat.S_IFSOCK, 'UNIX socket')])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__type__', 'regular file', 'UNIX socket')]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.path)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.path, None),
- (False, [('__exists__', False, True)]))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertRaises(gemato.exceptions.ManifestInvalidPath,
- gemato.verify.update_entry_for_path, self.path, e)
-
-
-class EmptyFileVerificationTest(unittest.TestCase):
- def setUp(self):
- self.f = tempfile.NamedTemporaryFile()
- self.path = self.f.name
-
- def tearDown(self):
- self.f.close()
-
- def test_get_file_metadata(self):
- st = os.stat(self.path)
- self.assertEqual(list(gemato.verify.get_file_metadata(
- self.path, hashes=['MD5', 'SHA1'])),
- [True, st.st_dev, (stat.S_IFREG, 'regular file'),
- 0, st.st_mtime, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- '__size__': 0,
- }])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0',
- 'MD5', 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testWrongSizeDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '5'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__size__', 5, 0)]))
-
- def testWrongSingleChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e')]))
-
- def testWrongAllChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e'),
- ('SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')]))
-
- def testWrongAllChecksumAndSizeDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '39',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__size__', 39, 0),
- ('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e'),
- ('SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', 'da39a3ee5e6b4b0d3255bfef95601890afd80709')]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.path)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.path, None),
- (False, [('__exists__', False, True)]))
-
- def testCrossFilesystem(self):
- if not os.path.ismount('/proc'):
- raise unittest.SkipTest('/proc is not a mountpoint')
-
- try:
- st = os.stat('/proc')
- except OSError:
- raise unittest.SkipTest('Unable to stat /proc')
-
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertRaises(gemato.exceptions.ManifestCrossDevice,
- gemato.verify.verify_path, self.path, e,
- expected_dev=st.st_dev)
-
- def testCrossFilesystemAssert(self):
- if not os.path.ismount('/proc'):
- raise unittest.SkipTest('/proc is not a mountpoint')
-
- try:
- st = os.stat('/proc')
- except OSError:
- raise unittest.SkipTest('Unable to stat /proc')
-
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertRaises(gemato.exceptions.ManifestCrossDevice,
- gemato.verify.verify_path, self.path, e,
- expected_dev=st.st_dev)
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertFalse(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {})
-
- def test_update_with_hashes(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1']))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
-
- def test_update_with_hashes_from_manifest(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
-
- def test_update_with_hashes_unchanged(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
- self.assertFalse(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1']))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
-
- def test_update_cross_filesystem(self):
- if not os.path.ismount('/proc'):
- raise unittest.SkipTest('/proc is not a mountpoint')
-
- try:
- st = os.stat('/proc')
- except OSError:
- raise unittest.SkipTest('Unable to stat /proc')
-
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertRaises(gemato.exceptions.ManifestCrossDevice,
- gemato.verify.update_entry_for_path, self.path, e,
- expected_dev=st.st_dev)
-
- def test_update_MISC(self):
- e = gemato.manifest.ManifestEntryMISC(
- os.path.basename(self.path), 0, {})
- self.assertFalse(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {})
-
- def test_update_IGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE(
- os.path.basename(self.path))
- self.assertRaises(AssertionError,
- gemato.verify.update_entry_for_path, self.path, e)
-
- def test_update_AUX(self):
- e = gemato.manifest.ManifestEntryAUX(
- os.path.basename(self.path), 0, {})
- self.assertFalse(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path,
- os.path.join('files', os.path.basename(self.path)))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {})
-
- def test_wrong_checksum_DATA_with_old_mtime(self):
- """
- Test whether the checksums are verified if last mtime is older
- than the current one.
- """
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(
- gemato.verify.verify_path(self.path, e,
- last_mtime=0),
- (False, [('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e')]))
-
- def test_wrong_checksum_DATA_with_new_mtime(self):
- """
- Test whether the checksums are verified if last mtime indicates
- that the file did not change (with st_size == 0).
- """
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- st = os.stat(self.path)
- self.assertEqual(
- gemato.verify.verify_path(self.path, e,
- last_mtime=st.st_mtime),
- (False, [('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e')]))
-
- def test_update_with_hashes_and_old_mtime(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1'], last_mtime=0))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
-
- def test_update_with_hashes_and_new_mtime(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
- st = os.stat(self.path)
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1'], last_mtime=st.st_mtime))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 0)
- self.assertDictEqual(e.checksums, {
- 'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
-
- def test_None_with_mtime(self):
- """
- Test that mtime does not cause stray files to go unnoticed.
- """
- st = os.stat(self.path)
- self.assertEqual(
- gemato.verify.verify_path(self.path, None,
- last_mtime=st.st_mtime),
- (False, [('__exists__', False, True)]))
-
-
-class NonEmptyFileVerificationTest(unittest.TestCase):
- def setUp(self):
- TEST_STRING = b'The quick brown fox jumps over the lazy dog'
- self.f = tempfile.NamedTemporaryFile()
- self.f.write(TEST_STRING)
- self.f.flush()
- self.path = self.f.name
-
- def tearDown(self):
- self.f.close()
-
- def test_get_file_metadata(self):
- st = os.stat(self.path)
- self.assertEqual(list(gemato.verify.get_file_metadata(
- self.path, hashes=['MD5', 'SHA1'])),
- [True, st.st_dev, (stat.S_IFREG, 'regular file'),
- st.st_size, st.st_mtime, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
- '__size__': 43,
- }])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '43'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '43',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testWrongSizeDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__size__', 0, 43)]))
-
- def testWrongSingleChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '43',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')]))
-
- def testWrongAllChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '43',
- 'MD5', 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', '9e107d9d372bb6826bd81d3542a419d6'),
- ('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')]))
-
- def testWrongAllChecksumAndSizeDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '0',
- 'MD5', 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__size__', 0, 43)]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.path)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.path, None),
- (False, [('__exists__', False, True)]))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 43)
- self.assertDictEqual(e.checksums, {})
-
- def test_update_with_hashes(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1']))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 43)
- self.assertDictEqual(e.checksums, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
- })
-
- def test_update_with_hashes_from_manifest(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 43)
- self.assertDictEqual(e.checksums, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
- })
-
- def test_wrong_checksum_DATA_with_old_mtime(self):
- """
- Test whether the checksums are verified if last mtime is older
- than the current one.
- """
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '43',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(
- gemato.verify.verify_path(self.path, e,
- last_mtime=0),
- (False, [('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12')]))
-
- def test_wrong_checksum_DATA_with_new_mtime(self):
- """
- Test whether the checksums are verified if last mtime indicates
- that the file did not change.
- """
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '43',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- st = os.stat(self.path)
- self.assertEqual(
- gemato.verify.verify_path(self.path, e,
- last_mtime=st.st_mtime),
- (True, []))
-
- def test_wrong_checksum_DATA_with_new_mtime_and_wrong_size(self):
- """
- Test whether the checksums are verified if last mtime indicates
- that the file did not change but size is different.
- """
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '33',
- 'MD5', '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- st = os.stat(self.path)
- self.assertEqual(
- gemato.verify.verify_path(self.path, e,
- last_mtime=st.st_mtime),
- (False, [('__size__', 33, 43)]))
-
- def test_update_with_hashes_and_old_mtime(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1'], last_mtime=0))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 43)
- self.assertDictEqual(e.checksums, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
- })
-
- def test_update_with_hashes_and_new_mtime(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 43, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
- st = os.stat(self.path)
- self.assertFalse(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1'], last_mtime=st.st_mtime))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 43)
- self.assertDictEqual(e.checksums, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
-
- def test_update_with_hashes_and_new_mtime_but_wrong_size(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 33, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
- })
- st = os.stat(self.path)
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1'], last_mtime=st.st_mtime))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, 43)
- self.assertDictEqual(e.checksums, {
- 'MD5': '9e107d9d372bb6826bd81d3542a419d6',
- 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
- })
-
- def test_None_with_mtime(self):
- """
- Test that mtime does not cause stray files to go unnoticed.
- """
- st = os.stat(self.path)
- self.assertEqual(
- gemato.verify.verify_path(self.path, None,
- last_mtime=st.st_mtime),
- (False, [('__exists__', False, True)]))
-
-
-class SymbolicLinkVerificationTest(NonEmptyFileVerificationTest):
- """
- A variant of regular file test using symlink.
- """
-
- def setUp(self):
- TEST_STRING = b'The quick brown fox jumps over the lazy dog'
- self.dir = tempfile.mkdtemp()
- self.real_path = os.path.join(self.dir, 'real')
- self.path = os.path.join(self.dir, 'symlink')
- with io.open(self.real_path, 'wb') as f:
- f.write(TEST_STRING)
- os.symlink('real', self.path)
-
- def tearDown(self):
- os.unlink(self.path)
- os.unlink(self.real_path)
- os.rmdir(self.dir)
-
-
-class SymbolicLinkDirectoryVerificationTest(DirectoryVerificationTest):
- """
- A variant of directory test using symlink.
- """
-
- def setUp(self):
- self.top_dir = tempfile.mkdtemp()
- self.real_dir = os.path.join(self.top_dir, 'real')
- self.dir = os.path.join(self.top_dir, 'symlink')
- os.mkdir(self.real_dir)
- os.symlink('real', self.dir)
-
- def tearDown(self):
- os.unlink(self.dir)
- os.rmdir(self.real_dir)
- os.rmdir(self.top_dir)
-
-
-class BrokenSymbolicLinkVerificationTest(NonExistingFileVerificationTest):
- def setUp(self):
- self.dir = tempfile.mkdtemp()
- self.path = os.path.join(self.dir, 'test')
- os.symlink('broken', self.path)
-
- def tearDown(self):
- os.unlink(self.path)
- os.rmdir(self.dir)
-
-
-class ProcFileVerificationTest(unittest.TestCase):
- """
- Attempt to verify a file from /proc to verify that we can handle
- filesystems that do not report st_size.
- """
-
- def setUp(self):
- self.path = '/proc/version'
- try:
- with io.open(self.path, 'rb') as f:
- data = f.read()
- st = os.fstat(f.fileno())
- except:
- raise unittest.SkipTest('{} not readable'.format(self.path))
-
- if st.st_size != 0:
- raise unittest.SkipTest('{} st_size is not 0'.format(self.path))
-
- self.size = len(data)
- if self.size == 0:
- raise unittest.SkipTest('{} empty'.format(self.path))
- self.md5 = gemato.hash.hash_bytes(data, 'md5')
- self.sha1 = gemato.hash.hash_bytes(data, 'sha1')
-
- def test_get_file_metadata(self):
- st = os.stat(self.path)
- metadata = list(gemato.verify.get_file_metadata(
- self.path, hashes=['MD5', 'SHA1']))
- # mtime is not meaningful on procfs, and changes with every stat
- metadata[4] = 0
- self.assertEqual(metadata,
- [True, st.st_dev, (stat.S_IFREG, 'regular file'),
- st.st_size, 0, {
- 'MD5': self.md5,
- 'SHA1': self.sha1,
- '__size__': self.size,
- }])
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), str(self.size)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), str(self.size),
- 'MD5', self.md5,
- 'SHA1', self.sha1))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testWrongSizeDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '47474'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__size__', 47474, self.size)]))
-
- def testWrongSingleChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), self.size,
- 'MD5', self.md5,
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', self.sha1)]))
-
- def testWrongAllChecksumDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), self.size,
- 'MD5', 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', self.md5),
- ('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', self.sha1)]))
-
- def testWrongAllChecksumAndSizeDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', os.path.basename(self.path), '47474',
- 'MD5', 'd41d8cd98f00b204e9800998ecf8427e',
- 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (False, [('__size__', 47474, self.size),
- ('MD5', 'd41d8cd98f00b204e9800998ecf8427e', self.md5),
- ('SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709', self.sha1)]))
-
- def testIGNORE(self):
- e = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', os.path.basename(self.path)))
- self.assertEqual(gemato.verify.verify_path(self.path, e),
- (True, []))
-
- def testNone(self):
- self.assertEqual(gemato.verify.verify_path(self.path, None),
- (False, [('__exists__', False, True)]))
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, self.size)
- self.assertDictEqual(e.checksums, {})
-
- def test_update_with_hashes(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e,
- ['MD5', 'SHA1']))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, self.size)
- self.assertDictEqual(e.checksums, {
- 'MD5': self.md5,
- 'SHA1': self.sha1,
- })
-
- def test_update_with_hashes_from_manifest(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {'MD5': '', 'SHA1': ''})
- self.assertTrue(
- gemato.verify.update_entry_for_path(self.path, e))
- self.assertEqual(e.path, os.path.basename(self.path))
- self.assertEqual(e.size, self.size)
- self.assertDictEqual(e.checksums, {
- 'MD5': self.md5,
- 'SHA1': self.sha1,
- })
-
-
-class UnreadableFileVerificationTest(unittest.TestCase):
- def setUp(self):
- self.dir = tempfile.mkdtemp()
- self.path = os.path.join(self.dir, 'test')
- with io.open(self.path, 'w'):
+
+import pytest
+
+from gemato.exceptions import (
+ ManifestInvalidPath,
+ ManifestCrossDevice,
+ )
+from gemato.hash import hash_path
+from gemato.manifest import new_manifest_entry
+from gemato.verify import (
+ get_file_metadata,
+ verify_path,
+ update_entry_for_path,
+ verify_entry_compatibility,
+ )
+
+
+TEST_STRING = b'The quick brown fox jumps over the lazy dog'
+
+
+@pytest.fixture
+def test_tree(tmp_path):
+ """Test tree with different file types needed for tests"""
+ with open(tmp_path / 'empty-file', 'w'):
+ pass
+ with open(tmp_path / 'regular-file', 'wb') as f:
+ f.write(TEST_STRING)
+ with open(tmp_path / 'unreadable-file', 'w') as f:
+ os.chmod(f.fileno(), 0)
+ os.mkdir(tmp_path / 'directory')
+ os.symlink('regular-file', tmp_path / 'symlink-to-file')
+ os.symlink('directory', tmp_path / 'symlink-to-directory')
+ os.symlink('non-existing', tmp_path / 'symlink-broken')
+ os.mkfifo(tmp_path / 'named-pipe')
+ unix_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ unix_sock.bind(str(tmp_path / 'unix-socket'))
+ unix_sock.listen(1)
+ yield tmp_path
+ unix_sock.close()
+
+
+NONFILE_TEST_PATHS = [
+ 'directory',
+ 'symlink-to-directory',
+ '/dev/null', # character device
+ 'named-pipe',
+ 'unix-socket',
+]
+EMPTY_FILE_TEST_PATHS = [
+ 'empty-file',
+ '/proc/version', # special file
+]
+NONEMPTY_FILE_TEST_PATHS = [
+ 'regular-file',
+ 'symlink-to-file',
+]
+FILE_TEST_PATHS = EMPTY_FILE_TEST_PATHS + NONEMPTY_FILE_TEST_PATHS
+NONEXIST_TEST_PATHS = [
+ 'non-existing',
+ 'symlink-broken',
+]
+ALL_TEST_PATHS = (NONFILE_TEST_PATHS + FILE_TEST_PATHS +
+ NONEXIST_TEST_PATHS)
+
+TEST_PATH_TYPES = {
+ 'directory': (stat.S_IFDIR, 'directory'),
+ 'symlink-to-directory': (stat.S_IFDIR, 'directory'),
+ '/dev/null': (stat.S_IFCHR, 'character device'),
+ 'named-pipe': (stat.S_IFIFO, 'named pipe'),
+ 'unix-socket': (stat.S_IFSOCK, 'UNIX socket'),
+ 'empty-file': (stat.S_IFREG, 'regular file'),
+ 'regular-file': (stat.S_IFREG, 'regular file'),
+ 'symlink-to-file': (stat.S_IFREG, 'regular file'),
+ '/proc/version': (stat.S_IFREG, 'regular file'),
+}
+
+
+def get_checksums(path):
+ """Get checksums for the specified path"""
+ try:
+ hashes = hash_path(path, ['md5', 'sha1', '__size__'])
+ except FileNotFoundError:
+ return None
+
+ return {
+ 'MD5': hashes['md5'],
+ 'SHA1': hashes['sha1'],
+ '__size__': hashes['__size__'],
+ }
+
+
+TEST_PATH_SIZES = {
+ 'empty-file': 0,
+ 'regular-file': 43,
+ 'symlink-to-file': 43,
+ '/proc/version': 0,
+}
+TEST_PATH_CHECKSUMS = {
+ 'empty-file': {'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
+ 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709',
+ '__size__': TEST_PATH_SIZES['empty-file'],
+ },
+ 'regular-file': {'MD5': '9e107d9d372bb6826bd81d3542a419d6',
+ 'SHA1': '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12',
+ '__size__': TEST_PATH_SIZES['regular-file'],
+ },
+ 'symlink-to-file': {'MD5': '9e107d9d372bb6826bd81d3542a419d6',
+ 'SHA1': '2fd4e1c67a2d28fced84'
+ '9ee1bb76e7391b93eb12',
+ '__size__': TEST_PATH_SIZES['regular-file'],
+ },
+ '/proc/version': get_checksums('/proc/version'),
+}
+
+
+def strip_size(checksums):
+ """Strip __size__ from checksum dict"""
+ d = dict(checksums)
+ del d['__size__']
+ return d
+
+
+@pytest.mark.parametrize(
+ # note: None for dynamic components, test will fill the blanks in
+ 'path,expected',
+ [(path, [False]) for path in NONEXIST_TEST_PATHS] +
+ [(path, [True, None, TEST_PATH_TYPES[path]])
+ for path in NONFILE_TEST_PATHS] +
+ [(path, [True, None, TEST_PATH_TYPES[path], TEST_PATH_SIZES[path],
+ None, TEST_PATH_CHECKSUMS[path]])
+ for path in FILE_TEST_PATHS])
+def test_get_file_metadata(test_tree, path, expected):
+ if path.startswith('/') and not os.path.exists(path):
+ pytest.skip(f'{path} does not exist')
+
+ try:
+ st = os.stat(test_tree / path)
+ except FileNotFoundError:
+ pass
+ else:
+ # fill in the blanks
+ if len(expected) > 1:
+ assert expected[1] is None
+ expected[1] = st.st_dev
+ if len(expected) > 4:
+ assert expected[4] is None
+ expected[4] = st.st_mtime
+
+ assert (
+ list(get_file_metadata(test_tree / path,
+ hashes=['MD5', 'SHA1'])) ==
+ expected)
+
+
+EMPTY_FILE_DATA = [0, {}]
+ZERO_MD5 = '00000000000000000000000000000000'
+ZERO_SHA1 = '0000000000000000000000000000000000000000'
+
+
+def mangle_one_checksum(checksums):
+ """Returns checksums with MD5 mangled"""
+ d = strip_size(checksums)
+ d['MD5'] = ZERO_MD5
+ if checksums['MD5'] == d['MD5']:
+ pytest.skip('MD5 already was zeros, how likely is that!?')
+ return d
+
+
+def mangle_both_checksums(checksums):
+ """Returns checksums with MD5 and SHA1 mangled"""
+ d = strip_size(checksums)
+ d['MD5'] = ZERO_MD5
+ d['SHA1'] = ZERO_SHA1
+ if checksums['MD5'] == d['MD5']:
+ pytest.skip('MD5 already was zeros, how likely is that!?')
+ if checksums['SHA1'] == d['SHA1']:
+ pytest.skip('SHA1 already was zeros, how likely is that!?')
+ return d
+
+
+class FILE_MTIME:
+ pass
+
+
+@pytest.mark.parametrize(
+ 'path,entry,args,last_mtime,expected,diff',
+ # IGNORE should pass for everything, even unreadable
+ [(path, 'IGNORE', [], None, True, [])
+ for path in ALL_TEST_PATHS + ['unreadable-file']] +
+ # None means must not exist, so passes for non-existing,
+ # fails for everything existing
+ [(path, None, [], None, True, []) for path in NONEXIST_TEST_PATHS] +
+ [(path, None, [], None, False, [('__exists__', False, True)])
+ for path in ALL_TEST_PATHS if path not in NONEXIST_TEST_PATHS] +
+ # test DATA on non-regular files
+ [('non-existing', 'DATA', EMPTY_FILE_DATA,
+ None, False, [('__exists__', True, False)])] +
+ [(path, 'DATA', EMPTY_FILE_DATA,
+ None, False,
+ [('__type__', 'regular file', TEST_PATH_TYPES[path][1])])
+ for path in NONFILE_TEST_PATHS] +
+ # test DATA on regular files
+ list(itertools.chain.from_iterable(
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'], {}],
+ None, True, []),
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ strip_size(TEST_PATH_CHECKSUMS[path])],
+ None, True, []),
+ # wrong size
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'] + 11, {}],
+ None, False, [('__size__',
+ TEST_PATH_CHECKSUMS[path]['__size__'] + 11,
+ TEST_PATH_CHECKSUMS[path]['__size__'])]),
+ # one wrong checksum
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ mangle_one_checksum(TEST_PATH_CHECKSUMS[path])],
+ None, False,
+ [('MD5', ZERO_MD5, TEST_PATH_CHECKSUMS[path]['MD5'])]),
+ # both wrong checksums
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ mangle_both_checksums(
+ TEST_PATH_CHECKSUMS[path])],
+ None, False,
+ [('MD5', ZERO_MD5, TEST_PATH_CHECKSUMS[path]['MD5']),
+ ('SHA1', ZERO_SHA1, TEST_PATH_CHECKSUMS[path]['SHA1'])]),
+ ] for path in FILE_TEST_PATHS)) +
+ # both wrong checksums + size (different for st_size == 0)
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'] + 11,
+ mangle_both_checksums(
+ TEST_PATH_CHECKSUMS[path])],
+ None, False, [('__size__',
+ TEST_PATH_CHECKSUMS[path]['__size__'] + 11,
+ TEST_PATH_CHECKSUMS[path]['__size__'])])
+ for path in NONEMPTY_FILE_TEST_PATHS] +
+ # on empty files, mtime does not matter
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ mangle_both_checksums(TEST_PATH_CHECKSUMS[path])],
+ mtime, False,
+ [('MD5', ZERO_MD5, TEST_PATH_CHECKSUMS[path]['MD5']),
+ ('SHA1', ZERO_SHA1, TEST_PATH_CHECKSUMS[path]['SHA1'])])
+ for path in EMPTY_FILE_TEST_PATHS
+ for mtime in (0, FILE_MTIME)] +
+ # on non-empty files with correct size, up-to-date mtime skips
+ # checksum
+ list(itertools.chain.from_iterable(
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ mangle_both_checksums(TEST_PATH_CHECKSUMS[path])],
+ 0, False,
+ [('MD5', ZERO_MD5, TEST_PATH_CHECKSUMS[path]['MD5']),
+ ('SHA1', ZERO_SHA1, TEST_PATH_CHECKSUMS[path]['SHA1'])]),
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ mangle_both_checksums(TEST_PATH_CHECKSUMS[path])],
+ FILE_MTIME, True, []),
+ # but size change invalidates it
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'] + 11,
+ mangle_both_checksums(TEST_PATH_CHECKSUMS[path])],
+ FILE_MTIME, False,
+ [('__size__',
+ TEST_PATH_CHECKSUMS[path]['__size__'] + 11,
+ TEST_PATH_CHECKSUMS[path]['__size__'])]),
+ ] for path in NONEMPTY_FILE_TEST_PATHS)) +
+ [(path, None, [], FILE_MTIME, False, [('__exists__', False, True)])
+ for path in FILE_TEST_PATHS])
+def test_verify_path(test_tree, path, entry, args, last_mtime, expected,
+ diff):
+ if path.startswith('/') and not os.path.exists(path):
+ pytest.skip(f'{path} does not exist')
+ if entry is not None:
+ entry = new_manifest_entry(entry, path, *args)
+ if last_mtime is FILE_MTIME:
+ st = os.stat(test_tree / path)
+ last_mtime = st.st_mtime
+ assert verify_path(test_tree / path,
+ entry,
+ last_mtime=last_mtime) == (expected, diff)
+
+
+@pytest.mark.parametrize(
+ 'path,key,match',
+ [(path, '__exists__', False)
+ for path in NONEXIST_TEST_PATHS] +
+ [(path, '__type__', TEST_PATH_TYPES[path][1])
+ for path in NONFILE_TEST_PATHS])
+def test_update_fail(test_tree, path, key, match):
+ entry = new_manifest_entry('DATA', path, 0, {})
+ with pytest.raises(ManifestInvalidPath) as exc:
+ update_entry_for_path(test_tree / path, entry)
+ assert exc.value.detail == (key, match)
+
+
+@pytest.mark.parametrize('function', [verify_path,
+ update_entry_for_path])
+def test_cross_filesystem(test_tree, function):
+ filename = 'empty-file'
+ try:
+ st = os.stat('/proc')
+ lst = os.stat(test_tree / filename)
+ except OSError:
+ pytest.skip('unable to stat /proc or empty-file')
+ if st.st_dev == lst.st_dev:
+ pytest.skip('/proc and test tree on the same filesystem!?')
+
+ entry = new_manifest_entry('DATA', filename, 0, {})
+ with pytest.raises(ManifestCrossDevice):
+ function(test_tree / filename, entry, expected_dev=st.st_dev)
+
+
+@pytest.mark.parametrize(
+ 'path,cls,args,new_hashes,last_mtime,retval,new_data',
+ list(itertools.chain.from_iterable(
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'], {}],
+ None, None, False,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ]),
+ (path, 'MISC', [TEST_PATH_CHECKSUMS[path]['__size__'], {}],
+ None, None, False,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ]),
+ # unchanged hashes
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ strip_size(TEST_PATH_CHECKSUMS[path])],
+ None, None, False,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ]),
+ # new hashes
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'], {}],
+ ['MD5', 'SHA1'], None, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ]),
+ # fill hashes already in Manifest
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ {'MD5': '', 'SHA1': ''}],
+ None, None, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ]),
+ # subset of existing hashes
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ strip_size(TEST_PATH_CHECKSUMS[path])],
+ ['MD5'], None, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', {'MD5': TEST_PATH_CHECKSUMS[path]['MD5']}),
+ ]),
+ # superset of existing hashes
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ {'MD5': TEST_PATH_CHECKSUMS[path]['MD5']}],
+ ['MD5', 'SHA1'], None, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ]),
+ ] for path in FILE_TEST_PATHS)) +
+ # mtime does not affect empty files
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ {'MD5': ZERO_MD5, 'SHA1': ZERO_SHA1}],
+ ['MD5', 'SHA1'], mtime, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ])
+ for path in EMPTY_FILE_TEST_PATHS
+ for mtime in (0, FILE_MTIME)] +
+ # but non-empty files with recent mtime do not get rechecked
+ list(itertools.chain.from_iterable(
+ [(path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ {'MD5': ZERO_MD5, 'SHA1': ZERO_SHA1}],
+ ['MD5', 'SHA1'], 0, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ]),
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'],
+ {'MD5': ZERO_MD5, 'SHA1': ZERO_SHA1}],
+ ['MD5', 'SHA1'], FILE_MTIME, False,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', {'MD5': ZERO_MD5, 'SHA1': ZERO_SHA1}),
+ ]),
+ # size invalidates recent mtime
+ (path, 'DATA', [TEST_PATH_CHECKSUMS[path]['__size__'] + 11,
+ {'MD5': ZERO_MD5, 'SHA1': ZERO_SHA1}],
+ ['MD5', 'SHA1'], FILE_MTIME, True,
+ [('size', TEST_PATH_CHECKSUMS[path]['__size__']),
+ ('checksums', strip_size(TEST_PATH_CHECKSUMS[path])),
+ ]),
+ ] for path in NONEMPTY_FILE_TEST_PATHS)) +
+ [])
+def test_update(test_tree, path, cls, args, new_hashes, last_mtime,
+ retval, new_data):
+ entry = new_manifest_entry(cls, path, *args)
+ if last_mtime is FILE_MTIME:
+ st = os.stat(test_tree / path)
+ last_mtime = st.st_mtime
+ assert update_entry_for_path(test_tree / path,
+ entry,
+ hashes=new_hashes,
+ last_mtime=last_mtime) is retval
+ assert entry.path == path
+ for k, v in new_data:
+ assert getattr(entry, k) == v
+
+
+def test_update_IGNORE(test_tree):
+ path = 'empty-file'
+ entry = new_manifest_entry('IGNORE', path)
+ with pytest.raises(AssertionError):
+ update_entry_for_path(test_tree / path, entry)
+
+
+def test_update_AUX(test_tree):
+ path = 'empty-file'
+ entry = new_manifest_entry('AUX', path, *EMPTY_FILE_DATA)
+ assert not update_entry_for_path(test_tree / path, entry)
+ assert entry.aux_path == path
+ assert entry.path == f'files/{path}'
+ assert entry.size == 0
+ assert entry.checksums == {}
+
+
+@pytest.mark.parametrize(
+ 'function,args',
+ [(get_file_metadata, [[]]),
+ (verify_path,
+ [new_manifest_entry('DATA', 'unreadable-file', 0, {})]),
+ (update_entry_for_path,
+ [new_manifest_entry('DATA', 'unreadable-file', 0, {})]),
+ ])
+def test_unreadable_file(test_tree, function, args):
+ with pytest.raises(PermissionError):
+ for x in function(test_tree / 'unreadable-file', *args):
pass
- os.chmod(self.path, 0)
-
- def tearDown(self):
- os.unlink(self.path)
- os.rmdir(self.dir)
-
- def test_get_file_metadata(self):
- with self.assertRaises(OSError):
- list(gemato.verify.get_file_metadata(
- os.path.join(self.dir, self.path), []))
-
- def testDATA(self):
- e = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0'))
- self.assertRaises(OSError, gemato.verify.verify_path,
- os.path.join(self.dir, e.path), e)
-
- def test_update(self):
- e = gemato.manifest.ManifestEntryDATA(
- os.path.basename(self.path), 0, {})
- self.assertRaises(OSError,
- gemato.verify.update_entry_for_path, self.path, e)
-
-
-class EntryCompatibilityVerificationTest(unittest.TestCase):
- def test_matching_entry(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (True, []))
-
- def test_compatible_types(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryEBUILD.from_list(
- ('EBUILD', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (True, []))
-
- def test_compatible_types_AUX(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'files/test.patch', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryAUX.from_list(
- ('AUX', 'test.patch', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (True, []))
-
- def test_compatible_types_MANIFEST(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryMANIFEST.from_list(
- ('MANIFEST', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (True, []))
-
- def test_incompatible_types_DATA_MISC(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryMISC.from_list(
- ('MISC', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (False, [('__type__', 'DATA', 'MISC')]))
-
- def test_incompatible_types_DATA_IGNORE(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryIGNORE.from_list(
- ('IGNORE', 'test'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (False, [('__type__', 'DATA', 'IGNORE')]))
-
- def test_incompatible_types_DATA_DIST(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryDIST.from_list(
- ('DIST', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (False, [('__type__', 'DATA', 'DIST')]))
-
- def test_mismatched_size(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '32', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (False, [('__size__', 0, 32)]))
-
- def test_mismatched_md5(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', '9e107d9d372bb6826bd81d3542a419d6'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (False, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', '9e107d9d372bb6826bd81d3542a419d6')]))
-
- def test_different_hashsets(self):
- e1 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'MD5', 'd41d8cd98f00b204e9800998ecf8427e'))
- e2 = gemato.manifest.ManifestEntryDATA.from_list(
- ('DATA', 'test', '0', 'SHA1', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'))
- self.assertEqual(gemato.verify.verify_entry_compatibility(e1, e2),
- (True, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', None),
- ('SHA1', None, 'da39a3ee5e6b4b0d3255bfef95601890afd80709')]))
+
+
+@pytest.mark.parametrize(
+ 'a_cls,a_name,a_args,b_cls,b_name,b_args,expected,diff',
+ [('DATA', 'test', [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'DATA', 'test', [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ True, []),
+ ('DATA', 'test-1.ebuild',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'EBUILD', 'test-1.ebuild',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ True, []),
+ ('DATA', 'files/test.patch',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'AUX', 'test.patch',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ True, []),
+ ('DATA', 'Manifest',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'MANIFEST', 'Manifest',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ True, []),
+ ('DATA', 'metadata.xml',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'MISC', 'metadata.xml',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ False, [('__type__', 'DATA', 'MISC')]),
+ ('DATA', 'test',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'IGNORE', 'test', [],
+ False, [('__type__', 'DATA', 'IGNORE')]),
+ ('DATA', 'test-1.tar.gz',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'DIST', 'test-1.tar.gz',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ False, [('__type__', 'DATA', 'DIST')]),
+ ('DATA', 'mismatched-size',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'DATA', 'mismatched-size',
+ [32, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ False, [('__size__', 0, 32)]),
+ ('DATA', 'mismatched-md5',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'DATA', 'mismatched-md5',
+ [0, {'MD5': ZERO_MD5}],
+ False, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', ZERO_MD5)]),
+ ('DATA', 'hash-subset',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'DATA', 'mismatched-md5',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e',
+ 'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'}],
+ True, [('SHA1', None, 'da39a3ee5e6b4b0d3255bfef95601890afd80709')]),
+ ('DATA', 'mismatched-hash-sets',
+ [0, {'MD5': 'd41d8cd98f00b204e9800998ecf8427e'}],
+ 'DATA', 'mismatched-md5',
+ [0, {'SHA1': 'da39a3ee5e6b4b0d3255bfef95601890afd80709'}],
+ True, [('MD5', 'd41d8cd98f00b204e9800998ecf8427e', None),
+ ('SHA1', None, 'da39a3ee5e6b4b0d3255bfef95601890afd80709')]),
+ ])
+def test_entry_compatibility(a_cls, a_name, a_args, b_cls, b_name,
+ b_args, expected, diff):
+ e1 = new_manifest_entry(a_cls, a_name, *a_args)
+ e2 = new_manifest_entry(b_cls, b_name, *b_args)
+ assert verify_entry_compatibility(e1, e2) == (expected, diff)