summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2018-07-25 15:54:32 +0200
committerMichał Górny <mgorny@gentoo.org>2018-07-25 15:54:32 +0200
commit2356749bae4c523fbe0ed3a0fcd593b16e138594 (patch)
treefbf2bc834bcdd08c3caf5c1fcc780345b2f56769
parent47e2a99c4497ea5c728ee4c9d5feb27076aff74d (diff)
downloadgemato-2356749bae4c523fbe0ed3a0fcd593b16e138594.tar.gz
tests: Add tests for keyserver-based refreshing
-rw-r--r--tests/test_openpgp.py169
-rw-r--r--tests/testutil.py82
2 files changed, 250 insertions, 1 deletions
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 81aca19..6452adb 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -17,6 +17,8 @@ import gemato.manifest
import gemato.openpgp
import gemato.recursiveloader
+from tests.testutil import HKPServerTestCase
+
PUBLIC_KEY = b'''
-----BEGIN PGP PUBLIC KEY BLOCK-----
@@ -237,6 +239,28 @@ t5pTRGhLWgdLUrs7vRB7wf7F8h4sci/YBKJRFA==
KEY_FINGERPRINT = '81E12C16BD8DCD60BE180845136880E72A7B1384'
SIG_TIMESTAMP = datetime.datetime(2017, 11, 8, 9, 1, 26)
+OTHER_PUBLIC_KEY = b'''
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+
+mQENBFtYfqUBCAC5OuNuaZOMwyegRtKFzzLlwsJaO+q1L5EN8tVHdzRUwBmwKgC8
+PDNiM7UGOhyN9Zasbeqvy1oF22nHIUgrDRkiB9m1k6E0FPvD2VzN1O7QiuKCjP8W
+aYhVRGYOXyCaaSPegqyidqPJz6AMDaZ38EWaZwGgJXmxzewUINPbepvyboTMZy5L
+9QiyfmKbsaW9BhW3qkKyIEnV+k/S/NQdKcVX5xEXriDt0E5r3NNMC0pxIhwpchLP
+MnHohMKBUYn3BNA9CyN0V/lRYJFJUrh9MnGkDkdYPSw9aYhvWEGOYnhW1bCl3ZLW
+6n2xVBpo5tK6PhJ+3lBCbzU3Lo6CtEbimkTxABEBAAG0Kk90aGVyIGdlbWF0byB0
+ZXN0IGtleSA8Z2VtYXRvQGV4YW1wbGUuY29tPokBVAQTAQgAPhYhBEuDSbkMVu5/
+BU1Shxgi9UJOttqBBQJbWH6lAhsDBQkDwmcABQsJCAcCBhUKCQgLAgQWAgMBAh4B
+AheAAAoJEBgi9UJOttqBlykH/37OTq4kUgN6Y/O91h95KfuyD+SE55rMCyZL2TpN
+mlN+Y9rO6WZIGyjEWW1cbB4VNdKvfyw/R4IBqHe19rEEg/gfosbZZl4ckOJgM02t
+Ksx6dtppzFa9lZmhkUBh+yMwF65Q9QR+xFX20lWanUqykXF8I5AMZbD8y00PaJ4Y
+C5JO+yd8tHqz9/aLv+uKYdkGd04JQVyXNYArJ7VeGfD/ixngoqKdKwIFx03tBXX/
+ADzqCOKsHytILIvYdWjz7qRL/l1Pv4kOEG07Ci2Cvl5wgMmqYIltr42G50UHqZgf
+M8n77OFu8x94uwIfsLqUrYnSg8Xy7Srx56Yl//ZnzMa3tcw=
+=SzMD
+-----END PGP PUBLIC KEY BLOCK-----
+'''
+OTHER_KEY_FINGERPRINT = '4B8349B90C56EE7F054D52871822F5424EB6DA81'
+
def strip_openpgp(text):
lines = text.lstrip().splitlines()
@@ -1021,3 +1045,148 @@ class OpenPGPPrivateKeyTest(unittest.TestCase):
self.assertIsNone(m2.openpgp_signature)
finally:
shutil.rmtree(d)
+
+
+class OpenPGPRefreshTest(HKPServerTestCase):
+ """
+ Test that refresh_keys() correctly handles revocation.
+ """
+
+ SERVER_KEYS = {
+ KEY_FINGERPRINT: REVOKED_PUBLIC_KEY,
+ }
+
+ def setUp(self):
+ self.env = gemato.openpgp.OpenPGPEnvironment()
+ try:
+ self.env.import_key(io.BytesIO(PUBLIC_KEY))
+ except gemato.exceptions.OpenPGPRuntimeError as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ super(OpenPGPRefreshTest, self).setUp()
+
+ def tearDown(self):
+ self.env.close()
+ super(OpenPGPRefreshTest, self).tearDown()
+
+ def test_refresh_keys(self):
+ try:
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ self.env.verify_file(f)
+
+ self.env.refresh_keys(allow_wkd=False,
+ keyserver=self.server_addr)
+
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
+ self.env.verify_file, f)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))
+
+
+class OpenPGPFailRefreshTest(HKPServerTestCase):
+ """
+ Test that refresh_keys() correctly handles missing key on server.
+ """
+
+ SERVER_KEYS = {}
+
+ def setUp(self):
+ self.env = gemato.openpgp.OpenPGPEnvironment()
+ try:
+ self.env.import_key(io.BytesIO(PUBLIC_KEY))
+ except gemato.exceptions.OpenPGPRuntimeError as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ super(OpenPGPFailRefreshTest, self).setUp()
+
+ def tearDown(self):
+ self.env.close()
+ super(OpenPGPFailRefreshTest, self).tearDown()
+
+ def test_refresh_keys(self):
+ try:
+ self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
+ self.env.refresh_keys, allow_wkd=False,
+ keyserver=self.server_addr)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))
+
+
+class OpenPGPUnrevokeRefreshTest(HKPServerTestCase):
+ """
+ Test that refresh_keys() does not ignore local revocation when
+ keyserver sends outdated (non-revoked) key.
+ """
+
+ SERVER_KEYS = {
+ KEY_FINGERPRINT: PUBLIC_KEY,
+ }
+
+ def setUp(self):
+ self.env = gemato.openpgp.OpenPGPEnvironment()
+ try:
+ self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY))
+ except gemato.exceptions.OpenPGPRuntimeError as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ super(OpenPGPUnrevokeRefreshTest, self).setUp()
+
+ def tearDown(self):
+ self.env.close()
+ super(OpenPGPUnrevokeRefreshTest, self).tearDown()
+
+ def test_refresh_keys(self):
+ try:
+ self.env.refresh_keys(allow_wkd=False,
+ keyserver=self.server_addr)
+
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure,
+ self.env.verify_file, f)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))
+
+
+class OpenPGPFakeKeyRefreshTest(HKPServerTestCase):
+ """
+ Test that refresh_keys() does not allow maliciously replacing key
+ with another.
+ """
+
+ SERVER_KEYS = {
+ OTHER_KEY_FINGERPRINT: PUBLIC_KEY,
+ }
+
+ def setUp(self):
+ self.env = gemato.openpgp.OpenPGPEnvironment()
+ try:
+ self.env.import_key(io.BytesIO(OTHER_PUBLIC_KEY))
+ except gemato.exceptions.OpenPGPRuntimeError as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ self.env.close()
+ raise unittest.SkipTest(str(e))
+ super(OpenPGPFakeKeyRefreshTest, self).setUp()
+
+ def tearDown(self):
+ self.env.close()
+ super(OpenPGPFakeKeyRefreshTest, self).tearDown()
+
+ def test_refresh_keys(self):
+ try:
+ self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError,
+ self.env.refresh_keys, allow_wkd=False,
+ keyserver=self.server_addr)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))
diff --git a/tests/testutil.py b/tests/testutil.py
index 9e294a5..cfc9879 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -1,17 +1,28 @@
# gemato: Test utility functions
# vim:fileencoding=utf-8
-# (c) 2017 Michał Górny
+# (c) 2017-2018 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import errno
+import functools
import io
import logging
import os
import os.path
+import random
import shutil
import sys
import tempfile
+import threading
import unittest
+if sys.hexversion >= 0x03000000:
+ from http.server import HTTPServer, BaseHTTPRequestHandler
+ from urllib.parse import urlparse, parse_qs
+else:
+ from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+ from urlparse import urlparse, parse_qs
+
class LoggingTestCase(unittest.TestCase):
def setUp(self):
@@ -43,3 +54,72 @@ class TempDirTestCase(LoggingTestCase):
def tearDown(self):
shutil.rmtree(self.dir)
super(TempDirTestCase, self).tearDown()
+
+
+class HKPServerRequestHandler(BaseHTTPRequestHandler):
+ def __init__(self, keys, *args, **kwargs):
+ self.keys = keys
+ BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
+
+ def log_message(self, *args, **kwargs):
+ pass
+
+ def do_GET(self):
+ try:
+ parsed = urlparse(self.path)
+ assert parsed.path == '/pks/lookup'
+
+ qs = parse_qs(parsed.query)
+ assert qs.get('op') == ['get']
+ assert len(qs.get('search', [])) == 1
+
+ key = qs['search'][0]
+ assert key.startswith('0x')
+ key = key[2:]
+ except AssertionError:
+ self.send_error(400, "Bad request")
+ return
+
+ if key not in self.keys:
+ self.send_error(404, "Not found")
+ return
+
+ self.send_response(200, "OK")
+ self.send_header("Content-type", "text/plain")
+ self.end_headers()
+ self.wfile.write(self.keys[key])
+ self.wfile.flush()
+
+
+class HKPServerTestCase(unittest.TestCase):
+ """
+ A test case deploying HKP server for OpenPGP client to use.
+ """
+
+ SERVER_KEYS = {}
+
+ def setUp(self):
+ # try 10 randomly selected ports before giving up
+ for port in random.sample(range(1024, 32768), 10):
+ try:
+ self.server = HTTPServer(('127.0.0.1', port),
+ functools.partial(HKPServerRequestHandler,
+ self.SERVER_KEYS))
+ except OSError as e:
+ if e.errno != errno.EADDRINUSE:
+ raise unittest.SkipTest('Unable to bind the HKP server: {}'
+ .format(e))
+ else:
+ break
+ else:
+ raise unittest.SkipTest('Unable to find a free port for HKP server')
+
+ self.server_addr = 'hkp://127.0.0.1:{}'.format(port)
+ self.server_thread = threading.Thread(
+ target=self.server.serve_forever)
+ self.server_thread.start()
+
+ def tearDown(self):
+ self.server.shutdown()
+ self.server.server_close()
+ self.server_thread.join()