summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/recursiveloader.py11
-rw-r--r--gemato/util.py18
2 files changed, 19 insertions, 10 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index 99cea81..0b98c3a 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -5,7 +5,6 @@
import errno
import os.path
-import sys
import gemato.compression
import gemato.exceptions
@@ -651,14 +650,8 @@ class ManifestRecursiveLoader(object):
with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool:
# verify the directories in parallel
- if sys.hexversion >= 0x03050400:
- ret = all(pool.imap_unordered(verifier, _walk_directory(it),
- chunksize=64))
- else:
- # in py<3.5.4 imap() swallows exceptions, so fall back
- # to regular map() [it's only a little slower]
- ret = all(pool.map(verifier, _walk_directory(it),
- chunksize=64))
+ ret = all(pool.imap_unordered(verifier, _walk_directory(it),
+ chunksize=64))
# check for missing directories
for relpath, dirdict in entry_dict.items():
diff --git a/gemato/util.py b/gemato/util.py
index 977cd26..eb052b6 100644
--- a/gemato/util.py
+++ b/gemato/util.py
@@ -4,6 +4,7 @@
# Licensed under the terms of 2-clause BSD license
import multiprocessing
+import sys
class MultiprocessingPoolWrapper(object):
@@ -18,7 +19,7 @@ class MultiprocessingPoolWrapper(object):
self.pool = multiprocessing.Pool(processes=processes)
def __enter__(self):
- return self.pool
+ return self
def __exit__(self, exc_type, exc_value, exc_cb):
if exc_type is None:
@@ -26,6 +27,21 @@ class MultiprocessingPoolWrapper(object):
self.pool.join()
self.pool.terminate()
+ def map(self, *args, **kwargs):
+ return self.pool.map(*args, **kwargs)
+
+ def imap_unordered(self, *args, **kwargs):
+ """
+ Use imap_unordered() if available and safe to use. Fall back
+ to regular map() otherwise.
+ """
+ if sys.hexversion >= 0x03050400:
+ return self.pool.imap_unordered(*args, **kwargs)
+ else:
+ # in py<3.5.4 imap() swallows exceptions, so fall back
+ # to regular map()
+ return self.pool.map(*args, **kwargs)
+
def path_starts_with(path, prefix):
"""