diff options
-rw-r--r-- | gemato/manifest.py | 30 | ||||
-rw-r--r-- | tests/test_openpgp.py | 66 |
2 files changed, 87 insertions, 9 deletions
diff --git a/gemato/manifest.py b/gemato/manifest.py index e6c9837..3743d5c 100644 --- a/gemato/manifest.py +++ b/gemato/manifest.py @@ -4,9 +4,11 @@ # Licensed under the terms of 2-clause BSD license import datetime +import io import os.path import gemato.exceptions +import gemato.openpgp import gemato.util @@ -298,31 +300,50 @@ class ManifestFile(object): """ self.entries = [] + self.openpgp_signed = None if f is not None: self.load(f) - def load(self, f): + def load(self, f, verify_openpgp=True, openpgp_env=None): """ Load data from file @f. The file should be open for reading in text mode, and oriented at the beginning. + + If @verify_openpgp is True and the Manifest contains an OpenPGP + signature, the signature will be verified. Provide @openpgp_env + to perform the verification in specific environment. + + If the verification succeeds, the openpgp_signed property will + be set to True. If it fails or OpenPGP is not available, + an exception will be raised. If the exception is caught, + the caller can continue using the ManifestFile instance + -- it will be loaded completely. """ self.entries = [] + self.openpgp_signed = False state = ManifestState.DATA + openpgp_data = '' for l in f: if state == ManifestState.DATA: if l == '-----BEGIN PGP SIGNED MESSAGE-----\n': if self.entries: raise gemato.exceptions.ManifestUnsignedData() + if verify_openpgp: + openpgp_data += l state = ManifestState.SIGNED_PREAMBLE continue elif state == ManifestState.SIGNED_PREAMBLE: + if verify_openpgp: + openpgp_data += l # skip header lines up to the empty line if l.strip(): continue state = ManifestState.SIGNED_DATA elif state == ManifestState.SIGNED_DATA: + if verify_openpgp: + openpgp_data += l if l == '-----BEGIN PGP SIGNATURE-----\n': state = ManifestState.SIGNATURE continue @@ -330,6 +351,8 @@ class ManifestFile(object): if l.startswith('- '): l = l[2:] elif state == ManifestState.SIGNATURE: + if verify_openpgp: + openpgp_data += l if l == '-----END PGP SIGNATURE-----\n': state = ManifestState.POST_SIGNED_DATA continue @@ -359,6 +382,11 @@ class ManifestFile(object): raise gemato.exceptions.ManifestSyntaxError( "Manifest terminated early, inside signature") + if verify_openpgp and state == ManifestState.POST_SIGNED_DATA: + with io.BytesIO(openpgp_data.encode('utf8')) as f: + gemato.openpgp.verify_file(f, env=openpgp_env) + self.openpgp_signed = True + def dump(self, f): """ Dump data into file @f. The file should be open for writing diff --git a/tests/test_openpgp.py b/tests/test_openpgp.py index 32028f0..9d37716 100644 --- a/tests/test_openpgp.py +++ b/tests/test_openpgp.py @@ -137,16 +137,18 @@ class SignedManifestTest(unittest.TestCase): def test_manifest_load(self): m = gemato.manifest.ManifestFile() with io.StringIO(SIGNED_MANIFEST) as f: - m.load(f) + m.load(f, verify_openpgp=False) self.assertIsNotNone(m.find_timestamp()) self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) + self.assertFalse(m.openpgp_signed) def test_dash_escaped_manifest_load(self): m = gemato.manifest.ManifestFile() with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f: - m.load(f) + m.load(f, verify_openpgp=False) self.assertIsNotNone(m.find_timestamp()) self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) + self.assertFalse(m.openpgp_signed) def test_modified_manifest_load(self): """ @@ -155,39 +157,40 @@ class SignedManifestTest(unittest.TestCase): """ m = gemato.manifest.ManifestFile() with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f: - m.load(f) + m.load(f, verify_openpgp=False) self.assertIsNotNone(m.find_timestamp()) self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) + self.assertFalse(m.openpgp_signed) def test_junk_before_manifest_load(self): m = gemato.manifest.ManifestFile() with io.StringIO('OPTIONAL test\n' + SIGNED_MANIFEST) as f: self.assertRaises(gemato.exceptions.ManifestUnsignedData, - m.load, f) + m.load, f, verify_openpgp=False) def test_junk_after_manifest_load(self): m = gemato.manifest.ManifestFile() with io.StringIO(SIGNED_MANIFEST + 'OPTIONAL test\n') as f: self.assertRaises(gemato.exceptions.ManifestUnsignedData, - m.load, f) + m.load, f, verify_openpgp=False) def test_signed_manifest_terminated_before_data(self): m = gemato.manifest.ManifestFile() with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:3])) as f: self.assertRaises(gemato.exceptions.ManifestSyntaxError, - m.load, f) + m.load, f, verify_openpgp=False) def test_signed_manifest_terminated_before_signature(self): m = gemato.manifest.ManifestFile() with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:7])) as f: self.assertRaises(gemato.exceptions.ManifestSyntaxError, - m.load, f) + m.load, f, verify_openpgp=False) def test_signed_manifest_terminated_before_end(self): m = gemato.manifest.ManifestFile() with io.StringIO('\n'.join(SIGNED_MANIFEST.splitlines()[:15])) as f: self.assertRaises(gemato.exceptions.ManifestSyntaxError, - m.load, f) + m.load, f, verify_openpgp=False) class OpenPGPCorrectKeyTest(unittest.TestCase): @@ -221,6 +224,28 @@ class OpenPGPCorrectKeyTest(unittest.TestCase): self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, self.env.verify_file, f) + def test_manifest_load(self): + m = gemato.manifest.ManifestFile() + with io.StringIO(SIGNED_MANIFEST) as f: + m.load(f, openpgp_env=self.env) + self.assertIsNotNone(m.find_timestamp()) + self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) + self.assertTrue(m.openpgp_signed) + + def test_dash_escaped_manifest_load(self): + m = gemato.manifest.ManifestFile() + with io.StringIO(DASH_ESCAPED_SIGNED_MANIFEST) as f: + m.load(f, openpgp_env=self.env) + self.assertIsNotNone(m.find_timestamp()) + self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) + self.assertTrue(m.openpgp_signed) + + def test_modified_manifest_load(self): + m = gemato.manifest.ManifestFile() + with io.StringIO(MODIFIED_SIGNED_MANIFEST) as f: + self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, + m.load, f, openpgp_env=self.env) + class OpenPGPNoKeyTest(unittest.TestCase): """ @@ -241,6 +266,31 @@ class OpenPGPNoKeyTest(unittest.TestCase): except gemato.exceptions.OpenPGPNoImplementation as e: raise unittest.SkipTest(str(e)) + def test_manifest_load(self): + m = gemato.manifest.ManifestFile() + with io.StringIO(SIGNED_MANIFEST) as f: + try: + self.assertRaises(gemato.exceptions.OpenPGPVerificationFailure, + m.load, f, openpgp_env=self.env) + except gemato.exceptions.OpenPGPNoImplementation as e: + raise unittest.SkipTest(str(e)) + + def test_manifest_load_exception_caught(self): + """ + Test that the Manifest is loaded even if exception is raised. + """ + m = gemato.manifest.ManifestFile() + with io.StringIO(SIGNED_MANIFEST) as f: + try: + m.load(f, openpgp_env=self.env) + except gemato.exceptions.OpenPGPVerificationFailure: + pass + except gemato.exceptions.OpenPGPNoImplementation: + pass + self.assertIsNotNone(m.find_timestamp()) + self.assertIsNotNone(m.find_path_entry('myebuild-0.ebuild')) + self.assertFalse(m.openpgp_signed) + class OpenPGPContextManagerTest(unittest.TestCase): """ |