summaryrefslogtreecommitdiff
path: root/tests/testutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/testutil.py')
-rw-r--r--tests/testutil.py57
1 files changed, 28 insertions, 29 deletions
diff --git a/tests/testutil.py b/tests/testutil.py
index 45d1cd6..c00ef75 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -3,6 +3,7 @@
# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import collections
import errno
import functools
import io
@@ -15,6 +16,8 @@ import tempfile
import threading
import unittest
+import pytest
+
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
@@ -85,35 +88,31 @@ class HKPServerRequestHandler(BaseHTTPRequestHandler):
self.wfile.flush()
-class HKPServerTestCase(unittest.TestCase):
- """
- A test case deploying HKP server for OpenPGP client to use.
- """
-
- SERVER_KEYS = {}
-
- def setUp(self):
- # try 10 randomly selected ports before giving up
- for port in random.sample(range(1024, 32768), 10):
- try:
- self.server = HTTPServer(('127.0.0.1', port),
- functools.partial(HKPServerRequestHandler,
- self.SERVER_KEYS))
- except OSError as e:
- if e.errno != errno.EADDRINUSE:
- raise unittest.SkipTest('Unable to bind the HKP server: {}'
- .format(e))
- else:
- break
+@pytest.fixture
+def hkp_server():
+ keys = {}
+ # try 10 randomly selected ports before giving up
+ for port in random.sample(range(1024, 32768), 10):
+ try:
+ server = HTTPServer(
+ ('127.0.0.1', port),
+ functools.partial(HKPServerRequestHandler, keys))
+ except OSError as e:
+ if e.errno != errno.EADDRINUSE:
+ raise unittest.SkipTest('Unable to bind the HKP server: {}'
+ .format(e))
else:
- raise unittest.SkipTest('Unable to find a free port for HKP server')
+ break
+ else:
+ pytest.skip('Unable to find a free port for HKP server')
- self.server_addr = 'hkp://127.0.0.1:{}'.format(port)
- self.server_thread = threading.Thread(
- target=self.server.serve_forever)
- self.server_thread.start()
+ server_addr = f'hkp://127.0.0.1:{port}'
+ server_thread = threading.Thread(target=server.serve_forever)
+ server_thread.start()
- def tearDown(self):
- self.server.shutdown()
- self.server.server_close()
- self.server_thread.join()
+ yield collections.namedtuple('HKPServerTuple', ('addr', 'keys'))(
+ server_addr, keys)
+
+ server.shutdown()
+ server.server_close()
+ server_thread.join()