diff options
author | Michał Górny <mgorny@gentoo.org> | 2017-12-02 13:53:36 +0100 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2017-12-02 15:15:26 +0100 |
commit | 8bf76e87fff8f048a5be4d93358393b4fe86115c (patch) | |
tree | a53ebaa7744279e8649c9896a2dbcf4f61ee8fdb | |
parent | 1a7a0dae18c619d27e31ad40fb35a9c7b49286da (diff) | |
download | gemato-8bf76e87fff8f048a5be4d93358393b4fe86115c.tar.gz |
recursiveloader: Use multiprocessing to load Manifests in parallel
-rw-r--r-- | gemato/recursiveloader.py | 39 |
1 files changed, 35 insertions, 4 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index a8ee8e4..df3743c 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -4,6 +4,7 @@ # Licensed under the terms of 2-clause BSD license import errno +import multiprocessing import os.path import gemato.compression @@ -61,6 +62,18 @@ class ManifestLoader(object): return m, st + def __call__(self, args): + """ + Load the Manifest file by passing @args to verify_and_load() + method. @args should be an iterable specifying the file relative + path and verification entry (or None). + + Returns a tuple of (relpath, ManifestFile instance). + + Intended to be used via multiprocessing.Pool.map(). + """ + return (args[0], self.verify_and_load(*args)[0]) + class ManifestRecursiveLoader(object): """ @@ -86,7 +99,8 @@ class ManifestRecursiveLoader(object): 'top_level_manifest_filename', 'loaded_manifests', 'updated_manifests', - 'manifest_device' + 'manifest_device', + 'max_jobs', ] def __init__(self, top_manifest_path, @@ -94,7 +108,8 @@ class ManifestRecursiveLoader(object): sign_openpgp=None, openpgp_keyid=None, hashes=None, allow_create=False, sort=None, compress_watermark=None, compress_format=None, - profile=gemato.profile.DefaultProfile()): + profile=gemato.profile.DefaultProfile(), + max_jobs=None): """ Instantiate the loader for a Manifest tree starting at top-level Manifest @top_manifest_path. @@ -134,6 +149,11 @@ class ManifestRecursiveLoader(object): The default @compress_format is 'gz'. @profile can be used to provide the profile for the repository. + + @max_jobs defines the number of subprocesses that can be spawned + to optimize some operations. If None (the default), the number + will automatically be determined based on CPU count. Otherwise, + the specified number will be used. """ self.root_directory = os.path.dirname(top_manifest_path) @@ -145,6 +165,7 @@ class ManifestRecursiveLoader(object): self.sort = sort self.compress_watermark = compress_watermark self.compress_format = compress_format + self.max_jobs = max_jobs self.profile.set_loader_options(self) @@ -276,6 +297,9 @@ class ManifestRecursiveLoader(object): on mismatch. Otherwise, sub-Manifests will be loaded unconditionally of whether they match parent checksums. """ + + pool = multiprocessing.Pool(processes=self.max_jobs) + # TODO: figure out how to avoid confusing uses of 'recursive' while True: to_load = [] @@ -288,14 +312,21 @@ class ManifestRecursiveLoader(object): if curmpath == mpath or mpath in self.loaded_manifests: continue mdir = os.path.dirname(mpath) + if not verify: + e = None if gemato.util.path_starts_with(path, mdir): to_load.append((mpath, e)) elif recursive and gemato.util.path_starts_with(mdir, path): to_load.append((mpath, e)) if not to_load: break - for mpath, e in to_load: - self.load_manifest(mpath, verify_entry=e if verify else None) + + manifests = pool.map(self.manifest_loader, to_load, + chunksize=16) + self.loaded_manifests.update(manifests) + + pool.close() + pool.join() def find_timestamp(self): """ |