summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2017-10-25 13:42:50 +0200
committerMichał Górny <mgorny@gentoo.org>2017-10-25 13:56:24 +0200
commitad5a8f6a7c084df7ce5a31dc2c72d69fc6398799 (patch)
treeb37988379bbb61f8354b2538d4b652227c3efcf9
parent6b9ccd25950b69b089e4b15e364c0c2b740dd9ad (diff)
downloadgemato-ad5a8f6a7c084df7ce5a31dc2c72d69fc6398799.tar.gz
Support verifying OpenPGP signatures
-rw-r--r--gemato/exceptions.py10
-rw-r--r--gemato/openpgp.py92
-rw-r--r--tests/test_openpgp.py70
3 files changed, 171 insertions, 1 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index 790ff53..c42bd12 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -61,3 +61,13 @@ class ManifestUnsignedData(Exception):
def __init__(self):
super(ManifestUnsignedData, self).__init__(
"Unsigned data found in an OpenPGP signed Manifest")
+
+
+class OpenPGPVerificationFailure(Exception):
+ """
+ An exception raised when OpenPGP verification fails.
+ """
+
+ def __init__(self, output):
+ super(OpenPGPVerificationFailure, self).__init__(
+ "OpenPGP verification failed:\n{}".format(output))
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
new file mode 100644
index 0000000..4ba1ad3
--- /dev/null
+++ b/gemato/openpgp.py
@@ -0,0 +1,92 @@
+# gemato: OpenPGP verification support
+# vim:fileencoding=utf-8
+# (c) 2017 Michał Górny
+# Licensed under the terms of 2-clause BSD license
+
+import shutil
+import subprocess
+import tempfile
+
+import gemato.exceptions
+
+
+class OpenPGPEnvironment(object):
+ """
+ An isolated environment for OpenPGP routines. Used to get reliable
+ verification results independently of user configuration.
+
+ Remember to close() in order to clean up the temporary directory,
+ or use as a context manager (via 'with').
+ """
+
+ def __init__(self):
+ self._home = tempfile.mkdtemp()
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self, exc_type, exc_value, exc_cb):
+ if self._home is not None:
+ self.close()
+ self._home = None
+
+ def close(self):
+ shutil.rmtree(self._home)
+
+ def import_key(self, keyfile):
+ """
+ Import a public key from open file @keyfile. The file should
+ be open for reading in binary mode, and oriented
+ at the beginning.
+ """
+
+ p = subprocess.Popen(['gpg', '--import', '--batch'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env={'HOME': self.home})
+ out, err = p.communicate(keyfile.read())
+
+ if p.wait() != 0:
+ raise RuntimeError('Unable to import key: {}'.format(err.decode('utf8')))
+
+ def verify_file(self, f):
+ """
+ A convenience wrapper for verify_file(), using this environment.
+ """
+
+ verify_file(f, env=self)
+
+ @property
+ def home(self):
+ if self._home is None:
+ raise RuntimeError(
+ 'OpenPGPEnvironment must be used via context manager')
+ return self._home
+
+
+def verify_file(f, env=None):
+ """
+ Perform an OpenPGP verification of Manifest data in open file @f.
+ The file should be open in binary mode and set at the beginning
+ (or start of signed part). Raises an exception if the verification
+ fails.
+
+ Note that this function does not distinguish whether the key
+ is trusted, and is subject to user configuration. To get reliable
+ results, prepare a dedicated OpenPGPEnvironment and pass it as @env.
+ """
+
+ penv = None
+ if env is not None:
+ penv = {'HOME': env.home}
+
+ p = subprocess.Popen(['gpg', '--verify', '--batch'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=penv)
+ out, err = p.communicate(f.read())
+
+ if p.wait() != 0:
+ raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8'))
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index b829078..dd2cef1 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -7,7 +7,28 @@ import io
import unittest
import gemato.manifest
-
+import gemato.openpgp
+
+
+PUBLIC_KEY = u'''
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+
+mQENBFnwXJMBCACgaTVz+d10TGL9zR920sb0GBFsitAJ5ZFzO4E0cg3SHhwI+reM
+JQ6LLKmHowY/E1dl5FBbnJoRMxXP7/eScQ7HlhYj1gMPN5XiS2pkPwVkmJKBDV42
+DLwoytC+ot0frRTJvSdEPCX81BNMgFiBSpkeZfXqb9XmU03bh6mFnrdd4CsHpTQG
+csVXHK8QKhaxuqmHTALdpSzKCb/r0N/Z3sQExZhfLcBf/9UUVXj44Nwc6ooqZLRi
+zHydxwQdxNu0aOFGEBn9WTi8Slf7MfR/pF0dI8rs9w6zMzVEq0lhDPpKFGDveoGf
+g/+TpvBNXZ7DWH23GM4kID3pk4LLMc24U1PhABEBAAG0D2dlbWF0byB0ZXN0IGtl
+eYkBRgQTAQoAMBYhBIHhLBa9jc1gvhgIRRNogOcqexOEBQJZ8FyTAhsDBQsJCg0E
+AxUKCAIeAQIXgAAKCRATaIDnKnsThCnkB/0fhTH230idhlfZhFbVgTLxrj4rpsGg
+20K8HkMaWzChsONdKkqYaYuRcm2UQZ0Kg5rm9jQsGYuAnzH/7XwmOleY95ycVfBk
+je9aXF6BEoGick6C/AK5w77vd1kcBtJDrT4I7vwD4wRkyUdCkpVMVT4z4aZ7lHJ4
+ECrrrI/mg0b+sGRyHfXPvIPp7F2959L/dpbhBZDfMOFC0A9LBQBJldKFbQLg3xzX
+4tniz/BBrp7KjTOMKU0sufsedI50xc6cvCYCwJElqo86vv69klZHahE/k9nJaUAM
+jCvJNJ7pU8YnJSRTQDH0PZEupAdzDU/AhGSrBz5+Jr7N0pQIxq4duE/Q
+=r7JK
+-----END PGP PUBLIC KEY BLOCK-----
+'''
SIGNED_MANIFEST = u'''
-----BEGIN PGP SIGNED MESSAGE-----
@@ -129,3 +150,50 @@ class SignedManifestTest(unittest.TestCase):
with io.StringIO(SIGNED_MANIFEST + 'OPTIONAL test\n') as f:
self.assertRaises(gemato.exceptions.ManifestUnsignedData,
m.load, f)
+
+
+class OpenPGPCorrectKeyTest(unittest.TestCase):
+ """
+ Tests performed with correct OpenPGP key set.
+ """
+
+ def setUp(self):
+ self.env = gemato.openpgp.OpenPGPEnvironment()
+ try:
+ self.env.import_key(
+ io.BytesIO(PUBLIC_KEY.encode('utf8')))
+ except RuntimeError:
+ raise unittest.SkipTest('Unable to import OpenPGP key')
+
+ def tearDown(self):
+ self.env.close()
+
+ def test_verify_manifest(self):
+ with io.BytesIO(SIGNED_MANIFEST.encode('utf8')) as f:
+ self.env.verify_file(f)
+
+ def test_verify_dash_escaped_manifest(self):
+ with io.BytesIO(DASH_ESCAPED_SIGNED_MANIFEST.encode('utf8')) as f:
+ self.env.verify_file(f)
+
+ def test_verify_modified_manifest(self):
+ with io.BytesIO(MODIFIED_SIGNED_MANIFEST.encode('utf8')) as f:
+ self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
+ self.env.verify_file, f)
+
+
+class OpenPGPNoKeyTest(unittest.TestCase):
+ """
+ Tests performed without correct OpenPGP key set.
+ """
+
+ def setUp(self):
+ self.env = gemato.openpgp.OpenPGPEnvironment()
+
+ def tearDown(self):
+ self.env.close()
+
+ def test_verify_manifest(self):
+ with io.BytesIO(SIGNED_MANIFEST.encode('utf8')) as f:
+ self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
+ self.env.verify_file, f)