summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2017-12-02 13:53:36 +0100
committerMichał Górny <mgorny@gentoo.org>2017-12-02 15:15:26 +0100
commit8bf76e87fff8f048a5be4d93358393b4fe86115c (patch)
treea53ebaa7744279e8649c9896a2dbcf4f61ee8fdb
parent1a7a0dae18c619d27e31ad40fb35a9c7b49286da (diff)
downloadgemato-8bf76e87fff8f048a5be4d93358393b4fe86115c.tar.gz
recursiveloader: Use multiprocessing to load Manifests in parallel
-rw-r--r--gemato/recursiveloader.py39
1 files changed, 35 insertions, 4 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index a8ee8e4..df3743c 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -4,6 +4,7 @@
# Licensed under the terms of 2-clause BSD license
import errno
+import multiprocessing
import os.path
import gemato.compression
@@ -61,6 +62,18 @@ class ManifestLoader(object):
return m, st
+ def __call__(self, args):
+ """
+ Load the Manifest file by passing @args to verify_and_load()
+ method. @args should be an iterable specifying the file relative
+ path and verification entry (or None).
+
+ Returns a tuple of (relpath, ManifestFile instance).
+
+ Intended to be used via multiprocessing.Pool.map().
+ """
+ return (args[0], self.verify_and_load(*args)[0])
+
class ManifestRecursiveLoader(object):
"""
@@ -86,7 +99,8 @@ class ManifestRecursiveLoader(object):
'top_level_manifest_filename',
'loaded_manifests',
'updated_manifests',
- 'manifest_device'
+ 'manifest_device',
+ 'max_jobs',
]
def __init__(self, top_manifest_path,
@@ -94,7 +108,8 @@ class ManifestRecursiveLoader(object):
sign_openpgp=None, openpgp_keyid=None,
hashes=None, allow_create=False, sort=None,
compress_watermark=None, compress_format=None,
- profile=gemato.profile.DefaultProfile()):
+ profile=gemato.profile.DefaultProfile(),
+ max_jobs=None):
"""
Instantiate the loader for a Manifest tree starting at top-level
Manifest @top_manifest_path.
@@ -134,6 +149,11 @@ class ManifestRecursiveLoader(object):
The default @compress_format is 'gz'.
@profile can be used to provide the profile for the repository.
+
+ @max_jobs defines the number of subprocesses that can be spawned
+ to optimize some operations. If None (the default), the number
+ will automatically be determined based on CPU count. Otherwise,
+ the specified number will be used.
"""
self.root_directory = os.path.dirname(top_manifest_path)
@@ -145,6 +165,7 @@ class ManifestRecursiveLoader(object):
self.sort = sort
self.compress_watermark = compress_watermark
self.compress_format = compress_format
+ self.max_jobs = max_jobs
self.profile.set_loader_options(self)
@@ -276,6 +297,9 @@ class ManifestRecursiveLoader(object):
on mismatch. Otherwise, sub-Manifests will be loaded
unconditionally of whether they match parent checksums.
"""
+
+ pool = multiprocessing.Pool(processes=self.max_jobs)
+
# TODO: figure out how to avoid confusing uses of 'recursive'
while True:
to_load = []
@@ -288,14 +312,21 @@ class ManifestRecursiveLoader(object):
if curmpath == mpath or mpath in self.loaded_manifests:
continue
mdir = os.path.dirname(mpath)
+ if not verify:
+ e = None
if gemato.util.path_starts_with(path, mdir):
to_load.append((mpath, e))
elif recursive and gemato.util.path_starts_with(mdir, path):
to_load.append((mpath, e))
if not to_load:
break
- for mpath, e in to_load:
- self.load_manifest(mpath, verify_entry=e if verify else None)
+
+ manifests = pool.map(self.manifest_loader, to_load,
+ chunksize=16)
+ self.loaded_manifests.update(manifests)
+
+ pool.close()
+ pool.join()
def find_timestamp(self):
"""