summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/exceptions.py9
-rw-r--r--gemato/openpgp.py120
-rw-r--r--tests/test_openpgp.py16
3 files changed, 91 insertions, 54 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index 22766ab..24451dc 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -137,6 +137,15 @@ class OpenPGPKeyImportError(OpenPGPRuntimeError):
return f'OpenPGP key import failed:\n{self.output}'
+class OpenPGPKeyListingError(OpenPGPRuntimeError):
+ """
+ An exception raised when key listing fails.
+ """
+
+ def __str__(self):
+ return f'OpenPGP key listing failed:\n{self.output}'
+
+
class OpenPGPKeyRefreshError(OpenPGPRuntimeError):
"""
An exception raised when keyring refresh (update) fails.
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 654f4f1..83d5d05 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -22,6 +22,7 @@ from gemato.exceptions import (
OpenPGPExpiredKeyFailure,
OpenPGPRevokedKeyFailure,
OpenPGPKeyImportError,
+ OpenPGPKeyListingError,
OpenPGPKeyRefreshError,
OpenPGPUnknownSigFailure,
OpenPGPSigningFailure,
@@ -278,87 +279,98 @@ debug-level guru
if exitst != 0:
raise OpenPGPKeyImportError(err.decode('utf8'))
- zbase32_translate = bytes.maketrans(
- b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
- b'ybndrfg8ejkmcpqxot1uwisza345h769')
-
- @classmethod
- def get_wkd_url(cls, email):
- localname, domain = email.encode('utf8').split(b'@', 1)
- b32 = (
- base64.b32encode(hashlib.sha1(localname.lower()).digest())
- .translate(cls.zbase32_translate).decode())
- uenc = urllib.parse.quote(localname)
- ldomain = domain.lower().decode('utf8')
- return (f'https://{ldomain}/.well-known/openpgpkey/hu/'
- f'{b32}?l={uenc}')
-
- def refresh_keys_wkd(self):
+ def list_keys(self):
"""
- Attempt to fetch updated keys using WKD. Returns true if *all*
- keys were successfully found. Otherwise, returns false.
+ List fingerprints and UIDs of all keys in keyring
+
+ Returns a mapping from fingerprint (as a string) to an iterable
+ of UIDs.
"""
- if requests is None:
- logging.debug('refresh_keys_wkd(): failing because requests'
- 'module is missing')
- return False
- # list all keys in the keyring
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--with-colons', '--list-keys'])
if exitst != 0:
- raise OpenPGPKeyRefreshError(err.decode('utf8'))
+ raise OpenPGPKeyListingError(err.decode('utf8'))
- # find keys and UIDs
- addrs = set()
- addrs_key = set()
- keys = set()
prev_pub = None
+ fpr = None
+ ret = {}
+
for line in out.splitlines():
# were we expecting a fingerprint?
if prev_pub is not None:
if line.startswith(b'fpr:'):
fpr = line.split(b':')[9].decode('ASCII')
- assert fpr.endswith(prev_pub)
+ if not fpr.endswith(prev_pub):
+ raise OpenPGPKeyListingError(
+ f'Incorrect fingerprint {fpr} for key '
+ f'{prev_pub}')
logging.debug(
- f'refresh_keys_wkd(): fingerprint: {fpr}')
- keys.add(fpr)
+ f'list_keys(): fingerprint: {fpr}')
+ ret[fpr] = []
prev_pub = None
else:
- # old GnuPG doesn't give fingerprints by default
- # (but it doesn't support WKD either)
- logging.debug(
- 'refresh_keys_wkd(): failing due to old gpg')
- return False
+ raise OpenPGPKeyListingError(
+ f'No fingerprint in GPG output, instead got: '
+ f'{line}')
elif line.startswith(b'pub:'):
- if keys:
- # every key must have at least one UID
- if not addrs_key:
- logging.debug(
- 'refresh_keys_wkd(): failing due to no UIDs')
- return False
- addrs.update(addrs_key)
- addrs_key = set()
-
# wait for the fingerprint
prev_pub = line.split(b':')[4].decode('ASCII')
- logging.debug(f'refresh_keys_wkd(): keyid: {prev_pub}')
+ logging.debug(f'list_keys(): keyid: {prev_pub}')
elif line.startswith(b'uid:'):
+ if fpr is None:
+ raise OpenPGPKeyListingError(
+ f'UID without key in GPG output: {line}')
uid = line.split(b':')[9]
name, addr = email.utils.parseaddr(uid.decode('utf8'))
if '@' in addr:
- logging.debug(f'refresh_keys_wkd(): UID: {addr}')
- addrs_key.add(addr)
+ logging.debug(f'list_keys(): UID: {addr}')
+ ret[fpr].append(addr)
else:
logging.debug(
- f'refresh_keys_wkd(): ignoring UID without '
- f'mail: {uid.decode("utf8")}')
+ f'list_keys(): ignoring UID without mail: {uid}')
+
+ return ret
- # grab the final set (also aborts when there are no keys)
- if not addrs_key:
- logging.debug('refresh_keys_wkd(): failing due to no UIDs')
+ zbase32_translate = bytes.maketrans(
+ b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
+ b'ybndrfg8ejkmcpqxot1uwisza345h769')
+
+ @classmethod
+ def get_wkd_url(cls, email):
+ localname, domain = email.encode('utf8').split(b'@', 1)
+ b32 = (
+ base64.b32encode(hashlib.sha1(localname.lower()).digest())
+ .translate(cls.zbase32_translate).decode())
+ uenc = urllib.parse.quote(localname)
+ ldomain = domain.lower().decode('utf8')
+ return (f'https://{ldomain}/.well-known/openpgpkey/hu/'
+ f'{b32}?l={uenc}')
+
+ def refresh_keys_wkd(self):
+ """
+ Attempt to fetch updated keys using WKD. Returns true if *all*
+ keys were successfully found. Otherwise, returns false.
+ """
+ if requests is None:
+ logging.debug('refresh_keys_wkd(): failing because requests'
+ 'module is missing')
+ return False
+
+ # list all keys in the keyring
+ keys = self.list_keys()
+ if not keys:
+ logging.debug('refresh_keys_wkd(): no keys found')
return False
- addrs.update(addrs_key)
+ addrs = set()
+ for key, uids in keys.items():
+ if not uids:
+ logging.debug(
+ f'refresh_keys_wkd(): failing due to no UIDs on '
+ f'key {key}')
+ return False
+ addrs.update(uids)
+ keys = set(keys)
data = b''
proxies = {}
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 5ee86aa..edeb0b9 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -642,6 +642,22 @@ def test_recursive_manifest_loader_save_submanifest(tmp_path, privkey_env):
assert m2.openpgp_signature is None
+@pytest.mark.parametrize(
+ 'key_var,expected',
+ [('VALID_PUBLIC_KEY', {KEY_FINGERPRINT: ['gemato@example.com']}),
+ ('OTHER_VALID_PUBLIC_KEY',
+ {OTHER_KEY_FINGERPRINT: ['gemato@example.com']}),
+ ('VALID_KEY_SUBKEY', {KEY_FINGERPRINT: ['gemato@example.com']}),
+ ('VALID_KEY_NOEMAIL', {KEY_FINGERPRINT: []}),
+ ])
+def test_list_keys(openpgp_env_with_refresh, key_var, expected):
+ try:
+ openpgp_env_with_refresh.import_key(io.BytesIO(globals()[key_var]))
+ except OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+ assert openpgp_env_with_refresh.list_keys() == expected
+
+
@pytest.fixture(scope='module')
def global_hkp_server():
"""A fixture that starts a single HKP server instance for tests"""