diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-08-25 17:54:33 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-08-25 17:54:33 +0200 |
commit | 6bc1e7a70741f073194b88998efe7f0519074222 (patch) | |
tree | 7c97a681fb4c10b92975ca5058de828f999efe5a | |
parent | f1c904feb96dac663fd3e93152c9febf8a8a640a (diff) | |
download | gemato-6bc1e7a70741f073194b88998efe7f0519074222.tar.gz |
openpgp: Reimplement WKD support via requests
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r-- | README.rst | 11 | ||||
-rw-r--r-- | gemato/openpgp.py | 78 | ||||
-rw-r--r-- | tests/test_openpgp.py | 85 | ||||
-rw-r--r-- | tests/testutil.py | 34 | ||||
-rw-r--r-- | tox.ini | 2 |
5 files changed, 116 insertions, 94 deletions
@@ -74,14 +74,17 @@ Requirements ============ gemato is written in Python and compatible with implementations of Python 3.6+. gemato is currently tested against CPython 3.6 -through 3.7 and PyPy3. gemato depends only on standard Python library -modules. +through 3.7 and PyPy3. gemato core depends only on standard Python +library modules. -Additionally, gemato calls the GnuPG executable to work with OpenPGP -signatures. Both GnuPG 1.4.21 and 2.2+ are tested. +Additionally, OpenPGP requires system install of GnuPG 2.2+ +and requests_ Python module. Tests require responses_ for mocking. References and footnotes ======================== .. [#GLEP74] GLEP 74: Full-tree verification using Manifest files (https://www.gentoo.org/glep/glep-0074.html) + +.. _requests: https://2.python-requests.org/en/master/ +.. _responses: https://github.com/getsentry/responses diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 6b7ec3f..14b676e 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -18,6 +18,11 @@ import urllib.parse import gemato.exceptions +try: + import requests +except ImportError: + requests = None + GNUPG = os.environ.get('GNUPG', 'gpg') GNUPGCONF = os.environ.get('GNUPGCONF', 'gpgconf') @@ -283,6 +288,10 @@ debug-level guru Attempt to fetch updated keys using WKD. Returns true if *all* keys were successfully found. Otherwise, returns false. """ + if requests is None: + raise gemato.exceptions.OpenPGPKeyRefreshError( + 'WKD updates require requests Python module') + # list all keys in the keyring exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--with-colons', '--list-keys']) @@ -339,42 +348,47 @@ debug-level guru return False addrs.update(addrs_key) - # create another isolated environment to fetch keys cleanly - with self.clone() as subenv: - # use --locate-keys to fetch keys via WKD - exitst, out, err = subenv._spawn_gpg( - [GNUPG, '--batch', '--locate-keys'] + list(addrs)) - # if at least one fetch failed, gpg returns unsuccessfully - if exitst != 0: - logging.debug('refresh_keys_wkd(): {} --locate-keys failed: {}' - .format(GNUPG, err.decode('utf8'))) + data = b'' + proxies = {} + if self.proxy is not None: + proxies = { + 'http': self.proxy, + 'https': self.proxy, + } + for a in addrs: + url = self.get_wkd_url(a) + resp = requests.get(url, proxies=proxies) + if resp.status_code != 200: + logging.debug(f'refresh_keys_wkd(): failing due to failed' + f'request for {url}: {resp}') return False + data += resp.content - # otherwise, xfer the keys - exitst, out, err = subenv._spawn_gpg( - [GNUPG, '--batch', '--export'] + list(keys)) - if exitst != 0: - logging.debug('refresh_keys_wkd(): {} --export failed: {}' - .format(GNUPG, err.decode('utf8'))) - return False + exitst, out, err = self._spawn_gpg( + [GNUPG, '--batch', '--import', '--status-fd', '1'], data) + if exitst != 0: + # there's no valid reason for import to fail here + raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) - exitst, out, err = self._spawn_gpg( - [GNUPG, '--batch', '--import', '--status-fd', '1'], out) - if exitst != 0: - # there's no valid reason for import to fail here - raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) - - # we need to explicitly ensure all keys were fetched - for l in out.splitlines(): - if l.startswith(b'[GNUPG:] IMPORT_OK'): - fpr = l.split(b' ')[3].decode('ASCII') - logging.debug('refresh_keys_wkd(): import successful for key: {}' - .format(fpr)) + # we need to explicitly ensure all keys were fetched + for l in out.splitlines(): + if l.startswith(b'[GNUPG:] IMPORT_OK'): + fpr = l.split(b' ')[3].decode('ASCII') + logging.debug('refresh_keys_wkd(): import successful for key: {}' + .format(fpr)) + if fpr in keys: keys.remove(fpr) - if keys: - logging.debug('refresh_keys_wkd(): failing due to non-updated keys: {}' - .format(keys)) - return False + else: + # we need to delete unexpected keys + exitst, out, err = self._spawn_gpg( + [GNUPG, '--batch', '--delete-keys', fpr]) + if exitst != 0: + raise gemato.exceptions.OpenPGPKeyRefreshError( + err.decode('utf8')) + if keys: + logging.debug('refresh_keys_wkd(): failing due to non-updated keys: {}' + .format(keys)) + return False return True diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 815de7e..2fd5365 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -23,7 +23,12 @@ from tests.keydata import ( OTHER_PUBLIC_KEY, OTHER_PUBLIC_KEY_UID, OTHER_PUBLIC_KEY_SIG, UNEXPIRE_SIG, ) -from tests.testutil import HKPServerTestCase, MockedWKDOpenPGPEnvironment +from tests.testutil import HKPServerTestCase + +try: + import responses +except ImportError: + responses = None VALID_PUBLIC_KEY = PUBLIC_KEY + UID + PUBLIC_KEY_SIG @@ -147,7 +152,6 @@ t5pTRGhLWgdLUrs7vRB7wf7F8h4sci/YBKJRFA== ''' KEY_FINGERPRINT = '81E12C16BD8DCD60BE180845136880E72A7B1384' -KEY_UID = 'gemato@example.com' SIG_TIMESTAMP = datetime.datetime(2017, 11, 8, 9, 1, 26) OTHER_VALID_PUBLIC_KEY = (OTHER_PUBLIC_KEY + OTHER_PUBLIC_KEY_UID + @@ -199,6 +203,14 @@ def strip_openpgp(text): return '\n'.join(lines[start+1:stop-start+2]) + '\n' +def need_responses(func): + def skipper(*args, **kwargs): + raise unittest.SkipTest('responses module is needed for WKD tests') + if responses is None: + return skipper + return responses.activate(func) + + class SignedManifestTest(unittest.TestCase): """ Test whether signed Manifest is read correctly. @@ -1129,12 +1141,8 @@ class OpenPGPWKDRefreshTest(unittest.TestCase): revocation. """ - KEYS = { - KEY_UID: REVOKED_PUBLIC_KEY, - } - def setUp(self): - self.env = MockedWKDOpenPGPEnvironment(self.KEYS) + self.env = gemato.openpgp.OpenPGPEnvironment() try: self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) except gemato.exceptions.OpenPGPRuntimeError as e: @@ -1147,11 +1155,18 @@ class OpenPGPWKDRefreshTest(unittest.TestCase): def tearDown(self): self.env.close() + @need_responses def test_refresh_keys(self): try: with io.StringIO(SIGNED_MANIFEST) as f: self.env.verify_file(f) + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=REVOKED_PUBLIC_KEY, + content_type='application/pgp-keys') self.env.refresh_keys(allow_wkd=True) with io.StringIO(SIGNED_MANIFEST) as f: @@ -1172,7 +1187,7 @@ class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase): } def setUp(self): - self.env = MockedWKDOpenPGPEnvironment() + self.env = gemato.openpgp.OpenPGPEnvironment() try: self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) except gemato.exceptions.OpenPGPRuntimeError as e: @@ -1187,11 +1202,17 @@ class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase): self.env.close() super(OpenPGPWKDFallbackRefreshTest, self).tearDown() + @need_responses def test_refresh_keys(self): try: with io.StringIO(SIGNED_MANIFEST) as f: self.env.verify_file(f) + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + status=404) self.env.refresh_keys(allow_wkd=True, keyserver=self.server_addr) @@ -1213,7 +1234,7 @@ class OpenPGPWKDFailRefreshTest(HKPServerTestCase): SERVER_KEYS = {} def setUp(self): - self.env = MockedWKDOpenPGPEnvironment() + self.env = gemato.openpgp.OpenPGPEnvironment() try: self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) except gemato.exceptions.OpenPGPRuntimeError as e: @@ -1228,8 +1249,14 @@ class OpenPGPWKDFailRefreshTest(HKPServerTestCase): self.env.close() super(OpenPGPWKDFailRefreshTest, self).tearDown() + @need_responses def test_refresh_keys(self): try: + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + status=404) self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, self.env.refresh_keys, allow_wkd=True, keyserver=self.server_addr) @@ -1243,12 +1270,8 @@ class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase): keyserver sends outdated (non-revoked) key. """ - KEYS = { - KEY_UID: VALID_PUBLIC_KEY, - } - def setUp(self): - self.env = MockedWKDOpenPGPEnvironment(self.KEYS) + self.env = gemato.openpgp.OpenPGPEnvironment() try: self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY)) except gemato.exceptions.OpenPGPRuntimeError as e: @@ -1261,8 +1284,15 @@ class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase): def tearDown(self): self.env.close() + @need_responses def test_refresh_keys(self): try: + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=VALID_PUBLIC_KEY, + content_type='application/pgp-keys') self.env.refresh_keys(allow_wkd=True) with io.StringIO(SIGNED_MANIFEST) as f: @@ -1277,12 +1307,8 @@ class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase): Test that WKD refresh_keys() does not allow injecting another key. """ - KEYS = { - KEY_UID: OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY, - } - def setUp(self): - self.env = MockedWKDOpenPGPEnvironment(self.KEYS) + self.env = gemato.openpgp.OpenPGPEnvironment() try: self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY)) except gemato.exceptions.OpenPGPRuntimeError as e: @@ -1295,8 +1321,15 @@ class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase): def tearDown(self): self.env.close() + @need_responses def test_refresh_keys(self): try: + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY, + content_type='application/pgp-keys') self.env.refresh_keys(allow_wkd=True) with io.StringIO(SIGNED_MANIFEST) as f: @@ -1312,13 +1345,10 @@ class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase): another (of the same UID). """ - KEYS = { - KEY_UID: VALID_PUBLIC_KEY, - } SERVER_KEYS = {} def setUp(self): - self.env = MockedWKDOpenPGPEnvironment(self.KEYS) + self.env = gemato.openpgp.OpenPGPEnvironment() try: self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY)) except gemato.exceptions.OpenPGPRuntimeError as e: @@ -1333,8 +1363,15 @@ class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase): self.env.close() super(OpenPGPWKDReplaceKeyRefreshTest, self).tearDown() + @need_responses def test_refresh_keys(self): try: + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=VALID_PUBLIC_KEY, + content_type='application/pgp-keys') self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, self.env.refresh_keys, allow_wkd=True, keyserver=self.server_addr) @@ -1422,7 +1459,7 @@ class OpenPGPForgedSubKeyKeyserverTest(HKPServerTestCase): super(OpenPGPForgedSubKeyKeyserverTest, self).tearDown() def test_verify_manifest(self): - self.env.refresh_keys(allow_wkd=True, + self.env.refresh_keys(allow_wkd=False, keyserver=self.server_addr) with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f: diff --git a/tests/testutil.py b/tests/testutil.py index f39533a..45d1cd6 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -18,8 +18,6 @@ import unittest from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs -import gemato.openpgp - class LoggingTestCase(unittest.TestCase): def setUp(self): @@ -119,35 +117,3 @@ class HKPServerTestCase(unittest.TestCase): self.server.shutdown() self.server.server_close() self.server_thread.join() - - -class MockedWKDOpenPGPEnvironment(gemato.openpgp.OpenPGPEnvironment): - """ - A subclass of OpenPGPEnvironment that partially mocks spawning - OpenPGP in order to inject keys without having to implement - full HTTPS server with domain satisfactory to GnuPG. - """ - - def __init__(self, keys={}): - self.keys = keys - super(MockedWKDOpenPGPEnvironment, self).__init__() - - def clone(self): - return MockedWKDOpenPGPEnvironment(self.keys) - - def _spawn_gpg(self, argv, stdin=''): - if '--locate-keys' in argv: - argv.remove('--locate-keys') - assert len(argv) == 3 - assert argv[:2] == ['gpg', '--batch'] - if argv[2] in self.keys: - ret, sout, serr = super(MockedWKDOpenPGPEnvironment, - self)._spawn_gpg( - ['gpg', '--batch', '--import'], - self.keys[argv[2]]) - else: - ret = 2 - return (ret, b'', b'') - - return super(MockedWKDOpenPGPEnvironment, self)._spawn_gpg( - argv, stdin) @@ -16,6 +16,8 @@ deps = pytest pytest-cov pytest-xdist + requests + responses commands = pytest -vv --cov=gemato --cov-config=.coveragerc -n auto {posargs} |