diff options
author | Michał Górny <mgorny@gentoo.org> | 2018-02-10 08:58:16 +0100 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2018-02-10 08:58:16 +0100 |
commit | c1471b7595d97b90dbd45755861195b3952f7ffa (patch) | |
tree | eb3b0207290a334f97a41758829f5c1580164418 | |
parent | f7eb4c073b3357379d7f1d47310d33891be3dbc0 (diff) | |
download | gemato-c1471b7595d97b90dbd45755861195b3952f7ffa.tar.gz |
cli: Split common OpenPGP logic into a base class
-rw-r--r-- | gemato/cli.py | 109 |
1 files changed, 49 insertions, 60 deletions
diff --git a/gemato/cli.py b/gemato/cli.py index bf74dc1..d2fdbaa 100644 --- a/gemato/cli.py +++ b/gemato/cli.py @@ -71,14 +71,45 @@ class GematoCommand(object): pass -class VerifyCommand(GematoCommand): - name = 'verify' - help = 'Verify one or more directories against Manifests' +class BaseOpenPGPCommand(GematoCommand): + """ + A base class wrapper that adds logic to load and use OpenPGP keys. + """ + + __slots__ = ['openpgp_env'] def __init__(self): self.openpgp_env = None + def add_options(self, subp): + subp.add_argument('-K', '--openpgp-key', + help='Use only the OpenPGP key(s) from a specific file') + + def parse_args(self, args, argp): + # use isolated environment if key is specified; + # system environment otherwise + if args.openpgp_key is not None: + env_class = gemato.openpgp.OpenPGPEnvironment + else: + env_class = gemato.openpgp.OpenPGPSystemEnvironment + self.openpgp_env = env_class() + + if args.openpgp_key is not None: + with io.open(args.openpgp_key, 'rb') as f: + self.openpgp_env.import_key(f) + + def cleanup(self): + if self.openpgp_env is not None: + self.openpgp_env.close() + + +class VerifyCommand(BaseOpenPGPCommand): + name = 'verify' + help = 'Verify one or more directories against Manifests' + def add_options(self, verify): + super(VerifyCommand, self).add_options(verify) + verify.add_argument('paths', nargs='*', default=['.'], help='Paths to verify (defaults to "." if none specified)') verify.add_argument('-j', '--jobs', type=int, @@ -86,8 +117,6 @@ class VerifyCommand(GematoCommand): .format(multiprocessing.cpu_count())) verify.add_argument('-k', '--keep-going', action='store_true', help='Continue reporting errors rather than terminating on the first failure') - verify.add_argument('-K', '--openpgp-key', - help='Use only the OpenPGP key(s) from a specific file') verify.add_argument('-P', '--no-openpgp-verify', action='store_false', dest='openpgp_verify', help='Disable OpenPGP verification of signed Manifests') @@ -99,10 +128,13 @@ class VerifyCommand(GematoCommand): help='Require that the top-level Manifest is OpenPGP signed') def parse_args(self, args, argp): + super(VerifyCommand, self).parse_args(args, argp) + self.paths = args.paths self.require_signed_manifest = args.require_signed_manifest self.init_kwargs = {} self.kwargs = {} + self.init_kwargs['openpgp_env'] = self.openpgp_env if args.jobs is not None: if args.jobs < 1: @@ -113,24 +145,13 @@ class VerifyCommand(GematoCommand): if not args.openpgp_verify: self.init_kwargs['verify_openpgp'] = False - # use isolated environment if key is specified; - # system environment otherwise - if args.openpgp_key is not None: - env_class = gemato.openpgp.OpenPGPEnvironment - else: - env_class = gemato.openpgp.OpenPGPSystemEnvironment - self.openpgp_env = env_class() - if args.openpgp_key is not None: - with io.open(args.openpgp_key, 'rb') as f: - self.openpgp_env.import_key(f) # always refresh keys to check for revocation # (unless user specifically asked us not to) if args.refresh_keys: logging.info('Refreshing keys from keyserver...') self.openpgp_env.refresh_keys() logging.info('Keys refreshed.') - self.init_kwargs['openpgp_env'] = self.openpgp_env def __call__(self): ret = True @@ -173,16 +194,14 @@ class VerifyCommand(GematoCommand): return 0 if ret else 1 - def cleanup(self): - if self.openpgp_env is not None: - self.openpgp_env.close() - -class UpdateCommand(GematoCommand): +class UpdateCommand(BaseOpenPGPCommand): name = 'update' help = 'Update the Manifest entries for one or more directory trees' def add_options(self, update): + super(UpdateCommand, self).add_options(update) + update.add_argument('paths', nargs='*', default=['.'], help='Paths to update (defaults to "." if none specified)') update.add_argument('-c', '--compress-watermark', type=int, @@ -200,8 +219,6 @@ class UpdateCommand(GematoCommand): .format(multiprocessing.cpu_count())) update.add_argument('-k', '--openpgp-id', help='Use the specified OpenPGP key (by ID or user)') - update.add_argument('-K', '--openpgp-key', - help='Use only the OpenPGP key(s) from a specific file') update.add_argument('-p', '--profile', help='Use the specified profile ("default", "ebuild", "old-ebuild"...)') signgroup = update.add_mutually_exclusive_group() @@ -215,6 +232,8 @@ class UpdateCommand(GematoCommand): help='Include TIMESTAMP entry in Manifest') def parse_args(self, args, argp): + super(UpdateCommand, self).parse_args(args, argp) + self.paths = args.paths self.timestamp = args.timestamp self.incremental = args.incremental @@ -222,6 +241,7 @@ class UpdateCommand(GematoCommand): self.init_kwargs = {} self.save_kwargs = {} self.update_kwargs = {} + self.init_kwargs['openpgp_env'] = self.openpgp_env if args.hashes is not None: self.init_kwargs['hashes'] = args.hashes.split() @@ -245,19 +265,6 @@ class UpdateCommand(GematoCommand): if args.sign is not None: self.init_kwargs['sign_openpgp'] = args.sign - # use isolated environment if key is specified; - # system environment otherwise - if args.openpgp_key is not None: - env_class = gemato.openpgp.OpenPGPEnvironment - else: - env_class = gemato.openpgp.OpenPGPSystemEnvironment - self.openpgp_env = env_class() - - if args.openpgp_key is not None: - with io.open(args.openpgp_key, 'rb') as f: - self.openpgp_env.import_key(f) - self.init_kwargs['openpgp_env'] = self.openpgp_env - def __call__(self): for p in self.paths: tlm = gemato.find_top_level.find_top_level_manifest(p) @@ -313,16 +320,14 @@ class UpdateCommand(GematoCommand): return 0 - def cleanup(self): - if self.openpgp_env is not None: - self.openpgp_env.close() - -class CreateCommand(GematoCommand): +class CreateCommand(BaseOpenPGPCommand): name = 'create' help = 'Create a Manifest tree starting at the specified file' def add_options(self, create): + super(CreateCommand, self).add_options(create) + create.add_argument('paths', nargs='*', default=['.'], help='Paths to create (defaults to "Manifest" if none specified)') create.add_argument('-c', '--compress-watermark', type=int, @@ -338,8 +343,6 @@ class CreateCommand(GematoCommand): .format(multiprocessing.cpu_count())) create.add_argument('-k', '--openpgp-id', help='Use the specified OpenPGP key (by ID or user)') - create.add_argument('-K', '--openpgp-key', - help='Use only the OpenPGP key(s) from a specific file') create.add_argument('-p', '--profile', help='Use the specified profile ("default", "ebuild", "old-ebuild"...)') signgroup = create.add_mutually_exclusive_group() @@ -353,12 +356,15 @@ class CreateCommand(GematoCommand): help='Include TIMESTAMP entry in Manifest') def parse_args(self, args, argp): + super(CreateCommand, self).parse_args(args, argp) + self.paths = args.paths self.timestamp = args.timestamp self.init_kwargs = {} self.save_kwargs = {} self.init_kwargs['allow_create'] = True + self.init_kwargs['openpgp_env'] = self.openpgp_env if args.hashes is not None: self.init_kwargs['hashes'] = args.hashes.split() @@ -382,19 +388,6 @@ class CreateCommand(GematoCommand): if args.sign is not None: self.init_kwargs['sign_openpgp'] = args.sign - # use isolated environment if key is specified; - # system environment otherwise - if args.openpgp_key is not None: - env_class = gemato.openpgp.OpenPGPEnvironment - else: - env_class = gemato.openpgp.OpenPGPSystemEnvironment - self.openpgp_env = env_class() - - if args.openpgp_key is not None: - with io.open(args.openpgp_key, 'rb') as f: - self.openpgp_env.import_key(f) - self.init_kwargs['openpgp_env'] = self.openpgp_env - def __call__(self): for p in self.paths: start = timeit.default_timer() @@ -422,10 +415,6 @@ class CreateCommand(GematoCommand): return 0 - def cleanup(self): - if self.openpgp_env is not None: - self.openpgp_env.close() - def main(argv): argp = argparse.ArgumentParser( |