summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-08-26 15:26:33 +0200
committerMichał Górny <mgorny@gentoo.org>2020-08-26 20:57:30 +0200
commit0ce6088801ce600abc0d25f0603560b8a7113071 (patch)
tree499fdb8ec45bd42350b5a09f6f3d249c28d57085
parentc02c24661520510739e34dcce64ee3e5904a4635 (diff)
downloadgemato-0ce6088801ce600abc0d25f0603560b8a7113071.tar.gz
tests: Rewrite test_openpgp in pure pytest
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r--README.rst4
-rw-r--r--tests/test_openpgp.py1870
-rw-r--r--tests/testutil.py57
3 files changed, 545 insertions, 1386 deletions
diff --git a/README.rst b/README.rst
index d84fb7f..c58460e 100644
--- a/README.rst
+++ b/README.rst
@@ -78,7 +78,8 @@ through 3.7 and PyPy3. gemato core depends only on standard Python
library modules.
Additionally, OpenPGP requires system install of GnuPG 2.2+
-and requests_ Python module. Tests require responses_ for mocking.
+and requests_ Python module. Tests require pytest_, and responses_
+for mocking.
References and footnotes
@@ -87,4 +88,5 @@ References and footnotes
(https://www.gentoo.org/glep/glep-0074.html)
.. _requests: https://2.python-requests.org/en/master/
+.. _pytest: https://docs.pytest.org/en/stable/
.. _responses: https://github.com/getsentry/responses
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 2fd5365..7489a96 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -5,10 +5,9 @@
import datetime
import io
-import os.path
-import shutil
-import tempfile
-import unittest
+import os
+
+import pytest
import gemato.cli
import gemato.compression
@@ -23,12 +22,11 @@ from tests.keydata import (
OTHER_PUBLIC_KEY, OTHER_PUBLIC_KEY_UID, OTHER_PUBLIC_KEY_SIG,
UNEXPIRE_SIG,
)
-from tests.testutil import HKPServerTestCase
+from tests.testutil import hkp_server
+
-try:
- import responses
-except ImportError:
- responses = None
+# workaround pyflakes
+hkp_server = hkp_server
VALID_PUBLIC_KEY = PUBLIC_KEY + UID + PUBLIC_KEY_SIG
@@ -203,1367 +201,527 @@ def strip_openpgp(text):
return '\n'.join(lines[start+1:stop-start+2]) + '\n'
-def need_responses(func):
- def skipper(*args, **kwargs):
- raise unittest.SkipTest('responses module is needed for WKD tests')
- if responses is None:
- return skipper
- return responses.activate(func)
-
-
-class SignedManifestTest(unittest.TestCase):
- """
- Test whether signed Manifest is read correctly.
- """
-
- def test_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f, verify_openpgp=False)
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- def test_dash_escaped_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f:
+MANIFESTS_GOOD_SIG = [
+ 'SIGNED_MANIFEST',
+ 'DASH_ESCAPED_SIGNED_MANIFEST',
+ 'SUBKEY_SIGNED_MANIFEST',
+]
+MANIFESTS_BAD_SIG = [
+ 'MODIFIED_SIGNED_MANIFEST',
+ 'EXPIRED_SIGNED_MANIFEST'
+]
+
+
+@pytest.mark.parametrize('manifest_var',
+ MANIFESTS_GOOD_SIG + MANIFESTS_BAD_SIG)
+def test_noverify_goodish_manifest_load(manifest_var):
+ """Test Manifest files that should succeed (OpenPGP disabled)"""
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(globals()[manifest_var]) as f:
+ m.load(f, verify_openpgp=False)
+ assert m.find_timestamp() is not None
+ assert m.find_path_entry('myebuild-0.ebuild') is not None
+ assert not m.openpgp_signed
+ assert m.openpgp_signature is None
+
+
+SIGNED_MANIFEST_JUNK_BEFORE = 'IGNORE test\n' + SIGNED_MANIFEST
+SIGNED_MANIFEST_JUNK_AFTER = SIGNED_MANIFEST + 'IGNORE test\n'
+SIGNED_MANIFEST_CUT_BEFORE_DATA = '\n'.join(
+ SIGNED_MANIFEST.splitlines()[:3])
+SIGNED_MANIFEST_CUT_BEFORE_SIGNATURE = '\n'.join(
+ SIGNED_MANIFEST.splitlines()[:7])
+SIGNED_MANIFEST_CUT_BEFORE_END = '\n'.join(
+ SIGNED_MANIFEST.splitlines()[:15])
+
+
+@pytest.mark.parametrize('manifest_var,expected',
+ [('SIGNED_MANIFEST_JUNK_BEFORE',
+ gemato.exceptions.ManifestUnsignedData),
+ ('SIGNED_MANIFEST_JUNK_AFTER',
+ gemato.exceptions.ManifestUnsignedData),
+ ('SIGNED_MANIFEST_CUT_BEFORE_DATA',
+ gemato.exceptions.ManifestSyntaxError),
+ ('SIGNED_MANIFEST_CUT_BEFORE_SIGNATURE',
+ gemato.exceptions.ManifestSyntaxError),
+ ('SIGNED_MANIFEST_CUT_BEFORE_END',
+ gemato.exceptions.ManifestSyntaxError),
+ ])
+def test_noverify_bad_manifest_load(manifest_var, expected):
+ """Test Manifest files that should fail"""
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(globals()[manifest_var]) as f:
+ with pytest.raises(expected):
m.load(f, verify_openpgp=False)
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- def test_modified_manifest_load(self):
- """
- Modified Manifest should load correctly since we do not enforce
- implicit verification.
- """
- m = gemato.manifest.ManifestFile()
- with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f:
- m.load(f, verify_openpgp=False)
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- def test_junk_before_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO('IGNORE test\n' + SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.ManifestUnsignedData,
- m.load, f, verify_openpgp=False)
-
- def test_junk_after_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST + 'IGNORE test\n') as f:
- self.assertRaises(gemato.exceptions.ManifestUnsignedData,
- m.load, f, verify_openpgp=False)
-
- def test_signed_manifest_terminated_before_data(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:3])) as f:
- self.assertRaises(gemato.exceptions.ManifestSyntaxError,
- m.load, f, verify_openpgp=False)
-
- def test_signed_manifest_terminated_before_signature(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:7])) as f:
- self.assertRaises(gemato.exceptions.ManifestSyntaxError,
- m.load, f, verify_openpgp=False)
-
- def test_signed_manifest_terminated_before_end(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:15])) as f:
- self.assertRaises(gemato.exceptions.ManifestSyntaxError,
- m.load, f, verify_openpgp=False)
-
- def test_recursive_manifest_loader(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(MODIFIED_SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest'),
- verify_openpgp=False)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
- finally:
- shutil.rmtree(d)
-
- def test_cli(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(MODIFIED_SIGNED_MANIFEST)
-
- os.mkdir(os.path.join(d, 'eclass'))
- with io.open(os.path.join(d, 'eclass/Manifest'), 'w'):
- pass
- with io.open(os.path.join(d, 'myebuild-0.ebuild'), 'wb') as f:
- f.write(b'12345678901234567890123456789012')
- with io.open(os.path.join(d, 'metadata.xml'), 'w'):
- pass
-
- self.assertEqual(
- gemato.cli.main(['gemato', 'verify',
- '--no-openpgp-verify', d]),
- 0)
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_save_manifest(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(MODIFIED_SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest'),
- verify_openpgp=False)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
- m.save_manifest('Manifest')
-
- with io.open(os.path.join(d, 'Manifest'), 'r') as f:
- self.assertEqual(f.read(),
- strip_openpgp(MODIFIED_SIGNED_MANIFEST))
- finally:
- shutil.rmtree(d)
-
-
-class OpenPGPCorrectKeyTest(unittest.TestCase):
- """
- Tests performed with correct OpenPGP key set.
- """
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- def test_verify_manifest(self):
- with io.StringIO(SIGNED_MANIFEST) as f:
- sig = self.env.verify_file(f)
- self.assertEqual(sig.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(sig.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(sig.expire_timestamp)
- self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT)
-
- def test_verify_dash_escaped_manifest(self):
- with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f:
- sig = self.env.verify_file(f)
- self.assertEqual(sig.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(sig.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(sig.expire_timestamp)
- self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT)
-
- def test_verify_modified_manifest(self):
- with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
- self.env.verify_file, f)
-
- def test_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f, openpgp_env=self.env)
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertTrue(m.openpgp_signed)
- self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(m.openpgp_signature.expire_timestamp)
- self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT)
-
- def test_dash_escaped_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f:
- m.load(f, openpgp_env=self.env)
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertTrue(m.openpgp_signed)
- self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(m.openpgp_signature.expire_timestamp)
- self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT)
-
- def test_modified_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
- m.load, f, openpgp_env=self.env)
-
- def test_recursive_manifest_loader(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest'),
- verify_openpgp=True,
- openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
- self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(m.openpgp_signature.expire_timestamp)
- self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT)
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_compressed(self):
- d = tempfile.mkdtemp()
- try:
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'w') as cf:
- cf.write(SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest.gz'),
- verify_openpgp=True,
- openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
- self.assertEqual(m.openpgp_signature.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(m.openpgp_signature.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(m.openpgp_signature.expire_timestamp)
- self.assertEqual(m.openpgp_signature.primary_key_fingerprint, KEY_FINGERPRINT)
- finally:
- shutil.rmtree(d)
-
- def test_cli(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, '.key.bin'), 'wb') as f:
- f.write(VALID_PUBLIC_KEY)
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
-
- os.mkdir(os.path.join(d, 'eclass'))
- with io.open(os.path.join(d, 'eclass/Manifest'), 'w'):
- pass
- with io.open(os.path.join(d, 'myebuild-0.ebuild'), 'w'):
- pass
- with io.open(os.path.join(d, 'metadata.xml'), 'w'):
- pass
-
- self.assertEqual(
- gemato.cli.main(['gemato', 'verify',
- '--openpgp-key', os.path.join(d, '.key.bin'),
- '--no-refresh-keys',
- '--require-signed-manifest', d]),
- 0)
- finally:
- shutil.rmtree(d)
-
-
-class OpenPGPNoKeyTest(unittest.TestCase):
- """
- Tests performed without correct OpenPGP key set.
- """
-
- expected_exception = gemato.exceptions.OpenPGPVerificationFailure
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
-
- def tearDown(self):
- self.env.close()
-
- def test_verify_manifest(self):
- with io.StringIO(SIGNED_MANIFEST) as f:
- try:
- self.assertRaises(self.expected_exception,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_manifest_load(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- try:
- self.assertRaises(self.expected_exception,
- m.load, f, openpgp_env=self.env)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_manifest_load_exception_caught(self):
- """
- Test that the Manifest is loaded even if exception is raised.
- """
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- try:
- m.load(f, openpgp_env=self.env)
- except self.expected_exception:
- pass
- except gemato.exceptions.OpenPGPNoImplementation:
- pass
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- def test_recursive_manifest_loader(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
-
- try:
- self.assertRaises(self.expected_exception,
- gemato.recursiveloader.ManifestRecursiveLoader,
- os.path.join(d, 'Manifest'),
- verify_openpgp=True,
- openpgp_env=self.env)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_compressed(self):
- d = tempfile.mkdtemp()
- try:
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'w') as cf:
- cf.write(SIGNED_MANIFEST)
-
- try:
- self.assertRaises(self.expected_exception,
- gemato.recursiveloader.ManifestRecursiveLoader,
- os.path.join(d, 'Manifest.gz'),
- verify_openpgp=True,
- openpgp_env=self.env)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
- finally:
- shutil.rmtree(d)
-
- def test_find_top_level_manifest(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
-
- self.assertEqual(
- gemato.find_top_level.find_top_level_manifest(d),
- os.path.join(d, 'Manifest'))
- finally:
- shutil.rmtree(d)
-
-
-class OpenPGPExpiredKeyTest(OpenPGPNoKeyTest):
- """
- Tests performed with an expired OpenPGP key.
- """
-
- expected_exception = gemato.exceptions.OpenPGPExpiredKeyFailure
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(EXPIRED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
-
-class OpenPGPRevokedKeyTest(OpenPGPNoKeyTest):
- """
- Tests performed with a revoked OpenPGP key.
- """
-
- expected_exception = gemato.exceptions.OpenPGPRevokedKeyFailure
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
-class OpenPGPExpiredSignatureTest(unittest.TestCase):
- """
- Tests for handling of expired signature.
- """
- expected_exception = gemato.exceptions.OpenPGPVerificationFailure
+@pytest.mark.parametrize('write_back', [False, True])
+def test_noverify_recursive_manifest_loader(tmp_path, write_back):
+ """Test reading signed Manifest"""
+ with open(tmp_path / 'Manifest', 'w') as f:
+ f.write(MODIFIED_SIGNED_MANIFEST)
+
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ tmp_path / 'Manifest',
+ verify_openpgp=False)
+ assert not m.openpgp_signed
+ assert m.openpgp_signature is None
+
+ if write_back:
+ m.save_manifest('Manifest')
+ with open(tmp_path / 'Manifest', 'r') as f:
+ assert f.read() == strip_openpgp(MODIFIED_SIGNED_MANIFEST)
+
+
+def test_noverify_load_cli(tmp_path):
+ """Test reading signed Manifest via CLI"""
+ with open(tmp_path / 'Manifest', 'w') as f:
+ f.write(MODIFIED_SIGNED_MANIFEST)
+ os.mkdir(tmp_path / 'eclass')
+ with open(tmp_path / 'eclass' / 'Manifest', 'w'):
+ pass
+ with open(tmp_path / 'myebuild-0.ebuild', 'wb') as f:
+ f.write(b'12345678901234567890123456789012')
+ with open(tmp_path / 'metadata.xml', 'wb'):
+ pass
+
+ assert 0 == gemato.cli.main(['gemato', 'verify',
+ '--no-openpgp-verify', str(tmp_path)])
+
+
+@pytest.fixture
+def openpgp_env():
+ """OpenPGP environment fixture"""
+ env = gemato.openpgp.OpenPGPEnvironment()
+ yield env
+ env.close()
+
+
+@pytest.fixture
+def openpgp_env_valid_key(openpgp_env):
+ """OpenPGP environment with good key loaded"""
+ try:
+ openpgp_env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+ yield openpgp_env
+
+
+MANIFEST_VARIANTS = [
+ # manifest, key, expected fpr/exception
+ # == good manifests ==
+ ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', None),
+ ('DASH_ESCAPED_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', None),
+ ('SUBKEY_SIGNED_MANIFEST', 'VALID_KEY_SUBKEY', None),
+ # == using private key ==
+ ('SIGNED_MANIFEST', 'PRIVATE_KEY', None),
+ # == bad manifests ==
+ ('MODIFIED_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY',
+ gemato.exceptions.OpenPGPVerificationFailure),
+ ('EXPIRED_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY',
+ gemato.exceptions.OpenPGPVerificationFailure),
+ # == bad keys ==
+ ('SIGNED_MANIFEST', None,
+ gemato.exceptions.OpenPGPVerificationFailure),
+ ('SIGNED_MANIFEST', 'EXPIRED_PUBLIC_KEY',
+ gemato.exceptions.OpenPGPExpiredKeyFailure),
+ ('SIGNED_MANIFEST', 'REVOKED_PUBLIC_KEY',
+ gemato.exceptions.OpenPGPRevokedKeyFailure),
+]
+
+
+def assert_signature(sig, manifest_var):
+ """Make assertions about the signature"""
+ if manifest_var == 'SUBKEY_SIGNED_MANIFEST':
+ assert sig.fingerprint == SUBKEY_FINGERPRINT
+ assert sig.timestamp == SUBKEY_SIG_TIMESTAMP
+ assert sig.expire_timestamp is None
+ assert sig.primary_key_fingerprint == KEY_FINGERPRINT
+ else:
+ assert sig.fingerprint == KEY_FINGERPRINT
+ assert sig.timestamp == SIG_TIMESTAMP
+ assert sig.expire_timestamp is None
+ assert sig.primary_key_fingerprint == KEY_FINGERPRINT
+
+
+@pytest.mark.parametrize('manifest_var,key_var,expected',
+ MANIFEST_VARIANTS)
+def test_verify_manifest(openpgp_env, manifest_var, key_var, expected):
+ """Test direct Manifest data verification"""
+ try:
+ if key_var is not None:
+ with io.BytesIO(globals()[key_var]) as f:
+ openpgp_env.import_key(f)
+
+ with io.StringIO(globals()[manifest_var]) as f:
+ if expected is None:
+ sig = openpgp_env.verify_file(f)
+ assert_signature(sig, manifest_var)
+ else:
+ with pytest.raises(expected):
+ openpgp_env.verify_file(f)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+
+
+@pytest.mark.parametrize('manifest_var,key_var,expected',
+ MANIFEST_VARIANTS)
+def test_manifest_load(openpgp_env, manifest_var, key_var, expected):
+ """Test Manifest verification via ManifestFile.load()"""
+ try:
+ if key_var is not None:
+ with io.BytesIO(globals()[key_var]) as f:
+ openpgp_env.import_key(f)
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- def test_verify_manifest(self):
- with io.StringIO(EXPIRED_SIGNED_MANIFEST) as f:
- try:
- self.assertRaises(self.expected_exception,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_manifest_load(self):
m = gemato.manifest.ManifestFile()
- with io.StringIO(EXPIRED_SIGNED_MANIFEST) as f:
- try:
- self.assertRaises(self.expected_exception,
- m.load, f, openpgp_env=self.env)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_manifest_load_exception_caught(self):
- """
- Test that the Manifest is loaded even if exception is raised.
- """
- m = gemato.manifest.ManifestFile()
- with io.StringIO(EXPIRED_SIGNED_MANIFEST) as f:
- try:
- m.load(f, openpgp_env=self.env)
- except self.expected_exception:
- pass
- except gemato.exceptions.OpenPGPNoImplementation:
- pass
- self.assertIsNotNone(m.find_timestamp())
- self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- def test_recursive_manifest_loader(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(EXPIRED_SIGNED_MANIFEST)
-
- try:
- self.assertRaises(self.expected_exception,
- gemato.recursiveloader.ManifestRecursiveLoader,
- os.path.join(d, 'Manifest'),
- verify_openpgp=True,
- openpgp_env=self.env)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_compressed(self):
- d = tempfile.mkdtemp()
- try:
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'w') as cf:
- cf.write(EXPIRED_SIGNED_MANIFEST)
-
- try:
- self.assertRaises(self.expected_exception,
- gemato.recursiveloader.ManifestRecursiveLoader,
- os.path.join(d, 'Manifest.gz'),
- verify_openpgp=True,
- openpgp_env=self.env)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
- finally:
- shutil.rmtree(d)
-
- def test_find_top_level_manifest(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(EXPIRED_SIGNED_MANIFEST)
-
- self.assertEqual(
- gemato.find_top_level.find_top_level_manifest(d),
- os.path.join(d, 'Manifest'))
- finally:
- shutil.rmtree(d)
-
-
-class OpenPGPContextManagerTest(unittest.TestCase):
- """
- Test the context manager API for OpenPGPEnvironment.
- """
-
- def test_import_key(self):
- with gemato.openpgp.OpenPGPEnvironment() as env:
- try:
- env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_import_malformed_key(self):
- with gemato.openpgp.OpenPGPEnvironment() as env:
- try:
- self.assertRaises(gemato.exceptions.OpenPGPKeyImportError,
- env.import_key,
- io.BytesIO(MALFORMED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_import_no_keys(self):
- with gemato.openpgp.OpenPGPEnvironment() as env:
- try:
- self.assertRaises(gemato.exceptions.OpenPGPKeyImportError,
- env.import_key,
- io.BytesIO(b''))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_import_forged_key(self):
- with gemato.openpgp.OpenPGPEnvironment() as env:
- try:
- self.assertRaises(gemato.exceptions.OpenPGPKeyImportError,
- env.import_key,
- io.BytesIO(FORGED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_verify_manifest(self):
- with io.StringIO(SIGNED_MANIFEST) as f:
- with gemato.openpgp.OpenPGPEnvironment() as env:
- try:
- try:
- env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- sig = env.verify_file(f)
- self.assertEqual(sig.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(sig.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(sig.expire_timestamp)
- self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
- def test_double_close(self):
- with gemato.openpgp.OpenPGPEnvironment() as env:
- env.close()
-
- def test_home_after_close(self):
- with gemato.openpgp.OpenPGPEnvironment() as env:
- env.close()
- with self.assertRaises(AssertionError):
- env.home
-
-
-class OpenPGPPrivateKeyTest(unittest.TestCase):
- """
- Tests performed with the private key available.
- """
-
- TEST_STRING = u'The quick brown fox jumps over the lazy dog'
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(PRIVATE_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- def test_verify_manifest(self):
- with io.StringIO(SIGNED_MANIFEST) as f:
- sig = self.env.verify_file(f)
- self.assertEqual(sig.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(sig.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(sig.expire_timestamp)
- self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT)
-
- def test_sign_data(self):
- with io.StringIO(self.TEST_STRING) as f:
- with io.StringIO() as wf:
- self.env.clear_sign_file(f, wf)
- wf.seek(0)
- self.env.verify_file(wf)
-
- def test_sign_data_keyid(self):
- with io.StringIO(self.TEST_STRING) as f:
- with io.StringIO() as wf:
- self.env.clear_sign_file(f, wf, keyid=PRIVATE_KEY_ID)
- wf.seek(0)
- self.env.verify_file(wf)
-
- def test_dump_signed_manifest(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f, openpgp_env=self.env)
- with io.StringIO() as f:
- m.dump(f, openpgp_env=self.env)
- f.seek(0)
- m.load(f, openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
- self.assertIsNotNone(m.openpgp_signature)
-
- def test_dump_signed_manifest_keyid(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f, openpgp_env=self.env)
- with io.StringIO() as f:
- m.dump(f, openpgp_keyid=PRIVATE_KEY_ID, openpgp_env=self.env)
- f.seek(0)
- m.load(f, openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
- self.assertIsNotNone(m.openpgp_signature)
-
- def test_dump_force_signed_manifest(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f, verify_openpgp=False, openpgp_env=self.env)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
- with io.StringIO() as f:
- m.dump(f, sign_openpgp=True, openpgp_env=self.env)
- f.seek(0)
- m.load(f, openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
- self.assertIsNotNone(m.openpgp_signature)
-
- def test_dump_force_unsigned_manifest(self):
- m = gemato.manifest.ManifestFile()
- with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f, openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
- with io.StringIO() as f:
- m.dump(f, sign_openpgp=False, openpgp_env=self.env)
- f.seek(0)
- m.load(f, openpgp_env=self.env)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- def test_recursive_manifest_loader_save_manifest(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
-
+ with io.StringIO(globals()[manifest_var]) as f:
+ if expected is None:
+ m.load(f, openpgp_env=openpgp_env)
+ assert m.openpgp_signed
+ assert_signature(m.openpgp_signature, manifest_var)
+ else:
+ with pytest.raises(expected):
+ m.load(f, openpgp_env=openpgp_env)
+ assert not m.openpgp_signed
+ assert m.openpgp_signature is None
+
+ # Manifest entries should be loaded even if verification failed
+ assert m.find_timestamp() is not None
+ assert m.find_path_entry('myebuild-0.ebuild') is not None
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+
+
+@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz'])
+@pytest.mark.parametrize('manifest_var,key_var,expected',
+ MANIFEST_VARIANTS)
+def test_recursive_manifest_loader(tmp_path, openpgp_env, filename,
+ manifest_var, key_var, expected):
+ """Test Manifest verification via ManifestRecursiveLoader"""
+ try:
+ if key_var is not None:
+ with io.BytesIO(globals()[key_var]) as f:
+ openpgp_env.import_key(f)
+
+ with gemato.compression.open_potentially_compressed_path(
+ tmp_path / filename, 'w') as cf:
+ cf.write(globals()[manifest_var])
+
+ if expected is None:
m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest'),
+ tmp_path / filename,
+ verify_openpgp=True,
+ openpgp_env=openpgp_env)
+ assert m.openpgp_signed
+ assert_signature(m.openpgp_signature, manifest_var)
+ else:
+ with pytest.raises(expected):
+ gemato.recursiveloader.ManifestRecursiveLoader(
+ tmp_path / filename,
verify_openpgp=True,
- openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
-
- m.save_manifest('Manifest')
- m2 = gemato.manifest.ManifestFile()
- with io.open(os.path.join(d, 'Manifest'), 'r') as f:
- m2.load(f, openpgp_env=self.env)
- self.assertTrue(m2.openpgp_signed)
- self.assertIsNotNone(m.openpgp_signature)
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_save_manifest_compressed(self):
- d = tempfile.mkdtemp()
- try:
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'w') as cf:
- cf.write(SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest.gz'),
- verify_openpgp=True,
- openpgp_env=self.env)
- self.assertTrue(m.openpgp_signed)
-
- m.save_manifest('Manifest.gz')
- m2 = gemato.manifest.ManifestFile()
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'r') as cf:
- m2.load(cf, openpgp_env=self.env)
- self.assertTrue(m2.openpgp_signed)
- self.assertIsNotNone(m.openpgp_signature)
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_save_manifest_force_sign(self):
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest'),
- verify_openpgp=False,
- sign_openpgp=True,
- openpgp_env=self.env)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- m.save_manifest('Manifest')
- m2 = gemato.manifest.ManifestFile()
- with io.open(os.path.join(d, 'Manifest'), 'r') as f:
- m2.load(f, openpgp_env=self.env)
- self.assertTrue(m2.openpgp_signed)
- self.assertIsNotNone(m2.openpgp_signature)
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_save_manifest_compressed_force_sign(self):
- d = tempfile.mkdtemp()
- try:
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'w') as cf:
- cf.write(SIGNED_MANIFEST)
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest.gz'),
- verify_openpgp=False,
- sign_openpgp=True,
- openpgp_env=self.env)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- m.save_manifest('Manifest.gz')
- m2 = gemato.manifest.ManifestFile()
- with gemato.compression.open_potentially_compressed_path(
- os.path.join(d, 'Manifest.gz'), 'r') as cf:
- m2.load(cf, openpgp_env=self.env)
- self.assertTrue(m2.openpgp_signed)
- self.assertIsNotNone(m2.openpgp_signature)
- finally:
- shutil.rmtree(d)
-
- def test_recursive_manifest_loader_save_submanifest_force_sign(self):
- """
- Test that sub-Manifests are not signed.
- """
- d = tempfile.mkdtemp()
- try:
- with io.open(os.path.join(d, 'Manifest'), 'w') as f:
- f.write(SIGNED_MANIFEST)
- os.mkdir(os.path.join(d, 'eclass'))
- with io.open(os.path.join(d, 'eclass/Manifest'), 'w'):
- pass
-
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(d, 'Manifest'),
- verify_openpgp=False,
- sign_openpgp=True,
- openpgp_env=self.env)
- self.assertFalse(m.openpgp_signed)
- self.assertIsNone(m.openpgp_signature)
-
- m.load_manifest('eclass/Manifest')
- m.save_manifest('eclass/Manifest')
-
- m2 = gemato.manifest.ManifestFile()
- with io.open(os.path.join(d, 'eclass/Manifest'), 'r') as f:
- m2.load(f, openpgp_env=self.env)
- self.assertFalse(m2.openpgp_signed)
- self.assertIsNone(m2.openpgp_signature)
- finally:
- shutil.rmtree(d)
-
-
-class OpenPGPRefreshTest(HKPServerTestCase):
- """
- Test that refresh_keys() correctly handles revocation.
- """
-
- SERVER_KEYS = {
- KEY_FINGERPRINT: REVOKED_PUBLIC_KEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPRefreshTest, self).tearDown()
-
- def test_refresh_keys(self):
- try:
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.env.verify_file(f)
-
- self.env.refresh_keys(allow_wkd=False,
- keyserver=self.server_addr)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPFailRefreshTest(HKPServerTestCase):
- """
- Test that refresh_keys() correctly handles missing key on server.
- """
-
- SERVER_KEYS = {}
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPFailRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPFailRefreshTest, self).tearDown()
-
- def test_refresh_keys(self):
- try:
- self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
- self.env.refresh_keys, allow_wkd=False,
- keyserver=self.server_addr)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPUnrevokeRefreshTest(HKPServerTestCase):
- """
- Test that refresh_keys() does not ignore local revocation when
- keyserver sends outdated (non-revoked) key.
- """
-
- SERVER_KEYS = {
- KEY_FINGERPRINT: VALID_PUBLIC_KEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPUnrevokeRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPUnrevokeRefreshTest, self).tearDown()
-
- def test_refresh_keys(self):
- try:
- self.env.refresh_keys(allow_wkd=False,
- keyserver=self.server_addr)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPFakeKeyRefreshTest(HKPServerTestCase):
- """
- Test that refresh_keys() does not allow maliciously replacing key
- with another.
- """
-
- SERVER_KEYS = {
- OTHER_KEY_FINGERPRINT: VALID_PUBLIC_KEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPFakeKeyRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPFakeKeyRefreshTest, self).tearDown()
-
- def test_refresh_keys(self):
- try:
- self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
- self.env.refresh_keys, allow_wkd=False,
- keyserver=self.server_addr)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPWKDRefreshTest(unittest.TestCase):
- """
- Test that WKD variant of refresh_keys() correctly handles
- revocation.
- """
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- @need_responses
- def test_refresh_keys(self):
- try:
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.env.verify_file(f)
-
- responses.add(
- responses.GET,
- 'https://example.com/.well-known/openpgpkey/hu/'
- '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
- body=REVOKED_PUBLIC_KEY,
- content_type='application/pgp-keys')
- self.env.refresh_keys(allow_wkd=True)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase):
- """
- Test that WKD variant of refresh_keys() correctly falls back
- to keyserver ops.
- """
-
- SERVER_KEYS = {
- KEY_FINGERPRINT: REVOKED_PUBLIC_KEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPWKDFallbackRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPWKDFallbackRefreshTest, self).tearDown()
-
- @need_responses
- def test_refresh_keys(self):
- try:
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.env.verify_file(f)
-
+ openpgp_env=openpgp_env)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+
+
+@pytest.mark.parametrize('manifest_var,key_var,expected',
+ [(m, k, e) for m, k, e in MANIFEST_VARIANTS
+ if k is not None])
+def test_cli(tmp_path, caplog, manifest_var, key_var, expected):
+ """Test Manifest verification via CLI"""
+ with open(tmp_path / '.key.bin', 'wb') as f:
+ f.write(globals()[key_var])
+ with open(tmp_path / 'Manifest', 'w') as f:
+ f.write(globals()[manifest_var])
+ os.mkdir(tmp_path / 'eclass')
+ with open(tmp_path / 'eclass' / 'Manifest', 'w'):
+ pass
+ with open(tmp_path / 'myebuild-0.ebuild', 'wb') as f:
+ if manifest_var == 'MODIFIED_SIGNED_MANIFEST':
+ f.write(b'12345678901234567890123456789012')
+ with open(tmp_path / 'metadata.xml', 'wb'):
+ pass
+
+ eexit = 0 if expected is None else 1
+ assert eexit == gemato.cli.main(['gemato', 'verify',
+ '--openpgp-key',
+ str(tmp_path / '.key.bin'),
+ '--no-refresh-keys',
+ '--require-signed-manifest',
+ str(tmp_path)])
+
+ if expected is not None:
+ assert str(expected('')) in caplog.text
+
+
+EMPTY_DATA = b''
+
+
+@pytest.mark.parametrize('key_var,success', [('VALID_PUBLIC_KEY', True),
+ ('MALFORMED_PUBLIC_KEY', False),
+ ('EMPTY_DATA', False),
+ ('FORGED_PUBLIC_KEY', False)])
+def test_env_import_key(openpgp_env, key_var, success):
+ """Test importing valid and invalid keys"""
+ try:
+ if success:
+ openpgp_env.import_key(io.BytesIO(globals()[key_var]))
+ else:
+ with pytest.raises(gemato.exceptions.OpenPGPKeyImportError):
+ openpgp_env.import_key(io.BytesIO(globals()[key_var]))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+
+
+def test_env_double_close():
+ """Test that env can be closed multiple times"""
+ with gemato.openpgp.OpenPGPEnvironment() as env:
+ env.close()
+
+
+def test_env_home_after_close():
+ """Test that .home can not be referenced after closing"""
+ with gemato.openpgp.OpenPGPEnvironment() as env:
+ env.close()
+ with pytest.raises(AssertionError):
+ env.home
+
+
+@pytest.fixture
+def privkey_env(openpgp_env):
+ """Environment with private key loaded"""
+ try:
+ openpgp_env.import_key(io.BytesIO(PRIVATE_KEY))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+ return openpgp_env
+
+
+TEST_STRING = u'The quick brown fox jumps over the lazy dog'
+
+
+@pytest.mark.parametrize('keyid', [None, PRIVATE_KEY_ID])
+def test_sign_data(privkey_env, keyid):
+ """Test signing data"""
+ with io.StringIO(TEST_STRING) as f:
+ with io.StringIO() as wf:
+ privkey_env.clear_sign_file(f, wf, keyid=keyid)
+ wf.seek(0)
+ privkey_env.verify_file(wf)
+
+
+@pytest.mark.parametrize('keyid', [None, PRIVATE_KEY_ID])
+@pytest.mark.parametrize('sign', [None, False, True])
+def test_dump_signed_manifest(privkey_env, keyid, sign):
+ """Test dumping a signed Manifest"""
+ m = gemato.manifest.ManifestFile()
+ verify = True if sign is None else False
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ m.load(f, verify_openpgp=verify, openpgp_env=privkey_env)
+ assert m.openpgp_signed == verify
+
+ with io.StringIO() as f:
+ m.dump(f, openpgp_keyid=keyid, openpgp_env=privkey_env,
+ sign_openpgp=sign)
+ f.seek(0)
+ m.load(f, openpgp_env=privkey_env)
+ if sign is not False:
+ assert m.openpgp_signed
+ assert m.openpgp_signature is not None
+ else:
+ assert not m.openpgp_signed
+ assert m.openpgp_signature is None
+
+
+@pytest.mark.parametrize('filename', ['Manifest', 'Manifest.gz'])
+@pytest.mark.parametrize('sign', [None, True])
+def test_recursive_manifest_loader_save_manifest(tmp_path, privkey_env,
+ filename, sign):
+ """Test signing Manifests via ManifestRecursiveLoader"""
+ with gemato.compression.open_potentially_compressed_path(
+ tmp_path / filename, 'w') as cf:
+ cf.write(SIGNED_MANIFEST)
+
+ verify = not sign
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ tmp_path / filename,
+ verify_openpgp=verify,
+ sign_openpgp=sign,
+ openpgp_env=privkey_env)
+ assert m.openpgp_signed == verify
+
+ m.save_manifest(filename)
+ m2 = gemato.manifest.ManifestFile()
+ with gemato.compression.open_potentially_compressed_path(
+ tmp_path / filename, 'r') as cf:
+ m2.load(cf, openpgp_env=privkey_env)
+ assert m2.openpgp_signed
+ assert m2.openpgp_signature is not None
+
+
+def test_recursive_manifest_loader_save_submanifest(tmp_path, privkey_env):
+ """Test that sub-Manifests are not signed"""
+ with open(tmp_path / 'Manifest', 'w') as f:
+ f.write(SIGNED_MANIFEST)
+ os.mkdir(tmp_path / 'eclass')
+ with open(tmp_path / 'eclass' / 'Manifest', 'w'):
+ pass
+
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ tmp_path / 'Manifest',
+ verify_openpgp=False,
+ sign_openpgp=True,
+ openpgp_env=privkey_env)
+ assert not m.openpgp_signed
+ assert m.openpgp_signature is None
+
+ m.load_manifest('eclass/Manifest')
+ m.save_manifest('eclass/Manifest')
+
+ m2 = gemato.manifest.ManifestFile()
+ with open(tmp_path / 'eclass' / 'Manifest', 'r') as f:
+ m2.load(f, openpgp_env=privkey_env)
+ assert not m2.openpgp_signed
+ assert m2.openpgp_signature is None
+
+
+COMBINED_PUBLIC_KEYS = OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY
+
+
+REFRESH_VARIANTS = [
+ # manifest, key, server key fpr, server key, expected exception
+ ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'VALID_PUBLIC_KEY', None),
+ ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'REVOKED_PUBLIC_KEY', gemato.exceptions.OpenPGPRevokedKeyFailure),
+ # test fetching subkey for primary key
+ ('SUBKEY_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'VALID_KEY_SUBKEY', None),
+ # refresh should fail if key is not on server
+ ('SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', None, None,
+ gemato.exceptions.OpenPGPKeyRefreshError),
+ # unrevocation should not be possible
+ ('SIGNED_MANIFEST', 'REVOKED_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'VALID_PUBLIC_KEY', gemato.exceptions.OpenPGPRevokedKeyFailure),
+ # unexpiration should be possible
+ ('SIGNED_MANIFEST', 'EXPIRED_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'UNEXPIRE_PUBLIC_KEY', None),
+ # make sure server can't malicously inject or replace key
+ ('SIGNED_MANIFEST', 'OTHER_VALID_PUBLIC_KEY', OTHER_KEY_FINGERPRINT,
+ 'VALID_PUBLIC_KEY', gemato.exceptions.OpenPGPKeyRefreshError),
+ ('SIGNED_MANIFEST', 'OTHER_VALID_PUBLIC_KEY', OTHER_KEY_FINGERPRINT,
+ 'COMBINED_PUBLIC_KEYS', gemato.exceptions.OpenPGPRuntimeError),
+ # test that forged keys are rejected
+ ('SIGNED_MANIFEST', 'EXPIRED_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'FORGED_UNEXPIRE_KEY', gemato.exceptions.OpenPGPExpiredKeyFailure),
+ ('SUBKEY_SIGNED_MANIFEST', 'VALID_PUBLIC_KEY', KEY_FINGERPRINT,
+ 'FORGED_SUBKEY', gemato.exceptions.OpenPGPVerificationFailure),
+]
+
+
+@pytest.mark.parametrize(
+ 'manifest_var,key_var,server_key_fpr,server_key_var,expected',
+ REFRESH_VARIANTS)
+def test_refresh_hkp(openpgp_env, hkp_server, manifest_var, key_var,
+ server_key_fpr, server_key_var, expected):
+ """Test refreshing against a HKP keyserver"""
+ try:
+ if key_var is not None:
+ with io.BytesIO(globals()[key_var]) as f:
+ openpgp_env.import_key(f)
+
+ if server_key_var is not None:
+ hkp_server.keys[server_key_fpr] = globals()[server_key_var]
+
+ if expected is None:
+ openpgp_env.refresh_keys(allow_wkd=False,
+ keyserver=hkp_server.addr)
+ with io.StringIO(globals()[manifest_var]) as f:
+ openpgp_env.verify_file(f)
+ else:
+ with pytest.raises(expected):
+ openpgp_env.refresh_keys(allow_wkd=False,
+ keyserver=hkp_server.addr)
+ with io.StringIO(globals()[manifest_var]) as f:
+ openpgp_env.verify_file(f)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+
+
+@pytest.mark.parametrize(
+ 'manifest_var,key_var,server_key_fpr,server_key_var,expected',
+ REFRESH_VARIANTS)
+def test_refresh_wkd(openpgp_env, manifest_var, key_var, server_key_fpr,
+ server_key_var, expected):
+ """Test refreshing against WKD"""
+ if key_var == 'EXPIRED_PUBLIC_KEY':
+ pytest.skip('TODO: expired public key lacks UID with email')
+
+ with pytest.importorskip('responses').RequestsMock() as responses:
+ try:
+ if key_var is not None:
+ with io.BytesIO(globals()[key_var]) as f:
+ openpgp_env.import_key(f)
+
+ if server_key_var is not None:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=globals()[server_key_var],
+ content_type='application/pgp-keys')
+ else:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ status=404)
+
+ if expected is None:
+ openpgp_env.refresh_keys(allow_wkd=True,
+ keyserver='hkps://block.invalid/')
+ with io.StringIO(globals()[manifest_var]) as f:
+ openpgp_env.verify_file(f)
+ else:
+ with pytest.raises(expected):
+ openpgp_env.refresh_keys(allow_wkd=True,
+ keyserver='hkps://block.invalid/')
+ with io.StringIO(globals()[manifest_var]) as f:
+ openpgp_env.verify_file(f)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ pytest.skip(str(e))
+
+
+def test_refresh_wkd_fallback_to_hkp(openpgp_env_valid_key, hkp_server):
+ """Test whether WKD refresh failure falls back to HKP"""
+ with pytest.importorskip('responses').RequestsMock() as responses:
+ try:
+ hkp_server.keys[KEY_FINGERPRINT] = REVOKED_PUBLIC_KEY
responses.add(
responses.GET,
'https://example.com/.well-known/openpgpkey/hu/'
'5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
status=404)
- self.env.refresh_keys(allow_wkd=True,
- keyserver=self.server_addr)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPWKDFailRefreshTest(HKPServerTestCase):
- """
- Test that WKD variant of refresh_keys() correctly handles missing
- key on server.
- Note: we also run HKP server to handle failed WKD fallback.
- """
+ openpgp_env_valid_key.refresh_keys(allow_wkd=True,
+ keyserver=hkp_server.addr)
- SERVER_KEYS = {}
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPWKDFailRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPWKDFailRefreshTest, self).tearDown()
-
- @need_responses
- def test_refresh_keys(self):
- try:
- responses.add(
- responses.GET,
- 'https://example.com/.well-known/openpgpkey/hu/'
- '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
- status=404)
- self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
- self.env.refresh_keys, allow_wkd=True,
- keyserver=self.server_addr)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase):
- """
- Test that WKD refresh_keys() does not ignore local revocation when
- keyserver sends outdated (non-revoked) key.
- """
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
+ with pytest.raises(gemato.exceptions.OpenPGPRevokedKeyFailure):
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ openpgp_env_valid_key.verify_file(f)
except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
+ pytest.skip(str(e))
- def tearDown(self):
- self.env.close()
- @need_responses
- def test_refresh_keys(self):
- try:
- responses.add(
- responses.GET,
- 'https://example.com/.well-known/openpgpkey/hu/'
- '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
- body=VALID_PUBLIC_KEY,
- content_type='application/pgp-keys')
- self.env.refresh_keys(allow_wkd=True)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase):
- """
- Test that WKD refresh_keys() does not allow injecting another key.
- """
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- @need_responses
- def test_refresh_keys(self):
- try:
- responses.add(
- responses.GET,
- 'https://example.com/.well-known/openpgpkey/hu/'
- '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
- body=OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY,
- content_type='application/pgp-keys')
- self.env.refresh_keys(allow_wkd=True)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase):
- """
- Test that WKD refresh_keys() does not allow replacing the key with
- another (of the same UID).
- """
-
- SERVER_KEYS = {}
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPWKDReplaceKeyRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPWKDReplaceKeyRefreshTest, self).tearDown()
-
- @need_responses
- def test_refresh_keys(self):
- try:
- responses.add(
- responses.GET,
- 'https://example.com/.well-known/openpgpkey/hu/'
- '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
- body=VALID_PUBLIC_KEY,
- content_type='application/pgp-keys')
- self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
- self.env.refresh_keys, allow_wkd=True,
- keyserver=self.server_addr)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPSubKeyTest(unittest.TestCase):
- """
- Tests that a signature made using a subkey works.
- """
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_KEY_SUBKEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- def test_verify_manifest(self):
- with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f:
- sig = self.env.verify_file(f)
- self.assertEqual(sig.fingerprint, SUBKEY_FINGERPRINT)
- self.assertEqual(sig.timestamp, SUBKEY_SIG_TIMESTAMP)
- self.assertIsNone(sig.expire_timestamp)
- self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT)
-
-
-class OpenPGPForgedSubKeyTest(unittest.TestCase):
- """
- Tests that a subkey is not used if its signature is wrong.
- """
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(FORGED_SUBKEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
-
- def tearDown(self):
- self.env.close()
-
- def test_verify_manifest(self):
- with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f:
- self.assertRaises(
- gemato.exceptions.OpenPGPVerificationFailure,
- self.env.verify_file, f)
-
-
-class OpenPGPForgedSubKeyKeyserverTest(HKPServerTestCase):
- """
- Tests that a forged subkey can not be injected via keyserver.
- """
-
- SERVER_KEYS = {
- KEY_FINGERPRINT: FORGED_SUBKEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPForgedSubKeyKeyserverTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPForgedSubKeyKeyserverTest, self).tearDown()
-
- def test_verify_manifest(self):
- self.env.refresh_keys(allow_wkd=False,
- keyserver=self.server_addr)
-
- with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f:
- self.assertRaises(
- gemato.exceptions.OpenPGPVerificationFailure,
- self.env.verify_file, f)
-
-
-class OpenPGPUnexpireRefreshTest(HKPServerTestCase):
- """
- Test that refresh_keys() correctly unexpires keys.
- """
-
- SERVER_KEYS = {
- KEY_FINGERPRINT: UNEXPIRE_PUBLIC_KEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(EXPIRED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPUnexpireRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPUnexpireRefreshTest, self).tearDown()
-
- def test_refresh_keys(self):
- try:
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPExpiredKeyFailure,
- self.env.verify_file, f)
-
- self.env.refresh_keys(allow_wkd=False,
- keyserver=self.server_addr)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- sig = self.env.verify_file(f)
- self.assertEqual(sig.fingerprint, KEY_FINGERPRINT)
- self.assertEqual(sig.timestamp, SIG_TIMESTAMP)
- self.assertIsNone(sig.expire_timestamp)
- self.assertEqual(sig.primary_key_fingerprint, KEY_FINGERPRINT)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class OpenPGPForgedUnexpireRefreshTest(HKPServerTestCase):
- """
- Test that a forged signature can not be used to unexpire key.
- """
-
- SERVER_KEYS = {
- KEY_FINGERPRINT: FORGED_UNEXPIRE_KEY,
- }
-
- def setUp(self):
- self.env = gemato.openpgp.OpenPGPEnvironment()
- try:
- self.env.import_key(io.BytesIO(EXPIRED_PUBLIC_KEY))
- except gemato.exceptions.OpenPGPRuntimeError as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- except gemato.exceptions.OpenPGPNoImplementation as e:
- self.env.close()
- raise unittest.SkipTest(str(e))
- super(OpenPGPForgedUnexpireRefreshTest, self).setUp()
-
- def tearDown(self):
- self.env.close()
- super(OpenPGPForgedUnexpireRefreshTest, self).tearDown()
-
- def test_refresh_keys(self):
- try:
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPExpiredKeyFailure,
- self.env.verify_file, f)
-
- self.env.refresh_keys(allow_wkd=False,
- keyserver=self.server_addr)
-
- with io.StringIO(SIGNED_MANIFEST) as f:
- self.assertRaises(gemato.exceptions.OpenPGPExpiredKeyFailure,
- self.env.verify_file, f)
- except gemato.exceptions.OpenPGPNoImplementation as e:
- raise unittest.SkipTest(str(e))
-
-
-class WKDUrlTests(unittest.TestCase):
- """Tests for get_wkd_url() helper"""
-
- def test_get_wkd_url(self):
- self.assertEqual(
- gemato.openpgp.OpenPGPEnvironment.get_wkd_url(
- 'gemato@example.com'),
- 'https://example.com/.well-known/openpgpkey/hu/'
- '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato')
- self.assertEqual(
- gemato.openpgp.OpenPGPEnvironment.get_wkd_url(
- 'Joe.Doe@Example.ORG'),
- 'https://example.org/.well-known/openpgpkey/hu/'
- 'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe')
+@pytest.mark.parametrize(
+ 'email,expected',
+ [('gemato@example.com',
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato'),
+ ('Joe.Doe@Example.ORG',
+ 'https://example.org/.well-known/openpgpkey/hu/'
+ 'iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=Joe.Doe'),
+ ])
+def test_get_wkd_url(email, expected):
+ assert (gemato.openpgp.OpenPGPEnvironment.get_wkd_url(email) ==
+ expected)
diff --git a/tests/testutil.py b/tests/testutil.py
index 45d1cd6..c00ef75 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -3,6 +3,7 @@
# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import collections
import errno
import functools
import io
@@ -15,6 +16,8 @@ import tempfile
import threading
import unittest
+import pytest
+
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
@@ -85,35 +88,31 @@ class HKPServerRequestHandler(BaseHTTPRequestHandler):
self.wfile.flush()
-class HKPServerTestCase(unittest.TestCase):
- """
- A test case deploying HKP server for OpenPGP client to use.
- """
-
- SERVER_KEYS = {}
-
- def setUp(self):
- # try 10 randomly selected ports before giving up
- for port in random.sample(range(1024, 32768), 10):
- try:
- self.server = HTTPServer(('127.0.0.1', port),
- functools.partial(HKPServerRequestHandler,
- self.SERVER_KEYS))
- except OSError as e:
- if e.errno != errno.EADDRINUSE:
- raise unittest.SkipTest('Unable to bind the HKP server: {}'
- .format(e))
- else:
- break
+@pytest.fixture
+def hkp_server():
+ keys = {}
+ # try 10 randomly selected ports before giving up
+ for port in random.sample(range(1024, 32768), 10):
+ try:
+ server = HTTPServer(
+ ('127.0.0.1', port),
+ functools.partial(HKPServerRequestHandler, keys))
+ except OSError as e:
+ if e.errno != errno.EADDRINUSE:
+ raise unittest.SkipTest('Unable to bind the HKP server: {}'
+ .format(e))
else:
- raise unittest.SkipTest('Unable to find a free port for HKP server')
+ break
+ else:
+ pytest.skip('Unable to find a free port for HKP server')
- self.server_addr = 'hkp://127.0.0.1:{}'.format(port)
- self.server_thread = threading.Thread(
- target=self.server.serve_forever)
- self.server_thread.start()
+ server_addr = f'hkp://127.0.0.1:{port}'
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.start()
- def tearDown(self):
- self.server.shutdown()
- self.server.server_close()
- self.server_thread.join()
+ yield collections.namedtuple('HKPServerTuple', ('addr', 'keys'))(
+ server_addr, keys)
+
+ server.shutdown()
+ server.server_close()
+ server_thread.join()