summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/exceptions.py11
-rw-r--r--gemato/openpgp.py48
-rw-r--r--tests/test_openpgp.py9
3 files changed, 46 insertions, 22 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index c42bd12..9edf47d 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -71,3 +71,14 @@ class OpenPGPVerificationFailure(Exception):
def __init__(self, output):
super(OpenPGPVerificationFailure, self).__init__(
"OpenPGP verification failed:\n{}".format(output))
+
+
+class OpenPGPNoImplementation(Exception):
+ """
+ An exception raised when no supported OpenPGP implementation
+ is available.
+ """
+
+ def __init__(self):
+ super(OpenPGPNoImplementation, self).__init__(
+ "No supported OpenPGP implementation found (install gnupg)")
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 4ba1ad3..adb4a5b 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -3,6 +3,7 @@
# (c) 2017 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import errno
import shutil
import subprocess
import tempfile
@@ -10,6 +11,27 @@ import tempfile
import gemato.exceptions
+def _spawn_gpg(options, home, stdin):
+ env = None
+ if home is not None:
+ env={'HOME': home}
+
+ try:
+ p = subprocess.Popen(['gpg', '--batch'] + options,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env)
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ raise gemato.exceptions.OpenPGPNoImplementation()
+ else:
+ raise
+
+ out, err = p.communicate(stdin.read())
+ return (p.wait(), out, err)
+
+
class OpenPGPEnvironment(object):
"""
An isolated environment for OpenPGP routines. Used to get reliable
@@ -40,14 +62,8 @@ class OpenPGPEnvironment(object):
at the beginning.
"""
- p = subprocess.Popen(['gpg', '--import', '--batch'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env={'HOME': self.home})
- out, err = p.communicate(keyfile.read())
-
- if p.wait() != 0:
+ exitst, out, err = _spawn_gpg(['--import'], self.home, keyfile)
+ if exitst != 0:
raise RuntimeError('Unable to import key: {}'.format(err.decode('utf8')))
def verify_file(self, f):
@@ -77,16 +93,8 @@ def verify_file(f, env=None):
results, prepare a dedicated OpenPGPEnvironment and pass it as @env.
"""
- penv = None
- if env is not None:
- penv = {'HOME': env.home}
-
- p = subprocess.Popen(['gpg', '--verify', '--batch'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=penv)
- out, err = p.communicate(f.read())
-
- if p.wait() != 0:
+ exitst, out, err = _spawn_gpg(['--verify'],
+ env.home if env is not None else None,
+ f)
+ if exitst != 0:
raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8'))
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index dd2cef1..dc2eafc 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -162,6 +162,8 @@ class OpenPGPCorrectKeyTest(unittest.TestCase):
try:
self.env.import_key(
io.BytesIO(PUBLIC_KEY.encode('utf8')))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))
except RuntimeError:
raise unittest.SkipTest('Unable to import OpenPGP key')
@@ -195,5 +197,8 @@ class OpenPGPNoKeyTest(unittest.TestCase):
def test_verify_manifest(self):
with io.BytesIO(SIGNED_MANIFEST.encode('utf8')) as f:
- self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
- self.env.verify_file, f)
+ try:
+ self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
+ self.env.verify_file, f)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))