diff options
-rw-r--r-- | gemato/exceptions.py | 9 | ||||
-rw-r--r-- | gemato/openpgp.py | 113 | ||||
-rw-r--r-- | tests/test_openpgp.py | 44 | ||||
-rw-r--r-- | tox.ini | 3 |
4 files changed, 153 insertions, 16 deletions
diff --git a/gemato/exceptions.py b/gemato/exceptions.py index aa4e499..6d7e8f3 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -218,9 +218,14 @@ class OpenPGPNoImplementation(GematoException): is available. """ + __slots__ = ['detail'] + + def __init__(self, detail): + super().__init__(detail) + self.detail = detail + def __str__(self): - return ('No supported OpenPGP implementation found (install ' - 'gnupg)') + return f'Requested OpenPGP provider not found ({self.detail})' class ManifestInvalidPath(GematoException): diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 901bfa5..8bce73b 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -15,6 +15,7 @@ import shutil import subprocess import tempfile import urllib.parse +import warnings from gemato.exceptions import ( OpenPGPNoImplementation, @@ -34,6 +35,11 @@ try: except ImportError: requests = None +try: + import pgpy +except ImportError: + pgpy = None + GNUPG = os.environ.get('GNUPG', 'gpg') GNUPGCONF = os.environ.get('GNUPGCONF', 'gpgconf') @@ -212,7 +218,7 @@ class SystemGPGEnvironment: stderr=subprocess.PIPE, env=env) except FileNotFoundError: - raise OpenPGPNoImplementation() + raise OpenPGPNoImplementation('install gpg') out, err = p.communicate(stdin) return (p.wait(), out, err) @@ -471,5 +477,110 @@ debug-level guru return (super()._spawn_gpg(options, stdin, env_override)) +class PGPyEnvironment: + """Stand-alone environment using pgpy library""" + + __slots__ = ['debug', 'keyring', 'proxy'] + + def __init__(self, debug=False, proxy=None): + if pgpy is None: + raise OpenPGPNoImplementation('install PGPy') + self.debug = debug + self.keyring = pgpy.PGPKeyring() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_cb): + pass + + def close(self): + pass + + def import_key(self, keyfile): + with warnings.catch_warnings(record=True) as warns: + try: + key_res = pgpy.PGPKey.from_blob(keyfile.read()) + except ValueError as e: + raise OpenPGPKeyImportError( + f'OpenPGP key import failed: {e}') + fprs = [] + for k in key_res[1].values(): + fprs.extend(self.keyring.load(k)) + + for w in warns: + if str(w.message) == 'Incorrect crc24': + raise OpenPGPKeyImportError( + f'OpenPGP key import failed: {w.message}') + + for fpr in fprs: + with self.keyring.key(fpr) as k: + if k.parent is not None: + try: + verifies = k.parent.verify(k) + except pgpy.errors.PGPError: + logging.debug( + f'Rejecting subkey {fpr} due to missing sig') + self.keyring.unload(k) + else: + if not verifies: + logging.debug( + f'Rejecting subkey {fpr} since parent ' + f'key signature does not check out') + self.keyring.unload(k) + for uid in k.userids: + if uid.selfsig is None: + raise OpenPGPKeyImportError( + f'Self-signature on {uid} missing') + if not k.verify(uid): + raise OpenPGPKeyImportError( + f'Self-signature on {uid} does not verify') + + def verify_file(self, f): + msg = pgpy.PGPMessage.from_blob(f.read()) + assert msg.is_signed + assert len(msg.signatures) == 1 + assert len(msg.signers) == 1 + + signer, = msg.signers + try: + with self.keyring.key(signer) as k: + pk = k + if k.parent is not None: + pk = k.parent + assert pk.parent is None + + vr = k.verify(msg) + if not vr: + raise OpenPGPVerificationFailure( + f'Bad signature made by key {k.fingerprint}') + now = datetime.datetime.utcnow() + sig_expire = msg.signatures[0].expires_at + if sig_expire is not None and sig_expire < now: + raise OpenPGPVerificationFailure( + f'Signature expired at {msg.signatures[0].expires_at}') + if k.expires_at is not None and k.expires_at < now: + raise OpenPGPExpiredKeyFailure( + f'Key {k.fingerprint} expired at {k.expires_at}') + if pk.expires_at is not None and pk.expires_at < now: + raise OpenPGPExpiredKeyFailure( + f'Primary key {pk.fingerprint} expired ' + f'at {k.expires_at}') + if list(k.revocation_signatures): + raise OpenPGPRevokedKeyFailure( + f'Key {pk.fingerprint} was revoked') + if list(pk.revocation_signatures): + raise OpenPGPRevokedKeyFailure( + f'Primary key {pk.fingerprint} was revoked') + return OpenPGPSignatureData( + k.fingerprint, + msg.signatures[0].created, + msg.signatures[0].expires_at, + pk.fingerprint) + except KeyError: + raise OpenPGPVerificationFailure( + f'Key {signer} not in keyring') + + OpenPGPSystemEnvironment = SystemGPGEnvironment OpenPGPEnvironment = IsolatedGPGEnvironment diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 85ff7f4..434df88 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -30,6 +30,7 @@ from gemato.manifest import ManifestFile from gemato.openpgp import ( SystemGPGEnvironment, IsolatedGPGEnvironment, + PGPyEnvironment, get_wkd_url, ) from gemato.recursiveloader import ManifestRecursiveLoader @@ -348,15 +349,21 @@ class MockedSystemGPGEnvironment(SystemGPGEnvironment): @pytest.fixture(params=[IsolatedGPGEnvironment, - MockedSystemGPGEnvironment]) + MockedSystemGPGEnvironment, + PGPyEnvironment, + ]) def openpgp_env(request): """OpenPGP environment fixture""" - env = request.param() + try: + env = request.param() + except OpenPGPNoImplementation as e: + pytest.skip(str(e)) yield env env.close() -@pytest.fixture(params=[IsolatedGPGEnvironment]) +@pytest.fixture(params=[IsolatedGPGEnvironment, + ]) def openpgp_env_with_refresh(request): """OpenPGP environments that support refreshing keys""" env = request.param() @@ -417,6 +424,10 @@ def assert_signature(sig, manifest_var): MANIFEST_VARIANTS) def test_verify_manifest(openpgp_env, manifest_var, key_var, expected): """Test direct Manifest data verification""" + if (isinstance(openpgp_env, PGPyEnvironment) and + manifest_var == 'DASH_ESCAPED_SIGNED_MANIFEST'): + pytest.xfail('dash escaping is known-broken in pgpy') + try: with io.StringIO(globals()[manifest_var]) as f: if expected is None: @@ -454,6 +465,10 @@ def test_verify_untrusted_key(): MANIFEST_VARIANTS) def test_manifest_load(openpgp_env, manifest_var, key_var, expected): """Test Manifest verification via ManifestFile.load()""" + if (isinstance(openpgp_env, PGPyEnvironment) and + manifest_var == 'DASH_ESCAPED_SIGNED_MANIFEST'): + pytest.xfail('dash escaping is known-broken in pgpy') + try: key_loaded = False m = ManifestFile() @@ -492,6 +507,10 @@ def test_manifest_load(openpgp_env, manifest_var, key_var, expected): def test_recursive_manifest_loader(tmp_path, openpgp_env, filename, manifest_var, key_var, expected): """Test Manifest verification via ManifestRecursiveLoader""" + if (isinstance(openpgp_env, PGPyEnvironment) and + manifest_var == 'DASH_ESCAPED_SIGNED_MANIFEST'): + pytest.xfail('dash escaping is known-broken in pgpy') + try: with open_potentially_compressed_path(tmp_path / filename, 'w') as cf: cf.write(globals()[manifest_var]) @@ -543,7 +562,7 @@ def test_cli(tmp_path, caplog, manifest_var, key_var, expected): '--no-refresh-keys', '--require-signed-manifest', str(tmp_path)]) - if str(OpenPGPNoImplementation('')) in caplog.text: + if str(OpenPGPNoImplementation('install gpg')) in caplog.text: pytest.skip('OpenPGP implementation missing') eexit = 0 if expected is None else 1 @@ -590,14 +609,18 @@ def test_env_home_after_close(): env.home -@pytest.fixture -def privkey_env(openpgp_env): +@pytest.fixture(params=[IsolatedGPGEnvironment, + MockedSystemGPGEnvironment, + ]) +def privkey_env(request): """Environment with private key loaded""" try: - openpgp_env.import_key(io.BytesIO(PRIVATE_KEY)) + env = request.param() + env.import_key(io.BytesIO(PRIVATE_KEY)) except OpenPGPNoImplementation as e: pytest.skip(str(e)) - return openpgp_env + yield env + env.close() TEST_STRING = u'The quick brown fox jumps over the lazy dog' @@ -716,9 +739,6 @@ def hkp_server(global_hkp_server): yield global_hkp_server -COMBINED_PUBLIC_KEYS = OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY - - REFRESH_VARIANTS = [ # manifest, key, server key fpr, server key, expected exception ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT, @@ -910,7 +930,7 @@ def test_cli_gpg_wrap(tmp_path, caplog, command, expected, match): str(tmp_path / '.key.bin'), '--no-refresh-keys', '--'] + command) - if str(OpenPGPNoImplementation('')) in caplog.text: + if str(OpenPGPNoImplementation('install gpg')) in caplog.text: pytest.skip('OpenPGP implementation missing') assert retval == expected @@ -1,5 +1,5 @@ [tox] -envlist = qa,py36,py37,py38,py39,pypy3,py3-nogpg,py3-norequests-noresponses,py3-noresponses +envlist = qa,py36,py37,py38,py39,pypy3,py3-nogpg,py3-nopgpy,py3-norequests-noresponses,py3-noresponses skip_missing_interpreters = True # we operate on sources anyway skipsdist = True @@ -18,6 +18,7 @@ deps = pytest >= 5 pytest-cov pytest-xdist + !nopgpy: pgpy !norequests: requests !noresponses: responses setenv = |