summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-08-25 17:54:33 +0200
committerMichał Górny <mgorny@gentoo.org>2020-08-25 17:54:33 +0200
commit6bc1e7a70741f073194b88998efe7f0519074222 (patch)
tree7c97a681fb4c10b92975ca5058de828f999efe5a /tests
parentf1c904feb96dac663fd3e93152c9febf8a8a640a (diff)
downloadgemato-6bc1e7a70741f073194b88998efe7f0519074222.tar.gz
openpgp: Reimplement WKD support via requests
Signed-off-by: Michał Górny <mgorny@gentoo.org>
Diffstat (limited to 'tests')
-rw-r--r--tests/test_openpgp.py85
-rw-r--r--tests/testutil.py34
2 files changed, 61 insertions, 58 deletions
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 815de7e..2fd5365 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -23,7 +23,12 @@ from tests.keydata import (
OTHER_PUBLIC_KEY, OTHER_PUBLIC_KEY_UID, OTHER_PUBLIC_KEY_SIG,
UNEXPIRE_SIG,
)
-from tests.testutil import HKPServerTestCase, MockedWKDOpenPGPEnvironment
+from tests.testutil import HKPServerTestCase
+
+try:
+ import responses
+except ImportError:
+ responses = None
VALID_PUBLIC_KEY = PUBLIC_KEY + UID + PUBLIC_KEY_SIG
@@ -147,7 +152,6 @@ t5pTRGhLWgdLUrs7vRB7wf7F8h4sci/YBKJRFA==
'''
KEY_FINGERPRINT = '81E12C16BD8DCD60BE180845136880E72A7B1384'
-KEY_UID = 'gemato@example.com'
SIG_TIMESTAMP = datetime.datetime(2017, 11, 8, 9, 1, 26)
OTHER_VALID_PUBLIC_KEY = (OTHER_PUBLIC_KEY + OTHER_PUBLIC_KEY_UID +
@@ -199,6 +203,14 @@ def strip_openpgp(text):
return '\n'.join(lines[start+1:stop-start+2]) + '\n'
+def need_responses(func):
+ def skipper(*args, **kwargs):
+ raise unittest.SkipTest('responses module is needed for WKD tests')
+ if responses is None:
+ return skipper
+ return responses.activate(func)
+
+
class SignedManifestTest(unittest.TestCase):
"""
Test whether signed Manifest is read correctly.
@@ -1129,12 +1141,8 @@ class OpenPGPWKDRefreshTest(unittest.TestCase):
revocation.
"""
- KEYS = {
- KEY_UID: REVOKED_PUBLIC_KEY,
- }
-
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1147,11 +1155,18 @@ class OpenPGPWKDRefreshTest(unittest.TestCase):
def tearDown(self):
self.env.close()
+ @need_responses
def test_refresh_keys(self):
try:
with io.StringIO(SIGNED_MANIFEST) as f:
self.env.verify_file(f)
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=REVOKED_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.env.refresh_keys(allow_wkd=True)
with io.StringIO(SIGNED_MANIFEST) as f:
@@ -1172,7 +1187,7 @@ class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase):
}
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment()
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1187,11 +1202,17 @@ class OpenPGPWKDFallbackRefreshTest(HKPServerTestCase):
self.env.close()
super(OpenPGPWKDFallbackRefreshTest, self).tearDown()
+ @need_responses
def test_refresh_keys(self):
try:
with io.StringIO(SIGNED_MANIFEST) as f:
self.env.verify_file(f)
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ status=404)
self.env.refresh_keys(allow_wkd=True,
keyserver=self.server_addr)
@@ -1213,7 +1234,7 @@ class OpenPGPWKDFailRefreshTest(HKPServerTestCase):
SERVER_KEYS = {}
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment()
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1228,8 +1249,14 @@ class OpenPGPWKDFailRefreshTest(HKPServerTestCase):
self.env.close()
super(OpenPGPWKDFailRefreshTest, self).tearDown()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ status=404)
self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
self.env.refresh_keys, allow_wkd=True,
keyserver=self.server_addr)
@@ -1243,12 +1270,8 @@ class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase):
keyserver sends outdated (non-revoked) key.
"""
- KEYS = {
- KEY_UID: VALID_PUBLIC_KEY,
- }
-
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1261,8 +1284,15 @@ class OpenPGPWKDUnrevokeRefreshTest(unittest.TestCase):
def tearDown(self):
self.env.close()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=VALID_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.env.refresh_keys(allow_wkd=True)
with io.StringIO(SIGNED_MANIFEST) as f:
@@ -1277,12 +1307,8 @@ class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase):
Test that WKD refresh_keys() does not allow injecting another key.
"""
- KEYS = {
- KEY_UID: OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY,
- }
-
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1295,8 +1321,15 @@ class OpenPGPWKDFakeKeyRefreshTest(unittest.TestCase):
def tearDown(self):
self.env.close()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=OTHER_VALID_PUBLIC_KEY + VALID_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.env.refresh_keys(allow_wkd=True)
with io.StringIO(SIGNED_MANIFEST) as f:
@@ -1312,13 +1345,10 @@ class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase):
another (of the same UID).
"""
- KEYS = {
- KEY_UID: VALID_PUBLIC_KEY,
- }
SERVER_KEYS = {}
def setUp(self):
- self.env = MockedWKDOpenPGPEnvironment(self.KEYS)
+ self.env = gemato.openpgp.OpenPGPEnvironment()
try:
self.env.import_key(io.BytesIO(OTHER_VALID_PUBLIC_KEY))
except gemato.exceptions.OpenPGPRuntimeError as e:
@@ -1333,8 +1363,15 @@ class OpenPGPWKDReplaceKeyRefreshTest(HKPServerTestCase):
self.env.close()
super(OpenPGPWKDReplaceKeyRefreshTest, self).tearDown()
+ @need_responses
def test_refresh_keys(self):
try:
+ responses.add(
+ responses.GET,
+ 'https://example.com/.well-known/openpgpkey/hu/'
+ '5x66h616iaskmnadrm86ndo6xnxbxjxb?l=gemato',
+ body=VALID_PUBLIC_KEY,
+ content_type='application/pgp-keys')
self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
self.env.refresh_keys, allow_wkd=True,
keyserver=self.server_addr)
@@ -1422,7 +1459,7 @@ class OpenPGPForgedSubKeyKeyserverTest(HKPServerTestCase):
super(OpenPGPForgedSubKeyKeyserverTest, self).tearDown()
def test_verify_manifest(self):
- self.env.refresh_keys(allow_wkd=True,
+ self.env.refresh_keys(allow_wkd=False,
keyserver=self.server_addr)
with io.StringIO(SUBKEY_SIGNED_MANIFEST) as f:
diff --git a/tests/testutil.py b/tests/testutil.py
index f39533a..45d1cd6 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -18,8 +18,6 @@ import unittest
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
-import gemato.openpgp
-
class LoggingTestCase(unittest.TestCase):
def setUp(self):
@@ -119,35 +117,3 @@ class HKPServerTestCase(unittest.TestCase):
self.server.shutdown()
self.server.server_close()
self.server_thread.join()
-
-
-class MockedWKDOpenPGPEnvironment(gemato.openpgp.OpenPGPEnvironment):
- """
- A subclass of OpenPGPEnvironment that partially mocks spawning
- OpenPGP in order to inject keys without having to implement
- full HTTPS server with domain satisfactory to GnuPG.
- """
-
- def __init__(self, keys={}):
- self.keys = keys
- super(MockedWKDOpenPGPEnvironment, self).__init__()
-
- def clone(self):
- return MockedWKDOpenPGPEnvironment(self.keys)
-
- def _spawn_gpg(self, argv, stdin=''):
- if '--locate-keys' in argv:
- argv.remove('--locate-keys')
- assert len(argv) == 3
- assert argv[:2] == ['gpg', '--batch']
- if argv[2] in self.keys:
- ret, sout, serr = super(MockedWKDOpenPGPEnvironment,
- self)._spawn_gpg(
- ['gpg', '--batch', '--import'],
- self.keys[argv[2]])
- else:
- ret = 2
- return (ret, b'', b'')
-
- return super(MockedWKDOpenPGPEnvironment, self)._spawn_gpg(
- argv, stdin)