summaryrefslogtreecommitdiff
path: root/tests/testutil.py
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2018-07-25 15:54:32 +0200
committerMichał Górny <mgorny@gentoo.org>2018-07-25 15:54:32 +0200
commit2356749bae4c523fbe0ed3a0fcd593b16e138594 (patch)
treefbf2bc834bcdd08c3caf5c1fcc780345b2f56769 /tests/testutil.py
parent47e2a99c4497ea5c728ee4c9d5feb27076aff74d (diff)
downloadgemato-2356749bae4c523fbe0ed3a0fcd593b16e138594.tar.gz
tests: Add tests for keyserver-based refreshing
Diffstat (limited to 'tests/testutil.py')
-rw-r--r--tests/testutil.py82
1 files changed, 81 insertions, 1 deletions
diff --git a/tests/testutil.py b/tests/testutil.py
index 9e294a5..cfc9879 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -1,17 +1,28 @@
# gemato: Test utility functions
# vim:fileencoding=utf-8
-# (c) 2017 Michał Górny
+# (c) 2017-2018 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import errno
+import functools
import io
import logging
import os
import os.path
+import random
import shutil
import sys
import tempfile
+import threading
import unittest
+if sys.hexversion >= 0x03000000:
+ from http.server import HTTPServer, BaseHTTPRequestHandler
+ from urllib.parse import urlparse, parse_qs
+else:
+ from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+ from urlparse import urlparse, parse_qs
+
class LoggingTestCase(unittest.TestCase):
def setUp(self):
@@ -43,3 +54,72 @@ class TempDirTestCase(LoggingTestCase):
def tearDown(self):
shutil.rmtree(self.dir)
super(TempDirTestCase, self).tearDown()
+
+
+class HKPServerRequestHandler(BaseHTTPRequestHandler):
+ def __init__(self, keys, *args, **kwargs):
+ self.keys = keys
+ BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
+
+ def log_message(self, *args, **kwargs):
+ pass
+
+ def do_GET(self):
+ try:
+ parsed = urlparse(self.path)
+ assert parsed.path == '/pks/lookup'
+
+ qs = parse_qs(parsed.query)
+ assert qs.get('op') == ['get']
+ assert len(qs.get('search', [])) == 1
+
+ key = qs['search'][0]
+ assert key.startswith('0x')
+ key = key[2:]
+ except AssertionError:
+ self.send_error(400, "Bad request")
+ return
+
+ if key not in self.keys:
+ self.send_error(404, "Not found")
+ return
+
+ self.send_response(200, "OK")
+ self.send_header("Content-type", "text/plain")
+ self.end_headers()
+ self.wfile.write(self.keys[key])
+ self.wfile.flush()
+
+
+class HKPServerTestCase(unittest.TestCase):
+ """
+ A test case deploying HKP server for OpenPGP client to use.
+ """
+
+ SERVER_KEYS = {}
+
+ def setUp(self):
+ # try 10 randomly selected ports before giving up
+ for port in random.sample(range(1024, 32768), 10):
+ try:
+ self.server = HTTPServer(('127.0.0.1', port),
+ functools.partial(HKPServerRequestHandler,
+ self.SERVER_KEYS))
+ except OSError as e:
+ if e.errno != errno.EADDRINUSE:
+ raise unittest.SkipTest('Unable to bind the HKP server: {}'
+ .format(e))
+ else:
+ break
+ else:
+ raise unittest.SkipTest('Unable to find a free port for HKP server')
+
+ self.server_addr = 'hkp://127.0.0.1:{}'.format(port)
+ self.server_thread = threading.Thread(
+ target=self.server.serve_forever)
+ self.server_thread.start()
+
+ def tearDown(self):
+ self.server.shutdown()
+ self.server.server_close()
+ self.server_thread.join()