summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2018-02-10 08:58:16 +0100
committerMichał Górny <mgorny@gentoo.org>2018-02-10 08:58:16 +0100
commitc1471b7595d97b90dbd45755861195b3952f7ffa (patch)
treeeb3b0207290a334f97a41758829f5c1580164418
parentf7eb4c073b3357379d7f1d47310d33891be3dbc0 (diff)
downloadgemato-c1471b7595d97b90dbd45755861195b3952f7ffa.tar.gz
cli: Split common OpenPGP logic into a base class
-rw-r--r--gemato/cli.py109
1 files changed, 49 insertions, 60 deletions
diff --git a/gemato/cli.py b/gemato/cli.py
index bf74dc1..d2fdbaa 100644
--- a/gemato/cli.py
+++ b/gemato/cli.py
@@ -71,14 +71,45 @@ class GematoCommand(object):
pass
-class VerifyCommand(GematoCommand):
- name = 'verify'
- help = 'Verify one or more directories against Manifests'
+class BaseOpenPGPCommand(GematoCommand):
+ """
+ A base class wrapper that adds logic to load and use OpenPGP keys.
+ """
+
+ __slots__ = ['openpgp_env']
def __init__(self):
self.openpgp_env = None
+ def add_options(self, subp):
+ subp.add_argument('-K', '--openpgp-key',
+ help='Use only the OpenPGP key(s) from a specific file')
+
+ def parse_args(self, args, argp):
+ # use isolated environment if key is specified;
+ # system environment otherwise
+ if args.openpgp_key is not None:
+ env_class = gemato.openpgp.OpenPGPEnvironment
+ else:
+ env_class = gemato.openpgp.OpenPGPSystemEnvironment
+ self.openpgp_env = env_class()
+
+ if args.openpgp_key is not None:
+ with io.open(args.openpgp_key, 'rb') as f:
+ self.openpgp_env.import_key(f)
+
+ def cleanup(self):
+ if self.openpgp_env is not None:
+ self.openpgp_env.close()
+
+
+class VerifyCommand(BaseOpenPGPCommand):
+ name = 'verify'
+ help = 'Verify one or more directories against Manifests'
+
def add_options(self, verify):
+ super(VerifyCommand, self).add_options(verify)
+
verify.add_argument('paths', nargs='*', default=['.'],
help='Paths to verify (defaults to "." if none specified)')
verify.add_argument('-j', '--jobs', type=int,
@@ -86,8 +117,6 @@ class VerifyCommand(GematoCommand):
.format(multiprocessing.cpu_count()))
verify.add_argument('-k', '--keep-going', action='store_true',
help='Continue reporting errors rather than terminating on the first failure')
- verify.add_argument('-K', '--openpgp-key',
- help='Use only the OpenPGP key(s) from a specific file')
verify.add_argument('-P', '--no-openpgp-verify', action='store_false',
dest='openpgp_verify',
help='Disable OpenPGP verification of signed Manifests')
@@ -99,10 +128,13 @@ class VerifyCommand(GematoCommand):
help='Require that the top-level Manifest is OpenPGP signed')
def parse_args(self, args, argp):
+ super(VerifyCommand, self).parse_args(args, argp)
+
self.paths = args.paths
self.require_signed_manifest = args.require_signed_manifest
self.init_kwargs = {}
self.kwargs = {}
+ self.init_kwargs['openpgp_env'] = self.openpgp_env
if args.jobs is not None:
if args.jobs < 1:
@@ -113,24 +145,13 @@ class VerifyCommand(GematoCommand):
if not args.openpgp_verify:
self.init_kwargs['verify_openpgp'] = False
- # use isolated environment if key is specified;
- # system environment otherwise
- if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
- else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
- self.openpgp_env = env_class()
-
if args.openpgp_key is not None:
- with io.open(args.openpgp_key, 'rb') as f:
- self.openpgp_env.import_key(f)
# always refresh keys to check for revocation
# (unless user specifically asked us not to)
if args.refresh_keys:
logging.info('Refreshing keys from keyserver...')
self.openpgp_env.refresh_keys()
logging.info('Keys refreshed.')
- self.init_kwargs['openpgp_env'] = self.openpgp_env
def __call__(self):
ret = True
@@ -173,16 +194,14 @@ class VerifyCommand(GematoCommand):
return 0 if ret else 1
- def cleanup(self):
- if self.openpgp_env is not None:
- self.openpgp_env.close()
-
-class UpdateCommand(GematoCommand):
+class UpdateCommand(BaseOpenPGPCommand):
name = 'update'
help = 'Update the Manifest entries for one or more directory trees'
def add_options(self, update):
+ super(UpdateCommand, self).add_options(update)
+
update.add_argument('paths', nargs='*', default=['.'],
help='Paths to update (defaults to "." if none specified)')
update.add_argument('-c', '--compress-watermark', type=int,
@@ -200,8 +219,6 @@ class UpdateCommand(GematoCommand):
.format(multiprocessing.cpu_count()))
update.add_argument('-k', '--openpgp-id',
help='Use the specified OpenPGP key (by ID or user)')
- update.add_argument('-K', '--openpgp-key',
- help='Use only the OpenPGP key(s) from a specific file')
update.add_argument('-p', '--profile',
help='Use the specified profile ("default", "ebuild", "old-ebuild"...)')
signgroup = update.add_mutually_exclusive_group()
@@ -215,6 +232,8 @@ class UpdateCommand(GematoCommand):
help='Include TIMESTAMP entry in Manifest')
def parse_args(self, args, argp):
+ super(UpdateCommand, self).parse_args(args, argp)
+
self.paths = args.paths
self.timestamp = args.timestamp
self.incremental = args.incremental
@@ -222,6 +241,7 @@ class UpdateCommand(GematoCommand):
self.init_kwargs = {}
self.save_kwargs = {}
self.update_kwargs = {}
+ self.init_kwargs['openpgp_env'] = self.openpgp_env
if args.hashes is not None:
self.init_kwargs['hashes'] = args.hashes.split()
@@ -245,19 +265,6 @@ class UpdateCommand(GematoCommand):
if args.sign is not None:
self.init_kwargs['sign_openpgp'] = args.sign
- # use isolated environment if key is specified;
- # system environment otherwise
- if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
- else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
- self.openpgp_env = env_class()
-
- if args.openpgp_key is not None:
- with io.open(args.openpgp_key, 'rb') as f:
- self.openpgp_env.import_key(f)
- self.init_kwargs['openpgp_env'] = self.openpgp_env
-
def __call__(self):
for p in self.paths:
tlm = gemato.find_top_level.find_top_level_manifest(p)
@@ -313,16 +320,14 @@ class UpdateCommand(GematoCommand):
return 0
- def cleanup(self):
- if self.openpgp_env is not None:
- self.openpgp_env.close()
-
-class CreateCommand(GematoCommand):
+class CreateCommand(BaseOpenPGPCommand):
name = 'create'
help = 'Create a Manifest tree starting at the specified file'
def add_options(self, create):
+ super(CreateCommand, self).add_options(create)
+
create.add_argument('paths', nargs='*', default=['.'],
help='Paths to create (defaults to "Manifest" if none specified)')
create.add_argument('-c', '--compress-watermark', type=int,
@@ -338,8 +343,6 @@ class CreateCommand(GematoCommand):
.format(multiprocessing.cpu_count()))
create.add_argument('-k', '--openpgp-id',
help='Use the specified OpenPGP key (by ID or user)')
- create.add_argument('-K', '--openpgp-key',
- help='Use only the OpenPGP key(s) from a specific file')
create.add_argument('-p', '--profile',
help='Use the specified profile ("default", "ebuild", "old-ebuild"...)')
signgroup = create.add_mutually_exclusive_group()
@@ -353,12 +356,15 @@ class CreateCommand(GematoCommand):
help='Include TIMESTAMP entry in Manifest')
def parse_args(self, args, argp):
+ super(CreateCommand, self).parse_args(args, argp)
+
self.paths = args.paths
self.timestamp = args.timestamp
self.init_kwargs = {}
self.save_kwargs = {}
self.init_kwargs['allow_create'] = True
+ self.init_kwargs['openpgp_env'] = self.openpgp_env
if args.hashes is not None:
self.init_kwargs['hashes'] = args.hashes.split()
@@ -382,19 +388,6 @@ class CreateCommand(GematoCommand):
if args.sign is not None:
self.init_kwargs['sign_openpgp'] = args.sign
- # use isolated environment if key is specified;
- # system environment otherwise
- if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
- else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
- self.openpgp_env = env_class()
-
- if args.openpgp_key is not None:
- with io.open(args.openpgp_key, 'rb') as f:
- self.openpgp_env.import_key(f)
- self.init_kwargs['openpgp_env'] = self.openpgp_env
-
def __call__(self):
for p in self.paths:
start = timeit.default_timer()
@@ -422,10 +415,6 @@ class CreateCommand(GematoCommand):
return 0
- def cleanup(self):
- if self.openpgp_env is not None:
- self.openpgp_env.close()
-
def main(argv):
argp = argparse.ArgumentParser(