summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2017-10-26 23:00:18 +0200
committerMichał Górny <mgorny@gentoo.org>2017-10-26 23:00:18 +0200
commitc3b591111ee7ae32eba4eae47dff95a5a338b942 (patch)
treebf430166dff4d6f17084f48b18c7748c59def1f1
parentea4d757852786ea334a034c30b4cd4cd34b76a21 (diff)
downloadgemato-c3b591111ee7ae32eba4eae47dff95a5a338b942.tar.gz
manifest: Support implicit OpenPGP verification
-rw-r--r--gemato/manifest.py30
-rw-r--r--tests/test_openpgp.py66
2 files changed, 87 insertions, 9 deletions
diff --git a/gemato/manifest.py b/gemato/manifest.py
index e6c9837..3743d5c 100644
--- a/gemato/manifest.py
+++ b/gemato/manifest.py
@@ -4,9 +4,11 @@
# Licensed under the terms of 2-clause BSD license
import datetime
+import io
import os.path
import gemato.exceptions
+import gemato.openpgp
import gemato.util
@@ -298,31 +300,50 @@ class ManifestFile(object):
"""
self.entries = []
+ self.openpgp_signed = None
if f is not None:
self.load(f)
- def load(self, f):
+ def load(self, f, verify_openpgp=True, openpgp_env=None):
"""
Load data from file @f. The file should be open for reading
in text mode, and oriented at the beginning.
+
+ If @verify_openpgp is True and the Manifest contains an OpenPGP
+ signature, the signature will be verified. Provide @openpgp_env
+ to perform the verification in specific environment.
+
+ If the verification succeeds, the openpgp_signed property will
+ be set to True. If it fails or OpenPGP is not available,
+ an exception will be raised. If the exception is caught,
+ the caller can continue using the ManifestFile instance
+ -- it will be loaded completely.
"""
self.entries = []
+ self.openpgp_signed = False
state = ManifestState.DATA
+ openpgp_data = ''
for l in f:
if state == ManifestState.DATA:
if l == '-----BEGIN PGP SIGNED MESSAGE-----\n':
if self.entries:
raise gemato.exceptions.ManifestUnsignedData()
+ if verify_openpgp:
+ openpgp_data += l
state = ManifestState.SIGNED_PREAMBLE
continue
elif state == ManifestState.SIGNED_PREAMBLE:
+ if verify_openpgp:
+ openpgp_data += l
# skip header lines up to the empty line
if l.strip():
continue
state = ManifestState.SIGNED_DATA
elif state == ManifestState.SIGNED_DATA:
+ if verify_openpgp:
+ openpgp_data += l
if l == '-----BEGIN PGP SIGNATURE-----\n':
state = ManifestState.SIGNATURE
continue
@@ -330,6 +351,8 @@ class ManifestFile(object):
if l.startswith('- '):
l = l[2:]
elif state == ManifestState.SIGNATURE:
+ if verify_openpgp:
+ openpgp_data += l
if l == '-----END PGP SIGNATURE-----\n':
state = ManifestState.POST_SIGNED_DATA
continue
@@ -359,6 +382,11 @@ class ManifestFile(object):
raise gemato.exceptions.ManifestSyntaxError(
"Manifest terminated early, inside signature")
+ if verify_openpgp and state == ManifestState.POST_SIGNED_DATA:
+ with io.BytesIO(openpgp_data.encode('utf8')) as f:
+ gemato.openpgp.verify_file(f, env=openpgp_env)
+ self.openpgp_signed = True
+
def dump(self, f):
"""
Dump data into file @f. The file should be open for writing
diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py
index 32028f0..9d37716 100644
--- a/tests/test_openpgp.py
+++ b/tests/test_openpgp.py
@@ -137,16 +137,18 @@ class SignedManifestTest(unittest.TestCase):
def test_manifest_load(self):
m = gemato.manifest.ManifestFile()
with io.StringIO(SIGNED_MANIFEST) as f:
- m.load(f)
+ m.load(f, verify_openpgp=False)
self.assertIsNotNone(m.find_timestamp())
self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
+ self.assertFalse(m.openpgp_signed)
def test_dash_escaped_manifest_load(self):
m = gemato.manifest.ManifestFile()
with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f:
- m.load(f)
+ m.load(f, verify_openpgp=False)
self.assertIsNotNone(m.find_timestamp())
self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
+ self.assertFalse(m.openpgp_signed)
def test_modified_manifest_load(self):
"""
@@ -155,39 +157,40 @@ class SignedManifestTest(unittest.TestCase):
"""
m = gemato.manifest.ManifestFile()
with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f:
- m.load(f)
+ m.load(f, verify_openpgp=False)
self.assertIsNotNone(m.find_timestamp())
self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
+ self.assertFalse(m.openpgp_signed)
def test_junk_before_manifest_load(self):
m = gemato.manifest.ManifestFile()
with io.StringIO('OPTIONAL test\n' + SIGNED_MANIFEST) as f:
self.assertRaises(gemato.exceptions.ManifestUnsignedData,
- m.load, f)
+ m.load, f, verify_openpgp=False)
def test_junk_after_manifest_load(self):
m = gemato.manifest.ManifestFile()
with io.StringIO(SIGNED_MANIFEST + 'OPTIONAL test\n') as f:
self.assertRaises(gemato.exceptions.ManifestUnsignedData,
- m.load, f)
+ m.load, f, verify_openpgp=False)
def test_signed_manifest_terminated_before_data(self):
m = gemato.manifest.ManifestFile()
with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:3])) as f:
self.assertRaises(gemato.exceptions.ManifestSyntaxError,
- m.load, f)
+ m.load, f, verify_openpgp=False)
def test_signed_manifest_terminated_before_signature(self):
m = gemato.manifest.ManifestFile()
with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:7])) as f:
self.assertRaises(gemato.exceptions.ManifestSyntaxError,
- m.load, f)
+ m.load, f, verify_openpgp=False)
def test_signed_manifest_terminated_before_end(self):
m = gemato.manifest.ManifestFile()
with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:15])) as f:
self.assertRaises(gemato.exceptions.ManifestSyntaxError,
- m.load, f)
+ m.load, f, verify_openpgp=False)
class OpenPGPCorrectKeyTest(unittest.TestCase):
@@ -221,6 +224,28 @@ class OpenPGPCorrectKeyTest(unittest.TestCase):
self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
self.env.verify_file, f)
+ def test_manifest_load(self):
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ m.load(f, openpgp_env=self.env)
+ self.assertIsNotNone(m.find_timestamp())
+ self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
+ self.assertTrue(m.openpgp_signed)
+
+ def test_dash_escaped_manifest_load(self):
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f:
+ m.load(f, openpgp_env=self.env)
+ self.assertIsNotNone(m.find_timestamp())
+ self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
+ self.assertTrue(m.openpgp_signed)
+
+ def test_modified_manifest_load(self):
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f:
+ self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
+ m.load, f, openpgp_env=self.env)
+
class OpenPGPNoKeyTest(unittest.TestCase):
"""
@@ -241,6 +266,31 @@ class OpenPGPNoKeyTest(unittest.TestCase):
except gemato.exceptions.OpenPGPNoImplementation as e:
raise unittest.SkipTest(str(e))
+ def test_manifest_load(self):
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ try:
+ self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure,
+ m.load, f, openpgp_env=self.env)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ raise unittest.SkipTest(str(e))
+
+ def test_manifest_load_exception_caught(self):
+ """
+ Test that the Manifest is loaded even if exception is raised.
+ """
+ m = gemato.manifest.ManifestFile()
+ with io.StringIO(SIGNED_MANIFEST) as f:
+ try:
+ m.load(f, openpgp_env=self.env)
+ except gemato.exceptions.OpenPGPVerificationFailure:
+ pass
+ except gemato.exceptions.OpenPGPNoImplementation:
+ pass
+ self.assertIsNotNone(m.find_timestamp())
+ self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild'))
+ self.assertFalse(m.openpgp_signed)
+
class OpenPGPContextManagerTest(unittest.TestCase):
"""