diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-11-25 00:55:44 +0100 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-12-03 00:55:50 +0100 |
commit | ff458243cf0f34bec5be99c2b44af58d91fbc60f (patch) | |
tree | 4e91633565171544b4372d4e93e82cb9b23372b3 | |
parent | b26e4261da01d33960f569f8105621ee714e6b5b (diff) | |
download | gemato-ff458243cf0f34bec5be99c2b44af58d91fbc60f.tar.gz |
recursiveloader: Use multiprocessing for assert_directory_verifies()
-rw-r--r-- | gemato/recursiveloader.py | 198 |
1 files changed, 135 insertions, 63 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index baa7c24..c9a6380 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -6,6 +6,7 @@ import errno import multiprocessing import os.path +import sys import gemato.compression import gemato.exceptions @@ -75,6 +76,76 @@ class ManifestLoader(object): return (args[0], self.verify_and_load(*args)[0]) +class SubprocessVerifier(object): + """ + Helper class used to verify directories in subprocesses. + """ + + __slots__ = ['top_level_manifest_filename', + 'manifest_device', 'fail_handler', 'last_mtime'] + + def __init__(self, top_level_manifest_filename, + manifest_device, fail_handler, last_mtime): + self.top_level_manifest_filename = top_level_manifest_filename + self.manifest_device = manifest_device + self.fail_handler = fail_handler + self.last_mtime = last_mtime + + def _verify_one_file(self, path, relpath, e): + ret, diff = gemato.verify.verify_path(path, e, + expected_dev=self.manifest_device, + last_mtime=self.last_mtime) + + if not ret: + err = gemato.exceptions.ManifestMismatch(relpath, e, diff) + ret = self.fail_handler(err) + if ret is None: + ret = True + + return ret + + def __call__(self, vals): + """ + Verify the specified directory and return the boolean value + (or raise an exception). + """ + + ret = True + dirpath, relpath, dirnames, filenames, dirdict = vals + + for d in dirnames: + # we already stripped ignored directories in walker, + # so go straight for verification + de = dirdict.pop(d, None) + if de is not None: + dpath = os.path.join(relpath, d) + ret &= self._verify_one_file(os.path.join(dirpath, d), + dpath, de) + + for f in filenames: + # skip dotfiles + if f.startswith('.'): + continue + + fpath = os.path.join(relpath, f) + # skip top-level Manifest, we obviously can't have + # an entry for it + if fpath == self.top_level_manifest_filename: + continue + fe = dirdict.pop(f, None) + ret &= self._verify_one_file(os.path.join(dirpath, f), + fpath, fe) + + + # check for missing files + for f, e in dirdict.items(): + fpath = os.path.join(relpath, f) + ret &= self._verify_one_file(os.path.join(dirpath, f), + fpath, e) + + return ret + + class ManifestRecursiveLoader(object): """ A class encapsulating a tree covered by multiple Manifests. @@ -480,23 +551,9 @@ class ManifestRecursiveLoader(object): dirout[filename] = e return out - def _verify_one_file(self, path, relpath, e, fail_handler, - last_mtime): - ret, diff = gemato.verify.verify_path(path, e, - expected_dev=self.manifest_device, - last_mtime=last_mtime) - - if not ret: - err = gemato.exceptions.ManifestMismatch(relpath, e, diff) - ret = fail_handler(err) - if ret is None: - ret = True - - return ret - def assert_directory_verifies(self, path='', fail_handler=gemato.util.throw_exception, - last_mtime=None): + last_mtime=None, jobs=None): """ Verify the complete directory tree starting at @path (relative to top Manifest directory). Includes testing for stray files. @@ -518,68 +575,83 @@ class ManifestRecursiveLoader(object): than that value (in st_mtime format) will be checked. Use this option *only* if mtimes can not be manipulated (i.e. do not use it with 'rsync --times')! + + @jobs specifies the number of parallel jobs to use. If set + to None (the default), the number of system CPUs will be used. """ entry_dict = self.get_file_entry_dict(path) it = os.walk(os.path.join(self.root_directory, path), onerror=gemato.util.throw_exception, followlinks=True) - ret = True - for dirpath, dirnames, filenames in it: - relpath = os.path.relpath(dirpath, self.root_directory) - # strip dot to avoid matching problems - if relpath == '.': - relpath = '' - dirdict = entry_dict.get(relpath, {}) + def _walk_directory(it): + """ + Pre-process os.walk() result for verification. Yield objects + suitable to passing to subprocesses. + """ + for dirpath, dirnames, filenames in it: + relpath = os.path.relpath(dirpath, self.root_directory) + # strip dot to avoid matching problems + if relpath == '.': + relpath = '' + dirdict = entry_dict.pop(relpath, {}) + + skip_dirs = [] + for d in dirnames: + # skip dotfiles + if d.startswith('.'): + skip_dirs.append(d) + continue - skip_dirs = [] - for d in dirnames: - # skip dotfiles - if d.startswith('.'): - skip_dirs.append(d) - continue + de = dirdict.get(d) + if de is None: + syspath = os.path.join(dirpath, d) + st = os.stat(syspath) + if st.st_dev != self.manifest_device: + raise gemato.exceptions.ManifestCrossDevice(syspath) + continue - de = dirdict.pop(d, None) - if de is None: - syspath = os.path.join(dirpath, d) - st = os.stat(syspath) - if st.st_dev != self.manifest_device: - raise gemato.exceptions.ManifestCrossDevice(syspath) - continue + if de.tag == 'IGNORE': + skip_dirs.append(d) + del dirdict[d] - if de.tag == 'IGNORE': - skip_dirs.append(d) - else: - dpath = os.path.join(relpath, d) - ret &= self._verify_one_file(os.path.join(dirpath, d), - dpath, de, fail_handler, last_mtime) + # skip scanning ignored directories + for d in skip_dirs: + dirnames.remove(d) - # skip scanning ignored directories - for d in skip_dirs: - dirnames.remove(d) + yield (dirpath, relpath, dirnames, filenames, dirdict) - for f in filenames: - # skip dotfiles - if f.startswith('.'): - continue + verifier = SubprocessVerifier( + self.top_level_manifest_filename, + self.manifest_device, + fail_handler, last_mtime) - fpath = os.path.join(relpath, f) - # skip top-level Manifest, we obviously can't have - # an entry for it - if fpath == self.top_level_manifest_filename: - continue - fe = dirdict.pop(f, None) - ret &= self._verify_one_file(os.path.join(dirpath, f), - fpath, fe, fail_handler, last_mtime) + pool = multiprocessing.Pool(processes=self.max_jobs) - # check for missing files - for relpath, dirdict in entry_dict.items(): - for f, e in dirdict.items(): - fpath = os.path.join(relpath, f) - syspath = os.path.join(self.root_directory, fpath) - ret &= self._verify_one_file(syspath, fpath, e, - fail_handler, last_mtime) + try: + # verify the directories in parallel + if sys.hexversion >= 0x03050400: + ret = all(pool.imap_unordered(verifier, _walk_directory(it), + chunksize=64)) + else: + # in py<3.5 imap() swallows exceptions, so fall back + # to regular map() [it's only a little slower] + ret = all(pool.map(verifier, _walk_directory(it), + chunksize=64)) + + pool.close() + + # check for missing directories + for relpath, dirdict in entry_dict.items(): + for f, e in dirdict.items(): + fpath = os.path.join(relpath, f) + syspath = os.path.join(self.root_directory, fpath) + ret &= verifier._verify_one_file(syspath, fpath, e) + + pool.join() + finally: + pool.terminate() return ret |