diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-08-31 18:42:06 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-09-05 11:15:11 +0200 |
commit | c401d641658697a6d3fddb5b08854a316e20258c (patch) | |
tree | 539e780417f468f81d17397c57bf5ae2954aa2b8 /tests/test_recursiveloader.py | |
parent | 6ad33d2d0abe1cec2c19d1e1d14a8b81c3a14c43 (diff) | |
download | gemato-c401d641658697a6d3fddb5b08854a316e20258c.tar.gz |
tests: Port test_recursiveloader to pytest
Signed-off-by: Michał Górny <mgorny@gentoo.org>
Diffstat (limited to 'tests/test_recursiveloader.py')
-rw-r--r-- | tests/test_recursiveloader.py | 4621 |
1 files changed, 2108 insertions, 2513 deletions
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py index be32fc4..ee11e19 100644 --- a/tests/test_recursiveloader.py +++ b/tests/test_recursiveloader.py @@ -1,2771 +1,2366 @@ # gemato: Recursive loader tests # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import base64 import datetime import gzip -import io +import itertools import os -import unittest + +import pytest import gemato.cli -import gemato.exceptions -import gemato.recursiveloader +from gemato.compression import open_potentially_compressed_path +from gemato.exceptions import ( + ManifestMismatch, + ManifestInvalidPath, + ManifestIncompatibleEntry, + ManifestCrossDevice, + ManifestSymlinkLoop, + ) +from gemato.manifest import ManifestPathEntry +from gemato.recursiveloader import ManifestRecursiveLoader + +from tests.test_compression import COMPRESSION_ALGOS +from tests.testutil import disallow_writes + -from tests.testutil import TempDirTestCase +class LayoutFactory: + """Factory to install layouts in temporary directory with cleanup""" + def __init__(self, tmp_path): + self.tmp_path = tmp_path + self.layouts = [] -def callback_return_true(e): - return True + def create(self, layout, readonly=False): + layout.create(self.tmp_path) + self.layouts.append(layout) + if readonly: + disallow_writes(self.tmp_path) + return self.tmp_path + def cleanup(self): + for layout in self.layouts: + layout.cleanup(self.tmp_path) -def callback_return_false(e): - return False +@pytest.fixture +def layout_factory(tmp_path): + factory = LayoutFactory(tmp_path) + yield factory + factory.cleanup() + + +class BaseLayout: + TOP_MANIFEST = 'Manifest' + DIRS = [] + MANIFESTS = {} + FILES = {} + + @classmethod + def create(cls, tmp_path): + """Create layout's files in the specified directory""" + cls.FILES = dict(cls.FILES) + cls.FILES.update(cls.MANIFESTS) + for d in cls.DIRS: + os.mkdir(tmp_path / d) + for f, contents in cls.FILES.items(): + bincontents = contents.encode('utf8') + if f.endswith('.gz'): + fclass = gzip.GzipFile + else: + fclass = open + with fclass(tmp_path / f, 'wb') as of: + of.write(bincontents) + + @classmethod + def cleanup(cls, tmp_path): + """Perform any necessary pre-cleanup tasks""" + pass + + +class BasicTestLayout(BaseLayout): + """Commonplace Manifest tree layout""" -class BasicNestingTest(TempDirTestCase): DIRS = ['sub', 'sub/deeper', 'other'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' TIMESTAMP 2017-01-01T01:01:01Z MANIFEST sub/Manifest 128 MD5 30fd28b98a23031c72793908dd35c530 MANIFEST other/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e DIST topdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/Manifest': u''' + 'sub/Manifest': ''' MANIFEST deeper/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a DIST subdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/stray': u'', - 'sub/deeper/Manifest': u''' + 'sub/deeper/Manifest': ''' DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/deeper/test': u'', - 'other/Manifest': u'', - } - - def test_init(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIn('Manifest', m.loaded_manifests) - - def test_load_sub_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertIn('sub/Manifest', m.loaded_manifests) - self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/deeper/test') - self.assertIn('sub/deeper/Manifest', m.loaded_manifests) - self.assertNotIn('other/Manifest', m.loaded_manifests) - - def test_recursive_load_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/deeper/test') - self.assertIn('sub/Manifest', m.loaded_manifests) - self.assertIn('sub/deeper/Manifest', m.loaded_manifests) - self.assertNotIn('other/Manifest', m.loaded_manifests) - - def test_load_manifests_recursively(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests) - self.assertNotIn('other/Manifest', m.loaded_manifests) - m.load_manifests_for_path('', recursive=True) - self.assertIn('sub/Manifest', m.loaded_manifests) - self.assertIn('sub/deeper/Manifest', m.loaded_manifests) - self.assertIn('other/Manifest', m.loaded_manifests) + 'other/Manifest': '', + } + FILES = { + 'sub/stray': '', + 'sub/deeper/test': '', + } - def test__iter_manifests_for_path_order(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.load_manifests_for_path('', recursive=True) - self.assertListEqual([d for mpath, d, k - in m._iter_manifests_for_path('sub/deeper')], - ['sub/deeper', 'sub', '']) - self.assertListEqual([d for mpath, d, k - in m._iter_manifests_for_path('other')], - ['other', '']) - - def test__iter_manifests_for_path_recursively_order(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.load_manifests_for_path('', recursive=True) - self.assertListEqual([d for mpath, d, k - in m._iter_manifests_for_path('sub', - recursive=True)], - ['sub/deeper', 'sub', '']) - - def test_load_sub_manifest_recursively(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub', recursive=True) - self.assertIn('sub/Manifest', m.loaded_manifests) - self.assertIn('sub/deeper/Manifest', m.loaded_manifests) - self.assertNotIn('other/Manifest', m.loaded_manifests) - - def test_find_timestamp(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_timestamp().ts, - datetime.datetime(2017, 1, 1, 1, 1, 1)) - - def test_set_timestamp(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.set_timestamp(datetime.datetime(2010, 7, 7, 7, 7, 7)) - self.assertEqual(m.find_timestamp().ts, - datetime.datetime(2010, 7, 7, 7, 7, 7)) - self.assertEqual( - len([x for x in m.loaded_manifests['Manifest'].entries - if x.tag == 'TIMESTAMP']), - 1) - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIsNone(m.find_path_entry('test')) - self.assertIsNone(m.find_path_entry('sub/test')) - self.assertEqual(m.find_path_entry('sub/deeper/test').path, 'test') - - def test_find_top_dist_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_dist_entry('topdistfile-1.txt').path, 'topdistfile-1.txt') - self.assertIsNone(m.find_dist_entry('subdistfile-1.txt')) - - def test_find_sub_dist_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_dist_entry('topdistfile-1.txt', 'sub').path, 'topdistfile-1.txt') - self.assertEqual(m.find_dist_entry('subdistfile-1.txt', 'sub').path, 'subdistfile-1.txt') - - def test_find_sub_dist_entry_with_slash_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_dist_entry('topdistfile-1.txt', 'sub/').path, 'topdistfile-1.txt') - self.assertEqual(m.find_dist_entry('subdistfile-1.txt', 'sub/').path, 'subdistfile-1.txt') - - def test_find_sub_dist_entry_with_file_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_dist_entry('topdistfile-1.txt', 'sub/file').path, 'topdistfile-1.txt') - self.assertEqual(m.find_dist_entry('subdistfile-1.txt', 'sub/file').path, 'subdistfile-1.txt') - - def test_verify_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.verify_path('sub/deeper/test'), (True, [])) - - def test_verify_nonexistent_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.verify_path('sub/deeper/nonexist'), (True, [])) - - def test_verify_stray_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.verify_path('sub/stray'), - (False, [('__exists__', False, True)])) - - def test_assert_path_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_path_verifies('sub/deeper/test') - - def test_assert_path_verifies_nonexistent_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_path_verifies('sub/deeper/nonexist') - - def test_assert_path_verifies_stray_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.assert_path_verifies, 'sub/stray') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - { - 'other': ['Manifest'], - 'sub': ['Manifest'], - 'sub/deeper': ['Manifest', 'test'], - }) - self.assertEqual(entries['other']['Manifest'].path, 'other/Manifest') - self.assertEqual(entries['sub']['Manifest'].path, 'sub/Manifest') - self.assertEqual(entries['sub/deeper']['Manifest'].path, 'deeper/Manifest') - self.assertEqual(entries['sub/deeper']['test'].path, 'test') - - def test_get_file_entry_dict_only_types(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('', only_types=['MANIFEST']) - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - { - 'other': ['Manifest'], - 'sub': ['Manifest'], - 'sub/deeper': ['Manifest'], - }) - self.assertEqual(entries['other']['Manifest'].path, 'other/Manifest') - self.assertEqual(entries['sub']['Manifest'].path, 'sub/Manifest') - self.assertEqual(entries['sub/deeper']['Manifest'].path, 'deeper/Manifest') - - def test_get_file_entry_dict_only_types_DIST(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('', only_types=['DIST']) - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - { - '': ['subdistfile-1.txt', 'topdistfile-1.txt'], - }) - self.assertEqual(entries['']['subdistfile-1.txt'].path, 'subdistfile-1.txt') - self.assertEqual(entries['']['topdistfile-1.txt'].path, 'topdistfile-1.txt') - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(m.updated_manifests, set()) - self.assertSetEqual(frozenset(entries), - frozenset(( - 'other/Manifest', - 'sub/Manifest', - 'sub/deeper/Manifest', - 'sub/deeper/test', - ))) - self.assertEqual(entries['other/Manifest'][0], 'Manifest') - self.assertEqual(entries['sub/Manifest'][0], 'Manifest') - self.assertEqual(entries['sub/deeper/Manifest'][0], 'sub/Manifest') - self.assertEqual(entries['sub/deeper/test'][0], 'sub/deeper/Manifest') - self.assertEqual(entries['other/Manifest'][1].path, 'other/Manifest') - self.assertEqual(entries['sub/Manifest'][1].path, 'sub/Manifest') - self.assertEqual(entries['sub/deeper/Manifest'][1].path, 'deeper/Manifest') - self.assertEqual(entries['sub/deeper/test'][1].path, 'test') - - def test_get_file_entry_dict_for_sub(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('sub') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - { - 'sub': ['Manifest'], - 'sub/deeper': ['Manifest', 'test'], - }) - self.assertEqual(entries['sub']['Manifest'].path, 'sub/Manifest') - self.assertEqual(entries['sub/deeper']['Manifest'].path, 'deeper/Manifest') - self.assertEqual(entries['sub/deeper']['test'].path, 'test') - - def test_get_deduplicated_file_entry_dict_for_update_for_sub(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('sub') - self.assertSetEqual(m.updated_manifests, set()) - self.assertSetEqual(frozenset(entries), - frozenset(( - 'sub/Manifest', - 'sub/deeper/Manifest', - 'sub/deeper/test', - ))) - self.assertEqual(entries['sub/Manifest'][0], 'Manifest') - self.assertEqual(entries['sub/deeper/Manifest'][0], 'sub/Manifest') - self.assertEqual(entries['sub/deeper/test'][0], 'sub/deeper/Manifest') - self.assertEqual(entries['sub/Manifest'][1].path, 'sub/Manifest') - self.assertEqual(entries['sub/deeper/Manifest'][1].path, 'deeper/Manifest') - self.assertEqual(entries['sub/deeper/test'][1].path, 'test') - - def test_get_file_entry_dict_for_invalid(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertDictEqual(m.get_file_entry_dict('nonexist'), {}) - - def test_get_deduplicated_file_entry_dict_for_update_for_invalid(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertDictEqual( - m.get_deduplicated_file_entry_dict_for_update('nonexist'), - {}) - self.assertSetEqual(m.updated_manifests, set()) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('other') - - def test_assert_directory_verifies_stray_file(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.assert_directory_verifies, 'sub') - - def test_assert_directory_verifies_stray_file_nofail(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertTrue(m.assert_directory_verifies( - 'sub', fail_handler=callback_return_true)) - - def test_assert_directory_verifies_stray_file_nofail_false(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertFalse(m.assert_directory_verifies( - 'sub', fail_handler=callback_return_false)) - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - os.path.join(self.dir, 'other')]), - 0) - - def test_cli_verifies_stray_file(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - os.path.join(self.dir, 'sub')]), - 1) - - def test_cli_verifies_stray_file_keep_going(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', '--keep-going', - os.path.join(self.dir, 'sub')]), - 1) - - def test_cli_fails_without_signed_manifest(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - '--require-signed-manifest', - os.path.join(self.dir, 'other')]), - 1) - - def test_save_manifest(self): - """ - Test if saving the (unmodified) Manifest works. - """ - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.save_manifest('Manifest') - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['Manifest'].lstrip()) - - def test_save_manifests_unmodified(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['Manifest']) - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'other/Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['other/Manifest']) - - def test_save_manifests_force(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.save_manifests(force=True) - # Manifest checksums change - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - - def test_save_manifests_force_sort(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.save_manifests(force=True, sort=True) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), u''' + # rewriting implies stripping leading whitespace + MANIFESTS_REWRITTEN = { + 'Manifest': ''' +TIMESTAMP 2017-01-01T01:01:01Z +MANIFEST sub/Manifest 127 MD5 51d05790f4208f3bdf1087ab31b6c228 +MANIFEST other/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e +DIST topdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip(), + 'sub/Manifest': ''' +MANIFEST deeper/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c +DIST subdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip(), + 'sub/deeper/Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip(), + 'other/Manifest': '', + } + MANIFESTS_SORTED = { + 'Manifest': ''' DIST topdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e MANIFEST other/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e MANIFEST sub/Manifest 127 MD5 de990fbccb1261da02c7513dfec56045 TIMESTAMP 2017-01-01T01:01:01Z -'''.lstrip()) - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), u''' +'''.lstrip(), + 'sub/Manifest': ''' DIST subdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e MANIFEST deeper/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c -'''.lstrip()) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512']) - self.assertIsInstance(m.find_path_entry('sub/stray'), - gemato.manifest.ManifestEntryDATA) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() +'''.lstrip(), + 'sub/deeper/Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip(), + 'other/Manifest': '', + } + MANIFESTS_SHA1 = { + 'Manifest': ''' +TIMESTAMP 2017-01-01T01:01:01Z +MANIFEST sub/Manifest 195 SHA1 bae1428bfbb4ea08a736975217819be285df4474 +MANIFEST other/Manifest 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709 +DIST topdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip(), + 'sub/Manifest': ''' +MANIFEST deeper/Manifest 58 SHA1 4b40f4102dd71fb2083ce9a8d8af6d7e49c281c4 +DIST subdistfile-1.txt 0 MD5 d41d8cd98f00b204e9800998ecf8427e +DATA stray 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709 +'''.lstrip(), + 'sub/deeper/Manifest': ''' +DATA test 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709 +'''.lstrip(), + 'other/Manifest': '', + } - def test_update_entry_for_path_MANIFEST(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'], - new_entry_type='MANIFEST') - self.assertIsInstance(m.find_path_entry('sub/stray'), - gemato.manifest.ManifestEntryMANIFEST) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - self.assertIn('sub/stray', m.loaded_manifests) - - def test_update_entry_for_path_MISC(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'], - new_entry_type='MISC') - self.assertIsInstance(m.find_path_entry('sub/stray'), - gemato.manifest.ManifestEntryMISC) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - def test_update_entry_for_path_EBUILD(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512'], - new_entry_type='EBUILD') - self.assertIsInstance(m.find_path_entry('sub/stray'), - gemato.manifest.ManifestEntryEBUILD) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() +class SubTimestampLayout(BaseLayout): + """Layout that places TIMESTAMP in a sub-Manifest""" - def test_update_entry_for_path_AUX_invalid(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(AssertionError, - m.update_entry_for_path, 'sub/stray', - hashes=['SHA256', 'SHA512'], - new_entry_type='AUX') - - def test_update_entry_for_path_nohash(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(AssertionError, - m.update_entry_for_path, 'sub/stray') - - def test_update_entry_for_path_hash_via_ctor(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entry_for_path('sub/stray') - self.assertListEqual( - sorted(m.find_path_entry('sub/stray').checksums), - ['SHA256', 'SHA512']) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() + DIRS = ['sub'] + MANIFESTS = { + 'Manifest': ''' +MANIFEST sub/Manifest 32 MD5 95737355786df5760d6369a80935cf8a +''', + 'sub/Manifest': ''' +TIMESTAMP 2017-01-01T01:01:01Z +''', + } - def test_update_entry_for_path_hash_via_ctor_and_override(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entry_for_path('sub/stray', hashes=['MD5']) - self.assertListEqual( - sorted(m.find_path_entry('sub/stray').checksums), - ['MD5']) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - def test_update_entry_for_path_discard(self): - """ - Test that files are not modified if save_manifests() - is not called. - """ - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/stray', hashes=['SHA256', 'SHA512']) - self.assertIsInstance(m.find_path_entry('sub/stray'), - gemato.manifest.ManifestEntryDATA) - del m - # relevant Manifests should not have been touched - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['Manifest']) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - self.assertIsInstance(m.find_path_entry('sub/stray'), - gemato.manifest.ManifestEntryDATA) - m.save_manifests() - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() +class MultiManifestLayout(BaseLayout): + """Layout with multiple Manifest files in a subdirectory""" - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', - self.dir]), - 0) - # relevant Manifests should have been updated - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) - m = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - m.load(f) - self.assertNotEqual(m.find_timestamp().ts, - datetime.datetime(2017, 1, 1, 1, 1, 1)) - - def test_compress_manifests_low_watermark(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=0) - # top-level Manifest should not be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - # but sub/Manifest should definitely be compressed - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.gz'))) - - def test_compress_manifests_low_watermark_bz2(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - try: - m.save_manifests(force=True, compress_watermark=0, - compress_format='bz2') - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('bz2 compression unsupported') - else: - # top-level Manifest should not be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.bz2'))) - # but sub/Manifest should definitely be compressed - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.bz2'))) - - def test_compress_manifests_low_watermark_lzma(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - try: - m.save_manifests(force=True, compress_watermark=0, - compress_format='lzma') - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('lzma compression unsupported') - else: - # top-level Manifest should not be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.lzma'))) - # but sub/Manifest should definitely be compressed - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.lzma'))) - - def test_compress_manifests_low_watermark_xz(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - try: - m.save_manifests(force=True, compress_watermark=0, - compress_format='xz') - except gemato.exceptions.UnsupportedCompression: - raise unittest.SkipTest('xz compression unsupported') - else: - # top-level Manifest should not be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.xz'))) - # but sub/Manifest should definitely be compressed - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.xz'))) - - -class MultipleManifestTest(TempDirTestCase): DIRS = ['sub'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' MANIFEST sub/Manifest.a 50 MD5 33fd9df6d410a93ff859d75e088bde7e MANIFEST sub/Manifest.b 32 MD5 95737355786df5760d6369a80935cf8a ''', - 'sub/Manifest.a': u''' + 'sub/Manifest.a': ''' DATA foo 32 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/Manifest.b': u''' + 'sub/Manifest.b': ''' TIMESTAMP 2017-01-01T01:01:01Z ''', - 'sub/foo': u'1234567890123456', - } - - def test_load_sub_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest.a', m.loaded_manifests) - self.assertNotIn('sub/Manifest.b', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertIn('sub/Manifest.a', m.loaded_manifests) - self.assertIn('sub/Manifest.b', m.loaded_manifests) - - def test_load_manifests_recursively(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest.a', m.loaded_manifests) - self.assertNotIn('sub/Manifest.b', m.loaded_manifests) - m.load_manifests_for_path('', recursive=True) - self.assertIn('sub/Manifest.a', m.loaded_manifests) - self.assertIn('sub/Manifest.b', m.loaded_manifests) - - def test_find_timestamp(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - # here it is expected to fail since TIMESTAMP is supposed - # to be top-level - self.assertIsNone(m.find_timestamp()) - - def test_verify_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.verify_path('sub/foo'), - (False, [('__size__', 32, 16)])) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/foo') - m.save_manifests() - # relevant Manifests should have been updated - # but sub/Manifest.b should be left intact - with io.open(os.path.join(self.dir, 'sub/Manifest.a'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) - with io.open(os.path.join(self.dir, 'sub/Manifest.b'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entry_for_path_hashes(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/foo', hashes=['SHA256', 'SHA512']) - # check for checksums - self.assertListEqual( - sorted(m.find_path_entry('sub/foo').checksums), - ['SHA256', 'SHA512']) - m.save_manifests() - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['MD5']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.b').checksums), - ['MD5']) - # relevant Manifests should have been updated - # but sub/Manifest.b should be left intact - with io.open(os.path.join(self.dir, 'sub/Manifest.a'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) - with io.open(os.path.join(self.dir, 'sub/Manifest.b'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entry_for_path_hashes_plus_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/foo', hashes=['SHA256', 'SHA512']) - # check for checksums - self.assertListEqual( - sorted(m.find_path_entry('sub/foo').checksums), - ['SHA256', 'SHA512']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['MD5']) - m.save_manifests(hashes=['SHA1']) - self.assertListEqual( - sorted(m.find_path_entry('sub/foo').checksums), - ['SHA256', 'SHA512']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['SHA1']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.b').checksums), - ['MD5']) - # relevant Manifests should have been updated - # but sub/Manifest.b should be left intact - with io.open(os.path.join(self.dir, 'sub/Manifest.a'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) - with io.open(os.path.join(self.dir, 'sub/Manifest.b'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entry_for_path_hashes_via_ctor(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entry_for_path('sub/foo') - # check for checksums - self.assertListEqual( - sorted(m.find_path_entry('sub/foo').checksums), - ['SHA256', 'SHA512']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['MD5']) - m.save_manifests() - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['SHA256', 'SHA512']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.b').checksums), - ['MD5']) - # relevant Manifests should have been updated - # but sub/Manifest.b should be left intact - with io.open(os.path.join(self.dir, 'sub/Manifest.a'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) - with io.open(os.path.join(self.dir, 'sub/Manifest.b'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entry_for_path_hashes_via_ctor_and_override(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entry_for_path('sub/foo', hashes=['SHA1']) - # check for checksums - self.assertListEqual( - sorted(m.find_path_entry('sub/foo').checksums), - ['SHA1']) - m.save_manifests() - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['SHA256', 'SHA512']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.b').checksums), - ['MD5']) - # relevant Manifests should have been updated - # but sub/Manifest.b should be left intact - with io.open(os.path.join(self.dir, 'sub/Manifest.a'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) - with io.open(os.path.join(self.dir, 'sub/Manifest.b'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() + } + FILES = { + 'sub/foo': '1234567890123456', + } - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - # check for checksums - self.assertListEqual( - sorted(m.find_path_entry('sub/foo').checksums), - ['SHA256', 'SHA512']) - m.save_manifests() - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.a').checksums), - ['SHA256', 'SHA512']) - self.assertListEqual( - sorted(m.find_path_entry('sub/Manifest.b').checksums), - ['SHA256', 'SHA512']) - # relevant Manifests should have been updated - # but sub/Manifest.b should be left intact - with io.open(os.path.join(self.dir, 'sub/Manifest.a'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['sub/Manifest.a']) - with io.open(os.path.join(self.dir, 'sub/Manifest.b'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest.b']) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() +class MultiTopManifestLayout(BaseLayout): + """Layout with multiple Manifest files in the top directory""" -class MultipleTopLevelManifestTest(TempDirTestCase): + DIRS = ['sub'] FILES = { - 'Manifest': u''' -MANIFEST Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e + 'Manifest': ''' +MANIFEST Manifest.a 62 MD5 ae43485cc7bd080800a64b09bbfa53a8 MANIFEST Manifest.b 32 MD5 95737355786df5760d6369a80935cf8a ''', - 'Manifest.a': u'', - 'Manifest.b': u''' + 'Manifest.a': ''' +MANIFEST sub/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + 'Manifest.b': ''' TIMESTAMP 2017-01-01T01:01:01Z ''', + 'sub/Manifest': '', } - def test_load_extra_manifests(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.load_manifests_for_path('') - self.assertIn('Manifest.a', m.loaded_manifests) - self.assertIn('Manifest.b', m.loaded_manifests) - - def test_find_timestamp(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_timestamp().ts, - datetime.datetime(2017, 1, 1, 1, 1, 1)) +class DuplicateEntryLayout(BaseLayout): + """Layout with duplicate (matching) entry for a file""" -class DuplicateFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice. - """ - - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'test': u'', - } - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('test').path, 'test') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - {'': ['test']}) - self.assertEqual(entries['']['test'].path, 'test') - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), frozenset(('test',))) - self.assertEqual(entries['test'][0], 'Manifest') - self.assertEqual(entries['test'][1].path, 'test') - self.assertSetEqual(frozenset(entries['test'][1].checksums), - frozenset(('MD5',))) + } + FILES = { + 'test': '', + } - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('') - - def test_set_timestamp(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIsNone(m.find_timestamp()) - m.set_timestamp(datetime.datetime(2010, 7, 7, 7, 7, 7)) - self.assertEqual(m.find_timestamp().ts, - datetime.datetime(2010, 7, 7, 7, 7, 7)) - - def test_cli_update_with_timestamp(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', - '--hashes=SHA256 SHA512', - '--timestamp', - self.dir]), - 0) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIsNotNone(m.find_timestamp()) - - -class DuplicateManifestFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same Manifest twice. - """ + +class DuplicateManifestEntryLayout(BaseLayout): + """Layout with duplicate (matching) entry for a Manifest""" DIRS = ['sub'] - FILES = { - 'Manifest': u''' -MANIFEST sub/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e -MANIFEST sub/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e + MANIFESTS = { + 'Manifest': ''' +MANIFEST sub/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a +MANIFEST sub/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a +''', + 'sub/Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/Manifest': u'' } - - def test_load_sub_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertIn('sub/Manifest', m.loaded_manifests) + FILES = { + 'sub/test': '', + } -class DuplicateManifestDATAFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same Manifest as MANIFEST - and DATA. - """ +class DuplicateManifestAsDataEntryLayout(BaseLayout): + """Layout with duplicate DATA entry for a Manifest""" DIRS = ['sub'] - FILES = { - 'Manifest': u''' -DATA sub/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e -MANIFEST sub/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e + MANIFESTS = { + 'Manifest': ''' +MANIFEST sub/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a +DATA sub/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a +''', + 'sub/Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/Manifest': u'' } - - def test_load_sub_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertIn('sub/Manifest', m.loaded_manifests) + FILES = { + 'sub/test': '', + } -class DuplicateFileEntryInSubManifestTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice in different - Manifest files. - """ +class DuplicateEntryInSubManifestLayout(BaseLayout): + """Layout with duplicate entry in sub-Manifest""" DIRS = ['sub'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' MANIFEST sub/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/Manifest': u''' + 'sub/Manifest': ''' DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', } + FILES = { + 'sub/test': '', + } - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('sub/test').size, 0) - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - {'sub': ['Manifest', 'test']}) - self.assertEqual(entries['sub']['test'].size, 0) - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), - frozenset(('sub/test', 'sub/Manifest'))) - self.assertEqual(entries['sub/test'][0], 'sub/Manifest') - self.assertEqual(entries['sub/test'][1].path, 'test') - self.assertSetEqual(frozenset(entries['sub/test'][1].checksums), - frozenset(('MD5',))) - - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - - -class DuplicateCompatibleTypeFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice, with - compatible types. - """ - FILES = { - 'Manifest': u''' +class DuplicateEbuildEntryLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' DATA test.ebuild 0 MD5 d41d8cd98f00b204e9800998ecf8427e EBUILD test.ebuild 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'test.ebuild': u'', - } - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('test.ebuild').path, 'test.ebuild') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - {'': ['test.ebuild']}) - self.assertEqual(entries['']['test.ebuild'].path, 'test.ebuild') - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), frozenset(('test.ebuild',))) - self.assertEqual(entries['test.ebuild'][0], 'Manifest') - self.assertEqual(entries['test.ebuild'][1].path, 'test.ebuild') - - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('') - - -class DuplicateAUXTypeFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice, using AUX - type (because of path weirdness). - """ - - DIRS = ['files'] - FILES = { - 'Manifest': u''' -DATA files/test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e -AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', - 'files/test.patch': u'', - } - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('files/test.patch').path, 'files/test.patch') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - {'files': ['test.patch']}) - self.assertEqual(entries['files']['test.patch'].path, 'files/test.patch') - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), frozenset(('files/test.patch',))) - self.assertEqual(entries['files/test.patch'][0], 'Manifest') - self.assertEqual(entries['files/test.patch'][1].path, 'files/test.patch') + } - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('') - -class DuplicateAUXTypeFileRemovalTest(TempDirTestCase): - DIRS = ['files'] FILES = { - 'Manifest': u''' -DATA files/test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e -AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', + 'test.ebuild': '', } - def test_update_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('files/test.patch') - self.assertIsNone(m.find_path_entry('files/test.patch')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - def test_update_entry_wrong_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestInvalidPath, - m.update_entry_for_path, 'test.patch', hashes=['MD5']) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertIsNone(m.find_path_entry('files/test.patch')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - -class AUXTypeFileAdditionTest(TempDirTestCase): +class PotentialAuxEntryLayout(BaseLayout): DIRS = ['files'] + MANIFESTS = { + 'Manifest': '', + } FILES = { - 'Manifest': u'', - 'files/test.txt': u'test', + 'files/test.patch': '', } - def test_update_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('files/test.txt', - hashes=['MD5'], new_entry_type='AUX') - self.assertIsInstance(m.find_path_entry('files/test.txt'), - gemato.manifest.ManifestEntryAUX) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entry_wrong_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(AssertionError, - m.update_entry_for_path, 'test.txt', - hashes=['MD5'], new_entry_type='AUX') +class DuplicateAuxEntryLayout(PotentialAuxEntryLayout): + MANIFESTS = { + 'Manifest': ''' +DATA files/test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e +AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } -class DuplicateDifferentHashSetFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice, - with different hash sets (and both of them mismatched). - """ - FILES = { - 'Manifest': u''' +class DisjointHashSetEntryLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' DATA test 0 MD5 9e107d9d372bb6826bd81d3542a419d6 DATA test 0 SHA1 2fd4e1c67a2d28fced849ee1bb76e7391b93eb12 ''', - 'test': u'', - } - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('test').path, 'test') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('') - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - {'': ['test']}) - self.assertEqual(entries['']['test'].path, 'test') - self.assertSetEqual(frozenset(entries['']['test'].checksums), - frozenset(('MD5', 'SHA1'))) - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), frozenset(('test',))) - self.assertEqual(entries['test'][0], 'Manifest') - self.assertEqual(entries['test'][1].path, 'test') - self.assertSetEqual(frozenset(entries['test'][1].checksums), - frozenset(('MD5', 'SHA1'))) - - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - with self.assertRaises(gemato.exceptions.ManifestMismatch) as cm: - m.assert_directory_verifies('') - self.assertListEqual(cm.exception.diff, - [ - ('MD5', '9e107d9d372bb6826bd81d3542a419d6', 'd41d8cd98f00b204e9800998ecf8427e'), - ('SHA1', '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', 'da39a3ee5e6b4b0d3255bfef95601890afd80709'), - ]) - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 1) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('test') - # either of the entries could have been taken - self.assertIn( - tuple(m.find_path_entry('test').checksums), - (('MD5',), ('SHA1',))) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertEqual(m.find_path_entry('test').checksums, - { - 'SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'SHA512': 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', - }) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - -class DuplicateIncompatibleDataMiscTypeFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice, with - incompatible types. - """ - + } FILES = { - 'Manifest': u''' -DATA test.ebuild 0 MD5 d41d8cd98f00b204e9800998ecf8427e -MISC test.ebuild 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', + 'test': '', } - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('test.ebuild').path, 'test.ebuild') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestIncompatibleEntry, - m.get_file_entry_dict, '') - - def test_get_file_entry_dict_only_types(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_file_entry_dict('', only_types=['DATA']) - self.assertDictEqual(dict([(k, sorted(v)) for k, v in entries.items()]), - {'': ['test.ebuild']}) - self.assertEqual(entries['']['test.ebuild'].tag, 'DATA') - - def test_deduplicated_get_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestIncompatibleEntry, - m.get_deduplicated_file_entry_dict_for_update, '') - -class DuplicateDifferentSizeFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice, with - different sizes. - """ +class IncompatibleTypeLayout(BaseLayout): + """A layout with two incompatible entries for the same file""" - FILES = { - 'Manifest': u''' -DATA test.ebuild 0 MD5 d41d8cd98f00b204e9800998ecf8427e -DATA test.ebuild 32 MD5 d41d8cd98f00b204e9800998ecf8427e + MANIFESTS = { + 'Manifest': ''' +DATA metadata.xml 0 MD5 d41d8cd98f00b204e9800998ecf8427e +MISC metadata.xml 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', } - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('test.ebuild').path, 'test.ebuild') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestIncompatibleEntry, - m.get_file_entry_dict, '') - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), - frozenset(('test.ebuild',))) - self.assertEqual(entries['test.ebuild'][0], 'Manifest') - self.assertIsInstance(entries['test.ebuild'][1], - gemato.manifest.ManifestEntryDATA) - - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - -class DuplicateDifferentHashFileEntryTest(TempDirTestCase): - """ - Test for specifying the entry for the same file twice, with - different sizes. - """ +class MismatchedSizeLayout(BaseLayout): + """A layout with two entries with different size for the same file""" - FILES = { - 'Manifest': u''' -DATA test.ebuild 0 MD5 d41d8cd98f00b204e9800998ecf8427e -DATA test.ebuild 0 MD5 9e107d9d372bb6826bd81d3542a419d6 + MANIFESTS = { + 'Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +DATA test 32 MD5 d41d8cd98f00b204e9800998ecf8427e ''', } - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('test.ebuild').path, 'test.ebuild') - - def test_get_file_entry_dict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestIncompatibleEntry, - m.get_file_entry_dict, '') - - def test_get_deduplicated_file_entry_dict_for_update(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - entries = m.get_deduplicated_file_entry_dict_for_update('') - self.assertSetEqual(frozenset(entries), - frozenset(('test.ebuild',))) - self.assertEqual(entries['test.ebuild'][0], 'Manifest') - self.assertIsInstance(entries['test.ebuild'][1], - gemato.manifest.ManifestEntryDATA) - self.assertSetEqual( - frozenset(entries['test.ebuild'][1].checksums), - frozenset(('MD5',))) - - m.save_manifests() - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(self.dir, 'Manifest'), 'r', - encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 1) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestIncompatibleEntry, - m.assert_directory_verifies, '') - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 1) +class MismatchedChecksumLayout(BaseLayout): + """A layout with two entries with different hash for the same file""" + MANIFESTS = { + 'Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +DATA test 0 MD5 9e107d9d372bb6826bd81d3542a419d6 +''', + } -class ManifestIgnoreEntryTest(TempDirTestCase): - """ - Test for a Manifest file with IGNOREs. - """ +class IgnoreEntryLayout(BaseLayout): DIRS = ['bar'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' IGNORE foo IGNORE bar ''', - 'foo': u'test', - 'bar/baz': u'test', } - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('foo').path, 'foo') - self.assertEqual(m.find_path_entry('bar').path, 'bar') - self.assertEqual(m.find_path_entry('bar/baz').path, 'bar') - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('') - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 0) - - -class ManifestMiscEntryTest(TempDirTestCase): - """ - Test for a Manifest file with MISC. - """ - FILES = { - 'Manifest': u''' -MISC foo 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', + 'foo': 'test', + 'bar/baz': 'test', } - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.assert_directory_verifies, '') - - def test_assert_directory_verifies_nonstrict_via_fail_handler(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertTrue(m.assert_directory_verifies('', - fail_handler=callback_return_true)) - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 1) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('foo') - self.assertIsNone(m.find_path_entry('foo')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertIsNone(m.find_path_entry('foo')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() +class MiscEntryLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' +MISC metadata.xml 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } -class CrossDeviceManifestTest(TempDirTestCase): - """ - Test for a Manifest that crosses filesystem boundaries. - """ - FILES = { - 'Manifest': u''' +class CrossDeviceLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' DATA sub/version 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', } - def setUp(self): - if not os.path.ismount('/proc'): - raise unittest.SkipTest('/proc is not a mountpoint') - super(CrossDeviceManifestTest, self).setUp() - os.symlink('/proc', os.path.join(self.dir, 'sub')) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.assert_directory_verifies, '') - - def test_assert_directory_verifies_nonstrict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.assert_directory_verifies, '', - fail_handler=callback_return_true) - - def test_assert_directory_verifies_subdir(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.assert_directory_verifies, 'sub') - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', '-x', self.dir]), - 1) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False, - hashes=['SHA256', 'SHA512']) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.update_entries_for_directory, '') - - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '-x', - '--hashes=SHA256 SHA512', self.dir]), - 1) - - -class CrossDeviceEmptyManifestTest(TempDirTestCase): - """ - Test for a Manifest that crosses filesystem boundaries without - explicit entries. - """ + @classmethod + def create(cls, tmp_path): + st1 = os.stat(tmp_path) + try: + st2 = os.stat('/proc/version') + except OSError: + pytest.skip('Unable to stat /proc/version') + if st1.st_dev == st2.st_dev: + pytest.skip('/proc/version is not on a distinct filesystem') + super().create(tmp_path) + os.symlink('/proc', tmp_path / 'sub') + + +class CrossDeviceEmptyLayout(CrossDeviceLayout): + MANIFESTS = { + 'Manifest': '', + } - FILES = { - 'Manifest': u'', - } - - def setUp(self): - if not os.path.ismount('/proc'): - raise unittest.SkipTest('/proc is not a mountpoint') - super(CrossDeviceEmptyManifestTest, self).setUp() - os.symlink('/proc', os.path.join(self.dir, 'sub')) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.assert_directory_verifies, '') - - def test_assert_directory_verifies_nonstrict(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.assert_directory_verifies, '', - fail_handler=callback_return_true) - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', '-x', self.dir]), - 1) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False, - hashes=['SHA256', 'SHA512']) - self.assertRaises(gemato.exceptions.ManifestCrossDevice, - m.update_entries_for_directory, '') - - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '-x', - '--hashes=SHA256 SHA512', self.dir]), - 1) - - -class CrossDeviceIgnoreManifestTest(TempDirTestCase): - """ - Test for a Manifest that crosses filesystem boundaries without - explicit entries. - """ - FILES = { - 'Manifest': u''' +class CrossDeviceIgnoreLayout(CrossDeviceLayout): + MANIFESTS = { + 'Manifest': ''' IGNORE sub ''', } - def setUp(self): - if not os.path.ismount('/proc'): - raise unittest.SkipTest('/proc is not a mountpoint') - super(CrossDeviceIgnoreManifestTest, self).setUp() - os.symlink('/proc', os.path.join(self.dir, 'sub')) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False) - m.assert_directory_verifies('') - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', '-x', self.dir]), - 0) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_xdev=False, - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertEqual(len(m.loaded_manifests['Manifest'].entries), 1) - -class DotfileManifestTest(TempDirTestCase): - """ - Test for implicitly ignoring dotfiles. - """ +class DotFileLayout(BaseLayout): + """Layout for testing ignoring dotfiles""" DIRS = ['.bar'] + MANIFESTS = { + 'Manifest': '', + } FILES = { - 'Manifest': u'', - '.foo': u'', - '.bar/baz': u'', + '.foo': '', + '.bar/baz': '', } - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies() - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 0) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertIsNone(m.find_path_entry('.bar/baz')) - self.assertIsNone(m.find_path_entry('.foo')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - -class DirectoryInPlaceOfFileManifestTest(TempDirTestCase): - """ - Test a tree where an expected file was replaced by a directory. - """ +class DirForFileLayout(BaseLayout): + """A layout where directory replaced a file""" DIRS = ['test'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''' } - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.assert_directory_verifies) - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 1) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestInvalidPath, - m.update_entry_for_path, 'test') - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - self.assertRaises(gemato.exceptions.ManifestInvalidPath, - m.update_entries_for_directory, '') - - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', - self.dir]), - 1) - - -class UnreadableDirectoryTest(TempDirTestCase): - """ - Test a tree where a directory can not be read. - """ +class UnreadableDirLayout(BaseLayout): DIRS = ['test'] - FILES = { - 'Manifest': u'' + MANIFESTS = { + 'Manifest': '', } - def setUp(self): - super(UnreadableDirectoryTest, self).setUp() - os.chmod(os.path.join(self.dir, 'test'), 0) - - def tearDown(self): - os.chmod(os.path.join(self.dir, 'test'), 0o555) - super(UnreadableDirectoryTest, self).tearDown() - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(OSError, m.assert_directory_verifies) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - self.assertRaises(OSError, m.update_entries_for_directory, '') + @classmethod + def create(cls, tmp_path): + super().create(tmp_path) + os.chmod(tmp_path / 'test', 0) + @classmethod + def cleanup(cls, tmp_path): + # restore permissions to allow cleanup + os.chmod(tmp_path / 'test', 0o755) -class CompressedTopManifestTest(TempDirTestCase): - """ - Test a tree with top-level Manifest being compressed. - """ - MANIFEST = b''' +class CompressedTopManifestLayout(BaseLayout): + TOP_MANIFEST = 'Manifest.gz' + MANIFESTS = { + 'Manifest.gz': ''' DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''' +''', + } FILES = { - 'test': u'', - } - - def setUp(self): - super(CompressedTopManifestTest, self).setUp() - self.manifest_gz = os.path.join(self.dir, 'Manifest.gz') - with gzip.GzipFile(self.manifest_gz, 'wb') as f: - f.write(self.MANIFEST) - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.manifest_gz) - self.assertEqual(m.find_path_entry('test').path, 'test') - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.manifest_gz) - m.assert_directory_verifies('') - - def test_save_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest.gz')) - m.save_manifest('Manifest.gz') - with gemato.compression.open_potentially_compressed_path( - os.path.join(self.dir, 'Manifest.gz'), 'rb') as f: - self.assertEqual(f.read(), self.MANIFEST.lstrip()) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest.gz'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertEqual(m.find_path_entry('test').checksums, - { - 'SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'SHA512': 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', - }) - m.save_manifests() - with gemato.compression.open_potentially_compressed_path( - os.path.join(self.dir, 'Manifest.gz'), 'rb') as f: - self.assertNotEqual(f.read(), self.MANIFEST.lstrip()) - m.assert_directory_verifies() + 'test': '', + } - def test_decompress_manifests_low_watermark(self): - """ - Try decompression with watermark low enough to keep this one - compressed. - """ - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest.gz'), - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=0) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - - def test_decompress_manifests_high_watermark(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest.gz'), - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=4096) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - - -class CompressedSubManifestTest(TempDirTestCase): - """ - Test a tree with top-level Manifest being compressed. - """ +class CompressedSubManifestLayout(BaseLayout): # we can't compress locally here since we need stable result - SUB_MANIFEST = b''' -H4sICHX68FkCA01hbmlmZXN0AHNxDHFUKEktLlEwUPB1MVVIMTFMsUhOsbRIMzBIMjIwSbW0MDCw -tLRITU6zMDEyT+UCAJqyznMxAAAA + SUB_MANIFEST_B64 = b''' +H4sICHX68FkCA01hbmlmZXN0AHNxDHFUKEktLlEwUPB1MVVIMTFMsUhOsbRIMzBIMjIwSbW0 +MDCwtLRITU6zMDEyT+UCAJqyznMxAAAA ''' DIRS = ['sub'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' MANIFEST sub/Manifest.gz 78 MD5 9c158f87b2445279d7c8aac439612fba ''', - 'sub/test': u'', - } - - def setUp(self): - super(CompressedSubManifestTest, self).setUp() - self.manifest_gz = os.path.join(self.dir, 'sub/Manifest.gz') - with io.open(self.manifest_gz, 'wb') as f: - f.write(base64.b64decode(self.SUB_MANIFEST)) - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertEqual(m.find_path_entry('sub/test').path, 'test') - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('') - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 0) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertEqual(m.find_path_entry('sub/test').checksums, - { - 'SHA256': 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855', - 'SHA512': 'cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e', - }) - m.save_manifests() - with io.open(os.path.join(self.dir, 'sub/Manifest.gz'), - 'rb') as f: - self.assertNotEqual(f.read(), - base64.b64decode(self.SUB_MANIFEST)) - m.assert_directory_verifies() - - def test_recompress_manifests_low_watermark(self): - """ - Try decompression with watermark low enough to keep all - compressed. - """ - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=0) - self.assertEqual(m.find_path_entry('sub/Manifest.gz').path, - 'sub/Manifest.gz') - self.assertIsNone(m.find_path_entry('sub/Manifest')) - # top-level is never compressed - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - # sub can be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.gz'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - - def test_recompress_manifests_high_watermark(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=4096) - self.assertEqual(m.find_path_entry('sub/Manifest').path, - 'sub/Manifest') - self.assertIsNone(m.find_path_entry('sub/Manifest.gz')) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.gz'))) - - def test_cli_recompress_manifests_low_watermark(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', - '--hashes=SHA256 SHA512', - '--compress-watermark=0', - self.dir]), - 0) - # top-level Manifest should not be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - # but sub/Manifest should be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.gz'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - - def test_cli_recompress_manifests_high_watermark(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', - '--hashes=SHA256 SHA512', - '--compress-watermark=4096', - self.dir]), - 0) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'sub/Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'sub/Manifest.gz'))) - - -class CompressedManifestOrderingTest(TempDirTestCase): - """ - Compressed Manifest paths can be shorter than regular, resulting - in wrong sort order. - """ - - MANIFEST = b''' -MANIFEST a/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''' - DIRS = ['a'] + 'sub/Manifest.gz': '', + } FILES = { - 'a/Manifest': u'', - 'a/stray': u'', + 'sub/test': '', } - def setUp(self): - super(CompressedManifestOrderingTest, self).setUp() - self.manifest_gz = os.path.join(self.dir, 'Manifest.gz') - with gzip.GzipFile(self.manifest_gz, 'wb') as f: - f.write(self.MANIFEST) + @classmethod + def create(cls, tmp_path): + super().create(tmp_path) + with open(tmp_path / 'sub/Manifest.gz', 'wb') as f: + f.write(base64.b64decode(cls.SUB_MANIFEST_B64)) - def test__iter_manifests_for_path_order(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.manifest_gz) - m.load_manifests_for_path('', recursive=True) - self.assertListEqual([d for mpath, d, k - in m._iter_manifests_for_path('a')], - ['a', '']) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.manifest_gz) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - self.assertIsInstance(m.find_path_entry('a/stray'), - gemato.manifest.ManifestEntryDATA) - m.save_manifests() - m.assert_directory_verifies() +class CompressedManifestSortLayout(BaseLayout): + """Layout to test ordering of mixed compressed/uncompressed Manifests""" -class MultipleSubdirectoryFilesTest(TempDirTestCase): - """ - Regression test for adding a directory with multiple stray files. - """ + TOP_MANIFEST = 'Manifest.gz' + DIRS = ['a'] + MANIFESTS = { + 'Manifest.gz': ''' +MANIFEST a/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + 'a/Manifest': '', + 'a/stray': '', + } + + +class MultipleStrayFilesLayout(BaseLayout): + """Regression test for adding multiple stray files""" DIRS = ['sub'] + MANIFESTS = { + 'Manifest': '', + } FILES = { - 'Manifest': u'', - 'sub/file.a': u'', - 'sub/file.b': u'', - 'sub/file.c': u'', - } - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - self.assertEqual(m.find_path_entry('sub/file.a').path, - 'sub/file.a') - self.assertEqual(m.find_path_entry('sub/file.b').path, - 'sub/file.b') - self.assertEqual(m.find_path_entry('sub/file.c').path, - 'sub/file.c') - m.save_manifests() - m.assert_directory_verifies() - + 'sub/file.a': '', + 'sub/file.b': '', + 'sub/file.c': '', + } -class UnregisteredManifestTestCase(TempDirTestCase): - """ - Test for finding a sub-Manifest that's not listed as MANIFEST. - """ +class StrayManifestLayout(BaseLayout): DIRS = ['sub'] - FILES = { - 'Manifest': u'', - 'sub/Manifest': u''' + MANIFESTS = { + 'Manifest': '', + 'sub/Manifest': ''' DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'sub/test': u'', - } - - def test_load_manifests(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertNotIn('sub/Manifest', m.loaded_manifests) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - self.assertIn('sub/Manifest', m.loaded_manifests) - # entry for sub-Manifest should go into parent dir - # and for test into the sub-Manifest - self.assertEqual(m.find_path_entry('sub/Manifest').path, - 'sub/Manifest') - self.assertEqual(m.find_path_entry('sub/test').path, 'test') - m.save_manifests() - m.assert_directory_verifies() - - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', - self.dir]), - 0) - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - self.dir]), - 0) - + } + FILES = { + 'sub/test': '', + } -class UnregisteredCompressedManifestTestCase(TempDirTestCase): - """ - Test for finding a compressed sub-Manifest that's not listed - as MANIFEST. - """ - SUB_MANIFEST = b''' -DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''' +class StrayCompressedManifestLayout(BaseLayout): DIRS = ['sub'] + MANIFESTS = { + 'Manifest': '', + 'sub/Manifest.gz': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } FILES = { - 'Manifest': u'', - 'sub/test': u'', - } - - def setUp(self): - super(UnregisteredCompressedManifestTestCase, self).setUp() - self.manifest_gz = os.path.join(self.dir, 'sub/Manifest.gz') - with gemato.compression.open_potentially_compressed_path( - os.path.join(self.dir, 'sub/Manifest.gz'), 'wb') as f: - f.write(self.SUB_MANIFEST) - - def test_load_manifests(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest.gz', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertNotIn('sub/Manifest.gz', m.loaded_manifests) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - self.assertIn('sub/Manifest.gz', m.loaded_manifests) - # entry for sub-Manifest should go into parent dir - # and for test into the sub-Manifest - self.assertEqual(m.find_path_entry('sub/Manifest.gz').path, - 'sub/Manifest.gz') - self.assertEqual(m.find_path_entry('sub/test').path, 'test') - m.save_manifests() - m.assert_directory_verifies() - - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', - self.dir]), - 0) - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - self.dir]), - 0) - + 'sub/test': '', + } -class InvalidManifestTestCase(TempDirTestCase): - """ - Test for ignoring a stray "Manifest" file that's invalid. - """ +class StrayInvalidManifestLayout(BaseLayout): DIRS = ['sub'] - FILES = { - 'Manifest': u'', - 'sub/Manifest': u''' -INVALID STUFF IN HERE + MANIFESTS = { + 'Manifest': '', + # technically it's not a Manifest but we want to verify that + # it is not clobbered + 'sub/Manifest': ''' +I AM SOOO INVALID ''', - 'sub/test': u'', - } - - def test_load_manifests(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - m.load_manifests_for_path('sub/test') - self.assertNotIn('sub/Manifest', m.loaded_manifests) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - self.assertNotIn('sub/Manifest', m.loaded_manifests) - # entry for sub-Manifest should go into parent dir - # and for test into the sub-Manifest - self.assertIsInstance(m.find_path_entry('sub/Manifest'), - gemato.manifest.ManifestEntryDATA) - self.assertEqual(m.find_path_entry('sub/test').path, - 'sub/test') - m.save_manifests() - # ensure that the file was not modified - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest']) - m.assert_directory_verifies() + } + FILES = { + 'sub/test': '', + } - def test_cli_update(self): - self.assertEqual( - gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', - self.dir]), - 0) - with io.open(os.path.join(self.dir, 'sub/Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['sub/Manifest']) - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - self.dir]), - 0) - - -class CreateNewManifestTest(TempDirTestCase): + +class StrayInvalidCompressedManifestLayout(BaseLayout): DIRS = ['sub'] + MANIFESTS = { + 'Manifest': '', + } FILES = { - 'test': u'', - 'sub/test': u'', + 'sub/test': '', } - def setUp(self): - super(CreateNewManifestTest, self).setUp() - self.path = os.path.join(self.dir, 'Manifest') + @classmethod + def create(cls, tmp_path): + super().create(tmp_path) + with open(tmp_path / 'sub/Manifest.gz', 'w') as f: + # important: this is written uncompressed + f.write('I AM SOOO INVALID\n') - def test_load_without_create(self): - self.assertRaises(IOError, - gemato.recursiveloader.ManifestRecursiveLoader, - self.path) - def test_create_without_save(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.path, allow_create=True) - del m - self.assertFalse(os.path.exists(self.path)) +class FilenameWhitespaceLayout(BaseLayout): + FILENAME = ' foo bar ' + MANIFESTS = { + 'Manifest': ''' +DATA \\x20\\x20foo\\x20bar\\x20\\x20 0 \ +MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } + FILES = { + FILENAME: '', + } - def test_create_empty(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.path, allow_create=True) - m.save_manifests() - self.assertTrue(os.path.exists(self.path)) + MANIFESTS_REWRITTEN = dict((k, v.lstrip()) for k, v in MANIFESTS.items()) - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.path, allow_create=True, hashes=['MD5']) - m.update_entries_for_directory('') - m.save_manifests() - m.assert_directory_verifies('') - - m2 = gemato.manifest.ManifestFile() - with io.open(self.path, 'r', encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 2) - - def test_cli(self): - self.assertEqual( - gemato.cli.main(['gemato', 'create', '--hashes=SHA256 SHA512', - self.dir]), - 0) - - m2 = gemato.manifest.ManifestFile() - with io.open(self.path, 'r', encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 2) - - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - self.dir]), - 0) - - def test_compress_manifests(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_create=True, - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=0) - # top-level Manifest can not be compressed - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - - def test_cli_compress_manifests(self): - self.assertEqual( - gemato.cli.main(['gemato', 'create', - '--hashes=SHA256 SHA512', - '--compress-watermark=0', - self.dir]), - 0) - # top-level Manifest can not be compressed - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - - -class CreateNewCompressedManifestTest(TempDirTestCase): - """ - Check that the tooling can create a compressed Manifest file - when explicitly requested to. Note that this file is not a valid - top-level Manifest since compressing that file is disallowed. - """ - DIRS = ['sub'] +class FilenameBackslashLayout(BaseLayout): + FILENAME = 'foo\\bar' + MANIFESTS = { + 'Manifest': ''' +DATA foo\\x5Cbar 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } FILES = { - 'test': u'', - 'sub/test': u'', + FILENAME: '', } - def setUp(self): - super(CreateNewCompressedManifestTest, self).setUp() - self.path = os.path.join(self.dir, 'Manifest.gz') + MANIFESTS_REWRITTEN = dict((k, v.lstrip()) for k, v in MANIFESTS.items()) - def test_load_without_create(self): - self.assertRaises(IOError, - gemato.recursiveloader.ManifestRecursiveLoader, - self.path) - def test_create_without_save(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.path, allow_create=True) - del m - self.assertFalse(os.path.exists(self.path)) +class NewManifestLayout(BaseLayout): + DIRS = ['sub'] + FILES = { + 'test': '', + 'sub/test': '', + } + - def test_create_empty(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.path, allow_create=True) - m.save_manifests() - with gemato.compression.open_potentially_compressed_path( - self.path, 'rb') as f: - self.assertEqual(f.read(), b'') - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - self.path, allow_create=True, hashes=['MD5']) - m.update_entries_for_directory('') - m.save_manifests() - m.assert_directory_verifies('') - - m2 = gemato.manifest.ManifestFile() - with gemato.compression.open_potentially_compressed_path( - self.path, 'r', encoding='utf8') as f: - m2.load(f) - self.assertEqual(len(m2.entries), 2) - - def test_decompress_manifests_low_watermark(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest.gz'), - allow_create=True, - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=0) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - - def test_decompress_manifests_high_watermark(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest.gz'), - allow_create=True, - hashes=['SHA256', 'SHA512']) - m.save_manifests(force=True, compress_watermark=4096) - self.assertFalse(os.path.exists( - os.path.join(self.dir, 'Manifest.gz'))) - self.assertTrue(os.path.exists( - os.path.join(self.dir, 'Manifest'))) - - -class MultipleDeepNestedManifestTest(TempDirTestCase): +class NestedManifestLayout(BaseLayout): DIRS = ['a', 'a/x', 'a/y', 'a/z', 'b'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' MANIFEST a/Manifest 119 MD5 6956767cfbb3276adbdce86cca559719 MANIFEST b/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'test': u'', - 'a/Manifest': u''' + 'a/Manifest': ''' MANIFEST x/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e MANIFEST z/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'a/test': u'', - 'a/x/Manifest': u'', - 'a/x/test': u'', - 'a/y/test': u'', - 'a/z/Manifest': u'', - 'a/z/test': u'', - 'b/Manifest': u'', - 'b/test': u'', - } - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - m.save_manifests() - m.assert_directory_verifies() - - def test_load_unregistered_manifests(self): - # remove the top Manifest - os.unlink(os.path.join(self.dir, 'Manifest')) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_create=True) - # we allow extra entries for files that referenced within - # newly added Manifest - self.assertListEqual(sorted(m.load_unregistered_manifests('')), - ['a/Manifest', 'a/x/Manifest', 'a/z/Manifest', - 'b/Manifest']) - self.assertIn('a/Manifest', m.loaded_manifests) - self.assertNotIn('a/Manifest', m.updated_manifests) - self.assertIsNone(m.find_path_entry('a/Manifest')) - self.assertIn('b/Manifest', m.loaded_manifests) - self.assertNotIn('b/Manifest', m.updated_manifests) - self.assertIsNone(m.find_path_entry('b/Manifest')) - - def test_update_entries_for_directory_without_manifests(self): - # remove the top Manifest - os.unlink(os.path.join(self.dir, 'Manifest')) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - allow_create=True) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - m.save_manifests() - m.assert_directory_verifies() - - def test_create_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIsNotNone(m.create_manifest('a/y/Manifest')) - self.assertFalse(os.path.exists(os.path.join( - self.dir, 'a/y/Manifest'))) - m.loaded_manifests['Manifest'].entries.append( - gemato.manifest.ManifestEntryMANIFEST( - 'a/y/Manifest', 0, {})) - m.save_manifests() - self.assertTrue(os.path.exists(os.path.join( - self.dir, 'a/y/Manifest'))) - + 'a/x/Manifest': '', + 'a/z/Manifest': '', + 'b/Manifest': '', + } + FILES = { + 'test': '', + 'a/test': '', + 'a/x/test': '', + 'a/y/test': '', + 'a/z/test': '', + 'b/test': '', + } -class AddingToMultipleManifestsTest(TempDirTestCase): - """ - Check that we are handling a directory containing multiple Manifests - correctly, and that we can cleanly add an additional 'Manifest' file - in it. - """ +class AddToMultiManifestLayout(BaseLayout): DIRS = ['a', 'b'] - FILES = { - 'Manifest': u''' + MANIFESTS = { + 'Manifest': ''' MANIFEST a/Manifest.a 47 MD5 89b9c1e9e5a063ee60b91b632c84c7c8 MANIFEST a/Manifest.b 47 MD5 1b1504046a2023ed75a2a89aed7c52f4 ''', - 'a/Manifest.a': u''' + 'a/Manifest': ''' +DATA c 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + 'a/Manifest.a': ''' DATA a 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'a/Manifest.b': u''' + 'a/Manifest.b': ''' DATA b 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'a/Manifest': u''' -DATA c 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', - 'a/a': u'', - 'a/b': u'', - 'a/c': u'', - 'b/test': u'', - } - - def test_load_unregistered_manifests(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertListEqual(sorted(m.load_unregistered_manifests('')), - ['a/Manifest']) - self.assertIn('a/Manifest', m.loaded_manifests) - self.assertNotIn('a/Manifest', m.updated_manifests) - self.assertIsNone(m.find_path_entry('a/Manifest')) - - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entries_for_directory('', hashes=['SHA256', 'SHA512']) - m.save_manifests() - self.assertListEqual(sorted(m.loaded_manifests), - ['Manifest', 'a/Manifest', 'a/Manifest.a', - 'a/Manifest.b']) - self.assertListEqual(sorted( - e.path for e in m.loaded_manifests['Manifest'].entries), - ['a/Manifest', 'a/Manifest.a', 'a/Manifest.b', 'b/test']) - self.assertListEqual(sorted( - e.path for e in m.loaded_manifests['a/Manifest.a'].entries), - ['a']) - self.assertListEqual(sorted( - e.path for e in m.loaded_manifests['a/Manifest.b'].entries), - ['b']) - self.assertListEqual(sorted( - e.path for e in m.loaded_manifests['a/Manifest'].entries), - ['c']) - m.assert_directory_verifies() + } + FILES = { + 'a/a': '', + 'a/b': '', + 'a/c': '', + 'b/test': '', + } -class ManifestMTimeTests(TempDirTestCase): - """ - Tests for mtime-limited verification/update. - """ +class SubManifestMismatchLayout(BaseLayout): + """Sub-Manifest whose checksum is mismatched""" - FILES = { - 'Manifest': u''' -DATA test 11 MD5 5f8db599de986fab7a21625b7916589c + DIRS = ['a'] + MANIFESTS = { + 'Manifest': ''' +MANIFEST a/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - 'test': u'test string', - } - - def test_assert_directory_verifies_old_mtime(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.assert_directory_verifies, '', last_mtime=0) - - def test_assert_directory_verifies_new_mtime(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - st = os.stat(os.path.join(self.dir, 'test')) - m.assert_directory_verifies('', last_mtime=st.st_mtime) - - def test_update_entries_for_directory_old_mtime(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['MD5']) - m.update_entries_for_directory('', last_mtime=0) - self.assertEqual(m.find_path_entry('test').checksums['MD5'], - '6f8db599de986fab7a21625b7916589c') - - def test_update_entries_for_directory_new_mtime(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['MD5']) - st = os.stat(os.path.join(self.dir, 'test')) - m.update_entries_for_directory('', last_mtime=st.st_mtime) - self.assertEqual(m.find_path_entry('test').checksums['MD5'], - '5f8db599de986fab7a21625b7916589c') - - -class ManifestWhitespaceInFilenameTest(TempDirTestCase): - """ - Test for a Manifest tree where filename contains whitespace. - """ - - FILENAME = ' foo bar ' - FILES = { - 'Manifest': u''' -DATA \\x20\\x20foo\\x20bar\\x20\\x20 0 MD5 d41d8cd98f00b204e9800998ecf8427e + 'a/Manifest': ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e ''', - FILENAME: u'' - } - - def test_find_path_entry(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIsNotNone(m.find_path_entry(self.FILENAME)) - - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.assert_directory_verifies('') - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 0) - - def test_rewrite_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.save_manifests(force=True) - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertEqual(f.read(), self.FILES['Manifest'].lstrip()) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA1']) - m.update_entry_for_path(self.FILENAME) - self.assertIsNotNone(m.find_path_entry(self.FILENAME)) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() + } + FILES = { + 'a/test': '', + } - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertIsNotNone(m.find_path_entry(self.FILENAME)) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() +class NonexistingDirectoryLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' +DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + } -class ManifestBackslashInFilenameTest(ManifestWhitespaceInFilenameTest): - """ - Test for a Manifest tree where filename contains backslash. - """ - FILENAME = 'foo\\bar' - FILES = { - 'Manifest': u''' -DATA foo\\x5Cbar 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', - FILENAME: u'' +class SymlinkLoopLayout(BaseLayout): + """A layout with a directory that contains a symlink to itself""" + + DIRS = ['sub'] + MANIFESTS = { + 'Manifest': '', } + @classmethod + def create(cls, tmp_path): + super().create(tmp_path) + os.symlink('.', tmp_path / 'sub/sub') -class SubManifestMismatchTest(TempDirTestCase): - """ - Test handling sub-Manifest whose checksum is mismatched. - """ - DIRS = ['a'] - FILES = { - 'Manifest': u''' -MANIFEST a/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', - 'a/Manifest': u''' -DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e -''', - 'a/test': u'', - } - - def test_init(self): - """ - Init should not attempt to load sub-Manifest, and therefore - not fail. - """ - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertIn('Manifest', m.loaded_manifests) - - def test_load_sub_manifest(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('a/Manifest', m.loaded_manifests) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.load_manifests_for_path, 'a/test') - self.assertNotIn('a/Manifest', m.loaded_manifests) - - def test_load_manifests_recursively(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertNotIn('a/Manifest', m.loaded_manifests) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.load_manifests_for_path, '', recursive=True) - self.assertNotIn('a/Manifest', m.loaded_manifests) - - def test_update_entries_for_directory(self): - """ - update_entries_for_directory() should ignore Manifest mismatches - since it's rewriting Manifests anyway. - """ - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['MD5']) - m.update_entries_for_directory('') - self.assertIsNotNone(m.find_path_entry('a/test')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest'].lstrip()) - m.assert_directory_verifies() +class SymlinkLoopIgnoreLayout(SymlinkLoopLayout): + """A layout with a directory that contains a symlink to itself""" + DIRS = ['sub'] + MANIFESTS = { + 'Manifest': ''' +IGNORE sub +''', + } -class ManifestMissingDirectoryTest(TempDirTestCase): - """ - Test handling Manifest with file in directory that does not exist. - """ - DIRS = [] +class MismatchedFileLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' +DATA test 11 MD5 5f8db599de986fab7a21625b7916589c +''', + } FILES = { - 'Manifest': u''' -DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e + 'test': 'test string', + } + + +class MismatchedFileFutureTimestampLayout(BaseLayout): + MANIFESTS = { + 'Manifest': ''' +TIMESTAMP +DATA test 11 SHA1 561295c9cbf9d6b2f6428414504a8deed3020641 ''', } + FILES = { + 'test': 'test string', + } - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestMismatch, - m.assert_directory_verifies, '') - - def test_assert_directory_verifies_nonstrict_via_fail_handler(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertTrue(m.assert_directory_verifies('', - fail_handler=callback_return_true)) - - def test_cli_verifies(self): - self.assertEqual( - gemato.cli.main(['gemato', 'verify', self.dir]), - 1) - - def test_update_entry_for_path(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - m.update_entry_for_path('sub/test') - self.assertIsNone(m.find_path_entry('sub/test')) + @classmethod + def create(cls, tmp_path): + super().create(tmp_path) + future_dt = (datetime.datetime.utcnow() + + datetime.timedelta(days=1)) + with open(tmp_path / 'Manifest', 'w') as f: + f.write(cls.MANIFESTS['Manifest'].replace( + 'TIMESTAMP', + f'TIMESTAMP {future_dt.strftime("%Y-%m-%dT%H:%M:%SZ")}')) + + +FLAT_LAYOUTS = [ + DuplicateEntryLayout, + DuplicateEbuildEntryLayout, + DuplicateAuxEntryLayout, + DisjointHashSetEntryLayout, + IncompatibleTypeLayout, + MismatchedSizeLayout, + MismatchedChecksumLayout, + IgnoreEntryLayout, + MiscEntryLayout, + CrossDeviceLayout, + CrossDeviceEmptyLayout, + CrossDeviceIgnoreLayout, + DotFileLayout, + DirForFileLayout, + UnreadableDirLayout, + MultipleStrayFilesLayout, + StrayManifestLayout, + StrayCompressedManifestLayout, + StrayInvalidManifestLayout, + StrayInvalidCompressedManifestLayout, + FilenameWhitespaceLayout, + FilenameBackslashLayout, + NonexistingDirectoryLayout, + SymlinkLoopLayout, + SymlinkLoopIgnoreLayout, + MismatchedFileLayout, +] +SUB_LAYOUTS = [ + SubTimestampLayout, + DuplicateManifestEntryLayout, + DuplicateManifestAsDataEntryLayout, + DuplicateEntryInSubManifestLayout, +] +ALL_LAYOUTS = FLAT_LAYOUTS + SUB_LAYOUTS + [ + BasicTestLayout, + MultiManifestLayout, + MultiTopManifestLayout, + CompressedSubManifestLayout, + NestedManifestLayout, + AddToMultiManifestLayout, + SubManifestMismatchLayout, +] + + +@pytest.mark.parametrize( + 'layout,path,recursive,expected', + [(layout, None, False, ['Manifest']) for layout in ALL_LAYOUTS] + + [(layout, '', True, ['Manifest']) for layout in FLAT_LAYOUTS] + + list(itertools.chain.from_iterable( + [(layout, '', False, ['Manifest']), + (layout, '', True, ['Manifest', 'sub/Manifest']), + (layout, 'sub', False, ['Manifest', 'sub/Manifest']), + ] for layout in SUB_LAYOUTS)) + + [(BasicTestLayout, 'sub/test', False, ['Manifest', 'sub/Manifest']), + (BasicTestLayout, 'sub/deeper/test', False, + ['Manifest', 'sub/Manifest', 'sub/deeper/Manifest']), + (BasicTestLayout, '', True, + ['Manifest', 'other/Manifest', 'sub/Manifest', + 'sub/deeper/Manifest']), + (BasicTestLayout, 'sub', True, + ['Manifest', 'sub/Manifest', 'sub/deeper/Manifest']), + (BasicTestLayout, 'sub/test', True, ['Manifest', 'sub/Manifest']), + (MultiManifestLayout, 'sub', False, ['Manifest', + 'sub/Manifest.a', + 'sub/Manifest.b']), + (MultiManifestLayout, '', True, ['Manifest', 'sub/Manifest.a', + 'sub/Manifest.b']), + (MultiTopManifestLayout, '', False, ['Manifest', + 'Manifest.a', + 'Manifest.b']), + (MultiTopManifestLayout, 'sub', False, ['Manifest', + 'Manifest.a', + 'Manifest.b', + 'sub/Manifest']), + (MultiTopManifestLayout, '', True, ['Manifest', + 'Manifest.a', + 'Manifest.b', + 'sub/Manifest']), + (CompressedTopManifestLayout, None, False, ['Manifest.gz']), + (CompressedSubManifestLayout, 'sub', False, ['Manifest', + 'sub/Manifest.gz']), + (CompressedSubManifestLayout, '', True, ['Manifest', + 'sub/Manifest.gz']), + (CompressedManifestSortLayout, None, False, ['Manifest.gz']), + (CompressedManifestSortLayout, 'a', False, ['Manifest.gz', + 'a/Manifest']), + (CompressedManifestSortLayout, '', True, ['Manifest.gz', + 'a/Manifest']), + (NestedManifestLayout, '', True, ['Manifest', + 'a/Manifest', + 'a/x/Manifest', + 'a/z/Manifest', + 'b/Manifest', + ]), + (AddToMultiManifestLayout, 'a', False, ['Manifest', + 'a/Manifest.a', + 'a/Manifest.b']), + (AddToMultiManifestLayout, '', True, ['Manifest', + 'a/Manifest.a', + 'a/Manifest.b']), + ]) +def test_load_manifests(layout_factory, layout, path, recursive, + expected): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + assert list(m.loaded_manifests) == [layout.TOP_MANIFEST] + if path is not None: + m.load_manifests_for_path(path, recursive=recursive) + lfunc = sorted if recursive else list + assert lfunc(m.loaded_manifests) == expected + + +@pytest.mark.parametrize( + 'layout,path,recursive,diff', + [(SubManifestMismatchLayout, 'a', False, [('__size__', 0, 50)]), + (SubManifestMismatchLayout, '', True, [('__size__', 0, 50)]), + ]) +def test_load_manifests_raise(layout_factory, layout, path, recursive, + diff): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + assert list(m.loaded_manifests) == [layout.TOP_MANIFEST] + with pytest.raises(ManifestMismatch) as exc: + m.load_manifests_for_path(path, recursive=recursive) + assert exc.value.diff == diff + + +@pytest.mark.parametrize( + 'layout,path,recursive,expected', + [(BasicTestLayout, 'sub/deeper', False, ['sub/deeper', 'sub', '']), + (BasicTestLayout, 'other', True, ['other', '']), + (BasicTestLayout, 'sub', True, ['sub/deeper', 'sub', '']), + (CompressedManifestSortLayout, '', True, ['a', '']), + ]) +def test__iter_manifests_for_path(layout_factory, layout, path, + recursive, expected): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST) + m.load_manifests_for_path('', recursive=True) + assert [d for mpath, d, k + in m._iter_manifests_for_path(path, + recursive=recursive + )] == expected + + +def get_entry(entry): + if entry is not None: + if entry.tag == 'TIMESTAMP': + return (entry.tag, entry.ts) + elif entry.tag == 'IGNORE': + return (entry.tag, entry.path) + return (entry.tag, entry.path, sorted(entry.checksums)) + + +@pytest.mark.parametrize( + 'layout,preload_paths,expected', + [(BasicTestLayout, None, + ('TIMESTAMP', datetime.datetime(2017, 1, 1, 1, 1, 1))), + # TIMESTAMP is valid only in top-level Manifest + (SubTimestampLayout, None, None), + (SubTimestampLayout, 'sub', None), + (MultiTopManifestLayout, None, + ('TIMESTAMP', datetime.datetime(2017, 1, 1, 1, 1, 1))), + ]) +def test_find_timestamp(layout_factory, layout, preload_paths, expected): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if preload_paths is not None: + m.load_manifests_for_path(preload_paths) + assert get_entry(m.find_timestamp()) == expected + + +@pytest.mark.parametrize( + 'layout', + [BasicTestLayout, + DuplicateEntryLayout, + ]) +def test_set_timestamp(layout_factory, layout): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + m.set_timestamp(datetime.datetime(2010, 7, 7, 7, 7, 7)) + assert (get_entry(m.find_timestamp()) == + ('TIMESTAMP', datetime.datetime(2010, 7, 7, 7, 7, 7))) + assert len([x for x + in m.loaded_manifests['Manifest'].entries + if x.tag == 'TIMESTAMP']) == 1 + + +@pytest.mark.parametrize( + 'layout,path,expected', + [(layout, layout.FILENAME, ('DATA', layout.FILENAME, ['MD5'])) + for layout in (FilenameWhitespaceLayout, + FilenameBackslashLayout, + )] + + [(BasicTestLayout, 'test', None), + (BasicTestLayout, 'sub/test', None), + (BasicTestLayout, 'sub/deeper/test', ('DATA', 'test', ['MD5'])), + (DuplicateEntryLayout, 'test', ('DATA', 'test', ['MD5'])), + (DuplicateEntryInSubManifestLayout, 'sub/test', + ('DATA', 'test', ['MD5'])), + (DuplicateEbuildEntryLayout, 'test.ebuild', + ('DATA', 'test.ebuild', ['MD5'])), + (DuplicateAuxEntryLayout, 'files/test.patch', + ('DATA', 'files/test.patch', ['MD5'])), + (DisjointHashSetEntryLayout, 'test', + ('DATA', 'test', ['MD5'])), + (IncompatibleTypeLayout, 'metadata.xml', + ('DATA', 'metadata.xml', ['MD5'])), + (MismatchedSizeLayout, 'test', + ('DATA', 'test', ['MD5'])), + (MismatchedChecksumLayout, 'test', + ('DATA', 'test', ['MD5'])), + (IgnoreEntryLayout, 'foo', ('IGNORE', 'foo')), + (IgnoreEntryLayout, 'bar', ('IGNORE', 'bar')), + (IgnoreEntryLayout, 'bar/baz', ('IGNORE', 'bar')), + (CompressedTopManifestLayout, 'test', ('DATA', 'test', ['MD5'])), + (CompressedSubManifestLayout, 'sub/test', + ('DATA', 'test', ['MD5'])), + ]) +def test_find_path_entry(layout_factory, layout, path, expected): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + assert get_entry(m.find_path_entry(path)) == expected + + +@pytest.mark.parametrize( + 'filename,relpath,expected', + [('topdistfile-1.txt', '', 'topdistfile-1.txt'), + ('subdistfile-1.txt', '', None), + ('topdistfile-1.txt', 'file', 'topdistfile-1.txt'), + ('subdistfile-1.txt', 'file', None), + ('topdistfile-1.txt', 'sub', 'topdistfile-1.txt'), + ('subdistfile-1.txt', 'sub', 'subdistfile-1.txt'), + ('topdistfile-1.txt', 'sub/', 'topdistfile-1.txt'), + ('subdistfile-1.txt', 'sub/', 'subdistfile-1.txt'), + ('topdistfile-1.txt', 'sub/file', 'topdistfile-1.txt'), + ('subdistfile-1.txt', 'sub/file', 'subdistfile-1.txt'), + ]) +def test_find_dist_entry(layout_factory, filename, relpath, expected): + layout = BasicTestLayout + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / 'Manifest') + if expected is not None: + expected = ('DIST', expected, ['MD5']) + assert get_entry(m.find_dist_entry(filename, relpath)) == expected + + +COMMON_VERIFY_PATH_VARIANTS = [ + # layout, path, expected, diff + (BasicTestLayout, 'sub/Manifest', True, []), + (BasicTestLayout, 'sub/deeper/test', True, []), + (BasicTestLayout, 'sub/deeper/nonexist', True, []), + (BasicTestLayout, 'sub/stray', False, [('__exists__', False, True)]), + (MultiManifestLayout, 'sub/foo', False, [('__size__', 32, 16)]), + (DuplicateEntryLayout, 'test', True, []), + (DuplicateManifestAsDataEntryLayout, 'sub/Manifest', True, []), + (DuplicateEntryInSubManifestLayout, 'sub/test', True, []), + (DuplicateEbuildEntryLayout, 'test.ebuild', True, []), + (DuplicateAuxEntryLayout, 'files/test.patch', True, []), + (DisjointHashSetEntryLayout, 'test', False, + [('MD5', + '9e107d9d372bb6826bd81d3542a419d6', + 'd41d8cd98f00b204e9800998ecf8427e'), + ]), + (DirForFileLayout, 'test', False, + [('__type__', 'regular file', 'directory')]), + (CompressedTopManifestLayout, 'test', True, []), + (CompressedSubManifestLayout, 'sub/test', True, []), + (MismatchedFileLayout, 'test', False, + [('MD5', + '5f8db599de986fab7a21625b7916589c', + '6f8db599de986fab7a21625b7916589c')]), +] + + +@pytest.mark.parametrize('layout,path,expected,diff', + COMMON_VERIFY_PATH_VARIANTS) +def test_verify_path(layout_factory, layout, path, expected, diff): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + assert m.verify_path(path) == (expected, diff) + + +@pytest.mark.parametrize('layout, path,expected,diff', + COMMON_VERIFY_PATH_VARIANTS) +def test_assert_path_verifies(layout_factory, layout, path, expected, diff): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if expected: + m.assert_path_verifies(path) + else: + with pytest.raises(ManifestMismatch) as exc: + m.assert_path_verifies(path) + assert exc.value.path == path + assert exc.value.diff == diff + + +@pytest.mark.parametrize( + 'layout,path,only_types,expected', + [(BasicTestLayout, '', None, + {'other': {'Manifest': ('MANIFEST', 'other/Manifest', ['MD5'])}, + 'sub': {'Manifest': ('MANIFEST', 'sub/Manifest', ['MD5'])}, + 'sub/deeper': {'Manifest': ('MANIFEST', 'deeper/Manifest', ['MD5']), + 'test': ('DATA', 'test', ['MD5']), + }, + }), + (BasicTestLayout, '', 'MANIFEST', + {'other': {'Manifest': ('MANIFEST', 'other/Manifest', ['MD5'])}, + 'sub': {'Manifest': ('MANIFEST', 'sub/Manifest', ['MD5'])}, + 'sub/deeper': {'Manifest': ('MANIFEST', 'deeper/Manifest', ['MD5'])}, + }), + (BasicTestLayout, '', 'DIST', + {'': {'subdistfile-1.txt': ('DIST', 'subdistfile-1.txt', ['MD5']), + 'topdistfile-1.txt': ('DIST', 'topdistfile-1.txt', ['MD5']), + }, + }), + (BasicTestLayout, 'sub', None, + {'sub': {'Manifest': ('MANIFEST', 'sub/Manifest', ['MD5'])}, + 'sub/deeper': {'Manifest': ('MANIFEST', 'deeper/Manifest', ['MD5']), + 'test': ('DATA', 'test', ['MD5']), + }, + }), + (BasicTestLayout, 'sub', 'MANIFEST', + {'sub': {'Manifest': ('MANIFEST', 'sub/Manifest', ['MD5'])}, + 'sub/deeper': {'Manifest': ('MANIFEST', 'deeper/Manifest', ['MD5'])}, + }), + (BasicTestLayout, 'non-existing', None, {}), + (DuplicateEntryLayout, '', None, + {'': {'test': ('DATA', 'test', ['MD5'])}}), + (DuplicateEntryInSubManifestLayout, '', None, + {'sub': {'Manifest': ('MANIFEST', 'sub/Manifest', ['MD5']), + 'test': ('DATA', 'sub/test', ['MD5']), + }, + }), + (DuplicateManifestAsDataEntryLayout, '', None, + {'sub': {'Manifest': ('DATA', 'sub/Manifest', ['MD5']), + 'test': ('DATA', 'test', ['MD5']), + }}), + (DuplicateEbuildEntryLayout, '', None, + {'': {'test.ebuild': ('EBUILD', 'test.ebuild', ['MD5'])}}), + (DuplicateEbuildEntryLayout, '', 'DATA', + {'': {'test.ebuild': ('DATA', 'test.ebuild', ['MD5'])}}), + (DuplicateAuxEntryLayout, '', None, + {'files': {'test.patch': ('AUX', 'files/test.patch', ['MD5'])}}), + (DuplicateAuxEntryLayout, '', 'DATA', + {'files': {'test.patch': ('DATA', 'files/test.patch', ['MD5'])}}), + (DisjointHashSetEntryLayout, '', None, + {'': {'test': ('DATA', 'test', ['MD5', 'SHA1'])}}), + (IncompatibleTypeLayout, '', 'DATA', + {'': {'metadata.xml': ('DATA', 'metadata.xml', ['MD5'])}}), + (IncompatibleTypeLayout, '', 'MISC', + {'': {'metadata.xml': ('MISC', 'metadata.xml', ['MD5'])}}), + ]) +def test_get_file_entry_dict(layout_factory, layout, path, only_types, + expected): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if only_types is not None: + only_types = [only_types] + entries = m.get_file_entry_dict(path, only_types=only_types) + assert (dict((subdir, dict((k, get_entry(v)) + for k, v in files.items())) + for subdir, files in entries.items()) == + expected) + + +@pytest.mark.parametrize( + 'layout,path,only_types,diff', + [(IncompatibleTypeLayout, '', None, [('__type__', 'DATA', 'MISC')]), + (MismatchedSizeLayout, '', None, [('__size__', 0, 32)]), + (MismatchedChecksumLayout, '', None, + [('MD5', + 'd41d8cd98f00b204e9800998ecf8427e', + '9e107d9d372bb6826bd81d3542a419d6')]), + ]) +def test_get_file_entry_dict_incompatible(layout_factory, layout, path, + only_types, diff): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if only_types is not None: + only_types = [only_types] + with pytest.raises(ManifestIncompatibleEntry) as exc: + m.get_file_entry_dict(path, only_types=only_types) + assert exc.value.diff == diff + + +@pytest.mark.parametrize( + 'layout,path,expected', + [(BasicTestLayout, '', + {'other/Manifest': ('Manifest', 'MANIFEST', 'other/Manifest', + ['MD5']), + 'sub/Manifest': ('Manifest', 'MANIFEST', 'sub/Manifest', ['MD5']), + 'sub/deeper/Manifest': ('sub/Manifest', 'MANIFEST', + 'deeper/Manifest', ['MD5']), + 'sub/deeper/test': ('sub/deeper/Manifest', 'DATA', 'test', + ['MD5']), + }), + (BasicTestLayout, 'sub', + {'sub/Manifest': ('Manifest', 'MANIFEST', 'sub/Manifest', ['MD5']), + 'sub/deeper/Manifest': ('sub/Manifest', 'MANIFEST', + 'deeper/Manifest', ['MD5']), + 'sub/deeper/test': ('sub/deeper/Manifest', 'DATA', 'test', + ['MD5']), + }), + (BasicTestLayout, 'non-existing', {}), + (DuplicateEntryLayout, '', + {'test': ('Manifest', 'DATA', 'test', ['MD5'])}), + (DuplicateEntryInSubManifestLayout, '', + {'sub/Manifest': ('Manifest', 'MANIFEST', 'sub/Manifest', ['MD5']), + 'sub/test': ('sub/Manifest', 'DATA', 'test', ['MD5']), + }), + (DisjointHashSetEntryLayout, '', + {'test': ('Manifest', 'DATA', 'test', ['MD5', 'SHA1'])}), + (MismatchedSizeLayout, '', + {'test': ('Manifest', 'DATA', 'test', ['MD5'])}), + (MismatchedChecksumLayout, '', + {'test': ('Manifest', 'DATA', 'test', ['MD5'])}), + ]) +def test_get_deduplicated_file_entry_dict_for_update(layout_factory, + layout, + path, + expected): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + entries = m.get_deduplicated_file_entry_dict_for_update(path) + assert dict((k, (v[0],) + get_entry(v[1])) + for k, v in entries.items()) == expected + + +@pytest.mark.parametrize( + 'layout,path,diff', + [(IncompatibleTypeLayout, '', [('__type__', 'DATA', 'MISC')]), + ]) +def test_get_deduplicated_file_entry_dict_incompatible(layout_factory, + layout, + path, + diff): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + with pytest.raises(ManifestIncompatibleEntry) as exc: + m.get_deduplicated_file_entry_dict_for_update(path) + assert exc.value.diff == diff + + +COMMON_DIRECTORY_VERIFICATION_VARIANTS = [ + # layout, path, fail_path, diff + (BasicTestLayout, 'other', None, []), + (BasicTestLayout, 'sub', 'sub/stray', + [('__exists__', False, True)]), + (DuplicateEntryLayout, '', None, []), + (DuplicateManifestEntryLayout, '', None, []), + (DuplicateManifestAsDataEntryLayout, '', None, []), + (DuplicateEbuildEntryLayout, '', None, []), + (DuplicateAuxEntryLayout, '', None, []), + (DisjointHashSetEntryLayout, '', 'test', + [('MD5', + '9e107d9d372bb6826bd81d3542a419d6', + 'd41d8cd98f00b204e9800998ecf8427e'), + ('SHA1', + '2fd4e1c67a2d28fced849ee1bb76e7391b93eb12', + 'da39a3ee5e6b4b0d3255bfef95601890afd80709'), + ]), + (IgnoreEntryLayout, '', None, []), + (MiscEntryLayout, '', 'metadata.xml', + [('__exists__', True, False)]), + (CrossDeviceIgnoreLayout, '', None, []), + (DotFileLayout, '', None, []), + (DirForFileLayout, '', 'test', + [('__type__', 'regular file', 'directory')]), + (CompressedSubManifestLayout, '', None, []), + (FilenameWhitespaceLayout, '', None, []), + (FilenameBackslashLayout, '', None, []), + (NonexistingDirectoryLayout, '', 'sub/test', + [('__exists__', True, False)]), + (MismatchedFileLayout, '', 'test', + [('MD5', + '5f8db599de986fab7a21625b7916589c', + '6f8db599de986fab7a21625b7916589c')]), +] + + +@pytest.mark.parametrize( + 'layout,path,fail_handler,expected,fail_path,diff', + [(layout, path, None, + True if fail_path is None else ManifestMismatch, + fail_path, diff) + for layout, path, fail_path, diff + in COMMON_DIRECTORY_VERIFICATION_VARIANTS] + + [(BasicTestLayout, 'sub', lambda e: True, True, None, []), + (BasicTestLayout, 'sub', lambda e: False, False, None, []), + (IncompatibleTypeLayout, '', None, ManifestIncompatibleEntry, None, + [('__type__', 'DATA', 'MISC')]), + (MismatchedSizeLayout, '', None, ManifestIncompatibleEntry, None, + [('__size__', 0, 32)]), + (MismatchedChecksumLayout, '', None, ManifestIncompatibleEntry, + None, + [('MD5', + 'd41d8cd98f00b204e9800998ecf8427e', + '9e107d9d372bb6826bd81d3542a419d6')]), + (MiscEntryLayout, '', lambda e: True, True, None, []), + (CrossDeviceLayout, '', None, ManifestCrossDevice, None, []), + (CrossDeviceLayout, '', lambda e: True, ManifestCrossDevice, None, + []), + (CrossDeviceLayout, 'sub', None, ManifestCrossDevice, None, []), + (CrossDeviceEmptyLayout, '', None, ManifestCrossDevice, None, []), + (CrossDeviceEmptyLayout, '', lambda e: True, ManifestCrossDevice, + None, []), + (CrossDeviceEmptyLayout, 'sub', None, ManifestCrossDevice, None, + []), + (CrossDeviceIgnoreLayout, 'sub', None, ManifestCrossDevice, None, + []), + (UnreadableDirLayout, '', None, PermissionError, None, []), + (UnreadableDirLayout, '', lambda e: False, PermissionError, None, + []), + (CompressedTopManifestLayout, '', None, True, None, []), + (NonexistingDirectoryLayout, '', lambda e: False, False, None, []), + (SymlinkLoopLayout, '', None, ManifestSymlinkLoop, None, []), + (SymlinkLoopLayout, '', lambda e: False, ManifestSymlinkLoop, None, + []), + (SymlinkLoopIgnoreLayout, '', None, True, None, []), + ]) +def test_assert_directory_verifies(layout_factory, layout, path, fail_handler, + expected, fail_path, diff): + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if expected is ManifestMismatch: + with pytest.raises(expected) as exc: + m.assert_directory_verifies(path) + assert (exc.value.path, exc.value.diff) == (fail_path, diff) + elif expected is ManifestIncompatibleEntry: + with pytest.raises(expected) as exc: + m.assert_directory_verifies(path) + assert exc.value.diff == diff + elif type(expected) is type and issubclass(expected, Exception): + with pytest.raises(expected) as exc: + m.assert_directory_verifies(path) + else: + kwargs = {} + if fail_handler is not None: + kwargs['fail_handler'] = fail_handler + assert m.assert_directory_verifies(path, **kwargs) == expected + + +@pytest.mark.parametrize( + 'layout,path,args,expected', + [(layout, path, '', + None if fail_path is None + else str(ManifestMismatch(fail_path, None, diff))) + for layout, path, fail_path, diff + in COMMON_DIRECTORY_VERIFICATION_VARIANTS] + + [(BasicTestLayout, 'sub', '--keep-going', + str(ManifestMismatch('sub/stray', None, []))), + (BasicTestLayout, 'other', '--require-signed-manifest', + 'is not OpenPGP signed'), + (IncompatibleTypeLayout, '', '', + str(ManifestIncompatibleEntry(ManifestPathEntry('metadata.xml'), + None, + [('__type__', 'DATA', 'MISC')]))), + (MismatchedSizeLayout, '', '', + str(ManifestIncompatibleEntry(ManifestPathEntry('test'), + None, + [('__size__', 0, 32)]))), + (MismatchedChecksumLayout, '', '', + str(ManifestIncompatibleEntry(ManifestPathEntry('test'), + None, + [('MD5', + 'd41d8cd98f00b204e9800998ecf8427e', + '9e107d9d372bb6826bd81d3542a419d6' + )]))), + (CrossDeviceLayout, '', '', + str(ManifestCrossDevice('<path>')).split('<path>', 1)[1]), + (CrossDeviceEmptyLayout, '', '', + str(ManifestCrossDevice('<path>')).split('<path>', 1)[1]), + (SymlinkLoopLayout, '', '', + str(ManifestSymlinkLoop('<path>')).split('<path>', 1)[1]), + (SymlinkLoopIgnoreLayout, '', '', None), + ]) +def test_cli_verify(layout_factory, caplog, layout, path, args, expected): + tmp_path = layout_factory.create(layout, readonly=True) + expected_retcode = 0 if expected is None else 1 + assert gemato.cli.main(['gemato', 'verify', '-x'] + args.split() + + [str(tmp_path / path)]) == expected_retcode + if expected is not None: + assert expected in caplog.text + + +@pytest.mark.parametrize( + 'layout,relpath', + [(BasicTestLayout, path) for path in BasicTestLayout.MANIFESTS] + + [(CompressedTopManifestLayout, 'Manifest.gz'), + ]) +def test_save_manifest(layout_factory, layout, relpath): + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + m.load_manifest(relpath) + os.remove(tmp_path / relpath) + m.save_manifest(relpath) + with open_potentially_compressed_path(tmp_path / relpath, 'r') as f: + assert f.read() == layout.MANIFESTS[relpath].lstrip() + + +@pytest.mark.parametrize( + 'layout,force,sort,expected_attr', + [(BasicTestLayout, False, False, 'MANIFESTS'), + (BasicTestLayout, True, False, 'MANIFESTS_REWRITTEN'), + (BasicTestLayout, True, True, 'MANIFESTS_SORTED'), + (FilenameWhitespaceLayout, True, False, 'MANIFESTS_REWRITTEN'), + (FilenameBackslashLayout, True, False, 'MANIFESTS_REWRITTEN'), + ]) +def test_save_manifests_unmodified(layout_factory, layout, force, sort, + expected_attr): + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if not force: + m.load_manifests_for_path('', recursive=True) + m.save_manifests(force=force, sort=sort) + output = {} + for relpath in layout.MANIFESTS: + with open(tmp_path / relpath) as f: + output[relpath] = f.read() + assert output == getattr(layout, expected_attr) + + +@pytest.mark.parametrize( + 'new_entry_type,manifest_checksum', + [('DATA', '27b043ae4e184ad25aec6e793f3a23f4'), + ('MANIFEST', '3db86d6c89178496902a012ae562f4f4'), + ('MISC', '74f04c5178fc1d27bb83871bff88caf1'), + ('EBUILD', '993f2e85ab23b5fe902b089584ca829e'), + ('AUX', None), + ('DIST', None), + ('IGNORE', None), + ]) +def test_update_entry_for_path_types(layout_factory, + new_entry_type, + manifest_checksum): + layout = BasicTestLayout + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + if manifest_checksum is None: + with pytest.raises(AssertionError): + m.update_entry_for_path('sub/stray', + hashes=['SHA256', 'SHA512'], + new_entry_type=new_entry_type) + else: + m.update_entry_for_path('sub/stray', + hashes=['SHA256', 'SHA512'], + new_entry_type=new_entry_type) + new_entry = m.find_path_entry('sub/stray') + assert get_entry(new_entry) == (new_entry_type, 'stray', + ['SHA256', 'SHA512']) m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) - m.assert_directory_verifies() - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - m.update_entries_for_directory('') - self.assertIsNone(m.find_path_entry('sub/test')) - m.save_manifests() - with io.open(os.path.join(self.dir, 'Manifest'), - 'r', encoding='utf8') as f: - self.assertNotEqual(f.read(), self.FILES['Manifest']) + output = {} + for relpath in layout.MANIFESTS: + with open(tmp_path / relpath) as f: + output[relpath] = f.read() + expected = dict(layout.MANIFESTS) + expected['Manifest'] = ( + expected['Manifest'].lstrip() + .replace('128 MD5 30fd28b98a23031c72793908dd35c530', + f'{344 + len(new_entry_type)} MD5 {manifest_checksum}')) + expected['sub/Manifest'] = ( + expected['sub/Manifest'].lstrip() + + f'{new_entry_type} stray 0 SHA256 ' + f'e3b0c44298fc1c149afbf4c8996fb924' + f'27ae41e4649b934ca495991b7852b855 SHA512 ' + f'cf83e1357eefb8bdf1542850d66d8007' + f'd620e4050b5715dc83f4a921d36ce9ce' + f'47d0d13c5d85f2b0ff8318d2877eec2f' + f'63b931bd47417a81a538327af927da3e\n') + assert output == expected m.assert_directory_verifies() + if new_entry_type == 'MANIFEST': + assert 'sub/stray' in m.loaded_manifests -class SymlinkLoopTest(TempDirTestCase): - """ - Test dealing with a directory that contains a symlink to itself. - """ - DIRS = ['sub'] - FILES = { - 'Manifest': u'', - } - - def setUp(self): - super(SymlinkLoopTest, self).setUp() - os.symlink('.', os.path.join(self.dir, 'sub/sub')) +def test_update_entry_for_path_no_hash_specified(layout_factory): + layout = BasicTestLayout + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + with pytest.raises(AssertionError): + m.update_entry_for_path('sub/stray') - def tearDown(self): - os.remove(os.path.join(self.dir, 'sub/sub')) - super(SymlinkLoopTest, self).tearDown() - def test_assert_directory_verifies(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest')) - self.assertRaises(gemato.exceptions.ManifestSymlinkLoop, - m.assert_directory_verifies, '') +@pytest.mark.parametrize( + 'layout,ctor,func,path,call,save,manifest_update', + [(BasicTestLayout, + ['SHA1'], + ManifestRecursiveLoader.update_entry_for_path, + 'sub/stray', + None, + None, + {'Manifest': BasicTestLayout.MANIFESTS['Manifest'].lstrip() + .replace('128 MD5 30fd28b98a23031c72793908dd35c530', + '186 SHA1 2b89b8bc8db9cec987beeb7f08f574f1766e6b06'), + 'sub/Manifest': BasicTestLayout.MANIFESTS['sub/Manifest'] + .lstrip() + + 'DATA stray 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709\n', + }), + (BasicTestLayout, + ['SHA1'], + ManifestRecursiveLoader.update_entry_for_path, + 'sub/stray', + ['MD5'], + None, + {'Manifest': BasicTestLayout.MANIFESTS['Manifest'].lstrip() + .replace('128 MD5 30fd28b98a23031c72793908dd35c530', + '177 SHA1 d6ecf169c7c4e951d5c633c8e0debe5df1a8c0aa'), + 'sub/Manifest': BasicTestLayout.MANIFESTS['sub/Manifest'] + .lstrip() + 'DATA stray 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + (BasicTestLayout, + None, + ManifestRecursiveLoader.update_entry_for_path, + 'sub/stray', + ['SHA1'], + None, + {'Manifest': BasicTestLayout.MANIFESTS['Manifest'].lstrip() + .replace('128 MD5 30fd28b98a23031c72793908dd35c530', + '186 MD5 52e5664c2b12561cf296549395c0462a'), + 'sub/Manifest': BasicTestLayout.MANIFESTS['sub/Manifest'] + .lstrip() + + 'DATA stray 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709\n', + }), + (BasicTestLayout, + ['SHA1'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + # this rehashes all files + BasicTestLayout.MANIFESTS_SHA1), + (BasicTestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + ['SHA1'], + {'Manifest': BasicTestLayout.MANIFESTS['Manifest'].lstrip() + .replace('128 MD5 30fd28b98a23031c72793908dd35c530', + '177 SHA1 d6ecf169c7c4e951d5c633c8e0debe5df1a8c0aa'), + 'sub/Manifest': BasicTestLayout.MANIFESTS['sub/Manifest'] + .lstrip() + 'DATA stray 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + (BasicTestLayout, + ['SHA256'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + ['SHA1'], + None, + # ctor is used for Manifests that are edited, call is used + # for everything else + # TODO: does this behavior really make sense? + {'Manifest': 'TIMESTAMP 2017-01-01T01:01:01Z\n' + 'MANIFEST sub/Manifest 221 SHA256 ' + '0c4f14d1e07eb2762ca9afec0d64d8a9' + 'd65e3d99b5700fd2779f3b2641d2807a\n' + 'MANIFEST other/Manifest 0 SHA1 ' + 'da39a3ee5e6b4b0d3255bfef95601890afd80709\n' + 'DIST topdistfile-1.txt 0 MD5 ' + 'd41d8cd98f00b204e9800998ecf8427e\n', + 'sub/Manifest': 'MANIFEST deeper/Manifest 58 SHA256 ' + '87d10bbc90d9d7838141dd2d50a58760' + '20a182dd950ef551b7f689bc178d6e6c\n' + 'DIST subdistfile-1.txt 0 MD5 ' + 'd41d8cd98f00b204e9800998ecf8427e\n' + 'DATA stray 0 SHA1 ' + 'da39a3ee5e6b4b0d3255bfef95601890afd80709\n', + 'sub/deeper/Manifest': 'DATA test 0 SHA1 ' + 'da39a3ee5e6b4b0d3255' + 'bfef95601890afd80709\n', + }), + (MultiManifestLayout, + ['SHA1'], + ManifestRecursiveLoader.update_entry_for_path, + 'sub/foo', + None, + None, + {'Manifest': MultiManifestLayout.MANIFESTS['Manifest'].lstrip() + .replace('50 MD5 33fd9df6d410a93ff859d75e088bde7e', + '58 SHA1 dc62bbde3db6e82aea65c3643ae0d6be50aa8a53'), + 'sub/Manifest.a': 'DATA foo 16 ' + 'SHA1 deed2a88e73dccaa30a9e6e296f62be238be4ade\n', + }), + (MultiManifestLayout, + None, + ManifestRecursiveLoader.update_entry_for_path, + 'sub/foo', + ['SHA1'], + None, + {'Manifest': MultiManifestLayout.MANIFESTS['Manifest'].lstrip() + .replace('50 MD5 33fd9df6d410a93ff859d75e088bde7e', + '58 MD5 094185d851bf9a700889e37a46700420'), + 'sub/Manifest.a': 'DATA foo 16 ' + 'SHA1 deed2a88e73dccaa30a9e6e296f62be238be4ade\n', + }), + (MultiManifestLayout, + None, + ManifestRecursiveLoader.update_entry_for_path, + 'sub/foo', + ['MD5'], + ['SHA1'], + {'Manifest': MultiManifestLayout.MANIFESTS['Manifest'].lstrip() + .replace('50 MD5 33fd9df6d410a93ff859d75e088bde7e', + '49 SHA1 08a3eac069b8b442513016d60a3da7288c4ea821'), + 'sub/Manifest.a': 'DATA foo 16 ' + 'MD5 abeac07d3c28c1bef9e730002c753ed4\n', + }), + (MultiManifestLayout, + ['SHA1'], + ManifestRecursiveLoader.update_entry_for_path, + 'sub/foo', + None, + None, + {'Manifest': MultiManifestLayout.MANIFESTS['Manifest'].lstrip() + .replace('50 MD5 33fd9df6d410a93ff859d75e088bde7e', + '58 SHA1 dc62bbde3db6e82aea65c3643ae0d6be50aa8a53'), + 'sub/Manifest.a': 'DATA foo 16 ' + 'SHA1 deed2a88e73dccaa30a9e6e296f62be238be4ade\n', + }), + (MultiManifestLayout, + ['SHA1'], + ManifestRecursiveLoader.update_entry_for_path, + 'sub/foo', + ['MD5'], + None, + {'Manifest': MultiManifestLayout.MANIFESTS['Manifest'].lstrip() + .replace('50 MD5 33fd9df6d410a93ff859d75e088bde7e', + '49 SHA1 08a3eac069b8b442513016d60a3da7288c4ea821'), + 'sub/Manifest.a': 'DATA foo 16 ' + 'MD5 abeac07d3c28c1bef9e730002c753ed4\n', + }), + (DuplicateAuxEntryLayout, + None, + ManifestRecursiveLoader.update_entry_for_path, + 'files/test.patch', + None, + None, + {'Manifest': DuplicateAuxEntryLayout.MANIFESTS['Manifest'] + .splitlines()[1] + '\n', + }), + (DuplicateAuxEntryLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': DuplicateAuxEntryLayout.MANIFESTS['Manifest'] + .splitlines()[1] + '\n', + }), + (DisjointHashSetEntryLayout, + None, + ManifestRecursiveLoader.update_entry_for_path, + 'test', + None, + None, + {'Manifest': 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + (DisjointHashSetEntryLayout, + ['SHA256'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': 'DATA test 0 SHA256 ' + 'e3b0c44298fc1c149afbf4c8996fb924' + '27ae41e4649b934ca495991b7852b855\n', + }), + (MiscEntryLayout, + ['MD5'], + ManifestRecursiveLoader.update_entry_for_path, + 'metadata.xml', + None, + None, + {'Manifest': '', + }), + (MiscEntryLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': '', + }), + (CrossDeviceIgnoreLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {}), + (DotFileLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {}), + (CompressedSubManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'sub/Manifest.gz': 'DATA test 0 MD5 ' + 'd41d8cd98f00b204e9800998ecf8427e\n', + }), + (CompressedManifestSortLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest.gz': + 'MANIFEST a/Manifest 50 MD5 8ee2fce40e6e6cc2b5de5c91d416e9f3\n', + 'a/Manifest': + 'DATA stray 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + (MultipleStrayFilesLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': ''.join(f'DATA sub/file.{x} 0 MD5 ' + f'd41d8cd98f00b204e9800998ecf8427e\n' + for x in 'cba'), + }), + (StrayManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': + 'MANIFEST sub/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n', + 'sub/Manifest': + StrayManifestLayout.MANIFESTS['sub/Manifest'].lstrip() + }), + (StrayCompressedManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': 'MANIFEST sub/Manifest.gz 75 MD5 ' + 'e6378b64d3577c73c979fdb423937d94\n', + 'sub/Manifest.gz': + StrayCompressedManifestLayout.MANIFESTS['sub/Manifest.gz'].lstrip() + }), + (StrayInvalidManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': + 'DATA sub/Manifest 19 MD5 1c0817af3a5def5d5c90b139988727a7\n' + 'DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + pytest.param( + StrayInvalidCompressedManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': + 'DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n' + 'DATA sub/Manifest.gz 18 MD5 f937f0ff743477e4f70ef2b79672c9bc\n', + }, marks=pytest.mark.xfail), + (FilenameWhitespaceLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {}), + (FilenameBackslashLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {}), + (NestedManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': + 'MANIFEST a/Manifest 220 MD5 e85fbbce600362ab3378ebd7a2bc06db\n' + 'MANIFEST b/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n' + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'a/Manifest': + 'MANIFEST x/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n' + 'MANIFEST z/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n' + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n' + 'DATA y/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'a/x/Manifest': + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'a/z/Manifest': + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'b/Manifest': + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + (AddToMultiManifestLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': + 'MANIFEST a/Manifest.a 47 MD5 89b9c1e9e5a063ee60b91b632c84c7c8\n' + 'MANIFEST a/Manifest.b 47 MD5 1b1504046a2023ed75a2a89aed7c52f4\n' + 'DATA b/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n' + 'MANIFEST a/Manifest 46 MD5 dae3736ed4a6d6a3a74aa0af1b063bdf\n', + 'a/Manifest': 'DATA c 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + }), + (SubManifestMismatchLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': + 'MANIFEST a/Manifest 50 MD5 0f7cd9ed779a4844f98d28315dd9176a\n', + }), + (NonexistingDirectoryLayout, + ['MD5'], + ManifestRecursiveLoader.update_entry_for_path, + 'sub/test', + None, + None, + {'Manifest': '', + }), + (NonexistingDirectoryLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': '', + }), + (SymlinkLoopIgnoreLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {}), + (MismatchedFileLayout, + ['MD5'], + ManifestRecursiveLoader.update_entry_for_path, + 'test', + None, + None, + {'Manifest': 'DATA test 11 MD5 6f8db599de986fab7a21625b7916589c\n', + }), + (MismatchedFileLayout, + ['MD5'], + ManifestRecursiveLoader.update_entries_for_directory, + '', + None, + None, + {'Manifest': 'DATA test 11 MD5 6f8db599de986fab7a21625b7916589c\n', + }), + ]) +def test_update_entry_hash_specs(layout_factory, layout, ctor, func, path, + call, save, manifest_update): + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=ctor, + allow_xdev=False) + func(m, path, hashes=call) + m.save_manifests(hashes=save) + + output = {} + for relpath in layout.MANIFESTS: + with open_potentially_compressed_path(tmp_path / relpath, + 'r') as f: + output[relpath] = f.read() + expected = dict(layout.MANIFESTS) + expected.update(manifest_update) + assert output == expected + m.assert_directory_verifies() + + +@pytest.mark.parametrize( + 'layout,path,expected,reason', + [(BasicTestLayout, 'nonexist', ManifestInvalidPath, + ('__exists__', False)), + # verify that aux_path does not confuse it + (DuplicateAuxEntryLayout, 'test.patch', ManifestInvalidPath, + ('__exists__', False)), + (PotentialAuxEntryLayout, 'test.patch', ManifestInvalidPath, + ('__exists__', False)), + (CrossDeviceLayout, 'sub/version', ManifestCrossDevice, None), + (DirForFileLayout, 'test', ManifestInvalidPath, + ('__type__', 'directory')), + ]) +def test_update_entry_raise(layout_factory, layout, path, expected, reason): + """Test that update_entry_for_path() raises an exception""" + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=['MD5'], + allow_xdev=False) + with pytest.raises(expected) as exc: + m.update_entry_for_path(path) + if expected is ManifestInvalidPath: + assert exc.value.detail == reason + + +@pytest.mark.parametrize( + 'layout,path,expected', + [(BasicTestLayout, 'nonexist', FileNotFoundError), + (CrossDeviceLayout, '', ManifestCrossDevice), + (CrossDeviceEmptyLayout, '', ManifestCrossDevice), + (DirForFileLayout, '', ManifestInvalidPath), + (UnreadableDirLayout, '', PermissionError), + (SymlinkLoopLayout, '', ManifestSymlinkLoop), + ]) +def test_update_entries_for_directory_raise(layout_factory, layout, path, + expected): + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=['MD5'], + allow_xdev=False) + with pytest.raises(expected): + m.update_entries_for_directory(path) + + +def test_update_entry_new_aux(layout_factory): + layout = PotentialAuxEntryLayout + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=['MD5'], + allow_xdev=False) + m.update_entry_for_path('files/test.patch', new_entry_type='AUX') + assert (get_entry(m.find_path_entry('files/test.patch')) == + ('AUX', 'files/test.patch', ['MD5'])) + m.save_manifests() + with open(tmp_path / layout.TOP_MANIFEST, 'r') as f: + contents = f.read() + assert (contents == + 'AUX test.patch 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n') + m.assert_directory_verifies() + + +def test_update_entry_and_discard(layout_factory): + """Test that Manifests are not changed without .save_manifests()""" + layout = BasicTestLayout + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=['SHA1'], + allow_xdev=False) + m.update_entry_for_path('sub/stray', hashes=['MD5']) + del m + + output = {} + for relpath in layout.MANIFESTS: + with open(tmp_path / relpath) as f: + output[relpath] = f.read() + assert output == layout.MANIFESTS + + +@pytest.mark.parametrize( + 'layout,args,update,replace_timestamp', + [(BasicTestLayout, + '', + BasicTestLayout.MANIFESTS_SHA1, + 'TIMESTAMP 2017-01-01T01:01:01Z'), + (DuplicateEntryLayout, + '', + {'Manifest': + DuplicateEntryLayout.MANIFESTS['Manifest'].splitlines()[1] + + '\nTIMESTAMP\n' + }, + 'TIMESTAMP'), + (MiscEntryLayout, + '', + {'Manifest': 'TIMESTAMP\n'}, + 'TIMESTAMP'), + (CrossDeviceIgnoreLayout, + '', + {}, + None), + (DotFileLayout, + '', + {}, + None), + (StrayManifestLayout, + '', + {'Manifest': 'MANIFEST sub/Manifest 58 SHA1 ' + '4b40f4102dd71fb2083ce9a8d8af6d7e49c281c4\n' + 'TIMESTAMP\n', + 'sub/Manifest': 'DATA test 0 SHA1 ' + 'da39a3ee5e6b4b0d3255bfef95601890afd80709\n', + }, + 'TIMESTAMP'), + (StrayCompressedManifestLayout, + '', + {'Manifest': 'MANIFEST sub/Manifest.gz 84 SHA1 ' + 'aa62bd16d440d2a118a381df4f9b9c413d993e75\n' + 'TIMESTAMP\n', + 'sub/Manifest.gz': 'DATA test 0 SHA1 ' + 'da39a3ee5e6b4b0d3255bfef95601890afd80709\n', + }, + 'TIMESTAMP'), + (StrayInvalidManifestLayout, + '', + {'Manifest': 'DATA sub/Manifest 19 SHA1 ' + '0edaf6696720e166e43e5eedbde23818a8a4939c\n' + 'DATA sub/test 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709\n' + 'TIMESTAMP\n', + }, + 'TIMESTAMP'), + pytest.param( + StrayInvalidCompressedManifestLayout, + '', + {'Manifest': + 'DATA sub/test 0 SHA1 da39a3ee5e6b4b0d3255bfef95601890afd80709\n' + 'DATA sub/Manifest.gz 18 SHA1 ' + '6af661c09147db2a2b51ae7c3cf2834d88884596\n' + 'TIMESTAMP\n', + }, + 'TIMESTAMP', marks=pytest.mark.xfail), + (FilenameWhitespaceLayout, + '', + {'Manifest': + 'DATA \\x20\\x20foo\\x20bar\\x20\\x20 0 SHA1 ' + 'da39a3ee5e6b4b0d3255bfef95601890afd80709\n' + 'TIMESTAMP\n', + }, + 'TIMESTAMP'), + (SymlinkLoopIgnoreLayout, + '', + {}, + None), + (MismatchedFileFutureTimestampLayout, + '', + {'Manifest': 'TIMESTAMP\n' + 'DATA test 11 SHA1 661295c9cbf9d6b2f6428414504a8deed3020641\n' + }, + 'TIMESTAMP'), + (MismatchedFileFutureTimestampLayout, + '--incremental', + {}, + 'TIMESTAMP'), + ]) +def test_cli_update(layout_factory, layout, args, update, + replace_timestamp): + tmp_path = layout_factory.create(layout) + assert gemato.cli.main(['gemato', 'update', '-x', '--hashes=SHA1', + '--timestamp'] + args.split() + + [str(tmp_path)]) == 0 + + if replace_timestamp is not None: + m = gemato.manifest.ManifestFile() + with open(tmp_path / layout.TOP_MANIFEST, 'r') as f: + m.load(f) + ts = m.find_timestamp() + assert ts is not None + assert ts.ts != datetime.datetime(2017, 1, 1, 1, 1, 1) + + output = {} + for relpath in layout.MANIFESTS: + with open_potentially_compressed_path(tmp_path / relpath, + 'r') as f: + output[relpath] = f.read() + expected = dict(layout.MANIFESTS) + expected.update(update) + if replace_timestamp is not None: + expected['Manifest'] = expected['Manifest'].replace( + replace_timestamp, ' '.join(ts.to_list())) + assert output == expected + + +@pytest.mark.parametrize( + 'layout,expected', + [(CrossDeviceLayout, + str(ManifestCrossDevice('<path>')).split('<path>', 1)[1]), + (CrossDeviceEmptyLayout, + str(ManifestCrossDevice('<path>')).split('<path>', 1)[1]), + (DirForFileLayout, + str(ManifestInvalidPath('<path>', ('__type__', 'directory'))) + .split('<path>', 1)[1]), + (SymlinkLoopLayout, + str(ManifestSymlinkLoop('<path>')).split('<path>', 1)[1]), + ]) +def test_cli_update_fail(layout_factory, caplog, layout, expected): + tmp_path = layout_factory.create(layout) + assert gemato.cli.main(['gemato', 'update', '-x', '--hashes=SHA1', + '--timestamp', str(tmp_path)]) == 1 + assert expected in caplog.text + + +COMMON_COMPRESS_VARIANTS = ( + # layout, watermark, compress_format, expected_compressed + list(itertools.chain.from_iterable( + [(BasicTestLayout, 0, algo, + [x for x in BasicTestLayout.MANIFESTS if x != 'Manifest']), + pytest.param(BasicTestLayout, 64, algo, ['sub/Manifest'], + marks=pytest.mark.xfail), + ] for algo in COMPRESSION_ALGOS)) + + [(CompressedSubManifestLayout, 0, 'gz', ['sub/Manifest']), + (CompressedSubManifestLayout, 4096, 'gz', []), + ]) + + +@pytest.mark.parametrize( + 'layout,watermark,compress_format,expected_compressed', + COMMON_COMPRESS_VARIANTS + + [(CompressedTopManifestLayout, 0, 'gz', ['Manifest']), + (CompressedTopManifestLayout, 4096, 'gz', []), + ]) +def test_compress_manifests(layout_factory, layout, watermark, + expected_compressed, compress_format): + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=['MD5'], + allow_xdev=False) + m.save_manifests(force=True, + compress_watermark=watermark, + compress_format=compress_format) + + manifests = [] + for dirpath, dirnames, filenames in os.walk(tmp_path): + for f in filenames: + if f.startswith('Manifest'): + manifests.append( + os.path.relpath(os.path.join(dirpath, f), tmp_path)) + expected_manifest_basenames = (os.path.splitext(x)[0] + for x in layout.MANIFESTS) + expected = [(f'{x}.{compress_format}' if x in expected_compressed + else x) + for x in expected_manifest_basenames] + assert sorted(manifests) == sorted(expected) + + +@pytest.mark.parametrize( + 'layout,watermark,compress_format,expected_compressed', + COMMON_COMPRESS_VARIANTS) +def test_cli_compress(layout_factory, layout, watermark, + expected_compressed, compress_format): + tmp_path = layout_factory.create(layout) + assert gemato.cli.main(['gemato', 'update', '--hashes=MD5', + f'--compress-format={compress_format}', + f'--compress-watermark={watermark}', + '--force-rewrite', str(tmp_path)]) == 0 + + manifests = [] + for dirpath, dirnames, filenames in os.walk(tmp_path): + for f in filenames: + if f.startswith('Manifest'): + manifests.append( + os.path.relpath(os.path.join(dirpath, f), tmp_path)) + expected_manifest_basenames = (os.path.splitext(x)[0] + for x in layout.MANIFESTS) + expected = [(f'{x}.{compress_format}' if x in expected_compressed + else x) + for x in expected_manifest_basenames] + assert sorted(manifests) == sorted(expected) + + +@pytest.mark.parametrize( + 'layout,expected', + [(DuplicateEntryLayout, + DuplicateEntryLayout.MANIFESTS['Manifest'].splitlines()[1] + '\n' + ), + (DuplicateManifestEntryLayout, + DuplicateManifestEntryLayout.MANIFESTS['Manifest'].splitlines()[1] + + '\n'), + (DuplicateManifestAsDataEntryLayout, + DuplicateManifestAsDataEntryLayout.MANIFESTS['Manifest'] + .splitlines()[1] + '\n'), + (DuplicateEbuildEntryLayout, + DuplicateEbuildEntryLayout.MANIFESTS['Manifest'].splitlines()[1] + + '\n'), + (DuplicateAuxEntryLayout, + DuplicateAuxEntryLayout.MANIFESTS['Manifest'].splitlines()[1] + + '\n'), + (DisjointHashSetEntryLayout, + 'DATA test 0 MD5 9e107d9d372bb6826bd81d3542a419d6 ' + 'SHA1 2fd4e1c67a2d28fced849ee1bb76e7391b93eb12\n'), + (MismatchedSizeLayout, + MismatchedSizeLayout.MANIFESTS['Manifest'].splitlines()[1] + '\n'), + (MismatchedChecksumLayout, + MismatchedChecksumLayout.MANIFESTS['Manifest'].splitlines()[2] + + '\n'), + (CompressedTopManifestLayout, + CompressedTopManifestLayout.MANIFESTS['Manifest.gz']), + ]) +def test_write_deduplicated_manifest(layout_factory, layout, expected): + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + m.get_deduplicated_file_entry_dict_for_update() + m.save_manifests() + with open_potentially_compressed_path(tmp_path / layout.TOP_MANIFEST, + 'r') as f: + contents = f.read() + assert contents == expected + + +@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz']) +def test_new_manifest_without_create(layout_factory, filename): + tmp_path = layout_factory.create(NewManifestLayout) + with pytest.raises(FileNotFoundError): + ManifestRecursiveLoader(tmp_path / filename) + + +@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz']) +def test_new_manifest_create_no_save(layout_factory, filename): + tmp_path = layout_factory.create(NewManifestLayout) + m = ManifestRecursiveLoader(tmp_path / filename, + allow_create=True) + del m + assert sorted(os.listdir(tmp_path)) == ['sub', 'test'] + + +@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz']) +def test_new_manifest_create_save(layout_factory, filename): + tmp_path = layout_factory.create(NewManifestLayout) + m = ManifestRecursiveLoader(tmp_path / filename, + allow_create=True) + m.save_manifests() + assert sorted(os.listdir(tmp_path)) == [filename, 'sub', 'test'] + + +@pytest.mark.parametrize( + 'filename,compress_watermark,expected', + [('Manifest', None, 'Manifest'), + ('Manifest', 0, 'Manifest'), + ('Manifest.gz', None, 'Manifest.gz'), + ('Manifest.gz', 0, 'Manifest.gz'), + ('Manifest.gz', 4096, 'Manifest'), + ]) +def test_new_manifest_create_update(layout_factory, + filename, + compress_watermark, + expected): + tmp_path = layout_factory.create(NewManifestLayout) + m = ManifestRecursiveLoader(tmp_path / filename, + allow_create=True, + hashes=['MD5']) + m.update_entries_for_directory('') + m.save_manifests(compress_watermark=compress_watermark) + assert sorted(os.listdir(tmp_path)) == [expected, 'sub', 'test'] + m.assert_directory_verifies('') + + # implicit compression should not affect top Manifest + with open_potentially_compressed_path(tmp_path / expected, 'r') as f: + contents = f.read() + expected = ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip() + assert contents == expected - def test_cli_verifies(self): - self.assertEqual(gemato.cli.main(['gemato', 'verify', self.dir]), - 1) - def test_update_entries_for_directory(self): - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(self.dir, 'Manifest'), - hashes=['SHA256', 'SHA512']) - self.assertRaises(gemato.exceptions.ManifestSymlinkLoop, - m.update_entries_for_directory, '') +@pytest.mark.parametrize('args', ['', '--compress-watermark=0']) +def test_new_manifest_cli(layout_factory, args): + tmp_path = layout_factory.create(NewManifestLayout) + assert gemato.cli.main(['gemato', 'create', '--hashes=MD5'] + + args.split() + [str(tmp_path)]) == 0 - def test_cli_update(self): - self.assertEqual(gemato.cli.main(['gemato', 'update', - '--hashes=SHA256 SHA512', self.dir]), 1) + with open(tmp_path / 'Manifest', 'r') as f: + contents = f.read() + expected = ''' +DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +DATA sub/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e +'''.lstrip() + assert contents == expected + + +@pytest.mark.parametrize( + 'layout,expected,expected_with_entry', + [(NestedManifestLayout, ['a/Manifest', 'b/Manifest'], + ['a/x/Manifest', 'a/z/Manifest']), + (AddToMultiManifestLayout, ['a/Manifest'], []), + ]) +def test_load_unregistered_manifests(layout_factory, + layout, + expected, + expected_with_entry): + tmp_path = layout_factory.create(layout) + # remove the top Manifest + os.unlink(tmp_path / 'Manifest') + m = ManifestRecursiveLoader(tmp_path / 'Manifest', + allow_create=True) + loaded = m.load_unregistered_manifests('') + assert sorted(loaded) == sorted(expected + expected_with_entry) + assert sorted(m.loaded_manifests) == sorted(['Manifest'] + loaded) + assert list(m.updated_manifests) == ['Manifest'] + # new entries are not added to Manifest + for path in expected: + assert get_entry(m.find_path_entry(path)) is None + + +def test_regenerate_update_manifest(layout_factory): + layout = NestedManifestLayout + tmp_path = layout_factory.create(layout) + # remove the top Manifest + os.unlink(tmp_path / 'Manifest') + m = ManifestRecursiveLoader(tmp_path / 'Manifest', + allow_create=True) + m.update_entries_for_directory('', hashes=['MD5']) + m.save_manifests() + + output = {} + for relpath in layout.MANIFESTS: + with open_potentially_compressed_path(tmp_path / relpath, + 'r') as f: + output[relpath] = f.read() + expected = { + 'Manifest': 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n' + 'MANIFEST b/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n' + 'MANIFEST a/Manifest 220 MD5 e85fbbce600362ab3378ebd7a2bc06db\n', + 'a/Manifest': + 'MANIFEST x/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n' + 'MANIFEST z/Manifest 49 MD5 b86a7748346d54c6455886306f017e6c\n' + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n' + 'DATA y/test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'a/x/Manifest': + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'a/z/Manifest': + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + 'b/Manifest': + 'DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e\n', + } + assert output == expected + + m.assert_directory_verifies() + + +def test_create_manifest(layout_factory): + layout = NestedManifestLayout + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / 'Manifest') + new_manifest = m.create_manifest('a/y/Manifest') + assert new_manifest is not None + assert not os.path.exists(tmp_path / 'a/y/Manifest') + m.loaded_manifests['Manifest'].entries.append( + gemato.manifest.ManifestEntryMANIFEST('a/y/Manifest', 0, {})) + m.save_manifests() + assert os.path.exists(tmp_path / 'a/y/Manifest') + + +def test_verify_mtime_old(layout_factory): + layout = MismatchedFileLayout + tmp_path = layout_factory.create(layout, readonly=True) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + with pytest.raises(ManifestMismatch): + m.assert_directory_verifies('', last_mtime=0) + + +def test_verify_mtime_new(layout_factory): + layout = MismatchedFileLayout + tmp_path = layout_factory.create(layout, readonly=True) + st = os.stat(tmp_path / 'test') + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + allow_xdev=False) + m.assert_directory_verifies('', last_mtime=st.st_mtime) + + +class FILE_STAT: + pass + + +@pytest.mark.parametrize( + 'last_mtime,manifest_update', + [(0, {'Manifest': + 'DATA test 11 MD5 6f8db599de986fab7a21625b7916589c\n'}), + (FILE_STAT, {}), + ]) +def test_update_mtime(layout_factory, last_mtime, manifest_update): + layout = MismatchedFileLayout + tmp_path = layout_factory.create(layout) + m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST, + hashes=['MD5'], + allow_xdev=False) + + if last_mtime is FILE_STAT: + st = os.stat(tmp_path / 'test') + last_mtime = st.st_mtime + m.update_entries_for_directory('', last_mtime=last_mtime) + m.save_manifests() + + output = {} + for relpath in layout.MANIFESTS: + with open_potentially_compressed_path(tmp_path / relpath, + 'r') as f: + output[relpath] = f.read() + expected = dict(layout.MANIFESTS) + expected.update(manifest_update) + assert output == expected |