summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/recursiveloader.py198
1 files changed, 135 insertions, 63 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index baa7c24..c9a6380 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -6,6 +6,7 @@
import errno
import multiprocessing
import os.path
+import sys
import gemato.compression
import gemato.exceptions
@@ -75,6 +76,76 @@ class ManifestLoader(object):
return (args[0], self.verify_and_load(*args)[0])
+class SubprocessVerifier(object):
+ """
+ Helper class used to verify directories in subprocesses.
+ """
+
+ __slots__ = ['top_level_manifest_filename',
+ 'manifest_device', 'fail_handler', 'last_mtime']
+
+ def __init__(self, top_level_manifest_filename,
+ manifest_device, fail_handler, last_mtime):
+ self.top_level_manifest_filename = top_level_manifest_filename
+ self.manifest_device = manifest_device
+ self.fail_handler = fail_handler
+ self.last_mtime = last_mtime
+
+ def _verify_one_file(self, path, relpath, e):
+ ret, diff = gemato.verify.verify_path(path, e,
+ expected_dev=self.manifest_device,
+ last_mtime=self.last_mtime)
+
+ if not ret:
+ err = gemato.exceptions.ManifestMismatch(relpath, e, diff)
+ ret = self.fail_handler(err)
+ if ret is None:
+ ret = True
+
+ return ret
+
+ def __call__(self, vals):
+ """
+ Verify the specified directory and return the boolean value
+ (or raise an exception).
+ """
+
+ ret = True
+ dirpath, relpath, dirnames, filenames, dirdict = vals
+
+ for d in dirnames:
+ # we already stripped ignored directories in walker,
+ # so go straight for verification
+ de = dirdict.pop(d, None)
+ if de is not None:
+ dpath = os.path.join(relpath, d)
+ ret &= self._verify_one_file(os.path.join(dirpath, d),
+ dpath, de)
+
+ for f in filenames:
+ # skip dotfiles
+ if f.startswith('.'):
+ continue
+
+ fpath = os.path.join(relpath, f)
+ # skip top-level Manifest, we obviously can't have
+ # an entry for it
+ if fpath == self.top_level_manifest_filename:
+ continue
+ fe = dirdict.pop(f, None)
+ ret &= self._verify_one_file(os.path.join(dirpath, f),
+ fpath, fe)
+
+
+ # check for missing files
+ for f, e in dirdict.items():
+ fpath = os.path.join(relpath, f)
+ ret &= self._verify_one_file(os.path.join(dirpath, f),
+ fpath, e)
+
+ return ret
+
+
class ManifestRecursiveLoader(object):
"""
A class encapsulating a tree covered by multiple Manifests.
@@ -480,23 +551,9 @@ class ManifestRecursiveLoader(object):
dirout[filename] = e
return out
- def _verify_one_file(self, path, relpath, e, fail_handler,
- last_mtime):
- ret, diff = gemato.verify.verify_path(path, e,
- expected_dev=self.manifest_device,
- last_mtime=last_mtime)
-
- if not ret:
- err = gemato.exceptions.ManifestMismatch(relpath, e, diff)
- ret = fail_handler(err)
- if ret is None:
- ret = True
-
- return ret
-
def assert_directory_verifies(self, path='',
fail_handler=gemato.util.throw_exception,
- last_mtime=None):
+ last_mtime=None, jobs=None):
"""
Verify the complete directory tree starting at @path (relative
to top Manifest directory). Includes testing for stray files.
@@ -518,68 +575,83 @@ class ManifestRecursiveLoader(object):
than that value (in st_mtime format) will be checked. Use this
option *only* if mtimes can not be manipulated (i.e. do not use
it with 'rsync --times')!
+
+ @jobs specifies the number of parallel jobs to use. If set
+ to None (the default), the number of system CPUs will be used.
"""
entry_dict = self.get_file_entry_dict(path)
it = os.walk(os.path.join(self.root_directory, path),
onerror=gemato.util.throw_exception,
followlinks=True)
- ret = True
- for dirpath, dirnames, filenames in it:
- relpath = os.path.relpath(dirpath, self.root_directory)
- # strip dot to avoid matching problems
- if relpath == '.':
- relpath = ''
- dirdict = entry_dict.get(relpath, {})
+ def _walk_directory(it):
+ """
+ Pre-process os.walk() result for verification. Yield objects
+ suitable to passing to subprocesses.
+ """
+ for dirpath, dirnames, filenames in it:
+ relpath = os.path.relpath(dirpath, self.root_directory)
+ # strip dot to avoid matching problems
+ if relpath == '.':
+ relpath = ''
+ dirdict = entry_dict.pop(relpath, {})
+
+ skip_dirs = []
+ for d in dirnames:
+ # skip dotfiles
+ if d.startswith('.'):
+ skip_dirs.append(d)
+ continue
- skip_dirs = []
- for d in dirnames:
- # skip dotfiles
- if d.startswith('.'):
- skip_dirs.append(d)
- continue
+ de = dirdict.get(d)
+ if de is None:
+ syspath = os.path.join(dirpath, d)
+ st = os.stat(syspath)
+ if st.st_dev != self.manifest_device:
+ raise gemato.exceptions.ManifestCrossDevice(syspath)
+ continue
- de = dirdict.pop(d, None)
- if de is None:
- syspath = os.path.join(dirpath, d)
- st = os.stat(syspath)
- if st.st_dev != self.manifest_device:
- raise gemato.exceptions.ManifestCrossDevice(syspath)
- continue
+ if de.tag == 'IGNORE':
+ skip_dirs.append(d)
+ del dirdict[d]
- if de.tag == 'IGNORE':
- skip_dirs.append(d)
- else:
- dpath = os.path.join(relpath, d)
- ret &= self._verify_one_file(os.path.join(dirpath, d),
- dpath, de, fail_handler, last_mtime)
+ # skip scanning ignored directories
+ for d in skip_dirs:
+ dirnames.remove(d)
- # skip scanning ignored directories
- for d in skip_dirs:
- dirnames.remove(d)
+ yield (dirpath, relpath, dirnames, filenames, dirdict)
- for f in filenames:
- # skip dotfiles
- if f.startswith('.'):
- continue
+ verifier = SubprocessVerifier(
+ self.top_level_manifest_filename,
+ self.manifest_device,
+ fail_handler, last_mtime)
- fpath = os.path.join(relpath, f)
- # skip top-level Manifest, we obviously can't have
- # an entry for it
- if fpath == self.top_level_manifest_filename:
- continue
- fe = dirdict.pop(f, None)
- ret &= self._verify_one_file(os.path.join(dirpath, f),
- fpath, fe, fail_handler, last_mtime)
+ pool = multiprocessing.Pool(processes=self.max_jobs)
- # check for missing files
- for relpath, dirdict in entry_dict.items():
- for f, e in dirdict.items():
- fpath = os.path.join(relpath, f)
- syspath = os.path.join(self.root_directory, fpath)
- ret &= self._verify_one_file(syspath, fpath, e,
- fail_handler, last_mtime)
+ try:
+ # verify the directories in parallel
+ if sys.hexversion >= 0x03050400:
+ ret = all(pool.imap_unordered(verifier, _walk_directory(it),
+ chunksize=64))
+ else:
+ # in py<3.5 imap() swallows exceptions, so fall back
+ # to regular map() [it's only a little slower]
+ ret = all(pool.map(verifier, _walk_directory(it),
+ chunksize=64))
+
+ pool.close()
+
+ # check for missing directories
+ for relpath, dirdict in entry_dict.items():
+ for f, e in dirdict.items():
+ fpath = os.path.join(relpath, f)
+ syspath = os.path.join(self.root_directory, fpath)
+ ret &= verifier._verify_one_file(syspath, fpath, e)
+
+ pool.join()
+ finally:
+ pool.terminate()
return ret