diff options
-rw-r--r-- | gemato/recursiveloader.py | 74 | ||||
-rw-r--r-- | tests/test_recursiveloader.py | 128 |
2 files changed, 202 insertions, 0 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py new file mode 100644 index 0000000..5198ba5 --- /dev/null +++ b/gemato/recursiveloader.py @@ -0,0 +1,74 @@ +# gemato: Recursive loader for Manifests +# vim:fileencoding=utf-8 +# (c) 2017 Michał Górny +# Licensed under the terms of 2-clause BSD license + +import io +import os.path +import weakref + +import gemato.manifest +import gemato.verify + + +class ManifestRecursiveLoader(object): + """ + A class encapsulating a tree covered by multiple Manifests. + Automatically verifies and loads additional sub-Manifests, + and provides methods to access the entries in them. + """ + + def __init__(self, top_manifest_path): + """ + Instantiate the loader for a Manifest tree starting at top-level + Manifest @top_manifest_path. + """ + self.root_directory = os.path.dirname(top_manifest_path) + self.loaded_manifests = {} + self.load_manifest(os.path.basename(top_manifest_path)) + + def load_manifest(self, relpath, verify_entry=None): + """ + Load a single Manifest file whose relative path within Manifest + tree is @relpath. If @verify_entry is not null, the Manifest + file is verified against the entry. + """ + m = gemato.manifest.ManifestFile() + path = os.path.join(self.root_directory, relpath) + if verify_entry is not None: + gemato.verify.assert_path_verifies(path, verify_entry) + with io.open(path, 'r', encoding='utf8') as f: + m.load(f) + self.loaded_manifests[relpath] = m + + def _iter_manifests_for_path(self, path): + """ + Iterate over loaded Manifests that can apply to path. + Yields a tuple of (relative_path, manifest). + """ + for k, v in self.loaded_manifests.items(): + d = os.path.dirname(k) + if not d or (path + '/').startswith(d + '/'): + yield (d, v) + + def load_manifests_for_path(self, path): + """ + Load all Manifests that may apply to the specified path, + recursively. + """ + while True: + to_load = [] + for relpath, m in self._iter_manifests_for_path(path): + for e in m.entries: + if not isinstance(e, gemato.manifest.ManifestEntryMANIFEST): + continue + mpath = os.path.join(relpath, e.path) + if mpath in self.loaded_manifests: + continue + mdir = os.path.dirname(mpath) + if not mdir or path.startswith(mdir + '/'): + to_load.append((mpath, e)) + if not to_load: + break + for mpath, e in to_load: + self.load_manifest(mpath, e) diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py new file mode 100644 index 0000000..6bbc7bd --- /dev/null +++ b/tests/test_recursiveloader.py @@ -0,0 +1,128 @@ +# gemato: Recursive loader tests +# vim:fileencoding=utf-8 +# (c) 2017 Michał Górny +# Licensed under the terms of 2-clause BSD license + +import io +import os +import tempfile +import unittest + +import gemato.recursiveloader + + +class BasicNestingTest(unittest.TestCase): + DIRS = ['sub', 'sub/deeper'] + FILES = { + 'Manifest': u''' +MANIFEST sub/Manifest 65 MD5 6af76e314820a44aba2b4bd3e6280c20 +''', + 'sub/Manifest': u''' +MANIFEST deeper/Manifest 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + 'sub/deeper/Manifest': u'', + } + + def setUp(self): + self.dir = tempfile.mkdtemp() + for k in self.DIRS: + os.mkdir(os.path.join(self.dir, k)) + for k, v in self.FILES.items(): + with io.open(os.path.join(self.dir, k), 'w', encoding='utf8') as f: + f.write(v) + + def tearDown(self): + for k in self.FILES: + os.unlink(os.path.join(self.dir, k)) + for k in reversed(self.DIRS): + os.rmdir(os.path.join(self.dir, k)) + os.rmdir(self.dir) + + def test_init(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertIn('Manifest', m.loaded_manifests) + + def test_load_sub_manifest(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertNotIn('sub/Manifest', m.loaded_manifests) + m.load_manifests_for_path('sub/test') + self.assertIn('sub/Manifest', m.loaded_manifests) + self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests) + m.load_manifests_for_path('sub/deeper/test') + self.assertIn('sub/deeper/Manifest', m.loaded_manifests) + + def test_recursive_load_manifest(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertNotIn('sub/Manifest', m.loaded_manifests) + self.assertNotIn('sub/deeper/Manifest', m.loaded_manifests) + m.load_manifests_for_path('sub/deeper/test') + self.assertIn('sub/Manifest', m.loaded_manifests) + self.assertIn('sub/deeper/Manifest', m.loaded_manifests) + + +class MultipleManifestTest(unittest.TestCase): + DIRS = ['sub'] + FILES = { + 'Manifest': u''' +MANIFEST sub/Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e +MANIFEST sub/Manifest.b 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + 'sub/Manifest.a': u'', + 'sub/Manifest.b': u'', + } + + def setUp(self): + self.dir = tempfile.mkdtemp() + for k in self.DIRS: + os.mkdir(os.path.join(self.dir, k)) + for k, v in self.FILES.items(): + with io.open(os.path.join(self.dir, k), 'w', encoding='utf8') as f: + f.write(v) + + def tearDown(self): + for k in self.FILES: + os.unlink(os.path.join(self.dir, k)) + for k in reversed(self.DIRS): + os.rmdir(os.path.join(self.dir, k)) + os.rmdir(self.dir) + + def test_load_sub_manifest(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + self.assertNotIn('sub/Manifest.a', m.loaded_manifests) + self.assertNotIn('sub/Manifest.b', m.loaded_manifests) + m.load_manifests_for_path('sub/test') + self.assertIn('sub/Manifest.a', m.loaded_manifests) + self.assertIn('sub/Manifest.b', m.loaded_manifests) + + +class MultipleTopLevelManifestTest(unittest.TestCase): + FILES = { + 'Manifest': u''' +MANIFEST Manifest.a 0 MD5 d41d8cd98f00b204e9800998ecf8427e +MANIFEST Manifest.b 0 MD5 d41d8cd98f00b204e9800998ecf8427e +''', + 'Manifest.a': u'', + 'Manifest.b': u'', + } + + def setUp(self): + self.dir = tempfile.mkdtemp() + for k, v in self.FILES.items(): + with io.open(os.path.join(self.dir, k), 'w', encoding='utf8') as f: + f.write(v) + + def tearDown(self): + for k in self.FILES: + os.unlink(os.path.join(self.dir, k)) + os.rmdir(self.dir) + + def test_load_extra_manifests(self): + m = gemato.recursiveloader.ManifestRecursiveLoader( + os.path.join(self.dir, 'Manifest')) + m.load_manifests_for_path('') + self.assertIn('Manifest.a', m.loaded_manifests) + self.assertIn('Manifest.b', m.loaded_manifests) |