summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/recursiveloader.py41
-rw-r--r--tests/test_recursiveloader.py99
2 files changed, 130 insertions, 10 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index 5d07e0b..447e2d4 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -3,6 +3,7 @@
# (c) 2017 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import errno
import os.path
import gemato.compression
@@ -26,7 +27,7 @@ class ManifestRecursiveLoader(object):
def __init__(self, top_manifest_path,
verify_openpgp=True, openpgp_env=None,
sign_openpgp=None, openpgp_keyid=None,
- hashes=None):
+ hashes=None, allow_create=False):
"""
Instantiate the loader for a Manifest tree starting at top-level
Manifest @top_manifest_path.
@@ -48,27 +49,38 @@ class ManifestRecursiveLoader(object):
for the Manifest. If it is specified, they will be used for all
subsequent update*() calls that do not specify another set
of hashes explicitly.
+
+ If @allow_create is True and @top_manifest_path does not exist,
+ a new Manifest tree will be initialized. Otherwise, opening
+ a non-existing file will cause an exception.
"""
self.root_directory = os.path.dirname(top_manifest_path)
- self.loaded_manifests = {}
self.verify_openpgp = verify_openpgp
self.openpgp_env = openpgp_env
self.sign_openpgp = sign_openpgp
self.openpgp_keyid = openpgp_keyid
self.hashes = hashes
+ self.loaded_manifests = {}
+ self.updated_manifests = set()
+
# TODO: allow catching OpenPGP exceptions somehow?
- m = self.load_manifest(os.path.basename(top_manifest_path))
+ m = self.load_manifest(os.path.basename(top_manifest_path),
+ allow_create=allow_create)
self.openpgp_signed = m.openpgp_signed
- self.updated_manifests = set()
- def load_manifest(self, relpath, verify_entry=None):
+ def load_manifest(self, relpath, verify_entry=None,
+ allow_create=False):
"""
Load a single Manifest file whose relative path within Manifest
tree is @relpath. If @verify_entry is not null, the Manifest
file is verified against the entry. If the file is compressed,
it is decompressed transparently.
+
+ If @allow_create is True and the Manifest does not exist,
+ a new Manifest will be added. Otherwise, opening a non-existing
+ file will cause an exception.
"""
m = gemato.manifest.ManifestFile()
path = os.path.join(self.root_directory, relpath)
@@ -77,11 +89,20 @@ class ManifestRecursiveLoader(object):
if not ret:
raise gemato.exceptions.ManifestMismatch(
relpath, verify_entry, diff)
- with gemato.compression.open_potentially_compressed_path(
- path, 'r', encoding='utf8') as f:
- m.load(f, self.verify_openpgp, self.openpgp_env)
- st = os.fstat(f.fileno())
- self.manifest_device = st.st_dev
+ try:
+ with gemato.compression.open_potentially_compressed_path(
+ path, 'r', encoding='utf8') as f:
+ m.load(f, self.verify_openpgp, self.openpgp_env)
+ st = os.fstat(f.fileno())
+ except IOError as err:
+ if err.errno == errno.ENOENT and allow_create:
+ st = os.stat(os.path.dirname(path))
+ # trigger saving
+ self.updated_manifests.add(relpath)
+ else:
+ raise err
+
+ self.manifest_device = st.st_dev
self.loaded_manifests[relpath] = m
return m
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
index 50e2f16..fb4b67d 100644
--- a/tests/test_recursiveloader.py
+++ b/tests/test_recursiveloader.py
@@ -2129,3 +2129,102 @@ INVALID STUFF IN HERE
gemato.cli.main(['gemato', 'verify',
self.dir]),
0)
+
+
+class CreateNewManifestTest(TempDirTestCase):
+ DIRS = ['sub']
+ FILES = {
+ 'test': u'',
+ 'sub/test': u'',
+ }
+
+ def setUp(self):
+ super(CreateNewManifestTest, self).setUp()
+ self.path = os.path.join(self.dir, 'Manifest')
+
+ def tearDown(self):
+ try:
+ os.unlink(self.path)
+ except OSError:
+ pass
+ super(CreateNewManifestTest, self).tearDown()
+
+ def test_load_without_create(self):
+ self.assertRaises(IOError,
+ gemato.recursiveloader.ManifestRecursiveLoader,
+ self.path)
+
+ def test_create_without_save(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ self.path, allow_create=True)
+ del m
+ self.assertFalse(os.path.exists(self.path))
+
+ def test_create_empty(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ self.path, allow_create=True)
+ m.save_manifests()
+ self.assertTrue(os.path.exists(self.path))
+
+ def test_update_entries_for_directory(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ self.path, allow_create=True, hashes=['MD5'])
+ m.update_entries_for_directory('')
+ m.save_manifests()
+ m.assert_directory_verifies('')
+
+ m2 = gemato.manifest.ManifestFile()
+ with io.open(self.path, 'r', encoding='utf8') as f:
+ m2.load(f)
+ self.assertEqual(len(m2.entries), 2)
+
+
+class CreateNewCompressedManifestTest(TempDirTestCase):
+ DIRS = ['sub']
+ FILES = {
+ 'test': u'',
+ 'sub/test': u'',
+ }
+
+ def setUp(self):
+ super(CreateNewCompressedManifestTest, self).setUp()
+ self.path = os.path.join(self.dir, 'Manifest.gz')
+
+ def tearDown(self):
+ try:
+ os.unlink(self.path)
+ except OSError:
+ pass
+ super(CreateNewCompressedManifestTest, self).tearDown()
+
+ def test_load_without_create(self):
+ self.assertRaises(IOError,
+ gemato.recursiveloader.ManifestRecursiveLoader,
+ self.path)
+
+ def test_create_without_save(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ self.path, allow_create=True)
+ del m
+ self.assertFalse(os.path.exists(self.path))
+
+ def test_create_empty(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ self.path, allow_create=True)
+ m.save_manifests()
+ with gemato.compression.open_potentially_compressed_path(
+ self.path, 'rb') as f:
+ self.assertEqual(f.read(), b'')
+
+ def test_update_entries_for_directory(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ self.path, allow_create=True, hashes=['MD5'])
+ m.update_entries_for_directory('')
+ m.save_manifests()
+ m.assert_directory_verifies('')
+
+ m2 = gemato.manifest.ManifestFile()
+ with gemato.compression.open_potentially_compressed_path(
+ self.path, 'r', encoding='utf8') as f:
+ m2.load(f)
+ self.assertEqual(len(m2.entries), 2)