summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/cli.py73
-rw-r--r--tests/test_recursiveloader.py34
2 files changed, 107 insertions, 0 deletions
diff --git a/gemato/cli.py b/gemato/cli.py
index 737814c..2a32cf2 100644
--- a/gemato/cli.py
+++ b/gemato/cli.py
@@ -6,6 +6,7 @@
from __future__ import print_function
import argparse
+import datetime
import io
import logging
import os.path
@@ -79,6 +80,59 @@ def do_verify(args):
return 0 if ret else 1
+def do_update(args):
+ for p in args.paths:
+ tlm = gemato.find_top_level.find_top_level_manifest(p)
+ if tlm is None:
+ logging.error('Top-level Manifest not found in {}'.format(p))
+ return 1
+
+ init_kwargs = {}
+ init_kwargs['hashes'] = args.hashes.split()
+ if args.openpgp_id is not None:
+ init_kwargs['openpgp_keyid'] = args.openpgp_id
+ if args.sign is not None:
+ init_kwargs['sign_openpgp'] = args.sign
+ with gemato.openpgp.OpenPGPEnvironment() as env:
+ if args.openpgp_key is not None:
+ with io.open(args.openpgp_key, 'rb') as f:
+ env.import_key(f)
+ init_kwargs['openpgp_env'] = env
+
+ start = timeit.default_timer()
+ try:
+ m = gemato.recursiveloader.ManifestRecursiveLoader(tlm,
+ **init_kwargs)
+ except gemato.exceptions.OpenPGPNoImplementation as e:
+ logging.error(str(e))
+ return 1
+ except gemato.exceptions.OpenPGPVerificationFailure as e:
+ logging.error(str(e))
+ return 1
+
+ relpath = os.path.relpath(p, os.path.dirname(tlm))
+ if relpath == '.':
+ relpath = ''
+ try:
+ m.update_entries_for_directory(relpath)
+
+ ts = m.find_timestamp()
+ if ts is not None:
+ ts.ts = datetime.datetime.utcnow()
+
+ m.save_manifests()
+ except gemato.exceptions.ManifestCrossDevice as e:
+ logging.error(str(e))
+ return 1
+ except gemato.exceptions.ManifestInvalidPath as e:
+ logging.error(str(e))
+ return 1
+
+ stop = timeit.default_timer()
+ logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
+ return 0
+
+
def main(argv):
argp = argparse.ArgumentParser(
prog=argv[0],
@@ -103,5 +157,24 @@ def main(argv):
help='Do not fail on non-strict Manifest issues (MISC/OPTIONAL entries)')
verify.set_defaults(func=do_verify)
+ update = subp.add_parser('update',
+ help='Update the Manifest entries for one or more directory trees')
+ update.add_argument('paths', nargs='*', default=['.'],
+ help='Paths to update (defaults to "." if none specified)')
+ update.add_argument('-H', '--hashes', required=True,
+ help='Whitespace-separated list of hashes to use')
+ update.add_argument('-k', '--openpgp-id',
+ help='Use the specified OpenPGP key (by ID or user)')
+ update.add_argument('-K', '--openpgp-key',
+ help='Use only the OpenPGP key(s) from a specific file')
+ signgroup = update.add_mutually_exclusive_group()
+ signgroup.add_argument('-s', '--sign', action='store_true',
+ default=None,
+ help='Force signing the top-level Manifest')
+ signgroup.add_argument('-S', '--no-sign', action='store_false',
+ dest='sign',
+ help='Disable signing the top-level Manifest')
+ update.set_defaults(func=do_update)
+
vals = argp.parse_args(argv[1:])
return vals.func(vals)
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
index 498498b..ccab094 100644
--- a/tests/test_recursiveloader.py
+++ b/tests/test_recursiveloader.py
@@ -501,6 +501,22 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
self.assertNotEqual(f.read(), self.FILES['Manifest'])
m.assert_directory_verifies()
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 0)
+ # relevant Manifests should have been updated
+ with io.open(os.path.join(self.dir, 'sub/Manifest'),
+ 'r', encoding='utf8') as f:
+ self.assertNotEqual(f.read(), self.FILES['sub/Manifest'])
+ m = gemato.manifest.ManifestFile()
+ with io.open(os.path.join(self.dir, 'Manifest'),
+ 'r', encoding='utf8') as f:
+ m.load(f)
+ self.assertNotEqual(m.find_timestamp().ts,
+ datetime.datetime(2017, 1, 1, 1, 1, 1))
+
class MultipleManifestTest(TempDirTestCase):
DIRS = ['sub']
@@ -1552,6 +1568,12 @@ DATA sub/version 0 MD5 d41d8cd98f00b204e9800998ecf8427e
self.assertRaises(gemato.exceptions.ManifestCrossDevice,
m.update_entries_for_directory, '')
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 1)
+
class CrossDeviceEmptyManifestTest(TempDirTestCase):
"""
@@ -1602,6 +1624,12 @@ class CrossDeviceEmptyManifestTest(TempDirTestCase):
self.assertRaises(gemato.exceptions.ManifestCrossDevice,
m.update_entries_for_directory, '')
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 1)
+
class CrossDeviceIgnoreManifestTest(TempDirTestCase):
"""
@@ -1713,6 +1741,12 @@ DATA test 0 MD5 d41d8cd98f00b204e9800998ecf8427e
self.assertRaises(gemato.exceptions.ManifestInvalidPath,
m.update_entries_for_directory, '')
+ def test_cli_update(self):
+ self.assertEqual(
+ gemato.cli.main(['gemato', 'update', '--hashes=SHA256 SHA512',
+ self.dir]),
+ 1)
+
class UnreadableDirectoryTest(TempDirTestCase):
"""