diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-08-26 15:26:33 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-08-26 20:57:30 +0200 |
commit | 0ce6088801ce600abc0d25f0603560b8a7113071 (patch) | |
tree | 499fdb8ec45bd42350b5a09f6f3d249c28d57085 | |
parent | c02c24661520510739e34dcce64ee3e5904a4635 (diff) | |
download | gemato-0ce6088801ce600abc0d25f0603560b8a7113071.tar.gz |
tests: Rewrite test_openpgp in pure pytest
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r-- | README.rst | 4 | ||||
-rw-r--r-- | tests/test_openpgp.py | 1870 | ||||
-rw-r--r-- | tests/testutil.py | 57 |
3 files changed, 545 insertions, 1386 deletions
@@ -78,7 +78,8 @@ through 3.7 and PyPy3. gemato core depends only on standard Python library modules. Additionally, OpenPGP requires system install of GnuPG 2.2+ -and requests_ Python module. Tests require responses_ for mocking. +and requests_ Python module. Tests require pytest_, and responses_ +for mocking. References and footnotes @@ -87,4 +88,5 @@ References and footnotes (https://www.gentoo.org/glep/glep-0074.html) .. _requests: https://2.python-requests.org/en/master/ +.. _pytest: https://docs.pytest.org/en/stable/ .. _responses: https://github.com/getsentry/responses diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 2fd5365..7489a96 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -5,10 +5,9 @@ import datetime import io -import os.path -import shutil -import tempfile -import unittest +import os + +import pytest import gemato.cli import gemato.compression @@ -23,12 +22,11 @@ from tests.keydata import ( OTHER_PUBLIC_KEY, OTHER_PUBLIC_KEY_UID, OTHER_PUBLIC_KEY_SIG, UNEXPIRE_SIG, ) -from tests.testutil import HKPServerTestCase +from tests.testutil import hkp_server + -try: - import responses -except ImportError: - responses = None +# workaround pyflakes +hkp_server = hkp_server VALID_PUBLIC_KEY = PUBLIC_KEY + UID + PUBLIC_KEY_SIG @@ -203,1367 +201,527 @@ def strip_openpgp(text): return '\n'.join(lines[start+1:stop-start+2]) + '\n' -def need_responses(func): - def skipper(*args, **kwargs): - raise unittest.SkipTest('responses module is needed for WKD tests') - if responses is None: - return skipper - return responses.activate(func) - - -class SignedManifestTest(unittest.TestCase): - """ - Test whether signed Manifest is read correctly. - """ - - def test_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f, verify_openpgp=False) - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - def test_dash_escaped_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f: +MANIFESTS_GOOD_SIG = [ + 'SIGNED_MANIFEST', + 'DASH_ESCAPED_SIGNED_MANIFEST', + 'SUBKEY_SIGNED_MANIFEST', +] +MANIFESTS_BAD_SIG = [ + 'MODIFIED_SIGNED_MANIFEST', + 'EXPIRED_SIGNED_MANIFEST' +] + + +@pytest.mark.parametrize('manifest_var', + MANIFESTS_GOOD_SIG + MANIFESTS_BAD_SIG) +def test_noverify_goodish_manifest_load(manifest_var): + """Test Manifest files that should succeed (OpenPGP disabled)""" + m = gemato.manifest.ManifestFile() + with io.StringIO(globals()[manifest_var]) as f: + m.load(f, verify_openpgp=False) + assert m.find_timestamp() is not None + assert m.find_path_entry('myebuild-0.ebuild') is not None + assert not m.openpgp_signed + assert m.openpgp_signature is None + + +SIGNED_MANIFEST_JUNK_BEFORE = 'IGNORE test\n' + SIGNED_MANIFEST +SIGNED_MANIFEST_JUNK_AFTER = SIGNED_MANIFEST + 'IGNORE test\n' +SIGNED_MANIFEST_CUT_BEFORE_DATA = '\n'.join( + SIGNED_MANIFEST.splitlines()[:3]) +SIGNED_MANIFEST_CUT_BEFORE_SIGNATURE = '\n'.join( + SIGNED_MANIFEST.splitlines()[:7]) +SIGNED_MANIFEST_CUT_BEFORE_END = '\n'.join( + SIGNED_MANIFEST.splitlines()[:15]) + + +@pytest.mark.parametrize('manifest_var,expected', + [('SIGNED_MANIFEST_JUNK_BEFORE', + gemato.exceptions.ManifestUnsignedData), + ('SIGNED_MANIFEST_JUNK_AFTER', + gemato.exceptions.ManifestUnsignedData), + ('SIGNED_MANIFEST_CUT_BEFORE_DATA', + gemato.exceptions.ManifestSyntaxError), + ('SIGNED_MANIFEST_CUT_BEFORE_SIGNATURE', + gemato.exceptions.ManifestSyntaxError), + ('SIGNED_MANIFEST_CUT_BEFORE_END', + gemato.exceptions.ManifestSyntaxError), + ]) +def test_noverify_bad_manifest_load(manifest_var, expected): + """Test Manifest files that should fail""" + m = gemato.manifest.ManifestFile() + with io.StringIO(globals()[manifest_var]) as f: + with pytest.raises(expected): m.load(f, verify_openpgp=False) - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - def test_modified_manifest_load(self): - """ - Modified Manifest should load correctly since we do not enforce - implicit verification. - """ - m = gemato.manifest.ManifestFile() - with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f: - m.load(f, verify_openpgp=False) - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - def test_junk_before_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO('IGNORE test\n' + SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.ManifestUnsignedData, - m.load, f, verify_openpgp=False) - - def test_junk_after_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST + 'IGNORE test\n') as f: - self.assertRaises(gemato.exceptions.ManifestUnsignedData, - m.load, f, verify_openpgp=False) - - def test_signed_manifest_terminated_before_data(self): - m = gemato.manifest.ManifestFile() - with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:3])) as f: - self.assertRaises(gemato.exceptions.ManifestSyntaxError, - m.load, f, verify_openpgp=False) - - def test_signed_manifest_terminated_before_signature(self): - m = gemato.manifest.ManifestFile() - with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:7])) as f: - self.assertRaises(gemato.exceptions.ManifestSyntaxError, - m.load, f, verify_openpgp=False) - - def test_signed_manifest_terminated_before_end(self): - m = gemato.manifest.ManifestFile() - with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:15])) as f: - self.assertRaises(gemato.exceptions.ManifestSyntaxError, - m.load, f, verify_openpgp=False) - - def test_recursive_manifest_loader(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(MODIFIED_SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest'), - verify_openpgp=False) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - finally: - shutil.rmtree(d) - - def test_cli(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(MODIFIED_SIGNED_MANIFEST) - - os.mkdir(os.path.join(d, 'eclass')) - with io.open(os.path.join(d, 'eclass/Manifest'), 'w'): - pass - with io.open(os.path.join(d, 'myebuild-0.ebuild'), 'wb') as f: - f.write(b'12345678901234567890123456789012') - with io.open(os.path.join(d, 'metadata.xml'), 'w'): - pass - - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - '--no-openpgp-verify', d]), - 0) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_save_manifest(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(MODIFIED_SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest'), - verify_openpgp=False) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - m.save_manifest('Manifest') - - with io.open(os.path.join(d, 'Manifest'), 'r') as f: - self.assertEqual(f.read(), - strip_openpgp(MODIFIED_SIGNED_MANIFEST)) - finally: - shutil.rmtree(d) - - -class OpenPGPCorrectKeyTest(unittest.TestCase): - """ - Tests performed with correct OpenPGP key set. - """ - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - def test_verify_manifest(self): - with io.StringIO(SIGNED_MANIFEST) as f: - sig = self.env.verify_file(f) - self.assertEqual(sig.fingerprint, KEY_FINGERPRINT) - self.assertEqual(sig.timestamp, SIG_TIMESTAMP) - self.assertIsNone(sig.expire_timestamp) - self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT) - - def test_verify_dash_escaped_manifest(self): - with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f: - sig = self.env.verify_file(f) - self.assertEqual(sig.fingerprint, KEY_FINGERPRINT) - self.assertEqual(sig.timestamp, SIG_TIMESTAMP) - self.assertIsNone(sig.expire_timestamp) - self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT) - - def test_verify_modified_manifest(self): - with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, - self.env.verify_file, f) - - def test_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f, openpgp_env=self.env) - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertTrue(m.openpgp_signed) - self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT) - self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP) - self.assertIsNone(m.openpgp_signature.expire_timestamp) - self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT) - - def test_dash_escaped_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f: - m.load(f, openpgp_env=self.env) - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertTrue(m.openpgp_signed) - self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT) - self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP) - self.assertIsNone(m.openpgp_signature.expire_timestamp) - self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT) - - def test_modified_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, - m.load, f, openpgp_env=self.env) - - def test_recursive_manifest_loader(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest'), - verify_openpgp=True, - openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT) - self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP) - self.assertIsNone(m.openpgp_signature.expire_timestamp) - self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_compressed(self): - d = tempfile.mkdtemp() - try: - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'w') as cf: - cf.write(SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest.gz'), - verify_openpgp=True, - openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT) - self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP) - self.assertIsNone(m.openpgp_signature.expire_timestamp) - self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT) - finally: - shutil.rmtree(d) - - def test_cli(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, '.key.bin'), 'wb') as f: - f.write(VALID_PUBLIC_KEY) - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - - os.mkdir(os.path.join(d, 'eclass')) - with io.open(os.path.join(d, 'eclass/Manifest'), 'w'): - pass - with io.open(os.path.join(d, 'myebuild-0.ebuild'), 'w'): - pass - with io.open(os.path.join(d, 'metadata.xml'), 'w'): - pass - - self.assertEqual( - gemato.cli.main(['gemato', 'verify', - '--openpgp-key', os.path.join(d, '.key.bin'), - '--no-refresh-keys', - '--require-signed-manifest', d]), - 0) - finally: - shutil.rmtree(d) - - -class OpenPGPNoKeyTest(unittest.TestCase): - """ - Tests performed without correct OpenPGP key set. - """ - - expected_exception = gemato.exceptions.OpenPGPVerificationFailure - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - - def tearDown(self): - self.env.close() - - def test_verify_manifest(self): - with io.StringIO(SIGNED_MANIFEST) as f: - try: - self.assertRaises(self.expected_exception, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_manifest_load(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - try: - self.assertRaises(self.expected_exception, - m.load, f, openpgp_env=self.env) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_manifest_load_exception_caught(self): - """ - Test that the Manifest is loaded even if exception is raised. - """ - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - try: - m.load(f, openpgp_env=self.env) - except self.expected_exception: - pass - except gemato.exceptions.OpenPGPNoImplementation: - pass - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - def test_recursive_manifest_loader(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - - try: - self.assertRaises(self.expected_exception, - gemato.recursiveloader.ManifestRecursiveLoader, - os.path.join(d, 'Manifest'), - verify_openpgp=True, - openpgp_env=self.env) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_compressed(self): - d = tempfile.mkdtemp() - try: - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'w') as cf: - cf.write(SIGNED_MANIFEST) - - try: - self.assertRaises(self.expected_exception, - gemato.recursiveloader.ManifestRecursiveLoader, - os.path.join(d, 'Manifest.gz'), - verify_openpgp=True, - openpgp_env=self.env) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - finally: - shutil.rmtree(d) - - def test_find_top_level_manifest(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - - self.assertEqual( - gemato.find_top_level.find_top_level_manifest(d), - os.path.join(d, 'Manifest')) - finally: - shutil.rmtree(d) - - -class OpenPGPExpiredKeyTest(OpenPGPNoKeyTest): - """ - Tests performed with an expired OpenPGP key. - """ - - expected_exception = gemato.exceptions.OpenPGPExpiredKeyFailure - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(EXPIRED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - -class OpenPGPRevokedKeyTest(OpenPGPNoKeyTest): - """ - Tests performed with a revoked OpenPGP key. - """ - - expected_exception = gemato.exceptions.OpenPGPRevokedKeyFailure - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - -class OpenPGPExpiredSignatureTest(unittest.TestCase): - """ - Tests for handling of expired signature. - """ - expected_exception = gemato.exceptions.OpenPGPVerificationFailure +@pytest.mark.parametrize('write_back', [False, True]) +def test_noverify_recursive_manifest_loader(tmp_path, write_back): + """Test reading signed Manifest""" + with open(tmp_path / 'Manifest', 'w') as f: + f.write(MODIFIED_SIGNED_MANIFEST) + + m = gemato.recursiveloader.ManifestRecursiveLoader( + tmp_path / 'Manifest', + verify_openpgp=False) + assert not m.openpgp_signed + assert m.openpgp_signature is None + + if write_back: + m.save_manifest('Manifest') + with open(tmp_path / 'Manifest', 'r') as f: + assert f.read() == strip_openpgp(MODIFIED_SIGNED_MANIFEST) + + +def test_noverify_load_cli(tmp_path): + """Test reading signed Manifest via CLI""" + with open(tmp_path / 'Manifest', 'w') as f: + f.write(MODIFIED_SIGNED_MANIFEST) + os.mkdir(tmp_path / 'eclass') + with open(tmp_path / 'eclass' / 'Manifest', 'w'): + pass + with open(tmp_path / 'myebuild-0.ebuild', 'wb') as f: + f.write(b'12345678901234567890123456789012') + with open(tmp_path / 'metadata.xml', 'wb'): + pass + + assert 0 == gemato.cli.main(['gemato', 'verify', + '--no-openpgp-verify', str(tmp_path)]) + + +@pytest.fixture +def openpgp_env(): + """OpenPGP environment fixture""" + env = gemato.openpgp.OpenPGPEnvironment() + yield env + env.close() + + +@pytest.fixture +def openpgp_env_valid_key(openpgp_env): + """OpenPGP environment with good key loaded""" + try: + openpgp_env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + yield openpgp_env + + +MANIFEST_VARIANTS = [ + # manifest, key, expected fpr/exception + # == good manifests == + ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', None), + ('DASH_ESCAPED_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', None), + ('SUBKEY_SIGNED_MANIFEST', 'VALID_KEY_SUBKEY', None), + # == using private key == + ('SIGNED_MANIFEST', 'PRIVATE_KEY', None), + # == bad manifests == + ('MODIFIED_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', + gemato.exceptions.OpenPGPVerificationFailure), + ('EXPIRED_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', + gemato.exceptions.OpenPGPVerificationFailure), + # == bad keys == + ('SIGNED_MANIFEST', None, + gemato.exceptions.OpenPGPVerificationFailure), + ('SIGNED_MANIFEST', 'EXPIRED_PUBLIC_KEY', + gemato.exceptions.OpenPGPExpiredKeyFailure), + ('SIGNED_MANIFEST', 'REVOKED_PUBLIC_KEY', + gemato.exceptions.OpenPGPRevokedKeyFailure), +] + + +def assert_signature(sig, manifest_var): + """Make assertions about the signature""" + if manifest_var == 'SUBKEY_SIGNED_MANIFEST': + assert sig.fingerprint == SUBKEY_FINGERPRINT + assert sig.timestamp == SUBKEY_SIG_TIMESTAMP + assert sig.expire_timestamp is None + assert sig.primary_key_fingerprint == KEY_FINGERPRINT + else: + assert sig.fingerprint == KEY_FINGERPRINT + assert sig.timestamp == SIG_TIMESTAMP + assert sig.expire_timestamp is None + assert sig.primary_key_fingerprint == KEY_FINGERPRINT + + +@pytest.mark.parametrize('manifest_var,key_var,expected', + MANIFEST_VARIANTS) +def test_verify_manifest(openpgp_env, manifest_var, key_var, expected): + """Test direct Manifest data verification""" + try: + if key_var is not None: + with io.BytesIO(globals()[key_var]) as f: + openpgp_env.import_key(f) + + with io.StringIO(globals()[manifest_var]) as f: + if expected is None: + sig = openpgp_env.verify_file(f) + assert_signature(sig, manifest_var) + else: + with pytest.raises(expected): + openpgp_env.verify_file(f) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + + +@pytest.mark.parametrize('manifest_var,key_var,expected', + MANIFEST_VARIANTS) +def test_manifest_load(openpgp_env, manifest_var, key_var, expected): + """Test Manifest verification via ManifestFile.load()""" + try: + if key_var is not None: + with io.BytesIO(globals()[key_var]) as f: + openpgp_env.import_key(f) - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - def test_verify_manifest(self): - with io.StringIO(EXPIRED_SIGNED_MANIFEST) as f: - try: - self.assertRaises(self.expected_exception, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_manifest_load(self): m = gemato.manifest.ManifestFile() - with io.StringIO(EXPIRED_SIGNED_MANIFEST) as f: - try: - self.assertRaises(self.expected_exception, - m.load, f, openpgp_env=self.env) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_manifest_load_exception_caught(self): - """ - Test that the Manifest is loaded even if exception is raised. - """ - m = gemato.manifest.ManifestFile() - with io.StringIO(EXPIRED_SIGNED_MANIFEST) as f: - try: - m.load(f, openpgp_env=self.env) - except self.expected_exception: - pass - except gemato.exceptions.OpenPGPNoImplementation: - pass - self.assertIsNotNone(m.find_timestamp()) - self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - def test_recursive_manifest_loader(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(EXPIRED_SIGNED_MANIFEST) - - try: - self.assertRaises(self.expected_exception, - gemato.recursiveloader.ManifestRecursiveLoader, - os.path.join(d, 'Manifest'), - verify_openpgp=True, - openpgp_env=self.env) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_compressed(self): - d = tempfile.mkdtemp() - try: - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'w') as cf: - cf.write(EXPIRED_SIGNED_MANIFEST) - - try: - self.assertRaises(self.expected_exception, - gemato.recursiveloader.ManifestRecursiveLoader, - os.path.join(d, 'Manifest.gz'), - verify_openpgp=True, - openpgp_env=self.env) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - finally: - shutil.rmtree(d) - - def test_find_top_level_manifest(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(EXPIRED_SIGNED_MANIFEST) - - self.assertEqual( - gemato.find_top_level.find_top_level_manifest(d), - os.path.join(d, 'Manifest')) - finally: - shutil.rmtree(d) - - -class OpenPGPContextManagerTest(unittest.TestCase): - """ - Test the context manager API for OpenPGPEnvironment. - """ - - def test_import_key(self): - with gemato.openpgp.OpenPGPEnvironment() as env: - try: - env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_import_malformed_key(self): - with gemato.openpgp.OpenPGPEnvironment() as env: - try: - self.assertRaises(gemato.exceptions.OpenPGPKeyImportError, - env.import_key, - io.BytesIO(MALFORMED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_import_no_keys(self): - with gemato.openpgp.OpenPGPEnvironment() as env: - try: - self.assertRaises(gemato.exceptions.OpenPGPKeyImportError, - env.import_key, - io.BytesIO(b'')) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_import_forged_key(self): - with gemato.openpgp.OpenPGPEnvironment() as env: - try: - self.assertRaises(gemato.exceptions.OpenPGPKeyImportError, - env.import_key, - io.BytesIO(FORGED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_verify_manifest(self): - with io.StringIO(SIGNED_MANIFEST) as f: - with gemato.openpgp.OpenPGPEnvironment() as env: - try: - try: - env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - sig = env.verify_file(f) - self.assertEqual(sig.fingerprint, KEY_FINGERPRINT) - self.assertEqual(sig.timestamp, SIG_TIMESTAMP) - self.assertIsNone(sig.expire_timestamp) - self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - def test_double_close(self): - with gemato.openpgp.OpenPGPEnvironment() as env: - env.close() - - def test_home_after_close(self): - with gemato.openpgp.OpenPGPEnvironment() as env: - env.close() - with self.assertRaises(AssertionError): - env.home - - -class OpenPGPPrivateKeyTest(unittest.TestCase): - """ - Tests performed with the private key available. - """ - - TEST_STRING = u'The quick brown fox jumps over the lazy dog' - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(PRIVATE_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - def test_verify_manifest(self): - with io.StringIO(SIGNED_MANIFEST) as f: - sig = self.env.verify_file(f) - self.assertEqual(sig.fingerprint, KEY_FINGERPRINT) - self.assertEqual(sig.timestamp, SIG_TIMESTAMP) - self.assertIsNone(sig.expire_timestamp) - self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT) - - def test_sign_data(self): - with io.StringIO(self.TEST_STRING) as f: - with io.StringIO() as wf: - self.env.clear_sign_file(f, wf) - wf.seek(0) - self.env.verify_file(wf) - - def test_sign_data_keyid(self): - with io.StringIO(self.TEST_STRING) as f: - with io.StringIO() as wf: - self.env.clear_sign_file(f, wf, keyid=PRIVATE_KEY_ID) - wf.seek(0) - self.env.verify_file(wf) - - def test_dump_signed_manifest(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f, openpgp_env=self.env) - with io.StringIO() as f: - m.dump(f, openpgp_env=self.env) - f.seek(0) - m.load(f, openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - self.assertIsNotNone(m.openpgp_signature) - - def test_dump_signed_manifest_keyid(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f, openpgp_env=self.env) - with io.StringIO() as f: - m.dump(f, openpgp_keyid=PRIVATE_KEY_ID, openpgp_env=self.env) - f.seek(0) - m.load(f, openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - self.assertIsNotNone(m.openpgp_signature) - - def test_dump_force_signed_manifest(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f, verify_openpgp=False, openpgp_env=self.env) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - with io.StringIO() as f: - m.dump(f, sign_openpgp=True, openpgp_env=self.env) - f.seek(0) - m.load(f, openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - self.assertIsNotNone(m.openpgp_signature) - - def test_dump_force_unsigned_manifest(self): - m = gemato.manifest.ManifestFile() - with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f, openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - with io.StringIO() as f: - m.dump(f, sign_openpgp=False, openpgp_env=self.env) - f.seek(0) - m.load(f, openpgp_env=self.env) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - def test_recursive_manifest_loader_save_manifest(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - + with io.StringIO(globals()[manifest_var]) as f: + if expected is None: + m.load(f, openpgp_env=openpgp_env) + assert m.openpgp_signed + assert_signature(m.openpgp_signature, manifest_var) + else: + with pytest.raises(expected): + m.load(f, openpgp_env=openpgp_env) + assert not m.openpgp_signed + assert m.openpgp_signature is None + + # Manifest entries should be loaded even if verification failed + assert m.find_timestamp() is not None + assert m.find_path_entry('myebuild-0.ebuild') is not None + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + + +@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz']) +@pytest.mark.parametrize('manifest_var,key_var,expected', + MANIFEST_VARIANTS) +def test_recursive_manifest_loader(tmp_path, openpgp_env, filename, + manifest_var, key_var, expected): + """Test Manifest verification via ManifestRecursiveLoader""" + try: + if key_var is not None: + with io.BytesIO(globals()[key_var]) as f: + openpgp_env.import_key(f) + + with gemato.compression.open_potentially_compressed_path( + tmp_path / filename, 'w') as cf: + cf.write(globals()[manifest_var]) + + if expected is None: m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest'), + tmp_path / filename, + verify_openpgp=True, + openpgp_env=openpgp_env) + assert m.openpgp_signed + assert_signature(m.openpgp_signature, manifest_var) + else: + with pytest.raises(expected): + gemato.recursiveloader.ManifestRecursiveLoader( + tmp_path / filename, verify_openpgp=True, - openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - - m.save_manifest('Manifest') - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(d, 'Manifest'), 'r') as f: - m2.load(f, openpgp_env=self.env) - self.assertTrue(m2.openpgp_signed) - self.assertIsNotNone(m.openpgp_signature) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_save_manifest_compressed(self): - d = tempfile.mkdtemp() - try: - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'w') as cf: - cf.write(SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest.gz'), - verify_openpgp=True, - openpgp_env=self.env) - self.assertTrue(m.openpgp_signed) - - m.save_manifest('Manifest.gz') - m2 = gemato.manifest.ManifestFile() - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'r') as cf: - m2.load(cf, openpgp_env=self.env) - self.assertTrue(m2.openpgp_signed) - self.assertIsNotNone(m.openpgp_signature) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_save_manifest_force_sign(self): - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest'), - verify_openpgp=False, - sign_openpgp=True, - openpgp_env=self.env) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - m.save_manifest('Manifest') - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(d, 'Manifest'), 'r') as f: - m2.load(f, openpgp_env=self.env) - self.assertTrue(m2.openpgp_signed) - self.assertIsNotNone(m2.openpgp_signature) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_save_manifest_compressed_force_sign(self): - d = tempfile.mkdtemp() - try: - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'w') as cf: - cf.write(SIGNED_MANIFEST) - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest.gz'), - verify_openpgp=False, - sign_openpgp=True, - openpgp_env=self.env) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - m.save_manifest('Manifest.gz') - m2 = gemato.manifest.ManifestFile() - with gemato.compression.open_potentially_compressed_path( - os.path.join(d, 'Manifest.gz'), 'r') as cf: - m2.load(cf, openpgp_env=self.env) - self.assertTrue(m2.openpgp_signed) - self.assertIsNotNone(m2.openpgp_signature) - finally: - shutil.rmtree(d) - - def test_recursive_manifest_loader_save_submanifest_force_sign(self): - """ - Test that sub-Manifests are not signed. - """ - d = tempfile.mkdtemp() - try: - with io.open(os.path.join(d, 'Manifest'), 'w') as f: - f.write(SIGNED_MANIFEST) - os.mkdir(os.path.join(d, 'eclass')) - with io.open(os.path.join(d, 'eclass/Manifest'), 'w'): - pass - - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(d, 'Manifest'), - verify_openpgp=False, - sign_openpgp=True, - openpgp_env=self.env) - self.assertFalse(m.openpgp_signed) - self.assertIsNone(m.openpgp_signature) - - m.load_manifest('eclass/Manifest') - m.save_manifest('eclass/Manifest') - - m2 = gemato.manifest.ManifestFile() - with io.open(os.path.join(d, 'eclass/Manifest'), 'r') as f: - m2.load(f, openpgp_env=self.env) - self.assertFalse(m2.openpgp_signed) - self.assertIsNone(m2.openpgp_signature) - finally: - shutil.rmtree(d) - - -class OpenPGPRefreshTest(HKPServerTestCase): - """ - Test that refresh_keys() correctly handles revocation. - """ - - SERVER_KEYS = { - KEY_FINGERPRINT: REVOKED_PUBLIC_KEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPRefreshTest, self).tearDown() - - def test_refresh_keys(self): - try: - with io.StringIO(SIGNED_MANIFEST) as f: - self.env.verify_file(f) - - self.env.refresh_keys(allow_wkd=False, - keyserver=self.server_addr) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPFailRefreshTest(HKPServerTestCase): - """ - Test that refresh_keys() correctly handles missing key on server. - """ - - SERVER_KEYS = {} - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPFailRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPFailRefreshTest, self).tearDown() - - def test_refresh_keys(self): - try: - self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, - self.env.refresh_keys, allow_wkd=False, - keyserver=self.server_addr) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPUnrevokeRefreshTest(HKPServerTestCase): - """ - Test that refresh_keys() does not ignore local revocation when - keyserver sends outdated (non-revoked) key. - """ - - SERVER_KEYS = { - KEY_FINGERPRINT: VALID_PUBLIC_KEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPUnrevokeRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPUnrevokeRefreshTest, self).tearDown() - - def test_refresh_keys(self): - try: - self.env.refresh_keys(allow_wkd=False, - keyserver=self.server_addr) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPFakeKeyRefreshTest(HKPServerTestCase): - """ - Test that refresh_keys() does not allow maliciously replacing key - with another. - """ - - SERVER_KEYS = { - OTHER_KEY_FINGERPRINT: VALID_PUBLIC_KEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPFakeKeyRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPFakeKeyRefreshTest, self).tearDown() - - def test_refresh_keys(self): - try: - self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, - self.env.refresh_keys, allow_wkd=False, - keyserver=self.server_addr) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPWKDRefreshTest(unittest.TestCase): - """ - Test that WKD variant of refresh_keys() correctly handles - revocation. - """ - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - @need_responses - def test_refresh_keys(self): - try: - with io.StringIO(SIGNED_MANIFEST) as f: - self.env.verify_file(f) - - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=REVOKED_PUBLIC_KEY, - content_type='application/pgp-keys') - self.env.refresh_keys(allow_wkd=True) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase): - """ - Test that WKD variant of refresh_keys() correctly falls back - to keyserver ops. - """ - - SERVER_KEYS = { - KEY_FINGERPRINT: REVOKED_PUBLIC_KEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPWKDFallbackRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPWKDFallbackRefreshTest, self).tearDown() - - @need_responses - def test_refresh_keys(self): - try: - with io.StringIO(SIGNED_MANIFEST) as f: - self.env.verify_file(f) - + openpgp_env=openpgp_env) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + + +@pytest.mark.parametrize('manifest_var,key_var,expected', + [(m, k, e) for m, k, e in MANIFEST_VARIANTS + if k is not None]) +def test_cli(tmp_path, caplog, manifest_var, key_var, expected): + """Test Manifest verification via CLI""" + with open(tmp_path / '.key.bin', 'wb') as f: + f.write(globals()[key_var]) + with open(tmp_path / 'Manifest', 'w') as f: + f.write(globals()[manifest_var]) + os.mkdir(tmp_path / 'eclass') + with open(tmp_path / 'eclass' / 'Manifest', 'w'): + pass + with open(tmp_path / 'myebuild-0.ebuild', 'wb') as f: + if manifest_var == 'MODIFIED_SIGNED_MANIFEST': + f.write(b'12345678901234567890123456789012') + with open(tmp_path / 'metadata.xml', 'wb'): + pass + + eexit = 0 if expected is None else 1 + assert eexit == gemato.cli.main(['gemato', 'verify', + '--openpgp-key', + str(tmp_path / '.key.bin'), + '--no-refresh-keys', + '--require-signed-manifest', + str(tmp_path)]) + + if expected is not None: + assert str(expected('')) in caplog.text + + +EMPTY_DATA = b'' + + +@pytest.mark.parametrize('key_var,success', [('VALID_PUBLIC_KEY', True), + ('MALFORMED_PUBLIC_KEY', False), + ('EMPTY_DATA', False), + ('FORGED_PUBLIC_KEY', False)]) +def test_env_import_key(openpgp_env, key_var, success): + """Test importing valid and invalid keys""" + try: + if success: + openpgp_env.import_key(io.BytesIO(globals()[key_var])) + else: + with pytest.raises(gemato.exceptions.OpenPGPKeyImportError): + openpgp_env.import_key(io.BytesIO(globals()[key_var])) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + + +def test_env_double_close(): + """Test that env can be closed multiple times""" + with gemato.openpgp.OpenPGPEnvironment() as env: + env.close() + + +def test_env_home_after_close(): + """Test that .home can not be referenced after closing""" + with gemato.openpgp.OpenPGPEnvironment() as env: + env.close() + with pytest.raises(AssertionError): + env.home + + +@pytest.fixture +def privkey_env(openpgp_env): + """Environment with private key loaded""" + try: + openpgp_env.import_key(io.BytesIO(PRIVATE_KEY)) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + return openpgp_env + + +TEST_STRING = u'The quick brown fox jumps over the lazy dog' + + +@pytest.mark.parametrize('keyid', [None, PRIVATE_KEY_ID]) +def test_sign_data(privkey_env, keyid): + """Test signing data""" + with io.StringIO(TEST_STRING) as f: + with io.StringIO() as wf: + privkey_env.clear_sign_file(f, wf, keyid=keyid) + wf.seek(0) + privkey_env.verify_file(wf) + + +@pytest.mark.parametrize('keyid', [None, PRIVATE_KEY_ID]) +@pytest.mark.parametrize('sign', [None, False, True]) +def test_dump_signed_manifest(privkey_env, keyid, sign): + """Test dumping a signed Manifest""" + m = gemato.manifest.ManifestFile() + verify = True if sign is None else False + with io.StringIO(SIGNED_MANIFEST) as f: + m.load(f, verify_openpgp=verify, openpgp_env=privkey_env) + assert m.openpgp_signed == verify + + with io.StringIO() as f: + m.dump(f, openpgp_keyid=keyid, openpgp_env=privkey_env, + sign_openpgp=sign) + f.seek(0) + m.load(f, openpgp_env=privkey_env) + if sign is not False: + assert m.openpgp_signed + assert m.openpgp_signature is not None + else: + assert not m.openpgp_signed + assert m.openpgp_signature is None + + +@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz']) +@pytest.mark.parametrize('sign', [None, True]) +def test_recursive_manifest_loader_save_manifest(tmp_path, privkey_env, + filename, sign): + """Test signing Manifests via ManifestRecursiveLoader""" + with gemato.compression.open_potentially_compressed_path( + tmp_path / filename, 'w') as cf: + cf.write(SIGNED_MANIFEST) + + verify = not sign + m = gemato.recursiveloader.ManifestRecursiveLoader( + tmp_path / filename, + verify_openpgp=verify, + sign_openpgp=sign, + openpgp_env=privkey_env) + assert m.openpgp_signed == verify + + m.save_manifest(filename) + m2 = gemato.manifest.ManifestFile() + with gemato.compression.open_potentially_compressed_path( + tmp_path / filename, 'r') as cf: + m2.load(cf, openpgp_env=privkey_env) + assert m2.openpgp_signed + assert m2.openpgp_signature is not None + + +def test_recursive_manifest_loader_save_submanifest(tmp_path, privkey_env): + """Test that sub-Manifests are not signed""" + with open(tmp_path / 'Manifest', 'w') as f: + f.write(SIGNED_MANIFEST) + os.mkdir(tmp_path / 'eclass') + with open(tmp_path / 'eclass' / 'Manifest', 'w'): + pass + + m = gemato.recursiveloader.ManifestRecursiveLoader( + tmp_path / 'Manifest', + verify_openpgp=False, + sign_openpgp=True, + openpgp_env=privkey_env) + assert not m.openpgp_signed + assert m.openpgp_signature is None + + m.load_manifest('eclass/Manifest') + m.save_manifest('eclass/Manifest') + + m2 = gemato.manifest.ManifestFile() + with open(tmp_path / 'eclass' / 'Manifest', 'r') as f: + m2.load(f, openpgp_env=privkey_env) + assert not m2.openpgp_signed + assert m2.openpgp_signature is None + + +COMBINED_PUBLIC_KEYS = OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY + + +REFRESH_VARIANTS = [ + # manifest, key, server key fpr, server key, expected exception + ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT, + 'VALID_PUBLIC_KEY', None), + ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT, + 'REVOKED_PUBLIC_KEY', gemato.exceptions.OpenPGPRevokedKeyFailure), + # test fetching subkey for primary key + ('SUBKEY_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT, + 'VALID_KEY_SUBKEY', None), + # refresh should fail if key is not on server + ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', None, None, + gemato.exceptions.OpenPGPKeyRefreshError), + # unrevocation should not be possible + ('SIGNED_MANIFEST', 'REVOKED_PUBLIC_KEY', KEY_FINGERPRINT, + 'VALID_PUBLIC_KEY', gemato.exceptions.OpenPGPRevokedKeyFailure), + # unexpiration should be possible + ('SIGNED_MANIFEST', 'EXPIRED_PUBLIC_KEY', KEY_FINGERPRINT, + 'UNEXPIRE_PUBLIC_KEY', None), + # make sure server can't malicously inject or replace key + ('SIGNED_MANIFEST', 'OTHER_VALID_PUBLIC_KEY', OTHER_KEY_FINGERPRINT, + 'VALID_PUBLIC_KEY', gemato.exceptions.OpenPGPKeyRefreshError), + ('SIGNED_MANIFEST', 'OTHER_VALID_PUBLIC_KEY', OTHER_KEY_FINGERPRINT, + 'COMBINED_PUBLIC_KEYS', gemato.exceptions.OpenPGPRuntimeError), + # test that forged keys are rejected + ('SIGNED_MANIFEST', 'EXPIRED_PUBLIC_KEY', KEY_FINGERPRINT, + 'FORGED_UNEXPIRE_KEY', gemato.exceptions.OpenPGPExpiredKeyFailure), + ('SUBKEY_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT, + 'FORGED_SUBKEY', gemato.exceptions.OpenPGPVerificationFailure), +] + + +@pytest.mark.parametrize( + 'manifest_var,key_var,server_key_fpr,server_key_var,expected', + REFRESH_VARIANTS) +def test_refresh_hkp(openpgp_env, hkp_server, manifest_var, key_var, + server_key_fpr, server_key_var, expected): + """Test refreshing against a HKP keyserver""" + try: + if key_var is not None: + with io.BytesIO(globals()[key_var]) as f: + openpgp_env.import_key(f) + + if server_key_var is not None: + hkp_server.keys[server_key_fpr] = globals()[server_key_var] + + if expected is None: + openpgp_env.refresh_keys(allow_wkd=False, + keyserver=hkp_server.addr) + with io.StringIO(globals()[manifest_var]) as f: + openpgp_env.verify_file(f) + else: + with pytest.raises(expected): + openpgp_env.refresh_keys(allow_wkd=False, + keyserver=hkp_server.addr) + with io.StringIO(globals()[manifest_var]) as f: + openpgp_env.verify_file(f) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + + +@pytest.mark.parametrize( + 'manifest_var,key_var,server_key_fpr,server_key_var,expected', + REFRESH_VARIANTS) +def test_refresh_wkd(openpgp_env, manifest_var, key_var, server_key_fpr, + server_key_var, expected): + """Test refreshing against WKD""" + if key_var == 'EXPIRED_PUBLIC_KEY': + pytest.skip('TODO: expired public key lacks UID with email') + + with pytest.importorskip('responses').RequestsMock() as responses: + try: + if key_var is not None: + with io.BytesIO(globals()[key_var]) as f: + openpgp_env.import_key(f) + + if server_key_var is not None: + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + body=globals()[server_key_var], + content_type='application/pgp-keys') + else: + responses.add( + responses.GET, + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', + status=404) + + if expected is None: + openpgp_env.refresh_keys(allow_wkd=True, + keyserver='hkps://block.invalid/') + with io.StringIO(globals()[manifest_var]) as f: + openpgp_env.verify_file(f) + else: + with pytest.raises(expected): + openpgp_env.refresh_keys(allow_wkd=True, + keyserver='hkps://block.invalid/') + with io.StringIO(globals()[manifest_var]) as f: + openpgp_env.verify_file(f) + except gemato.exceptions.OpenPGPNoImplementation as e: + pytest.skip(str(e)) + + +def test_refresh_wkd_fallback_to_hkp(openpgp_env_valid_key, hkp_server): + """Test whether WKD refresh failure falls back to HKP""" + with pytest.importorskip('responses').RequestsMock() as responses: + try: + hkp_server.keys[KEY_FINGERPRINT] = REVOKED_PUBLIC_KEY responses.add( responses.GET, 'https://example.com/.well-known/openpgpkey/hu/' '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', status=404) - self.env.refresh_keys(allow_wkd=True, - keyserver=self.server_addr) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPWKDFailRefreshTest(HKPServerTestCase): - """ - Test that WKD variant of refresh_keys() correctly handles missing - key on server. - Note: we also run HKP server to handle failed WKD fallback. - """ + openpgp_env_valid_key.refresh_keys(allow_wkd=True, + keyserver=hkp_server.addr) - SERVER_KEYS = {} - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPWKDFailRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPWKDFailRefreshTest, self).tearDown() - - @need_responses - def test_refresh_keys(self): - try: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - status=404) - self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, - self.env.refresh_keys, allow_wkd=True, - keyserver=self.server_addr) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase): - """ - Test that WKD refresh_keys() does not ignore local revocation when - keyserver sends outdated (non-revoked) key. - """ - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) + with pytest.raises(gemato.exceptions.OpenPGPRevokedKeyFailure): + with io.StringIO(SIGNED_MANIFEST) as f: + openpgp_env_valid_key.verify_file(f) except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) + pytest.skip(str(e)) - def tearDown(self): - self.env.close() - @need_responses - def test_refresh_keys(self): - try: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=VALID_PUBLIC_KEY, - content_type='application/pgp-keys') - self.env.refresh_keys(allow_wkd=True) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase): - """ - Test that WKD refresh_keys() does not allow injecting another key. - """ - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - @need_responses - def test_refresh_keys(self): - try: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY, - content_type='application/pgp-keys') - self.env.refresh_keys(allow_wkd=True) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase): - """ - Test that WKD refresh_keys() does not allow replacing the key with - another (of the same UID). - """ - - SERVER_KEYS = {} - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPWKDReplaceKeyRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPWKDReplaceKeyRefreshTest, self).tearDown() - - @need_responses - def test_refresh_keys(self): - try: - responses.add( - responses.GET, - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato', - body=VALID_PUBLIC_KEY, - content_type='application/pgp-keys') - self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, - self.env.refresh_keys, allow_wkd=True, - keyserver=self.server_addr) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPSubKeyTest(unittest.TestCase): - """ - Tests that a signature made using a subkey works. - """ - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_KEY_SUBKEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - def test_verify_manifest(self): - with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f: - sig = self.env.verify_file(f) - self.assertEqual(sig.fingerprint, SUBKEY_FINGERPRINT) - self.assertEqual(sig.timestamp, SUBKEY_SIG_TIMESTAMP) - self.assertIsNone(sig.expire_timestamp) - self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT) - - -class OpenPGPForgedSubKeyTest(unittest.TestCase): - """ - Tests that a subkey is not used if its signature is wrong. - """ - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(FORGED_SUBKEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - - def tearDown(self): - self.env.close() - - def test_verify_manifest(self): - with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f: - self.assertRaises( - gemato.exceptions.OpenPGPVerificationFailure, - self.env.verify_file, f) - - -class OpenPGPForgedSubKeyKeyserverTest(HKPServerTestCase): - """ - Tests that a forged subkey can not be injected via keyserver. - """ - - SERVER_KEYS = { - KEY_FINGERPRINT: FORGED_SUBKEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPForgedSubKeyKeyserverTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPForgedSubKeyKeyserverTest, self).tearDown() - - def test_verify_manifest(self): - self.env.refresh_keys(allow_wkd=False, - keyserver=self.server_addr) - - with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f: - self.assertRaises( - gemato.exceptions.OpenPGPVerificationFailure, - self.env.verify_file, f) - - -class OpenPGPUnexpireRefreshTest(HKPServerTestCase): - """ - Test that refresh_keys() correctly unexpires keys. - """ - - SERVER_KEYS = { - KEY_FINGERPRINT: UNEXPIRE_PUBLIC_KEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(EXPIRED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPUnexpireRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPUnexpireRefreshTest, self).tearDown() - - def test_refresh_keys(self): - try: - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPExpiredKeyFailure, - self.env.verify_file, f) - - self.env.refresh_keys(allow_wkd=False, - keyserver=self.server_addr) - - with io.StringIO(SIGNED_MANIFEST) as f: - sig = self.env.verify_file(f) - self.assertEqual(sig.fingerprint, KEY_FINGERPRINT) - self.assertEqual(sig.timestamp, SIG_TIMESTAMP) - self.assertIsNone(sig.expire_timestamp) - self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class OpenPGPForgedUnexpireRefreshTest(HKPServerTestCase): - """ - Test that a forged signature can not be used to unexpire key. - """ - - SERVER_KEYS = { - KEY_FINGERPRINT: FORGED_UNEXPIRE_KEY, - } - - def setUp(self): - self.env = gemato.openpgp.OpenPGPEnvironment() - try: - self.env.import_key(io.BytesIO(EXPIRED_PUBLIC_KEY)) - except gemato.exceptions.OpenPGPRuntimeError as e: - self.env.close() - raise unittest.SkipTest(str(e)) - except gemato.exceptions.OpenPGPNoImplementation as e: - self.env.close() - raise unittest.SkipTest(str(e)) - super(OpenPGPForgedUnexpireRefreshTest, self).setUp() - - def tearDown(self): - self.env.close() - super(OpenPGPForgedUnexpireRefreshTest, self).tearDown() - - def test_refresh_keys(self): - try: - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPExpiredKeyFailure, - self.env.verify_file, f) - - self.env.refresh_keys(allow_wkd=False, - keyserver=self.server_addr) - - with io.StringIO(SIGNED_MANIFEST) as f: - self.assertRaises(gemato.exceptions.OpenPGPExpiredKeyFailure, - self.env.verify_file, f) - except gemato.exceptions.OpenPGPNoImplementation as e: - raise unittest.SkipTest(str(e)) - - -class WKDUrlTests(unittest.TestCase): - """Tests for get_wkd_url() helper""" - - def test_get_wkd_url(self): - self.assertEqual( - gemato.openpgp.OpenPGPEnvironment.get_wkd_url( - 'gemato@example.com'), - 'https://example.com/.well-known/openpgpkey/hu/' - '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato') - self.assertEqual( - gemato.openpgp.OpenPGPEnvironment.get_wkd_url( - 'Joe.Doe@Example.ORG'), - 'https://example.org/.well-known/openpgpkey/hu/' - 'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe') +@pytest.mark.parametrize( + 'email,expected', + [('gemato@example.com', + 'https://example.com/.well-known/openpgpkey/hu/' + '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'), + ('Joe.Doe@Example.ORG', + 'https://example.org/.well-known/openpgpkey/hu/' + 'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe'), + ]) +def test_get_wkd_url(email, expected): + assert (gemato.openpgp.OpenPGPEnvironment.get_wkd_url(email) == + expected) diff --git a/tests/testutil.py b/tests/testutil.py index 45d1cd6..c00ef75 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -3,6 +3,7 @@ # (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license +import collections import errno import functools import io @@ -15,6 +16,8 @@ import tempfile import threading import unittest +import pytest + from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs @@ -85,35 +88,31 @@ class HKPServerRequestHandler(BaseHTTPRequestHandler): self.wfile.flush() -class HKPServerTestCase(unittest.TestCase): - """ - A test case deploying HKP server for OpenPGP client to use. - """ - - SERVER_KEYS = {} - - def setUp(self): - # try 10 randomly selected ports before giving up - for port in random.sample(range(1024, 32768), 10): - try: - self.server = HTTPServer(('127.0.0.1', port), - functools.partial(HKPServerRequestHandler, - self.SERVER_KEYS)) - except OSError as e: - if e.errno != errno.EADDRINUSE: - raise unittest.SkipTest('Unable to bind the HKP server: {}' - .format(e)) - else: - break +@pytest.fixture +def hkp_server(): + keys = {} + # try 10 randomly selected ports before giving up + for port in random.sample(range(1024, 32768), 10): + try: + server = HTTPServer( + ('127.0.0.1', port), + functools.partial(HKPServerRequestHandler, keys)) + except OSError as e: + if e.errno != errno.EADDRINUSE: + raise unittest.SkipTest('Unable to bind the HKP server: {}' + .format(e)) else: - raise unittest.SkipTest('Unable to find a free port for HKP server') + break + else: + pytest.skip('Unable to find a free port for HKP server') - self.server_addr = 'hkp://127.0.0.1:{}'.format(port) - self.server_thread = threading.Thread( - target=self.server.serve_forever) - self.server_thread.start() + server_addr = f'hkp://127.0.0.1:{port}' + server_thread = threading.Thread(target=server.serve_forever) + server_thread.start() - def tearDown(self): - self.server.shutdown() - self.server.server_close() - self.server_thread.join() + yield collections.namedtuple('HKPServerTuple', ('addr', 'keys'))( + server_addr, keys) + + server.shutdown() + server.server_close() + server_thread.join() |