diff options
-rw-r--r-- | gemato/cli.py | 87 | ||||
-rw-r--r-- | gemato/openpgp.py | 42 |
2 files changed, 67 insertions, 62 deletions
diff --git a/gemato/cli.py b/gemato/cli.py index 276b6b0..9c5c6cd 100644 --- a/gemato/cli.py +++ b/gemato/cli.py @@ -31,8 +31,11 @@ from gemato.profile import get_profile_by_name from gemato.recursiveloader import ManifestRecursiveLoader +LOGGER = logging.getLogger(__name__) + + def verify_failure(e): - logging.error(e) + LOGGER.error(e) return False @@ -167,25 +170,25 @@ class VerifyingOpenPGPMixin(BaseOpenPGPMixin): # always refresh keys to check for revocation # (unless user specifically asked us not to) if args.refresh_keys: - logging.info('Refreshing keys...') + LOGGER.info('Refreshing keys...') self.openpgp_env.refresh_keys(allow_wkd=args.allow_wkd, keyserver=args.keyserver) - logging.info('Keys refreshed.') + LOGGER.info('Keys refreshed.') def print_signatures(self, sigs): for i, sig in enumerate(sigs): if len(sigs) > 1: - logging.info(f"-- signature {i}") - logging.info(f"- status: {sig.sig_status}") - logging.info(f"- valid: {sig.valid_sig}, " - f"trusted: {sig.trusted_sig}") + LOGGER.info(f"-- signature {i}") + LOGGER.info(f"- status: {sig.sig_status}") + LOGGER.info(f"- valid: {sig.valid_sig}, " + f"trusted: {sig.trusted_sig}") if sig.valid_sig: - logging.info("- primary key: " - f"{sig.primary_key_fingerprint}") - logging.info(f"- subkey: {sig.fingerprint}") - logging.info(f"- timestamp: {sig.timestamp} UTC") + LOGGER.info("- primary key: " + f"{sig.primary_key_fingerprint}") + LOGGER.info(f"- subkey: {sig.fingerprint}") + LOGGER.info(f"- timestamp: {sig.timestamp} UTC") if sig.key_expiration is not None: - logging.info(f"- key expiration: {sig.key_expiration} UTC") + LOGGER.info(f"- key expiration: {sig.key_expiration} UTC") class BaseManifestLoaderMixin: @@ -274,27 +277,27 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin, for p in self.paths: tlm = find_top_level_manifest(p) if tlm is None: - logging.error(f'Top-level Manifest not found in {p}') + LOGGER.error(f'Top-level Manifest not found in {p}') return 1 - logging.info(f"Using top-level Manifest: {tlm}") + LOGGER.info(f"Using top-level Manifest: {tlm}") start = timeit.default_timer() m = ManifestRecursiveLoader(tlm, **self.init_kwargs) if self.require_signed_manifest and not m.openpgp_signed: - logging.error(f'Top-level Manifest {tlm} is not ' - f'OpenPGP signed') + LOGGER.error(f'Top-level Manifest {tlm} is not ' + f'OpenPGP signed') return 1 ts = m.find_timestamp() if ts: - logging.info(f'Manifest timestamp: {ts.ts} UTC') + LOGGER.info(f'Manifest timestamp: {ts.ts} UTC') if m.openpgp_signed: - logging.info('Valid OpenPGP signature found:') + LOGGER.info('Valid OpenPGP signature found:') self.print_signatures(m.openpgp_signature) - logging.info(f'Verifying {p}...') + LOGGER.info(f'Verifying {p}...') tlmdir = os.path.dirname(tlm) relpath = os.path.relpath(p, tlmdir) @@ -306,12 +309,12 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin, apparent_path = os.path.join(tlmdir, e.path) real_path = os.path.realpath(apparent_path) if apparent_path != real_path: - logging.warning( + LOGGER.warning( f"Path contains symlinks. Real file path: {real_path}") raise stop = timeit.default_timer() - logging.info(f'{p} verified in {stop - start:.2f} seconds') + LOGGER.info(f'{p} verified in {stop - start:.2f} seconds') return 0 if ret else 1 @@ -411,7 +414,7 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand): for p in self.paths: tlm = find_top_level_manifest(p) if tlm is None: - logging.error(f'Top-level Manifest not found in {p}') + LOGGER.error(f'Top-level Manifest not found in {p}') return 1 start = timeit.default_timer() @@ -419,32 +422,32 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand): # if not specified by user, profile must set it if m.hashes is None: - logging.error('--hashes must be specified if not ' - 'implied by --profile') + LOGGER.error('--hashes must be specified if not ' + 'implied by --profile') return 1 relpath = os.path.relpath(p, os.path.dirname(tlm)) if relpath == '.': relpath = '' if self.timestamp and relpath != '': - logging.error('Timestamp can only be updated if doing ' - 'full-tree update') + LOGGER.error('Timestamp can only be updated if doing ' + 'full-tree update') return 1 update_kwargs = {} if self.incremental: if relpath != '': - logging.error('Incremental works only for ' - 'full-tree update') + LOGGER.error('Incremental works only for ' + 'full-tree update') return 1 last_ts = m.find_timestamp() if last_ts is None: - logging.error('Incremental specified but no ' - 'timestamp in Manifest') + LOGGER.error('Incremental specified but no ' + 'timestamp in Manifest') return 1 update_kwargs['last_mtime'] = last_ts.ts.timestamp() - logging.info(f'Updating Manifests in {p}...') + LOGGER.info(f'Updating Manifests in {p}...') start_ts = datetime.datetime.utcnow() m.update_entries_for_directory(relpath, **update_kwargs) @@ -463,7 +466,7 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand): m.save_manifests(**self.save_kwargs) stop = timeit.default_timer() - logging.info(f'{p} updated in {stop - start:.2f} seconds') + LOGGER.info(f'{p} updated in {stop - start:.2f} seconds') return 0 @@ -496,11 +499,11 @@ class CreateCommand(BaseUpdateMixin, GematoCommand): # if not specified by user, profile must set it if m.hashes is None: - logging.error('--hashes must be specified if not ' - 'implied by --profile') + LOGGER.error('--hashes must be specified if not ' + 'implied by --profile') return 1 - logging.info(f'Creating Manifests in {p}...') + LOGGER.info(f'Creating Manifests in {p}...') start_ts = datetime.datetime.utcnow() m.update_entries_for_directory() @@ -512,7 +515,7 @@ class CreateCommand(BaseUpdateMixin, GematoCommand): m.save_manifests(**self.save_kwargs) stop = timeit.default_timer() - logging.info(f'{p} updated in {stop - start:.2f} seconds') + LOGGER.info(f'{p} updated in {stop - start:.2f} seconds') return 0 @@ -598,11 +601,11 @@ class OpenPGPVerifyCommand(VerifyingOpenPGPMixin, GematoCommand): sigs = self.openpgp_env.verify_file( f, require_all_good=self.require_all_good) except GematoException as e: - logging.error( + LOGGER.error( f'OpenPGP verification failed for {p}:\n{e}') ret = False else: - logging.info( + LOGGER.info( f'Valid OpenPGP signature found in {p}:') self.print_signatures(sigs) finally: @@ -649,12 +652,12 @@ class OpenPGPVerifyDetachedCommand(VerifyingOpenPGPMixin, GematoCommand): self.signature_file, self.data_file, require_all_good=self.require_all_good) except GematoException as e: - logging.error( + LOGGER.error( f"OpenPGP verification failed for {self.data_file} " f"(sig in {self.signature_file}):\n{e}") return 1 else: - logging.info( + LOGGER.info( f"File {self.data_file.name} verified successfully against " f"the signature in {self.signature_file}:") self.print_signatures(sigs) @@ -692,7 +695,7 @@ class GnuPGWrapCommand(VerifyingOpenPGPMixin, GematoCommand): sig = signal.strsignal(-ret) else: sig = -ret - logging.error( + LOGGER.error( f'Child process terminated due to signal: {sig}') return ret @@ -729,7 +732,7 @@ def main(argv): finally: vals.cmd.cleanup() except GematoException as e: - logging.error(e) + LOGGER.error(e) return 1 diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 3f659fa..52275a7 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -49,6 +49,8 @@ except ImportError: GNUPG = os.environ.get('GNUPG', 'gpg') GNUPGCONF = os.environ.get('GNUPGCONF', 'gpgconf') +LOGGER = logging.getLogger(__name__) + class OpenPGPSignatureStatus(enum.Enum): GOOD = enum.auto() @@ -465,7 +467,7 @@ debug-level guru ret, sout, serr = self._spawn_gpg( [GNUPGCONF, '--kill', 'all']) if ret != 0: - logging.warning( + LOGGER.warning( f'{GNUPGCONF} --kill failed:\n' f'{serr.decode("utf8", errors="backslashescape")}') if not self.debug: @@ -474,8 +476,8 @@ debug-level guru shutil.rmtree(self._home, onerror=_rmtree_error_handler) else: - logging.debug(f'GNUPGHOME left for debug purposes: ' - f'{self._home}') + LOGGER.debug(f'GNUPGHOME left for debug purposes: ' + f'{self._home}') self._home = None def import_key(self, keyfile, trust=True): @@ -522,7 +524,7 @@ debug-level guru raise OpenPGPKeyListingError( f'Incorrect fingerprint {fpr} for key ' f'{prev_pub}') - logging.debug( + LOGGER.debug( f'list_keys(): fingerprint: {fpr}') ret[fpr] = [] prev_pub = None @@ -533,7 +535,7 @@ debug-level guru elif line.startswith(b'pub:'): # wait for the fingerprint prev_pub = line.split(b':')[4].decode('ASCII') - logging.debug(f'list_keys(): keyid: {prev_pub}') + LOGGER.debug(f'list_keys(): keyid: {prev_pub}') elif line.startswith(b'uid:'): if fpr is None: raise OpenPGPKeyListingError( @@ -542,10 +544,10 @@ debug-level guru _, addr = email.utils.parseaddr( uid.decode('utf8', errors='replace')) if '@' in addr: - logging.debug(f'list_keys(): UID: {addr}') + LOGGER.debug(f'list_keys(): UID: {addr}') ret[fpr].append(addr) else: - logging.debug( + LOGGER.debug( f'list_keys(): ignoring UID without mail: ' f'{uid!r}') @@ -557,19 +559,19 @@ debug-level guru keys were successfully found. Otherwise, returns false. """ if requests is None: - logging.debug('refresh_keys_wkd(): failing because requests' - 'module is missing') + LOGGER.debug('refresh_keys_wkd(): failing because requests' + 'module is missing') return False # list all keys in the keyring keys = self.list_keys() if not keys: - logging.debug('refresh_keys_wkd(): no keys found') + LOGGER.debug('refresh_keys_wkd(): no keys found') return False addrs = set() for key, uids in keys.items(): if not uids: - logging.debug( + LOGGER.debug( f'refresh_keys_wkd(): failing due to no UIDs on ' f'key {key}') return False @@ -591,8 +593,8 @@ debug-level guru except (requests.exceptions.ConnectionError, requests.exceptions.HTTPError, ) as e: - logging.debug(f'refresh_keys_wkd(): failing due to failed ' - f'request for {url}: {e}') + LOGGER.debug(f'refresh_keys_wkd(): failing due to failed ' + f'request for {url}: {e}') return False data += resp.content @@ -605,7 +607,7 @@ debug-level guru for line in out.splitlines(): if line.startswith(b'[GNUPG:] IMPORT_OK'): fpr = line.split(b' ')[3].decode('ASCII') - logging.debug( + LOGGER.debug( f'refresh_keys_wkd(): import successful for key: {fpr}') imported_keys.add(fpr) @@ -617,7 +619,7 @@ debug-level guru unexpected_keys = imported_keys.difference(expected_keys) if unexpected_keys: # we need to delete unexpected keys - logging.debug( + LOGGER.debug( f'refresh_keys_wkd(): got unexpected key, will remove: ' f'{unexpected_keys}') # 128x 40-byte fingerprints = 5KiB commandline max @@ -629,7 +631,7 @@ debug-level guru not_updated_keys = expected_keys.difference(imported_keys) if not_updated_keys: - logging.debug( + LOGGER.debug( f'refresh_keys_wkd(): failing due to non-updated keys: ' f'{not_updated_keys}') return False @@ -646,8 +648,8 @@ debug-level guru raise_on_error=OpenPGPKeyRefreshError) def refresh_keys(self, allow_wkd=True, keyserver=None): - logging.debug(f'refresh_keys(allow_wkd={allow_wkd}, ' - f'keyserver={keyserver}) called') + LOGGER.debug(f'refresh_keys(allow_wkd={allow_wkd}, ' + f'keyserver={keyserver}) called') if allow_wkd and self.refresh_keys_wkd(): return @@ -710,12 +712,12 @@ class PGPyEnvironment: try: verifies = k.parent.verify(k) except pgpy.errors.PGPError: - logging.debug( + LOGGER.debug( f'Rejecting subkey {fpr} due to missing sig') self.keyring.unload(k) else: if not verifies: - logging.debug( + LOGGER.debug( f'Rejecting subkey {fpr} since parent ' f'key signature does not check out') self.keyring.unload(k) |