summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-09-06 23:36:21 +0200
committerMichał Górny <mgorny@gentoo.org>2020-09-06 23:50:32 +0200
commit4355f02079ed1aaea1867411c9770c90ae7679c6 (patch)
tree0ca8e487b281ebd99a5b4bde0ecda778fe2b357a
parente05553c18968d03d505acdf5b9ba59131268f7a3 (diff)
downloadgemato-4355f02079ed1aaea1867411c9770c90ae7679c6.tar.gz
Add a partial experimental PGPy-based OpenPGP provider
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r--gemato/exceptions.py9
-rw-r--r--gemato/openpgp.py113
-rw-r--r--tests/test_openpgp.py44
-rw-r--r--tox.ini3
4 files changed, 153 insertions, 16 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index aa4e499..6d7e8f3 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -218,9 +218,14 @@ class OpenPGPNoImplementation(GematoException):
is available.
"""
+ __slots__ = ['detail']
+
+ def __init__(self, detail):
+ super().__init__(detail)
+ self.detail = detail
+
def __str__(self):
- return ('No supported OpenPGP implementation found (install '
- 'gnupg)')
+ return f'Requested OpenPGP provider not found ({self.detail})'
class ManifestInvalidPath(GematoException):
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 901bfa5..8bce73b 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -15,6 +15,7 @@ import shutil
import subprocess
import tempfile
import urllib.parse
+import warnings
from gemato.exceptions import (
OpenPGPNoImplementation,
@@ -34,6 +35,11 @@ try:
except ImportError:
requests = None
+try:
+ import pgpy
+except ImportError:
+ pgpy = None
+
GNUPG = os.environ.get('GNUPG', 'gpg')
GNUPGCONF = os.environ.get('GNUPGCONF', 'gpgconf')
@@ -212,7 +218,7 @@ class SystemGPGEnvironment:
stderr=subprocess.PIPE,
env=env)
except FileNotFoundError:
- raise OpenPGPNoImplementation()
+ raise OpenPGPNoImplementation('install gpg')
out, err = p.communicate(stdin)
return (p.wait(), out, err)
@@ -471,5 +477,110 @@ debug-level guru
return (super()._spawn_gpg(options, stdin, env_override))
+class PGPyEnvironment:
+ """Stand-alone environment using pgpy library"""
+
+ __slots__ = ['debug', 'keyring', 'proxy']
+
+ def __init__(self, debug=False, proxy=None):
+ if pgpy is None:
+ raise OpenPGPNoImplementation('install PGPy')
+ self.debug = debug
+ self.keyring = pgpy.PGPKeyring()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_cb):
+ pass
+
+ def close(self):
+ pass
+
+ def import_key(self, keyfile):
+ with warnings.catch_warnings(record=True) as warns:
+ try:
+ key_res = pgpy.PGPKey.from_blob(keyfile.read())
+ except ValueError as e:
+ raise OpenPGPKeyImportError(
+ f'OpenPGP key import failed: {e}')
+ fprs = []
+ for k in key_res[1].values():
+ fprs.extend(self.keyring.load(k))
+
+ for w in warns:
+ if str(w.message) == 'Incorrect crc24':
+ raise OpenPGPKeyImportError(
+ f'OpenPGP key import failed: {w.message}')
+
+ for fpr in fprs:
+ with self.keyring.key(fpr) as k:
+ if k.parent is not None:
+ try:
+ verifies = k.parent.verify(k)
+ except pgpy.errors.PGPError:
+ logging.debug(
+ f'Rejecting subkey {fpr} due to missing sig')
+ self.keyring.unload(k)
+ else:
+ if not verifies:
+ logging.debug(
+ f'Rejecting subkey {fpr} since parent '
+ f'key signature does not check out')
+ self.keyring.unload(k)
+ for uid in k.userids:
+ if uid.selfsig is None:
+ raise OpenPGPKeyImportError(
+ f'Self-signature on {uid} missing')
+ if not k.verify(uid):
+ raise OpenPGPKeyImportError(
+ f'Self-signature on {uid} does not verify')
+
+ def verify_file(self, f):
+ msg = pgpy.PGPMessage.from_blob(f.read())
+ assert msg.is_signed
+ assert len(msg.signatures) == 1
+ assert len(msg.signers) == 1
+
+ signer, = msg.signers
+ try:
+ with self.keyring.key(signer) as k:
+ pk = k
+ if k.parent is not None:
+ pk = k.parent
+ assert pk.parent is None
+
+ vr = k.verify(msg)
+ if not vr:
+ raise OpenPGPVerificationFailure(
+ f'Bad signature made by key {k.fingerprint}')
+ now = datetime.datetime.utcnow()
+ sig_expire = msg.signatures[0].expires_at
+ if sig_expire is not None and sig_expire < now:
+ raise OpenPGPVerificationFailure(
+ f'Signature expired at {msg.signatures[0].expires_at}')
+ if k.expires_at is not None and k.expires_at < now:
+ raise OpenPGPExpiredKeyFailure(
+ f'Key {k.fingerprint} expired at {k.expires_at}')
+ if pk.expires_at is not None and pk.expires_at < now:
+ raise OpenPGPExpiredKeyFailure(
+ f'Primary key {pk.fingerprint} expired '
+ f'at {k.expires_at}')
+ if list(k.revocation_signatures):
+ raise OpenPGPRevokedKeyFailure(
+ f'Key {pk.fingerprint} was revoked')
+ if list(pk.revocation_signatures):
+ raise OpenPGPRevokedKeyFailure(
+ f'Primary key {pk.fingerprint} was revoked')
+ return OpenPGPSignatureData(
+ k.fingerprint,
+ msg.signatures[0].created,
+ msg.signatures[0].expires_at,
+ pk.fingerprint)
+ except KeyError:
+ raise OpenPGPVerificationFailure(
+ f'Key {signer} not in keyring')
+
+
OpenPGPSystemEnvironment = SystemGPGEnvironment
OpenPGPEnvironment = IsolatedGPGEnvironment
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 85ff7f4..434df88 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -30,6 +30,7 @@ from gemato.manifest import ManifestFile
from gemato.openpgp import (
SystemGPGEnvironment,
IsolatedGPGEnvironment,
+ PGPyEnvironment,
get_wkd_url,
)
from gemato.recursiveloader import ManifestRecursiveLoader
@@ -348,15 +349,21 @@ class MockedSystemGPGEnvironment(SystemGPGEnvironment):
@pytest.fixture(params=[IsolatedGPGEnvironment,
- MockedSystemGPGEnvironment])
+ MockedSystemGPGEnvironment,
+ PGPyEnvironment,
+ ])
def openpgp_env(request):
"""OpenPGP environment fixture"""
- env = request.param()
+ try:
+ env = request.param()
+ except OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
yield env
env.close()
-@pytest.fixture(params=[IsolatedGPGEnvironment])
+@pytest.fixture(params=[IsolatedGPGEnvironment,
+ ])
def openpgp_env_with_refresh(request):
"""OpenPGP environments that support refreshing keys"""
env = request.param()
@@ -417,6 +424,10 @@ def assert_signature(sig, manifest_var):
MANIFEST_VARIANTS)
def test_verify_manifest(openpgp_env, manifest_var, key_var, expected):
"""Test direct Manifest data verification"""
+ if (isinstance(openpgp_env, PGPyEnvironment) and
+ manifest_var == 'DASH_ESCAPED_SIGNED_MANIFEST'):
+ pytest.xfail('dash escaping is known-broken in pgpy')
+
try:
with io.StringIO(globals()[manifest_var]) as f:
if expected is None:
@@ -454,6 +465,10 @@ def test_verify_untrusted_key():
MANIFEST_VARIANTS)
def test_manifest_load(openpgp_env, manifest_var, key_var, expected):
"""Test Manifest verification via ManifestFile.load()"""
+ if (isinstance(openpgp_env, PGPyEnvironment) and
+ manifest_var == 'DASH_ESCAPED_SIGNED_MANIFEST'):
+ pytest.xfail('dash escaping is known-broken in pgpy')
+
try:
key_loaded = False
m = ManifestFile()
@@ -492,6 +507,10 @@ def test_manifest_load(openpgp_env, manifest_var, key_var, expected):
def test_recursive_manifest_loader(tmp_path, openpgp_env, filename,
manifest_var, key_var, expected):
"""Test Manifest verification via ManifestRecursiveLoader"""
+ if (isinstance(openpgp_env, PGPyEnvironment) and
+ manifest_var == 'DASH_ESCAPED_SIGNED_MANIFEST'):
+ pytest.xfail('dash escaping is known-broken in pgpy')
+
try:
with open_potentially_compressed_path(tmp_path / filename, 'w') as cf:
cf.write(globals()[manifest_var])
@@ -543,7 +562,7 @@ def test_cli(tmp_path, caplog, manifest_var, key_var, expected):
'--no-refresh-keys',
'--require-signed-manifest',
str(tmp_path)])
- if str(OpenPGPNoImplementation('')) in caplog.text:
+ if str(OpenPGPNoImplementation('install gpg')) in caplog.text:
pytest.skip('OpenPGP implementation missing')
eexit = 0 if expected is None else 1
@@ -590,14 +609,18 @@ def test_env_home_after_close():
env.home
-@pytest.fixture
-def privkey_env(openpgp_env):
+@pytest.fixture(params=[IsolatedGPGEnvironment,
+ MockedSystemGPGEnvironment,
+ ])
+def privkey_env(request):
"""Environment with private key loaded"""
try:
- openpgp_env.import_key(io.BytesIO(PRIVATE_KEY))
+ env = request.param()
+ env.import_key(io.BytesIO(PRIVATE_KEY))
except OpenPGPNoImplementation as e:
pytest.skip(str(e))
- return openpgp_env
+ yield env
+ env.close()
TEST_STRING = u'The quick brown fox jumps over the lazy dog'
@@ -716,9 +739,6 @@ def hkp_server(global_hkp_server):
yield global_hkp_server
-COMBINED_PUBLIC_KEYS = OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY
-
-
REFRESH_VARIANTS = [
# manifest, key, server key fpr, server key, expected exception
('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT,
@@ -910,7 +930,7 @@ def test_cli_gpg_wrap(tmp_path, caplog, command, expected, match):
str(tmp_path / '.key.bin'),
'--no-refresh-keys',
'--'] + command)
- if str(OpenPGPNoImplementation('')) in caplog.text:
+ if str(OpenPGPNoImplementation('install gpg')) in caplog.text:
pytest.skip('OpenPGP implementation missing')
assert retval == expected
diff --git a/tox.ini b/tox.ini
index 175327f..a95c708 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = qa,py36,py37,py38,py39,pypy3,py3-nogpg,py3-norequests-noresponses,py3-noresponses
+envlist = qa,py36,py37,py38,py39,pypy3,py3-nogpg,py3-nopgpy,py3-norequests-noresponses,py3-noresponses
skip_missing_interpreters = True
# we operate on sources anyway
skipsdist = True
@@ -18,6 +18,7 @@ deps =
pytest >= 5
pytest-cov
pytest-xdist
+ !nopgpy: pgpy
!norequests: requests
!noresponses: responses
setenv =