diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_openpgp.py | 169 | ||||
-rw-r--r-- | tests/testutil.py | 82 |
2 files changed, 250 insertions, 1 deletions
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 81aca19..6452adb 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -17,6 +17,8 @@ import gemato.manifest import gemato.openpgp import gemato.recursiveloader +from tests.testutil import HKPServerTestCase + PUBLIC_KEY = b''' -----BEGIN PGP PUBLIC KEY BLOCK----- @@ -237,6 +239,28 @@ t5pTRGhLWgdLUrs7vRB7wf7F8h4sci/YBKJRFA== KEY_FINGERPRINT = '81E12C16BD8DCD60BE180845136880E72A7B1384' SIG_TIMESTAMP = datetime.datetime(2017, 11, 8, 9, 1, 26) +OTHER_PUBLIC_KEY = b''' +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mQENBFtYfqUBCAC5OuNuaZOMwyegRtKFzzLlwsJaO+q1L5EN8tVHdzRUwBmwKgC8 +PDNiM7UGOhyN9Zasbeqvy1oF22nHIUgrDRkiB9m1k6E0FPvD2VzN1O7QiuKCjP8W +aYhVRGYOXyCaaSPegqyidqPJz6AMDaZ38EWaZwGgJXmxzewUINPbepvyboTMZy5L +9QiyfmKbsaW9BhW3qkKyIEnV+k/S/NQdKcVX5xEXriDt0E5r3NNMC0pxIhwpchLP +MnHohMKBUYn3BNA9CyN0V/lRYJFJUrh9MnGkDkdYPSw9aYhvWEGOYnhW1bCl3ZLW +6n2xVBpo5tK6PhJ+3lBCbzU3Lo6CtEbimkTxABEBAAG0Kk90aGVyIGdlbWF0byB0 +ZXN0IGtleSA8Z2VtYXRvQGV4YW1wbGUuY29tPokBVAQTAQgAPhYhBEuDSbkMVu5/ +BU1Shxgi9UJOttqBBQJbWH6lAhsDBQkDwmcABQsJCAcCBhUKCQgLAgQWAgMBAh4B +AheAAAoJEBgi9UJOttqBlykH/37OTq4kUgN6Y/O91h95KfuyD+SE55rMCyZL2TpN +mlN+Y9rO6WZIGyjEWW1cbB4VNdKvfyw/R4IBqHe19rEEg/gfosbZZl4ckOJgM02t +Ksx6dtppzFa9lZmhkUBh+yMwF65Q9QR+xFX20lWanUqykXF8I5AMZbD8y00PaJ4Y +C5JO+yd8tHqz9/aLv+uKYdkGd04JQVyXNYArJ7VeGfD/ixngoqKdKwIFx03tBXX/ +ADzqCOKsHytILIvYdWjz7qRL/l1Pv4kOEG07Ci2Cvl5wgMmqYIltr42G50UHqZgf +M8n77OFu8x94uwIfsLqUrYnSg8Xy7Srx56Yl//ZnzMa3tcw= +=SzMD +-----END PGP PUBLIC KEY BLOCK----- +''' +OTHER_KEY_FINGERPRINT = '4B8349B90C56EE7F054D52871822F5424EB6DA81' + def strip_openpgp(text): lines = text.lstrip().splitlines() @@ -1021,3 +1045,148 @@ class OpenPGPPrivateKeyTest(unittest.TestCase): self.assertIsNone(m2.openpgp_signature) finally: shutil.rmtree(d) + + +class OpenPGPRefreshTest(HKPServerTestCase): + """ + Test that refresh_keys() correctly handles revocation. + """ + + SERVER_KEYS = { + KEY_FINGERPRINT: REVOKED_PUBLIC_KEY, + } + + def setUp(self): + self.env = gemato.openpgp.OpenPGPEnvironment() + try: + self.env.import_key(io.BytesIO(PUBLIC_KEY)) + except gemato.exceptions.OpenPGPRuntimeError as e: + self.env.close() + raise unittest.SkipTest(str(e)) + except gemato.exceptions.OpenPGPNoImplementation as e: + self.env.close() + raise unittest.SkipTest(str(e)) + super(OpenPGPRefreshTest, self).setUp() + + def tearDown(self): + self.env.close() + super(OpenPGPRefreshTest, self).tearDown() + + def test_refresh_keys(self): + try: + with io.StringIO(SIGNED_MANIFEST) as f: + self.env.verify_file(f) + + self.env.refresh_keys(allow_wkd=False, + keyserver=self.server_addr) + + with io.StringIO(SIGNED_MANIFEST) as f: + self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, + self.env.verify_file, f) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) + + +class OpenPGPFailRefreshTest(HKPServerTestCase): + """ + Test that refresh_keys() correctly handles missing key on server. + """ + + SERVER_KEYS = {} + + def setUp(self): + self.env = gemato.openpgp.OpenPGPEnvironment() + try: + self.env.import_key(io.BytesIO(PUBLIC_KEY)) + except gemato.exceptions.OpenPGPRuntimeError as e: + self.env.close() + raise unittest.SkipTest(str(e)) + except gemato.exceptions.OpenPGPNoImplementation as e: + self.env.close() + raise unittest.SkipTest(str(e)) + super(OpenPGPFailRefreshTest, self).setUp() + + def tearDown(self): + self.env.close() + super(OpenPGPFailRefreshTest, self).tearDown() + + def test_refresh_keys(self): + try: + self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, + self.env.refresh_keys, allow_wkd=False, + keyserver=self.server_addr) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) + + +class OpenPGPUnrevokeRefreshTest(HKPServerTestCase): + """ + Test that refresh_keys() does not ignore local revocation when + keyserver sends outdated (non-revoked) key. + """ + + SERVER_KEYS = { + KEY_FINGERPRINT: PUBLIC_KEY, + } + + def setUp(self): + self.env = gemato.openpgp.OpenPGPEnvironment() + try: + self.env.import_key(io.BytesIO(REVOKED_PUBLIC_KEY)) + except gemato.exceptions.OpenPGPRuntimeError as e: + self.env.close() + raise unittest.SkipTest(str(e)) + except gemato.exceptions.OpenPGPNoImplementation as e: + self.env.close() + raise unittest.SkipTest(str(e)) + super(OpenPGPUnrevokeRefreshTest, self).setUp() + + def tearDown(self): + self.env.close() + super(OpenPGPUnrevokeRefreshTest, self).tearDown() + + def test_refresh_keys(self): + try: + self.env.refresh_keys(allow_wkd=False, + keyserver=self.server_addr) + + with io.StringIO(SIGNED_MANIFEST) as f: + self.assertRaises(gemato.exceptions.OpenPGPRevokedKeyFailure, + self.env.verify_file, f) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) + + +class OpenPGPFakeKeyRefreshTest(HKPServerTestCase): + """ + Test that refresh_keys() does not allow maliciously replacing key + with another. + """ + + SERVER_KEYS = { + OTHER_KEY_FINGERPRINT: PUBLIC_KEY, + } + + def setUp(self): + self.env = gemato.openpgp.OpenPGPEnvironment() + try: + self.env.import_key(io.BytesIO(OTHER_PUBLIC_KEY)) + except gemato.exceptions.OpenPGPRuntimeError as e: + self.env.close() + raise unittest.SkipTest(str(e)) + except gemato.exceptions.OpenPGPNoImplementation as e: + self.env.close() + raise unittest.SkipTest(str(e)) + super(OpenPGPFakeKeyRefreshTest, self).setUp() + + def tearDown(self): + self.env.close() + super(OpenPGPFakeKeyRefreshTest, self).tearDown() + + def test_refresh_keys(self): + try: + self.assertRaises(gemato.exceptions.OpenPGPKeyRefreshError, + self.env.refresh_keys, allow_wkd=False, + keyserver=self.server_addr) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) diff --git a/tests/testutil.py b/tests/testutil.py index 9e294a5..cfc9879 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -1,17 +1,28 @@ # gemato: Test utility functions # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2018 Michał Górny # Licensed under the terms of 2-clause BSD license +import errno +import functools import io import logging import os import os.path +import random import shutil import sys import tempfile +import threading import unittest +if sys.hexversion >= 0x03000000: + from http.server import HTTPServer, BaseHTTPRequestHandler + from urllib.parse import urlparse, parse_qs +else: + from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler + from urlparse import urlparse, parse_qs + class LoggingTestCase(unittest.TestCase): def setUp(self): @@ -43,3 +54,72 @@ class TempDirTestCase(LoggingTestCase): def tearDown(self): shutil.rmtree(self.dir) super(TempDirTestCase, self).tearDown() + + +class HKPServerRequestHandler(BaseHTTPRequestHandler): + def __init__(self, keys, *args, **kwargs): + self.keys = keys + BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def log_message(self, *args, **kwargs): + pass + + def do_GET(self): + try: + parsed = urlparse(self.path) + assert parsed.path == '/pks/lookup' + + qs = parse_qs(parsed.query) + assert qs.get('op') == ['get'] + assert len(qs.get('search', [])) == 1 + + key = qs['search'][0] + assert key.startswith('0x') + key = key[2:] + except AssertionError: + self.send_error(400, "Bad request") + return + + if key not in self.keys: + self.send_error(404, "Not found") + return + + self.send_response(200, "OK") + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(self.keys[key]) + self.wfile.flush() + + +class HKPServerTestCase(unittest.TestCase): + """ + A test case deploying HKP server for OpenPGP client to use. + """ + + SERVER_KEYS = {} + + def setUp(self): + # try 10 randomly selected ports before giving up + for port in random.sample(range(1024, 32768), 10): + try: + self.server = HTTPServer(('127.0.0.1', port), + functools.partial(HKPServerRequestHandler, + self.SERVER_KEYS)) + except OSError as e: + if e.errno != errno.EADDRINUSE: + raise unittest.SkipTest('Unable to bind the HKP server: {}' + .format(e)) + else: + break + else: + raise unittest.SkipTest('Unable to find a free port for HKP server') + + self.server_addr = 'hkp://127.0.0.1:{}'.format(port) + self.server_thread = threading.Thread( + target=self.server.serve_forever) + self.server_thread.start() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + self.server_thread.join() |