diff options
-rw-r--r-- | gemato/cli.py | 65 | ||||
-rw-r--r-- | tests/test_recursiveloader.py | 33 |
2 files changed, 98 insertions, 0 deletions
diff --git a/gemato/cli.py b/gemato/cli.py index ca35d57..21c984f 100644 --- a/gemato/cli.py +++ b/gemato/cli.py @@ -135,6 +135,52 @@ def do_update(args): return 0 +def do_create(args): + for p in args.paths: + init_kwargs = {} + init_kwargs['allow_create'] = True + init_kwargs['hashes'] = args.hashes.split() + if args.openpgp_id is not None: + init_kwargs['openpgp_keyid'] = args.openpgp_id + if args.sign is not None: + init_kwargs['sign_openpgp'] = args.sign + with gemato.openpgp.OpenPGPEnvironment() as env: + if args.openpgp_key is not None: + with io.open(args.openpgp_key, 'rb') as f: + env.import_key(f) + init_kwargs['openpgp_env'] = env + + start = timeit.default_timer() + try: + m = gemato.recursiveloader.ManifestRecursiveLoader( + p, **init_kwargs) + except gemato.exceptions.OpenPGPNoImplementation as e: + logging.error(str(e)) + return 1 + except gemato.exceptions.OpenPGPVerificationFailure as e: + logging.error(str(e)) + return 1 + + try: + m.update_entries_for_directory() + + ts = m.find_timestamp() + if ts is not None: + ts.ts = datetime.datetime.utcnow() + + m.save_manifests() + except gemato.exceptions.ManifestCrossDevice as e: + logging.error(str(e)) + return 1 + except gemato.exceptions.ManifestInvalidPath as e: + logging.error(str(e)) + return 1 + + stop = timeit.default_timer() + logging.info('{} updated in {:.2f} seconds'.format(p, stop - start)) + return 0 + + def main(argv): argp = argparse.ArgumentParser( prog=argv[0], @@ -178,5 +224,24 @@ def main(argv): help='Disable signing the top-level Manifest') update.set_defaults(func=do_update) + create = subp.add_parser('create', + help='Create a Manifest tree starting at the specified file') + create.add_argument('paths', nargs='*', default=['Manifest'], + help='Paths to create (defaults to "Manifest" if none specified)') + create.add_argument('-H', '--hashes', required=True, + help='Whitespace-separated list of hashes to use') + create.add_argument('-k', '--openpgp-id', + help='Use the specified OpenPGP key (by ID or user)') + create.add_argument('-K', '--openpgp-key', + help='Use only the OpenPGP key(s) from a specific file') + signgroup = create.add_mutually_exclusive_group() + signgroup.add_argument('-s', '--sign', action='store_true', + default=None, + help='Force signing the top-level Manifest') + signgroup.add_argument('-S', '--no-sign', action='store_false', + dest='sign', + help='Disable signing the top-level Manifest') + create.set_defaults(func=do_create) + vals = argp.parse_args(argv[1:]) return vals.func(vals) diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py index fb4b67d..e257a10 100644 --- a/tests/test_recursiveloader.py +++ b/tests/test_recursiveloader.py @@ -2178,6 +2178,22 @@ class CreateNewManifestTest(TempDirTestCase): m2.load(f) self.assertEqual(len(m2.entries), 2) + def test_cli(self): + self.assertEqual( + gemato.cli.main(['gemato', 'create', '--hashes=SHA256 SHA512', + self.path]), + 0) + + m2 = gemato.manifest.ManifestFile() + with io.open(self.path, 'r', encoding='utf8') as f: + m2.load(f) + self.assertEqual(len(m2.entries), 2) + + self.assertEqual( + gemato.cli.main(['gemato', 'verify', + self.dir]), + 0) + class CreateNewCompressedManifestTest(TempDirTestCase): DIRS = ['sub'] @@ -2228,3 +2244,20 @@ class CreateNewCompressedManifestTest(TempDirTestCase): self.path, 'r', encoding='utf8') as f: m2.load(f) self.assertEqual(len(m2.entries), 2) + + def test_cli(self): + self.assertEqual( + gemato.cli.main(['gemato', 'create', '--hashes=SHA256 SHA512', + self.path]), + 0) + + m2 = gemato.manifest.ManifestFile() + with gemato.compression.open_potentially_compressed_path( + self.path, 'r', encoding='utf8') as f: + m2.load(f) + self.assertEqual(len(m2.entries), 2) + + self.assertEqual( + gemato.cli.main(['gemato', 'verify', + self.dir]), + 0) |