summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/recursiveloader.py74
-rw-r--r--tests/test_recursiveloader.py128
2 files changed, 202 insertions, 0 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
new file mode 100644
index 0000000..5198ba5
--- /dev/null
+++ b/gemato/recursiveloader.py
@@ -0,0 +1,74 @@
+# gemato: Recursive loader for Manifests
+# vim:fileencoding=utf-8
+# (c) 2017 Michał Górny
+# Licensed under the terms of 2-clause BSD license
+
+import io
+import os.path
+import weakref
+
+import gemato.manifest
+import gemato.verify
+
+
+class ManifestRecursiveLoader(object):
+ """
+ A class encapsulating a tree covered by multiple Manifests.
+ Automatically verifies and loads additional sub-Manifests,
+ and provides methods to access the entries in them.
+ """
+
+ def __init__(self, top_manifest_path):
+ """
+ Instantiate the loader for a Manifest tree starting at top-level
+ Manifest @top_manifest_path.
+ """
+ self.root_directory = os.path.dirname(top_manifest_path)
+ self.loaded_manifests = {}
+ self.load_manifest(os.path.basename(top_manifest_path))
+
+ def load_manifest(self, relpath, verify_entry=None):
+ """
+ Load a single Manifest file whose relative path within Manifest
+ tree is @relpath. If @verify_entry is not null, the Manifest
+ file is verified against the entry.
+ """
+ m = gemato.manifest.ManifestFile()
+ path = os.path.join(self.root_directory, relpath)
+ if verify_entry is not None:
+ gemato.verify.assert_path_verifies(path, verify_entry)
+ with io.open(path, 'r', encoding='utf8') as f:
+ m.load(f)
+ self.loaded_manifests[relpath] = m
+
+ def _iter_manifests_for_path(self, path):
+ """
+ Iterate over loaded Manifests that can apply to path.
+ Yields a tuple of (relative_path, manifest).
+ """
+ for k, v in self.loaded_manifests.items():
+ d = os.path.dirname(k)
+ if not d or (path + '/').startswith(d + '/'):
+ yield (d, v)
+
+ def load_manifests_for_path(self, path):
+ """
+ Load all Manifests that may apply to the specified path,
+ recursively.
+ """
+ while True:
+ to_load = []
+ for relpath, m in self._iter_manifests_for_path(path):
+ for e in m.entries:
+ if not isinstance(e, gemato.manifest.ManifestEntryMANIFEST):
+ continue
+ mpath = os.path.join(relpath, e.path)
+ if mpath in self.loaded_manifests:
+ continue
+ mdir = os.path.dirname(mpath)
+ if not mdir or path.startswith(mdir + '/'):
+ to_load.append((mpath, e))
+ if not to_load:
+ break
+ for mpath, e in to_load:
+ self.load_manifest(mpath, e)
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
new file mode 100644
index 0000000..6bbc7bd
--- /dev/null
+++ b/tests/test_recursiveloader.py
@@ -0,0 +1,128 @@
+# gemato: Recursive loader tests
+# vim:fileencoding=utf-8
+# (c) 2017 Michał Górny
+# Licensed under the terms of 2-clause BSD license
+
+import io
+import os
+import tempfile
+import unittest
+
+import gemato.recursiveloader
+
+
+class BasicNestingTest(unittest.TestCase):
+ DIRS = ['sub', 'sub/deeper']
+ FILES = {
+ 'Manifest': u'''
+MANIFEST sub/Manifest 65 MD5 6af76e314820a44aba2b4bd3e6280c20
+''',
+ 'sub/Manifest': u'''
+MANIFEST deeper/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+''',
+ 'sub/deeper/Manifest': u'',
+ }
+
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ for k in self.DIRS:
+ os.mkdir(os.path.join(self.dir, k))
+ for k, v in self.FILES.items():
+ with io.open(os.path.join(self.dir, k), 'w', encoding='utf8') as f:
+ f.write(v)
+
+ def tearDown(self):
+ for k in self.FILES:
+ os.unlink(os.path.join(self.dir, k))
+ for k in reversed(self.DIRS):
+ os.rmdir(os.path.join(self.dir, k))
+ os.rmdir(self.dir)
+
+ def test_init(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertIn('Manifest', m.loaded_manifests)
+
+ def test_load_sub_manifest(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+ m.load_manifests_for_path('sub/test')
+ self.assertIn('sub/Manifest', m.loaded_manifests)
+ self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests)
+ m.load_manifests_for_path('sub/deeper/test')
+ self.assertIn('sub/deeper/Manifest', m.loaded_manifests)
+
+ def test_recursive_load_manifest(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertNotIn('sub/Manifest', m.loaded_manifests)
+ self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests)
+ m.load_manifests_for_path('sub/deeper/test')
+ self.assertIn('sub/Manifest', m.loaded_manifests)
+ self.assertIn('sub/deeper/Manifest', m.loaded_manifests)
+
+
+class MultipleManifestTest(unittest.TestCase):
+ DIRS = ['sub']
+ FILES = {
+ 'Manifest': u'''
+MANIFEST sub/Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+MANIFEST sub/Manifest.b 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+''',
+ 'sub/Manifest.a': u'',
+ 'sub/Manifest.b': u'',
+ }
+
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ for k in self.DIRS:
+ os.mkdir(os.path.join(self.dir, k))
+ for k, v in self.FILES.items():
+ with io.open(os.path.join(self.dir, k), 'w', encoding='utf8') as f:
+ f.write(v)
+
+ def tearDown(self):
+ for k in self.FILES:
+ os.unlink(os.path.join(self.dir, k))
+ for k in reversed(self.DIRS):
+ os.rmdir(os.path.join(self.dir, k))
+ os.rmdir(self.dir)
+
+ def test_load_sub_manifest(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ self.assertNotIn('sub/Manifest.a', m.loaded_manifests)
+ self.assertNotIn('sub/Manifest.b', m.loaded_manifests)
+ m.load_manifests_for_path('sub/test')
+ self.assertIn('sub/Manifest.a', m.loaded_manifests)
+ self.assertIn('sub/Manifest.b', m.loaded_manifests)
+
+
+class MultipleTopLevelManifestTest(unittest.TestCase):
+ FILES = {
+ 'Manifest': u'''
+MANIFEST Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+MANIFEST Manifest.b 0 MD5 d41d8cd98f00b204e9800998ecf8427e
+''',
+ 'Manifest.a': u'',
+ 'Manifest.b': u'',
+ }
+
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ for k, v in self.FILES.items():
+ with io.open(os.path.join(self.dir, k), 'w', encoding='utf8') as f:
+ f.write(v)
+
+ def tearDown(self):
+ for k in self.FILES:
+ os.unlink(os.path.join(self.dir, k))
+ os.rmdir(self.dir)
+
+ def test_load_extra_manifests(self):
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(self.dir, 'Manifest'))
+ m.load_manifests_for_path('')
+ self.assertIn('Manifest.a', m.loaded_manifests)
+ self.assertIn('Manifest.b', m.loaded_manifests)