diff options
author | Michał Górny <mgorny@gentoo.org> | 2018-03-16 12:16:44 +0100 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2018-03-16 12:16:44 +0100 |
commit | 6dff07c2f93212d50c3bb4471e2ce5d18d63a54a (patch) | |
tree | cd71e6ce4c208e78d5d495daff4b3fa26a0ff55c | |
parent | ac4eb9de6692735cfcba22dddbf95b2c13477a82 (diff) | |
download | gemato-6dff07c2f93212d50c3bb4471e2ce5d18d63a54a.tar.gz |
Introduce a context manager for multiprocessing.Pool
-rw-r--r-- | gemato/recursiveloader.py | 20 | ||||
-rw-r--r-- | gemato/util.py | 25 |
2 files changed, 26 insertions, 19 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index 0969554..99cea81 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -4,7 +4,6 @@ # Licensed under the terms of 2-clause BSD license import errno -import multiprocessing import os.path import sys @@ -382,9 +381,7 @@ class ManifestRecursiveLoader(object): unconditionally of whether they match parent checksums. """ - pool = multiprocessing.Pool(processes=self.max_jobs) - - try: + with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool: # TODO: figure out how to avoid confusing uses of 'recursive' while True: to_load = [] @@ -410,11 +407,6 @@ class ManifestRecursiveLoader(object): chunksize=16) self.loaded_manifests.update(manifests) - pool.close() - pool.join() - finally: - pool.terminate() - def find_timestamp(self): """ Find a timestamp entry and return it. Returns None if there @@ -657,9 +649,7 @@ class ManifestRecursiveLoader(object): self.manifest_device, fail_handler, last_mtime) - pool = multiprocessing.Pool(processes=self.max_jobs) - - try: + with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool: # verify the directories in parallel if sys.hexversion >= 0x03050400: ret = all(pool.imap_unordered(verifier, _walk_directory(it), @@ -670,8 +660,6 @@ class ManifestRecursiveLoader(object): ret = all(pool.map(verifier, _walk_directory(it), chunksize=64)) - pool.close() - # check for missing directories for relpath, dirdict in entry_dict.items(): for f, e in dirdict.items(): @@ -679,10 +667,6 @@ class ManifestRecursiveLoader(object): syspath = os.path.join(self.root_directory, fpath) ret &= verifier._verify_one_file(syspath, fpath, e) - pool.join() - finally: - pool.terminate() - return ret def save_manifests(self, hashes=None, force=False, sort=None, diff --git a/gemato/util.py b/gemato/util.py index 0f790ce..977cd26 100644 --- a/gemato/util.py +++ b/gemato/util.py @@ -1,8 +1,31 @@ # gemato: Utility functions # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2018 Michał Górny # Licensed under the terms of 2-clause BSD license +import multiprocessing + + +class MultiprocessingPoolWrapper(object): + """ + A portability wrapper for multiprocessing.Pool that supports + context manager API (and any future hacks we might need). + """ + + __slots__ = ['pool'] + + def __init__(self, processes): + self.pool = multiprocessing.Pool(processes=processes) + + def __enter__(self): + return self.pool + + def __exit__(self, exc_type, exc_value, exc_cb): + if exc_type is None: + self.pool.close() + self.pool.join() + self.pool.terminate() + def path_starts_with(path, prefix): """ |