diff options
-rw-r--r-- | gemato/cli.py | 73 | ||||
-rw-r--r-- | tests/test_recursiveloader.py | 34 |
2 files changed, 107 insertions, 0 deletions
diff --git a/gemato/cli.py b/gemato/cli.py index 737814c..2a32cf2 100644 --- a/gemato/cli.py +++ b/gemato/cli.py @@ -6,6 +6,7 @@ from __future__ import print_function import argparse +import datetime import io import logging import os.path @@ -79,6 +80,59 @@ def do_verify(args): return 0 if ret else 1 +def do_update(args): + for p in args.paths: + tlm = gemato.find_top_level.find_top_level_manifest(p) + if tlm is None: + logging.error('Top-level Manifest not found in {}'.format(p)) + return 1 + + init_kwargs = {} + init_kwargs['hashes'] = args.hashes.split() + if args.openpgp_id is not None: + init_kwargs['openpgp_keyid'] = args.openpgp_id + if args.sign is not None: + init_kwargs['sign_openpgp'] = args.sign + with gemato.openpgp.OpenPGPEnvironment() as env: + if args.openpgp_key is not None: + with io.open(args.openpgp_key, 'rb') as f: + env.import_key(f) + init_kwargs['openpgp_env'] = env + + start = timeit.default_timer() + try: + m = gemato.recursiveloader.ManifestRecursiveLoader(tlm, + **init_kwargs) + except gemato.exceptions.OpenPGPNoImplementation as e: + logging.error(str(e)) + return 1 + except gemato.exceptions.OpenPGPVerificationFailure as e: + logging.error(str(e)) + return 1 + + relpath = os.path.relpath(p, os.path.dirname(tlm)) + if relpath == '.': + relpath = '' + try: + m.update_entries_for_directory(relpath) + + ts = m.find_timestamp() + if ts is not None: + ts.ts = datetime.datetime.utcnow() + + m.save_manifests() + except gemato.exceptions.ManifestCrossDevice as e: + logging.error(str(e)) + return 1 + except gemato.exceptions.ManifestInvalidPath as e: + logging.error(str(e)) + return 1 + + stop = timeit.default_timer() + logging.info('{} updated in {:.2f} seconds'.format(p, stop - start)) + return 0 + + def main(argv): argp = argparse.ArgumentParser( prog=argv[0], @@ -103,5 +157,24 @@ def main(argv): help='Do not fail on non-strict Manifest issues (MISC/OPTIONAL entries)') verify.set_defaults(func=do_verify) + update = subp.add_parser('update', + help='Update the Manifest entries for one or more directory trees') + update.add_argument('paths', nargs='*', default=['.'], + help='Paths to update (defaults to "." if none specified)') + update.add_argument('-H', '--hashes', required=True, + help='Whitespace-separated list of hashes to use') + update.add_argument('-k', '--openpgp-id', + help='Use the specified OpenPGP key (by ID or user)') + update.add_argument('-K', '--openpgp-key', + help='Use only the OpenPGP key(s) from a specific file') + signgroup = update.add_mutually_exclusive_group() + signgroup.add_argument('-s', '--sign', action='store_true', + default=None, + help='Force signing the top-level Manifest') + signgroup.add_argument('-S', '--no-sign', action='store_false', + dest='sign', + help='Disable signing the top-level Manifest') + update.set_defaults(func=do_update) + vals = argp.parse_args(argv[1:]) return vals.func(vals) diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py index 498498b..ccab094 100644 --- a/tests/test_recursiveloader.py +++ b/tests/test_recursiveloader.py @@ -501,6 +501,22 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e self.assertNotEqual(f.read(), self.FILES['Manifest']) m.assert_directory_verifies() + def test_cli_update(self): + self.assertEqual( + gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', + self.dir]), + 0) + # relevant Manifests should have been updated + with io.open(os.path.join(self.dir, 'sub/Manifest'), + 'r', encoding='utf8') as f: + self.assertNotEqual(f.read(), self.FILES['sub/Manifest']) + m = gemato.manifest.ManifestFile() + with io.open(os.path.join(self.dir, 'Manifest'), + 'r', encoding='utf8') as f: + m.load(f) + self.assertNotEqual(m.find_timestamp().ts, + datetime.datetime(2017, 1, 1, 1, 1, 1)) + class MultipleManifestTest(TempDirTestCase): DIRS = ['sub'] @@ -1552,6 +1568,12 @@ DATA sub/version 0 MD5 d41d8cd98f00b204e9800998ecf8427e self.assertRaises(gemato.exceptions.ManifestCrossDevice, m.update_entries_for_directory, '') + def test_cli_update(self): + self.assertEqual( + gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', + self.dir]), + 1) + class CrossDeviceEmptyManifestTest(TempDirTestCase): """ @@ -1602,6 +1624,12 @@ class CrossDeviceEmptyManifestTest(TempDirTestCase): self.assertRaises(gemato.exceptions.ManifestCrossDevice, m.update_entries_for_directory, '') + def test_cli_update(self): + self.assertEqual( + gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', + self.dir]), + 1) + class CrossDeviceIgnoreManifestTest(TempDirTestCase): """ @@ -1713,6 +1741,12 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e self.assertRaises(gemato.exceptions.ManifestInvalidPath, m.update_entries_for_directory, '') + def test_cli_update(self): + self.assertEqual( + gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512', + self.dir]), + 1) + class UnreadableDirectoryTest(TempDirTestCase): """ |