diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-10-29 13:48:37 +0100 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-10-29 13:48:37 +0100 |
commit | 566eff1c70f54d5020d860ff449edc605aa828e3 (patch) | |
tree | b803e5bf10e0cf967c069d3ec4a8266e3772845d | |
parent | 919b29928428c5a1f473d78e02e6d8fdcd2acead (diff) | |
download | gemato-566eff1c70f54d5020d860ff449edc605aa828e3.tar.gz |
recursiveloader: Support creating new Manifests
-rw-r--r-- | gemato/recursiveloader.py | 41 | ||||
-rw-r--r-- | tests/test_recursiveloader.py | 99 |
2 files changed, 130 insertions, 10 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index 5d07e0b..447e2d4 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -3,6 +3,7 @@ # (c) 2017 Michał Górny # Licensed under the terms of 2-clause BSD license +import errno import os.path import gemato.compression @@ -26,7 +27,7 @@ class ManifestRecursiveLoader(object): def __init__(self, top_manifest_path, verify_openpgp=True, openpgp_env=None, sign_openpgp=None, openpgp_keyid=None, - hashes=None): + hashes=None, allow_create=False): """ Instantiate the loader for a Manifest tree starting at top-level Manifest @top_manifest_path. @@ -48,27 +49,38 @@ class ManifestRecursiveLoader(object): for the Manifest. If it is specified, they will be used for all subsequent update*() calls that do not specify another set of hashes explicitly. + + If @allow_create is True and @top_manifest_path does not exist, + a new Manifest tree will be initialized. Otherwise, opening + a non-existing file will cause an exception. """ self.root_directory = os.path.dirname(top_manifest_path) - self.loaded_manifests = {} self.verify_openpgp = verify_openpgp self.openpgp_env = openpgp_env self.sign_openpgp = sign_openpgp self.openpgp_keyid = openpgp_keyid self.hashes = hashes + self.loaded_manifests = {} + self.updated_manifests = set() + # TODO: allow catching OpenPGP exceptions somehow? - m = self.load_manifest(os.path.basename(top_manifest_path)) + m = self.load_manifest(os.path.basename(top_manifest_path), + allow_create=allow_create) self.openpgp_signed = m.openpgp_signed - self.updated_manifests = set() - def load_manifest(self, relpath, verify_entry=None): + def load_manifest(self, relpath, verify_entry=None, + allow_create=False): """ Load a single Manifest file whose relative path within Manifest tree is @relpath. If @verify_entry is not null, the Manifest file is verified against the entry. If the file is compressed, it is decompressed transparently. + + If @allow_create is True and the Manifest does not exist, + a new Manifest will be added. Otherwise, opening a non-existing + file will cause an exception. """ m = gemato.manifest.ManifestFile() path = os.path.join(self.root_directory, relpath) @@ -77,11 +89,20 @@ class ManifestRecursiveLoader(object): if not ret: raise gemato.exceptions.ManifestMismatch( relpath, verify_entry, diff) - with gemato.compression.open_potentially_compressed_path( - path, 'r', encoding='utf8') as f: - m.load(f, self.verify_openpgp, self.openpgp_env) - st = os.fstat(f.fileno()) - self.manifest_device = st.st_dev + try: + with gemato.compression.open_potentially_compressed_path( + path, 'r', encoding='utf8') as f: + m.load(f, self.verify_openpgp, self.openpgp_env) + st = os.fstat(f.fileno()) + except IOError as err: + if err.errno == errno.ENOENT and allow_create: + st = os.stat(os.path.dirname(path)) + # trigger saving + self.updated_manifests.add(relpath) + else: + raise err + + self.manifest_device = st.st_dev self.loaded_manifests[relpath] = m return m diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py index 50e2f16..fb4b67d 100644 --- a/tests/test_recursiveloader.py +++ b/tests/test_recursiveloader.py @@ -2129,3 +2129,102 @@ INVALID STUFF IN HERE gemato.cli.main(['gemato', 'verify', self.dir]), 0) + + +class CreateNewManifestTest(TempDirTestCase): + DIRS = ['sub'] + FILES = { + 'test': u'', + 'sub/test': u'', + } + + def setUp(self): + super(CreateNewManifestTest, self).setUp() + self.path = os.path.join(self.dir, 'Manifest') + + def tearDown(self): + try: + os.unlink(self.path) + except OSError: + pass + super(CreateNewManifestTest, self).tearDown() + + def test_load_without_create(self): + self.assertRaises(IOError, + gemato.recursiveloader.ManifestRecursiveLoader, + self.path) + + def test_create_without_save(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + self.path, allow_create=True) + del m + self.assertFalse(os.path.exists(self.path)) + + def test_create_empty(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + self.path, allow_create=True) + m.save_manifests() + self.assertTrue(os.path.exists(self.path)) + + def test_update_entries_for_directory(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + self.path, allow_create=True, hashes=['MD5']) + m.update_entries_for_directory('') + m.save_manifests() + m.assert_directory_verifies('') + + m2 = gemato.manifest.ManifestFile() + with io.open(self.path, 'r', encoding='utf8') as f: + m2.load(f) + self.assertEqual(len(m2.entries), 2) + + +class CreateNewCompressedManifestTest(TempDirTestCase): + DIRS = ['sub'] + FILES = { + 'test': u'', + 'sub/test': u'', + } + + def setUp(self): + super(CreateNewCompressedManifestTest, self).setUp() + self.path = os.path.join(self.dir, 'Manifest.gz') + + def tearDown(self): + try: + os.unlink(self.path) + except OSError: + pass + super(CreateNewCompressedManifestTest, self).tearDown() + + def test_load_without_create(self): + self.assertRaises(IOError, + gemato.recursiveloader.ManifestRecursiveLoader, + self.path) + + def test_create_without_save(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + self.path, allow_create=True) + del m + self.assertFalse(os.path.exists(self.path)) + + def test_create_empty(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + self.path, allow_create=True) + m.save_manifests() + with gemato.compression.open_potentially_compressed_path( + self.path, 'rb') as f: + self.assertEqual(f.read(), b'') + + def test_update_entries_for_directory(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + self.path, allow_create=True, hashes=['MD5']) + m.update_entries_for_directory('') + m.save_manifests() + m.assert_directory_verifies('') + + m2 = gemato.manifest.ManifestFile() + with gemato.compression.open_potentially_compressed_path( + self.path, 'r', encoding='utf8') as f: + m2.load(f) + self.assertEqual(len(m2.entries), 2) |