diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-09-05 15:15:32 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-09-05 16:15:59 +0200 |
commit | 0a95b33a5918d16450b51c2f046313cfd8771a81 (patch) | |
tree | 082c6682ff24fd06ede1f87d14f9297a9f00666e | |
parent | 35f41a15dea30c2cc00c693de8d0fe40de9b9479 (diff) | |
download | gemato-0a95b33a5918d16450b51c2f046313cfd8771a81.tar.gz |
openpgp: Move listing keys to a separate method
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r-- | gemato/exceptions.py | 9 | ||||
-rw-r--r-- | gemato/openpgp.py | 120 | ||||
-rw-r--r-- | tests/test_openpgp.py | 16 |
3 files changed, 91 insertions, 54 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py index 22766ab..24451dc 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -137,6 +137,15 @@ class OpenPGPKeyImportError(OpenPGPRuntimeError): return f'OpenPGP key import failed:\n{self.output}' +class OpenPGPKeyListingError(OpenPGPRuntimeError): + """ + An exception raised when key listing fails. + """ + + def __str__(self): + return f'OpenPGP key listing failed:\n{self.output}' + + class OpenPGPKeyRefreshError(OpenPGPRuntimeError): """ An exception raised when keyring refresh (update) fails. diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 654f4f1..83d5d05 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -22,6 +22,7 @@ from gemato.exceptions import ( OpenPGPExpiredKeyFailure, OpenPGPRevokedKeyFailure, OpenPGPKeyImportError, + OpenPGPKeyListingError, OpenPGPKeyRefreshError, OpenPGPUnknownSigFailure, OpenPGPSigningFailure, @@ -278,87 +279,98 @@ debug-level guru if exitst != 0: raise OpenPGPKeyImportError(err.decode('utf8')) - zbase32_translate = bytes.maketrans( - b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567', - b'ybndrfg8ejkmcpqxot1uwisza345h769') - - @classmethod - def get_wkd_url(cls, email): - localname, domain = email.encode('utf8').split(b'@', 1) - b32 = ( - base64.b32encode(hashlib.sha1(localname.lower()).digest()) - .translate(cls.zbase32_translate).decode()) - uenc = urllib.parse.quote(localname) - ldomain = domain.lower().decode('utf8') - return (f'https://{ldomain}/.well-known/openpgpkey/hu/' - f'{b32}?l={uenc}') - - def refresh_keys_wkd(self): + def list_keys(self): """ - Attempt to fetch updated keys using WKD. Returns true if *all* - keys were successfully found. Otherwise, returns false. + List fingerprints and UIDs of all keys in keyring + + Returns a mapping from fingerprint (as a string) to an iterable + of UIDs. """ - if requests is None: - logging.debug('refresh_keys_wkd(): failing because requests' - 'module is missing') - return False - # list all keys in the keyring exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--with-colons', '--list-keys']) if exitst != 0: - raise OpenPGPKeyRefreshError(err.decode('utf8')) + raise OpenPGPKeyListingError(err.decode('utf8')) - # find keys and UIDs - addrs = set() - addrs_key = set() - keys = set() prev_pub = None + fpr = None + ret = {} + for line in out.splitlines(): # were we expecting a fingerprint? if prev_pub is not None: if line.startswith(b'fpr:'): fpr = line.split(b':')[9].decode('ASCII') - assert fpr.endswith(prev_pub) + if not fpr.endswith(prev_pub): + raise OpenPGPKeyListingError( + f'Incorrect fingerprint {fpr} for key ' + f'{prev_pub}') logging.debug( - f'refresh_keys_wkd(): fingerprint: {fpr}') - keys.add(fpr) + f'list_keys(): fingerprint: {fpr}') + ret[fpr] = [] prev_pub = None else: - # old GnuPG doesn't give fingerprints by default - # (but it doesn't support WKD either) - logging.debug( - 'refresh_keys_wkd(): failing due to old gpg') - return False + raise OpenPGPKeyListingError( + f'No fingerprint in GPG output, instead got: ' + f'{line}') elif line.startswith(b'pub:'): - if keys: - # every key must have at least one UID - if not addrs_key: - logging.debug( - 'refresh_keys_wkd(): failing due to no UIDs') - return False - addrs.update(addrs_key) - addrs_key = set() - # wait for the fingerprint prev_pub = line.split(b':')[4].decode('ASCII') - logging.debug(f'refresh_keys_wkd(): keyid: {prev_pub}') + logging.debug(f'list_keys(): keyid: {prev_pub}') elif line.startswith(b'uid:'): + if fpr is None: + raise OpenPGPKeyListingError( + f'UID without key in GPG output: {line}') uid = line.split(b':')[9] name, addr = email.utils.parseaddr(uid.decode('utf8')) if '@' in addr: - logging.debug(f'refresh_keys_wkd(): UID: {addr}') - addrs_key.add(addr) + logging.debug(f'list_keys(): UID: {addr}') + ret[fpr].append(addr) else: logging.debug( - f'refresh_keys_wkd(): ignoring UID without ' - f'mail: {uid.decode("utf8")}') + f'list_keys(): ignoring UID without mail: {uid}') + + return ret - # grab the final set (also aborts when there are no keys) - if not addrs_key: - logging.debug('refresh_keys_wkd(): failing due to no UIDs') + zbase32_translate = bytes.maketrans( + b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567', + b'ybndrfg8ejkmcpqxot1uwisza345h769') + + @classmethod + def get_wkd_url(cls, email): + localname, domain = email.encode('utf8').split(b'@', 1) + b32 = ( + base64.b32encode(hashlib.sha1(localname.lower()).digest()) + .translate(cls.zbase32_translate).decode()) + uenc = urllib.parse.quote(localname) + ldomain = domain.lower().decode('utf8') + return (f'https://{ldomain}/.well-known/openpgpkey/hu/' + f'{b32}?l={uenc}') + + def refresh_keys_wkd(self): + """ + Attempt to fetch updated keys using WKD. Returns true if *all* + keys were successfully found. Otherwise, returns false. + """ + if requests is None: + logging.debug('refresh_keys_wkd(): failing because requests' + 'module is missing') + return False + + # list all keys in the keyring + keys = self.list_keys() + if not keys: + logging.debug('refresh_keys_wkd(): no keys found') return False - addrs.update(addrs_key) + addrs = set() + for key, uids in keys.items(): + if not uids: + logging.debug( + f'refresh_keys_wkd(): failing due to no UIDs on ' + f'key {key}') + return False + addrs.update(uids) + keys = set(keys) data = b'' proxies = {} diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 5ee86aa..edeb0b9 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -642,6 +642,22 @@ def test_recursive_manifest_loader_save_submanifest(tmp_path, privkey_env): assert m2.openpgp_signature is None +@pytest.mark.parametrize( + 'key_var,expected', + [('VALID_PUBLIC_KEY', {KEY_FINGERPRINT: ['gemato@example.com']}), + ('OTHER_VALID_PUBLIC_KEY', + {OTHER_KEY_FINGERPRINT: ['gemato@example.com']}), + ('VALID_KEY_SUBKEY', {KEY_FINGERPRINT: ['gemato@example.com']}), + ('VALID_KEY_NOEMAIL', {KEY_FINGERPRINT: []}), + ]) +def test_list_keys(openpgp_env_with_refresh, key_var, expected): + try: + openpgp_env_with_refresh.import_key(io.BytesIO(globals()[key_var])) + except OpenPGPNoImplementation as e: + pytest.skip(str(e)) + assert openpgp_env_with_refresh.list_keys() == expected + + @pytest.fixture(scope='module') def global_hkp_server(): """A fixture that starts a single HKP server instance for tests""" |