diff options
-rw-r--r-- | gemato/exceptions.py | 11 | ||||
-rw-r--r-- | gemato/openpgp.py | 48 | ||||
-rw-r--r-- | tests/test_openpgp.py | 9 |
3 files changed, 46 insertions, 22 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py index c42bd12..9edf47d 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -71,3 +71,14 @@ class OpenPGPVerificationFailure(Exception): def __init__(self, output): super(OpenPGPVerificationFailure, self).__init__( "OpenPGP verification failed:\n{}".format(output)) + + +class OpenPGPNoImplementation(Exception): + """ + An exception raised when no supported OpenPGP implementation + is available. + """ + + def __init__(self): + super(OpenPGPNoImplementation, self).__init__( + "No supported OpenPGP implementation found (install gnupg)") diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 4ba1ad3..adb4a5b 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -3,6 +3,7 @@ # (c) 2017 Michał Górny # Licensed under the terms of 2-clause BSD license +import errno import shutil import subprocess import tempfile @@ -10,6 +11,27 @@ import tempfile import gemato.exceptions +def _spawn_gpg(options, home, stdin): + env = None + if home is not None: + env={'HOME': home} + + try: + p = subprocess.Popen(['gpg', '--batch'] + options, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env) + except OSError as e: + if e.errno == errno.ENOENT: + raise gemato.exceptions.OpenPGPNoImplementation() + else: + raise + + out, err = p.communicate(stdin.read()) + return (p.wait(), out, err) + + class OpenPGPEnvironment(object): """ An isolated environment for OpenPGP routines. Used to get reliable @@ -40,14 +62,8 @@ class OpenPGPEnvironment(object): at the beginning. """ - p = subprocess.Popen(['gpg', '--import', '--batch'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env={'HOME': self.home}) - out, err = p.communicate(keyfile.read()) - - if p.wait() != 0: + exitst, out, err = _spawn_gpg(['--import'], self.home, keyfile) + if exitst != 0: raise RuntimeError('Unable to import key: {}'.format(err.decode('utf8'))) def verify_file(self, f): @@ -77,16 +93,8 @@ def verify_file(f, env=None): results, prepare a dedicated OpenPGPEnvironment and pass it as @env. """ - penv = None - if env is not None: - penv = {'HOME': env.home} - - p = subprocess.Popen(['gpg', '--verify', '--batch'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=penv) - out, err = p.communicate(f.read()) - - if p.wait() != 0: + exitst, out, err = _spawn_gpg(['--verify'], + env.home if env is not None else None, + f) + if exitst != 0: raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8')) diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index dd2cef1..dc2eafc 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -162,6 +162,8 @@ class OpenPGPCorrectKeyTest(unittest.TestCase): try: self.env.import_key( io.BytesIO(PUBLIC_KEY.encode('utf8'))) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) except RuntimeError: raise unittest.SkipTest('Unable to import OpenPGP key') @@ -195,5 +197,8 @@ class OpenPGPNoKeyTest(unittest.TestCase): def test_verify_manifest(self): with io.BytesIO(SIGNED_MANIFEST.encode('utf8')) as f: - self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, - self.env.verify_file, f) + try: + self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, + self.env.verify_file, f) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) |