diff options
author | Michał Górny <mgorny@gentoo.org> | 2018-07-25 15:54:32 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2018-07-25 15:54:32 +0200 |
commit | 2356749bae4c523fbe0ed3a0fcd593b16e138594 (patch) | |
tree | fbf2bc834bcdd08c3caf5c1fcc780345b2f56769 /tests/testutil.py | |
parent | 47e2a99c4497ea5c728ee4c9d5feb27076aff74d (diff) | |
download | gemato-2356749bae4c523fbe0ed3a0fcd593b16e138594.tar.gz |
tests: Add tests for keyserver-based refreshing
Diffstat (limited to 'tests/testutil.py')
-rw-r--r-- | tests/testutil.py | 82 |
1 files changed, 81 insertions, 1 deletions
diff --git a/tests/testutil.py b/tests/testutil.py index 9e294a5..cfc9879 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -1,17 +1,28 @@ # gemato: Test utility functions # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2018 Michał Górny # Licensed under the terms of 2-clause BSD license +import errno +import functools import io import logging import os import os.path +import random import shutil import sys import tempfile +import threading import unittest +if sys.hexversion >= 0x03000000: + from http.server import HTTPServer, BaseHTTPRequestHandler + from urllib.parse import urlparse, parse_qs +else: + from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler + from urlparse import urlparse, parse_qs + class LoggingTestCase(unittest.TestCase): def setUp(self): @@ -43,3 +54,72 @@ class TempDirTestCase(LoggingTestCase): def tearDown(self): shutil.rmtree(self.dir) super(TempDirTestCase, self).tearDown() + + +class HKPServerRequestHandler(BaseHTTPRequestHandler): + def __init__(self, keys, *args, **kwargs): + self.keys = keys + BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def log_message(self, *args, **kwargs): + pass + + def do_GET(self): + try: + parsed = urlparse(self.path) + assert parsed.path == '/pks/lookup' + + qs = parse_qs(parsed.query) + assert qs.get('op') == ['get'] + assert len(qs.get('search', [])) == 1 + + key = qs['search'][0] + assert key.startswith('0x') + key = key[2:] + except AssertionError: + self.send_error(400, "Bad request") + return + + if key not in self.keys: + self.send_error(404, "Not found") + return + + self.send_response(200, "OK") + self.send_header("Content-type", "text/plain") + self.end_headers() + self.wfile.write(self.keys[key]) + self.wfile.flush() + + +class HKPServerTestCase(unittest.TestCase): + """ + A test case deploying HKP server for OpenPGP client to use. + """ + + SERVER_KEYS = {} + + def setUp(self): + # try 10 randomly selected ports before giving up + for port in random.sample(range(1024, 32768), 10): + try: + self.server = HTTPServer(('127.0.0.1', port), + functools.partial(HKPServerRequestHandler, + self.SERVER_KEYS)) + except OSError as e: + if e.errno != errno.EADDRINUSE: + raise unittest.SkipTest('Unable to bind the HKP server: {}' + .format(e)) + else: + break + else: + raise unittest.SkipTest('Unable to find a free port for HKP server') + + self.server_addr = 'hkp://127.0.0.1:{}'.format(port) + self.server_thread = threading.Thread( + target=self.server.serve_forever) + self.server_thread.start() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + self.server_thread.join() |