summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2018-02-10 00:20:55 +0100
committerMichał Górny <mgorny@gentoo.org>2018-02-10 00:20:55 +0100
commitf7eb4c073b3357379d7f1d47310d33891be3dbc0 (patch)
tree5b465c650d968af2b6d54cb2eea65545f844ff75
parent21f2da234216a4413d4060ae7442345f519cff8c (diff)
downloadgemato-f7eb4c073b3357379d7f1d47310d33891be3dbc0.tar.gz
cli: Split argument processing into separate method
-rw-r--r--gemato/cli.py451
1 files changed, 253 insertions, 198 deletions
diff --git a/gemato/cli.py b/gemato/cli.py
index 07dc534..bf74dc1 100644
--- a/gemato/cli.py
+++ b/gemato/cli.py
@@ -51,10 +51,22 @@ class GematoCommand(object):
"""
pass
- def __call__(self, args, argp):
+ def parse_args(self, args, argp):
"""
- Perform the command, given command-line arguments @args.
- @argp is argparse instance provided for error reporting.
+ Process command-line arguments @args. @argp is the argparse
+ instance provided for error reporting.
+ """
+ pass
+
+ def __call__(self):
+ """
+ Perform the command. Returns the exit status.
+ """
+ pass
+
+ def cleanup(self):
+ """
+ Perform any cleanups necessary. Called on program termination.
"""
pass
@@ -63,6 +75,9 @@ class VerifyCommand(GematoCommand):
name = 'verify'
help = 'Verify one or more directories against Manifests'
+ def __init__(self):
+ self.openpgp_env = None
+
def add_options(self, verify):
verify.add_argument('paths', nargs='*', default=['.'],
help='Paths to verify (defaults to "." if none specified)')
@@ -83,75 +98,85 @@ class VerifyCommand(GematoCommand):
verify.add_argument('-s', '--require-signed-manifest', action='store_true',
help='Require that the top-level Manifest is OpenPGP signed')
- def __call__(self, args, argp):
+ def parse_args(self, args, argp):
+ self.paths = args.paths
+ self.require_signed_manifest = args.require_signed_manifest
+ self.init_kwargs = {}
+ self.kwargs = {}
+
+ if args.jobs is not None:
+ if args.jobs < 1:
+ argp.error('--jobs must be positive')
+ self.init_kwargs['max_jobs'] = args.jobs
+ if args.keep_going:
+ self.kwargs['fail_handler'] = verify_failure
+ if not args.openpgp_verify:
+ self.init_kwargs['verify_openpgp'] = False
+
+ # use isolated environment if key is specified;
+ # system environment otherwise
+ if args.openpgp_key is not None:
+ env_class = gemato.openpgp.OpenPGPEnvironment
+ else:
+ env_class = gemato.openpgp.OpenPGPSystemEnvironment
+ self.openpgp_env = env_class()
+
+ if args.openpgp_key is not None:
+ with io.open(args.openpgp_key, 'rb') as f:
+ self.openpgp_env.import_key(f)
+ # always refresh keys to check for revocation
+ # (unless user specifically asked us not to)
+ if args.refresh_keys:
+ logging.info('Refreshing keys from keyserver...')
+ self.openpgp_env.refresh_keys()
+ logging.info('Keys refreshed.')
+ self.init_kwargs['openpgp_env'] = self.openpgp_env
+
+ def __call__(self):
ret = True
- for p in args.paths:
+ for p in self.paths:
tlm = gemato.find_top_level.find_top_level_manifest(p)
if tlm is None:
logging.error('Top-level Manifest not found in {}'.format(p))
return 1
- init_kwargs = {}
- kwargs = {}
- if args.jobs is not None:
- if args.jobs < 1:
- argp.error('--jobs must be positive')
- init_kwargs['max_jobs'] = args.jobs
- if args.keep_going:
- kwargs['fail_handler'] = verify_failure
- if not args.openpgp_verify:
- init_kwargs['verify_openpgp'] = False
-
- # use isolated environment if key is specified;
- # system environment otherwise
- if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
- else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
-
- with env_class() as env:
- if args.openpgp_key is not None:
- with io.open(args.openpgp_key, 'rb') as f:
- env.import_key(f)
- # always refresh keys to check for revocation
- # (unless user specifically asked us not to)
- if args.refresh_keys:
- logging.info('Refreshing keys from keyserver...')
- env.refresh_keys()
- logging.info('Keys refreshed.')
- init_kwargs['openpgp_env'] = env
-
- start = timeit.default_timer()
- m = gemato.recursiveloader.ManifestRecursiveLoader(tlm, **init_kwargs)
- if args.require_signed_manifest and not m.openpgp_signed:
- logging.error('Top-level Manifest {} is not OpenPGP signed'.format(tlm))
- return 1
+ start = timeit.default_timer()
+ m = gemato.recursiveloader.ManifestRecursiveLoader(tlm,
+ **self.init_kwargs)
+ if self.require_signed_manifest and not m.openpgp_signed:
+ logging.error('Top-level Manifest {} is not OpenPGP signed'.format(tlm))
+ return 1
+
+ ts = m.find_timestamp()
+ if ts:
+ logging.info('Manifest timestamp: {} UTC'.format(ts.ts))
+
+ if m.openpgp_signed:
+ logging.info('Valid OpenPGP signature found:')
+ logging.info('- primary key: {}'.format(
+ m.openpgp_signature.primary_key_fingerprint))
+ logging.info('- subkey: {}'.format(
+ m.openpgp_signature.fingerprint))
+ logging.info('- timestamp: {} UTC'.format(
+ m.openpgp_signature.timestamp))
+
+ logging.info('Verifying {}...'.format(p))
+
+ relpath = os.path.relpath(p, os.path.dirname(tlm))
+ if relpath == '.':
+ relpath = ''
+ ret &= m.assert_directory_verifies(relpath, **self.kwargs)
+
+ stop = timeit.default_timer()
+ logging.info('{} verified in {:.2f} seconds'.format(p, stop - start))
- ts = m.find_timestamp()
- if ts:
- logging.info('Manifest timestamp: {} UTC'.format(ts.ts))
-
- if m.openpgp_signed:
- logging.info('Valid OpenPGP signature found:')
- logging.info('- primary key: {}'.format(
- m.openpgp_signature.primary_key_fingerprint))
- logging.info('- subkey: {}'.format(
- m.openpgp_signature.fingerprint))
- logging.info('- timestamp: {} UTC'.format(
- m.openpgp_signature.timestamp))
-
- logging.info('Verifying {}...'.format(p))
-
- relpath = os.path.relpath(p, os.path.dirname(tlm))
- if relpath == '.':
- relpath = ''
- ret &= m.assert_directory_verifies(relpath, **kwargs)
-
- stop = timeit.default_timer()
- logging.info('{} verified in {:.2f} seconds'.format(p, stop - start))
return 0 if ret else 1
+ def cleanup(self):
+ if self.openpgp_env is not None:
+ self.openpgp_env.close()
+
class UpdateCommand(GematoCommand):
name = 'update'
@@ -189,94 +214,109 @@ class UpdateCommand(GematoCommand):
update.add_argument('-t', '--timestamp', action='store_true',
help='Include TIMESTAMP entry in Manifest')
- def __call__(self, args, argp):
- for p in args.paths:
+ def parse_args(self, args, argp):
+ self.paths = args.paths
+ self.timestamp = args.timestamp
+ self.incremental = args.incremental
+
+ self.init_kwargs = {}
+ self.save_kwargs = {}
+ self.update_kwargs = {}
+
+ if args.hashes is not None:
+ self.init_kwargs['hashes'] = args.hashes.split()
+ if args.compress_watermark is not None:
+ if args.compress_watermark < 0:
+ argp.error('--compress-watermark must not be negative!')
+ self.init_kwargs['compress_watermark'] = args.compress_watermark
+ if args.compress_format is not None:
+ self.init_kwargs['compress_format'] = args.compress_format
+ if args.force_rewrite:
+ self.save_kwargs['force'] = True
+ if args.jobs is not None:
+ if args.jobs < 1:
+ argp.error('--jobs must be positive')
+ self.init_kwargs['max_jobs'] = args.jobs
+ if args.openpgp_id is not None:
+ self.init_kwargs['openpgp_keyid'] = args.openpgp_id
+ if args.profile is not None:
+ self.init_kwargs['profile'] = gemato.profile.get_profile_by_name(
+ args.profile)
+ if args.sign is not None:
+ self.init_kwargs['sign_openpgp'] = args.sign
+
+ # use isolated environment if key is specified;
+ # system environment otherwise
+ if args.openpgp_key is not None:
+ env_class = gemato.openpgp.OpenPGPEnvironment
+ else:
+ env_class = gemato.openpgp.OpenPGPSystemEnvironment
+ self.openpgp_env = env_class()
+
+ if args.openpgp_key is not None:
+ with io.open(args.openpgp_key, 'rb') as f:
+ self.openpgp_env.import_key(f)
+ self.init_kwargs['openpgp_env'] = self.openpgp_env
+
+ def __call__(self):
+ for p in self.paths:
tlm = gemato.find_top_level.find_top_level_manifest(p)
if tlm is None:
logging.error('Top-level Manifest not found in {}'.format(p))
return 1
- init_kwargs = {}
- save_kwargs = {}
- update_kwargs = {}
- if args.hashes is not None:
- init_kwargs['hashes'] = args.hashes.split()
- if args.compress_watermark is not None:
- if args.compress_watermark < 0:
- argp.error('--compress-watermark must not be negative!')
- init_kwargs['compress_watermark'] = args.compress_watermark
- if args.compress_format is not None:
- init_kwargs['compress_format'] = args.compress_format
- if args.force_rewrite:
- save_kwargs['force'] = True
- if args.jobs is not None:
- if args.jobs < 1:
- argp.error('--jobs must be positive')
- init_kwargs['max_jobs'] = args.jobs
- if args.openpgp_id is not None:
- init_kwargs['openpgp_keyid'] = args.openpgp_id
- if args.profile is not None:
- init_kwargs['profile'] = gemato.profile.get_profile_by_name(
- args.profile)
- if args.sign is not None:
- init_kwargs['sign_openpgp'] = args.sign
-
- # use isolated environment if key is specified;
- # system environment otherwise
- if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
- else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
-
- with env_class() as env:
- if args.openpgp_key is not None:
- with io.open(args.openpgp_key, 'rb') as f:
- env.import_key(f)
- init_kwargs['openpgp_env'] = env
-
- start = timeit.default_timer()
- m = gemato.recursiveloader.ManifestRecursiveLoader(tlm,
- **init_kwargs)
-
- # if not specified by user, profile must set it
- if m.hashes is None:
- argp.error('--hashes must be specified if not implied by --profile')
-
- relpath = os.path.relpath(p, os.path.dirname(tlm))
- if relpath == '.':
- relpath = ''
- if args.timestamp and relpath != '':
- argp.error('Timestamp can only be updated if doing full-tree update')
- if args.incremental:
- if relpath != '':
- argp.error('Incremental works only for full-tree update')
- last_ts = m.find_timestamp()
- if last_ts is None:
- argp.error('Incremental specified but no timestamp in Manifest')
- update_kwargs['last_mtime'] = last_ts.ts.timestamp()
-
- logging.info('Updating Manifests in {}...'.format(p))
-
- start_ts = datetime.datetime.utcnow()
- m.update_entries_for_directory(relpath, **update_kwargs)
-
- # write TIMESTAMP if requested, or if already there
+ start = timeit.default_timer()
+ m = gemato.recursiveloader.ManifestRecursiveLoader(tlm,
+ **self.init_kwargs)
+
+ # if not specified by user, profile must set it
+ if m.hashes is None:
+ logging.error('--hashes must be specified if not implied by --profile')
+ return 1
+
+ relpath = os.path.relpath(p, os.path.dirname(tlm))
+ if relpath == '.':
+ relpath = ''
+ if self.timestamp and relpath != '':
+ logging.error('Timestamp can only be updated if doing full-tree update')
+ return 1
+ if self.incremental:
if relpath != '':
- # skip timestamp if not doing full update
- pass
- elif args.timestamp:
- m.set_timestamp(start_ts)
- else:
- ts = m.find_timestamp()
- if ts is not None:
- ts.ts = start_ts
-
- m.save_manifests(**save_kwargs)
-
- stop = timeit.default_timer()
- logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
+ logging.error('Incremental works only for full-tree update')
+ return 1
+ last_ts = m.find_timestamp()
+ if last_ts is None:
+ loggng.error('Incremental specified but no timestamp in Manifest')
+ return 1
+ self.update_kwargs['last_mtime'] = last_ts.ts.timestamp()
+
+ logging.info('Updating Manifests in {}...'.format(p))
+
+ start_ts = datetime.datetime.utcnow()
+ m.update_entries_for_directory(relpath, **self.update_kwargs)
+
+ # write TIMESTAMP if requested, or if already there
+ if relpath != '':
+ # skip timestamp if not doing full update
+ pass
+ elif self.timestamp:
+ m.set_timestamp(start_ts)
+ else:
+ ts = m.find_timestamp()
+ if ts is not None:
+ ts.ts = start_ts
+
+ m.save_manifests(**self.save_kwargs)
+
+ stop = timeit.default_timer()
+ logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
+
return 0
+ def cleanup(self):
+ if self.openpgp_env is not None:
+ self.openpgp_env.close()
+
class CreateCommand(GematoCommand):
name = 'create'
@@ -312,69 +352,80 @@ class CreateCommand(GematoCommand):
create.add_argument('-t', '--timestamp', action='store_true',
help='Include TIMESTAMP entry in Manifest')
- def __call__(self, args, argp):
- for p in args.paths:
- init_kwargs = {}
- save_kwargs = {}
- init_kwargs['allow_create'] = True
- if args.hashes is not None:
- init_kwargs['hashes'] = args.hashes.split()
- if args.compress_watermark is not None:
- if args.compress_watermark < 0:
- argp.error('--compress-watermark must not be negative!')
- init_kwargs['compress_watermark'] = args.compress_watermark
- if args.compress_format is not None:
- init_kwargs['compress_format'] = args.compress_format
- if args.force_rewrite:
- save_kwargs['force'] = True
- if args.jobs is not None:
- if args.jobs < 1:
- argp.error('--jobs must be positive')
- init_kwargs['max_jobs'] = args.jobs
- if args.openpgp_id is not None:
- init_kwargs['openpgp_keyid'] = args.openpgp_id
- if args.profile is not None:
- init_kwargs['profile'] = gemato.profile.get_profile_by_name(
- args.profile)
- if args.sign is not None:
- init_kwargs['sign_openpgp'] = args.sign
-
- # use isolated environment if key is specified;
- # system environment otherwise
- if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
- else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
-
- with env_class() as env:
- if args.openpgp_key is not None:
- with io.open(args.openpgp_key, 'rb') as f:
- env.import_key(f)
- init_kwargs['openpgp_env'] = env
-
- start = timeit.default_timer()
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(p, 'Manifest'), **init_kwargs)
+ def parse_args(self, args, argp):
+ self.paths = args.paths
+ self.timestamp = args.timestamp
+
+ self.init_kwargs = {}
+ self.save_kwargs = {}
+ self.init_kwargs['allow_create'] = True
+
+ if args.hashes is not None:
+ self.init_kwargs['hashes'] = args.hashes.split()
+ if args.compress_watermark is not None:
+ if args.compress_watermark < 0:
+ argp.error('--compress-watermark must not be negative!')
+ self.init_kwargs['compress_watermark'] = args.compress_watermark
+ if args.compress_format is not None:
+ self.init_kwargs['compress_format'] = args.compress_format
+ if args.force_rewrite:
+ self.save_kwargs['force'] = True
+ if args.jobs is not None:
+ if args.jobs < 1:
+ argp.error('--jobs must be positive')
+ self.init_kwargs['max_jobs'] = args.jobs
+ if args.openpgp_id is not None:
+ self.init_kwargs['openpgp_keyid'] = args.openpgp_id
+ if args.profile is not None:
+ self.init_kwargs['profile'] = gemato.profile.get_profile_by_name(
+ args.profile)
+ if args.sign is not None:
+ self.init_kwargs['sign_openpgp'] = args.sign
+
+ # use isolated environment if key is specified;
+ # system environment otherwise
+ if args.openpgp_key is not None:
+ env_class = gemato.openpgp.OpenPGPEnvironment
+ else:
+ env_class = gemato.openpgp.OpenPGPSystemEnvironment
+ self.openpgp_env = env_class()
+
+ if args.openpgp_key is not None:
+ with io.open(args.openpgp_key, 'rb') as f:
+ self.openpgp_env.import_key(f)
+ self.init_kwargs['openpgp_env'] = self.openpgp_env
+
+ def __call__(self):
+ for p in self.paths:
+ start = timeit.default_timer()
+ m = gemato.recursiveloader.ManifestRecursiveLoader(
+ os.path.join(p, 'Manifest'), **self.init_kwargs)
+
+ # if not specified by user, profile must set it
+ if m.hashes is None:
+ logging.error('--hashes must be specified if not implied by --profile')
+ return 1
- # if not specified by user, profile must set it
- if m.hashes is None:
- argp.error('--hashes must be specified if not implied by --profile')
+ logging.info('Creating Manifests in {}...'.format(p))
- logging.info('Creating Manifests in {}...'.format(p))
+ start_ts = datetime.datetime.utcnow()
+ m.update_entries_for_directory()
- start_ts = datetime.datetime.utcnow()
- m.update_entries_for_directory()
+ # write TIMESTAMP if requested
+ if self.timestamp:
+ m.set_timestamp(start_ts)
- # write TIMESTAMP if requested, or if already there
- if args.timestamp:
- m.set_timestamp(start_ts)
+ m.save_manifests(**self.save_kwargs)
- m.save_manifests(**save_kwargs)
+ stop = timeit.default_timer()
+ logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
- stop = timeit.default_timer()
- logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
return 0
+ def cleanup(self):
+ if self.openpgp_env is not None:
+ self.openpgp_env.close()
+
def main(argv):
argp = argparse.ArgumentParser(
@@ -393,7 +444,11 @@ def main(argv):
if not hasattr(vals, 'cmd'):
argp.error('No function specified')
try:
- return vals.cmd(vals, argp)
+ try:
+ vals.cmd.parse_args(vals, argp)
+ return vals.cmd()
+ finally:
+ vals.cmd.cleanup()
except gemato.exceptions.GematoException as e:
logging.error(str(e))
return 1