diff options
-rw-r--r-- | gemato/recursiveloader.py | 11 | ||||
-rw-r--r-- | gemato/util.py | 18 |
2 files changed, 19 insertions, 10 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index 99cea81..0b98c3a 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -5,7 +5,6 @@ import errno import os.path -import sys import gemato.compression import gemato.exceptions @@ -651,14 +650,8 @@ class ManifestRecursiveLoader(object): with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool: # verify the directories in parallel - if sys.hexversion >= 0x03050400: - ret = all(pool.imap_unordered(verifier, _walk_directory(it), - chunksize=64)) - else: - # in py<3.5.4 imap() swallows exceptions, so fall back - # to regular map() [it's only a little slower] - ret = all(pool.map(verifier, _walk_directory(it), - chunksize=64)) + ret = all(pool.imap_unordered(verifier, _walk_directory(it), + chunksize=64)) # check for missing directories for relpath, dirdict in entry_dict.items(): diff --git a/gemato/util.py b/gemato/util.py index 977cd26..eb052b6 100644 --- a/gemato/util.py +++ b/gemato/util.py @@ -4,6 +4,7 @@ # Licensed under the terms of 2-clause BSD license import multiprocessing +import sys class MultiprocessingPoolWrapper(object): @@ -18,7 +19,7 @@ class MultiprocessingPoolWrapper(object): self.pool = multiprocessing.Pool(processes=processes) def __enter__(self): - return self.pool + return self def __exit__(self, exc_type, exc_value, exc_cb): if exc_type is None: @@ -26,6 +27,21 @@ class MultiprocessingPoolWrapper(object): self.pool.join() self.pool.terminate() + def map(self, *args, **kwargs): + return self.pool.map(*args, **kwargs) + + def imap_unordered(self, *args, **kwargs): + """ + Use imap_unordered() if available and safe to use. Fall back + to regular map() otherwise. + """ + if sys.hexversion >= 0x03050400: + return self.pool.imap_unordered(*args, **kwargs) + else: + # in py<3.5.4 imap() swallows exceptions, so fall back + # to regular map() + return self.pool.map(*args, **kwargs) + def path_starts_with(path, prefix): """ |