summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-08-25 17:54:33 +0200
committerMichał Górny <mgorny@gentoo.org>2020-08-25 17:54:33 +0200
commit6bc1e7a70741f073194b88998efe7f0519074222 (patch)
tree7c97a681fb4c10b92975ca5058de828f999efe5a
parentf1c904feb96dac663fd3e93152c9febf8a8a640a (diff)
downloadgemato-6bc1e7a70741f073194b88998efe7f0519074222.tar.gz
openpgp: Reimplement WKD support via requests
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r--README.rst11
-rw-r--r--gemato/openpgp.py78
-rw-r--r--tests/test_openpgp.py85
-rw-r--r--tests/testutil.py34
-rw-r--r--tox.ini2
5 files changed, 116 insertions, 94 deletions
diff --git a/README.rst b/README.rst
index e15c175..d84fb7f 100644
--- a/README.rst
+++ b/README.rst
@@ -74,14 +74,17 @@ Requirements
============
gemato is written in Python and compatible with implementations
of Python 3.6+. gemato is currently tested against CPython 3.6
-through 3.7 and PyPy3. gemato depends only on standard Python library
-modules.
+through 3.7 and PyPy3. gemato core depends only on standard Python
+library modules.
-Additionally, gemato calls the GnuPG executable to work with OpenPGP
-signatures. Both GnuPG 1.4.21 and 2.2+ are tested.
+Additionally, OpenPGP requires system install of GnuPG 2.2+
+and requests_ Python module. Tests require responses_ for mocking.
References and footnotes
========================
.. [#GLEP74] GLEP 74: Full-tree verification using Manifest files
(https://www.gentoo.org/glep/glep-0074.html)
+
+.. _requests: https://2.python-requests.org/en/master/
+.. _responses: https://github.com/getsentry/responses
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 6b7ec3f..14b676e 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -18,6 +18,11 @@ import urllib.parse
import gemato.exceptions
+try:
+ import requests
+except ImportError:
+ requests = None
+
GNUPG = os.environ.get('GNUPG', 'gpg')
GNUPGCONF = os.environ.get('GNUPGCONF', 'gpgconf')
@@ -283,6 +288,10 @@ debug-level guru
Attempt to fetch updated keys using WKD. Returns true if *all*
keys were successfully found. Otherwise, returns false.
"""
+ if requests is None:
+ raise gemato.exceptions.OpenPGPKeyRefreshError(
+ 'WKD updates require requests Python module')
+
# list all keys in the keyring
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--with-colons', '--list-keys'])
@@ -339,42 +348,47 @@ debug-level guru
return False
addrs.update(addrs_key)
- # create another isolated environment to fetch keys cleanly
- with self.clone() as subenv:
- # use --locate-keys to fetch keys via WKD
- exitst, out, err = subenv._spawn_gpg(
- [GNUPG, '--batch', '--locate-keys'] + list(addrs))
- # if at least one fetch failed, gpg returns unsuccessfully
- if exitst != 0:
- logging.debug('refresh_keys_wkd(): {} --locate-keys failed: {}'
- .format(GNUPG, err.decode('utf8')))
+ data = b''
+ proxies = {}
+ if self.proxy is not None:
+ proxies = {
+ 'http': self.proxy,
+ 'https': self.proxy,
+ }
+ for a in addrs:
+ url = self.get_wkd_url(a)
+ resp = requests.get(url, proxies=proxies)
+ if resp.status_code != 200:
+ logging.debug(f'refresh_keys_wkd(): failing due to failed'
+ f'request for {url}: {resp}')
return False
+ data += resp.content
- # otherwise, xfer the keys
- exitst, out, err = subenv._spawn_gpg(
- [GNUPG, '--batch', '--export'] + list(keys))
- if exitst != 0:
- logging.debug('refresh_keys_wkd(): {} --export failed: {}'
- .format(GNUPG, err.decode('utf8')))
- return False
+ exitst, out, err = self._spawn_gpg(
+ [GNUPG, '--batch', '--import', '--status-fd', '1'], data)
+ if exitst != 0:
+ # there's no valid reason for import to fail here
+ raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8'))
- exitst, out, err = self._spawn_gpg(
- [GNUPG, '--batch', '--import', '--status-fd', '1'], out)
- if exitst != 0:
- # there's no valid reason for import to fail here
- raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8'))
-
- # we need to explicitly ensure all keys were fetched
- for l in out.splitlines():
- if l.startswith(b'[GNUPG:] IMPORT_OK'):
- fpr = l.split(b' ')[3].decode('ASCII')
- logging.debug('refresh_keys_wkd(): import successful for key: {}'
- .format(fpr))
+ # we need to explicitly ensure all keys were fetched
+ for l in out.splitlines():
+ if l.startswith(b'[GNUPG:] IMPORT_OK'):
+ fpr = l.split(b' ')[3].decode('ASCII')
+ logging.debug('refresh_keys_wkd(): import successful for key: {}'
+ .format(fpr))
+ if fpr in keys:
keys.remove(fpr)
- if keys:
- logging.debug('refresh_keys_wkd(): failing due to non-updated keys: {}'
- .format(keys))
- return False
+ else:
+ # we need to delete unexpected keys
+ exitst, out, err = self._spawn_gpg(
+ [GNUPG, '--batch', '--delete-keys', fpr])
+ if exitst != 0:
+ raise gemato.exceptions.OpenPGPKeyRefreshError(
+ err.decode('utf8'))
+ if keys:
+ logging.debug('refresh_keys_wkd(): failing due to non-updated keys: {}'
+ .format(keys))
+ return False
return True
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 815de7e..2fd5365 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -23,7 +23,12 @@ from tests.keydata import (
OTHER_PUBLIC_KEY, OTHER_PUBLIC_KEY_UID, OTHER_PUBLIC_KEY_SIG,
UNEXPIRE_SIG,
)
-from tests.testutil import HKPServerTestCase, MockedWKDOpenPGPEnvironment
+from tests.testutil import HKPServerTestCase
+
+try:
+ import responses
+except ImportError:
+ responses = None
VALID_PUBLIC_KEY = PUBLIC_KEY + UID + PUBLIC_KEY_SIG
@@ -147,7 +152,6 @@ t5pTRGhLWgdLUrs7vRB7wf7F8h4sci/YBKJRFA==
'''
KEY_FINGERPRINT = '81E12C16BD8DCD60BE180845136880E72A7B1384'
-KEY_UID = 'gemato@example.com'
SIG_TIMESTAMP = datetime.datetime(2017, 11, 8, 9, 1, 26)
OTHER_VALID_PUBLIC_KEY = (OTHER_PUBLIC_KEY + OTHER_PUBLIC_KEY_UID +
@@ -199,6 +203,14 @@ def strip_openpgp(text):
return '\n'.join(lines[start+1:stop-start+2]) + '\n'
+def need_responses(func):
+ def skipper(*args, **kwargs):
+ raise unittest.SkipTest('responses module is needed for WKD tests')
+ if responses is None:
+ return skipper
+ return responses.activate(func)
+
+
class SignedManifestTest(unittest.TestCase):
"""
Test whether signed Manifest is read correctly.
@@ -1129,12 +1141,8 @@ class OpenPGPWKDRefreshTest(unittest.TestCase):
revocation.
"""
- KEYS = {
- KEY_UID: REVOKED_PUBLIC_KEY,
- }
-
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1147,11 +1155,18 @@ class OpenPGPWKDRefreshTest(unittest.TestCase):
def tearDown(self):
self.env.close()
+ @need_responses
def test_refresh_keys(self):
try:
with io.StringIO(SIGNED_MANIFEST) as f:
self.env.verify_file(f)
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=REVOKED_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.env.refresh_keys(allow_wkd=True)
with io.StringIO(SIGNED_MANIFEST) as f:
@@ -1172,7 +1187,7 @@ class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase):
}
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment()
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1187,11 +1202,17 @@ class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase):
self.env.close()
super(OpenPGPWKDFallbackRefreshTest, self).tearDown()
+ @need_responses
def test_refresh_keys(self):
try:
with io.StringIO(SIGNED_MANIFEST) as f:
self.env.verify_file(f)
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ status=404)
self.env.refresh_keys(allow_wkd=True,
keyserver=self.server_addr)
@@ -1213,7 +1234,7 @@ class OpenPGPWKDFailRefreshTest(HKPServerTestCase):
SERVER_KEYS = {}
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment()
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1228,8 +1249,14 @@ class OpenPGPWKDFailRefreshTest(HKPServerTestCase):
self.env.close()
super(OpenPGPWKDFailRefreshTest, self).tearDown()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ status=404)
self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
self.env.refresh_keys, allow_wkd=True,
keyserver=self.server_addr)
@@ -1243,12 +1270,8 @@ class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase):
keyserver sends outdated (non-revoked) key.
"""
- KEYS = {
- KEY_UID: VALID_PUBLIC_KEY,
- }
-
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1261,8 +1284,15 @@ class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase):
def tearDown(self):
self.env.close()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=VALID_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.env.refresh_keys(allow_wkd=True)
with io.StringIO(SIGNED_MANIFEST) as f:
@@ -1277,12 +1307,8 @@ class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase):
Test that WKD refresh_keys() does not allow injecting another key.
"""
- KEYS = {
- KEY_UID: OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY,
- }
-
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1295,8 +1321,15 @@ class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase):
def tearDown(self):
self.env.close()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.env.refresh_keys(allow_wkd=True)
with io.StringIO(SIGNED_MANIFEST) as f:
@@ -1312,13 +1345,10 @@ class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase):
another (of the same UID).
"""
- KEYS = {
- KEY_UID: VALID_PUBLIC_KEY,
- }
SERVER_KEYS = {}
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1333,8 +1363,15 @@ class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase):
self.env.close()
super(OpenPGPWKDReplaceKeyRefreshTest, self).tearDown()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=VALID_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
self.env.refresh_keys, allow_wkd=True,
keyserver=self.server_addr)
@@ -1422,7 +1459,7 @@ class OpenPGPForgedSubKeyKeyserverTest(HKPServerTestCase):
super(OpenPGPForgedSubKeyKeyserverTest, self).tearDown()
def test_verify_manifest(self):
- self.env.refresh_keys(allow_wkd=True,
+ self.env.refresh_keys(allow_wkd=False,
keyserver=self.server_addr)
with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f:
diff --git a/tests/testutil.py b/tests/testutil.py
index f39533a..45d1cd6 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -18,8 +18,6 @@ import unittest
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
-import gemato.openpgp
-
class LoggingTestCase(unittest.TestCase):
def setUp(self):
@@ -119,35 +117,3 @@ class HKPServerTestCase(unittest.TestCase):
self.server.shutdown()
self.server.server_close()
self.server_thread.join()
-
-
-class MockedWKDOpenPGPEnvironment(gemato.openpgp.OpenPGPEnvironment):
- """
- A subclass of OpenPGPEnvironment that partially mocks spawning
- OpenPGP in order to inject keys without having to implement
- full HTTPS server with domain satisfactory to GnuPG.
- """
-
- def __init__(self, keys={}):
- self.keys = keys
- super(MockedWKDOpenPGPEnvironment, self).__init__()
-
- def clone(self):
- return MockedWKDOpenPGPEnvironment(self.keys)
-
- def _spawn_gpg(self, argv, stdin=''):
- if '--locate-keys' in argv:
- argv.remove('--locate-keys')
- assert len(argv) == 3
- assert argv[:2] == ['gpg', '--batch']
- if argv[2] in self.keys:
- ret, sout, serr = super(MockedWKDOpenPGPEnvironment,
- self)._spawn_gpg(
- ['gpg', '--batch', '--import'],
- self.keys[argv[2]])
- else:
- ret = 2
- return (ret, b'', b'')
-
- return super(MockedWKDOpenPGPEnvironment, self)._spawn_gpg(
- argv, stdin)
diff --git a/tox.ini b/tox.ini
index 253d5d3..9cc1b6d 100644
--- a/tox.ini
+++ b/tox.ini
@@ -16,6 +16,8 @@ deps =
pytest
pytest-cov
pytest-xdist
+ requests
+ responses
commands =
pytest -vv --cov=gemato --cov-config=.coveragerc -n auto {posargs}