summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/cli.py87
-rw-r--r--gemato/openpgp.py42
2 files changed, 67 insertions, 62 deletions
diff --git a/gemato/cli.py b/gemato/cli.py
index 276b6b0..9c5c6cd 100644
--- a/gemato/cli.py
+++ b/gemato/cli.py
@@ -31,8 +31,11 @@ from gemato.profile import get_profile_by_name
from gemato.recursiveloader import ManifestRecursiveLoader
+LOGGER = logging.getLogger(__name__)
+
+
def verify_failure(e):
- logging.error(e)
+ LOGGER.error(e)
return False
@@ -167,25 +170,25 @@ class VerifyingOpenPGPMixin(BaseOpenPGPMixin):
# always refresh keys to check for revocation
# (unless user specifically asked us not to)
if args.refresh_keys:
- logging.info('Refreshing keys...')
+ LOGGER.info('Refreshing keys...')
self.openpgp_env.refresh_keys(allow_wkd=args.allow_wkd,
keyserver=args.keyserver)
- logging.info('Keys refreshed.')
+ LOGGER.info('Keys refreshed.')
def print_signatures(self, sigs):
for i, sig in enumerate(sigs):
if len(sigs) > 1:
- logging.info(f"-- signature {i}")
- logging.info(f"- status: {sig.sig_status}")
- logging.info(f"- valid: {sig.valid_sig}, "
- f"trusted: {sig.trusted_sig}")
+ LOGGER.info(f"-- signature {i}")
+ LOGGER.info(f"- status: {sig.sig_status}")
+ LOGGER.info(f"- valid: {sig.valid_sig}, "
+ f"trusted: {sig.trusted_sig}")
if sig.valid_sig:
- logging.info("- primary key: "
- f"{sig.primary_key_fingerprint}")
- logging.info(f"- subkey: {sig.fingerprint}")
- logging.info(f"- timestamp: {sig.timestamp} UTC")
+ LOGGER.info("- primary key: "
+ f"{sig.primary_key_fingerprint}")
+ LOGGER.info(f"- subkey: {sig.fingerprint}")
+ LOGGER.info(f"- timestamp: {sig.timestamp} UTC")
if sig.key_expiration is not None:
- logging.info(f"- key expiration: {sig.key_expiration} UTC")
+ LOGGER.info(f"- key expiration: {sig.key_expiration} UTC")
class BaseManifestLoaderMixin:
@@ -274,27 +277,27 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin,
for p in self.paths:
tlm = find_top_level_manifest(p)
if tlm is None:
- logging.error(f'Top-level Manifest not found in {p}')
+ LOGGER.error(f'Top-level Manifest not found in {p}')
return 1
- logging.info(f"Using top-level Manifest: {tlm}")
+ LOGGER.info(f"Using top-level Manifest: {tlm}")
start = timeit.default_timer()
m = ManifestRecursiveLoader(tlm, **self.init_kwargs)
if self.require_signed_manifest and not m.openpgp_signed:
- logging.error(f'Top-level Manifest {tlm} is not '
- f'OpenPGP signed')
+ LOGGER.error(f'Top-level Manifest {tlm} is not '
+ f'OpenPGP signed')
return 1
ts = m.find_timestamp()
if ts:
- logging.info(f'Manifest timestamp: {ts.ts} UTC')
+ LOGGER.info(f'Manifest timestamp: {ts.ts} UTC')
if m.openpgp_signed:
- logging.info('Valid OpenPGP signature found:')
+ LOGGER.info('Valid OpenPGP signature found:')
self.print_signatures(m.openpgp_signature)
- logging.info(f'Verifying {p}...')
+ LOGGER.info(f'Verifying {p}...')
tlmdir = os.path.dirname(tlm)
relpath = os.path.relpath(p, tlmdir)
@@ -306,12 +309,12 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin,
apparent_path = os.path.join(tlmdir, e.path)
real_path = os.path.realpath(apparent_path)
if apparent_path != real_path:
- logging.warning(
+ LOGGER.warning(
f"Path contains symlinks. Real file path: {real_path}")
raise
stop = timeit.default_timer()
- logging.info(f'{p} verified in {stop - start:.2f} seconds')
+ LOGGER.info(f'{p} verified in {stop - start:.2f} seconds')
return 0 if ret else 1
@@ -411,7 +414,7 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand):
for p in self.paths:
tlm = find_top_level_manifest(p)
if tlm is None:
- logging.error(f'Top-level Manifest not found in {p}')
+ LOGGER.error(f'Top-level Manifest not found in {p}')
return 1
start = timeit.default_timer()
@@ -419,32 +422,32 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand):
# if not specified by user, profile must set it
if m.hashes is None:
- logging.error('--hashes must be specified if not '
- 'implied by --profile')
+ LOGGER.error('--hashes must be specified if not '
+ 'implied by --profile')
return 1
relpath = os.path.relpath(p, os.path.dirname(tlm))
if relpath == '.':
relpath = ''
if self.timestamp and relpath != '':
- logging.error('Timestamp can only be updated if doing '
- 'full-tree update')
+ LOGGER.error('Timestamp can only be updated if doing '
+ 'full-tree update')
return 1
update_kwargs = {}
if self.incremental:
if relpath != '':
- logging.error('Incremental works only for '
- 'full-tree update')
+ LOGGER.error('Incremental works only for '
+ 'full-tree update')
return 1
last_ts = m.find_timestamp()
if last_ts is None:
- logging.error('Incremental specified but no '
- 'timestamp in Manifest')
+ LOGGER.error('Incremental specified but no '
+ 'timestamp in Manifest')
return 1
update_kwargs['last_mtime'] = last_ts.ts.timestamp()
- logging.info(f'Updating Manifests in {p}...')
+ LOGGER.info(f'Updating Manifests in {p}...')
start_ts = datetime.datetime.utcnow()
m.update_entries_for_directory(relpath, **update_kwargs)
@@ -463,7 +466,7 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand):
m.save_manifests(**self.save_kwargs)
stop = timeit.default_timer()
- logging.info(f'{p} updated in {stop - start:.2f} seconds')
+ LOGGER.info(f'{p} updated in {stop - start:.2f} seconds')
return 0
@@ -496,11 +499,11 @@ class CreateCommand(BaseUpdateMixin, GematoCommand):
# if not specified by user, profile must set it
if m.hashes is None:
- logging.error('--hashes must be specified if not '
- 'implied by --profile')
+ LOGGER.error('--hashes must be specified if not '
+ 'implied by --profile')
return 1
- logging.info(f'Creating Manifests in {p}...')
+ LOGGER.info(f'Creating Manifests in {p}...')
start_ts = datetime.datetime.utcnow()
m.update_entries_for_directory()
@@ -512,7 +515,7 @@ class CreateCommand(BaseUpdateMixin, GematoCommand):
m.save_manifests(**self.save_kwargs)
stop = timeit.default_timer()
- logging.info(f'{p} updated in {stop - start:.2f} seconds')
+ LOGGER.info(f'{p} updated in {stop - start:.2f} seconds')
return 0
@@ -598,11 +601,11 @@ class OpenPGPVerifyCommand(VerifyingOpenPGPMixin, GematoCommand):
sigs = self.openpgp_env.verify_file(
f, require_all_good=self.require_all_good)
except GematoException as e:
- logging.error(
+ LOGGER.error(
f'OpenPGP verification failed for {p}:\n{e}')
ret = False
else:
- logging.info(
+ LOGGER.info(
f'Valid OpenPGP signature found in {p}:')
self.print_signatures(sigs)
finally:
@@ -649,12 +652,12 @@ class OpenPGPVerifyDetachedCommand(VerifyingOpenPGPMixin, GematoCommand):
self.signature_file, self.data_file,
require_all_good=self.require_all_good)
except GematoException as e:
- logging.error(
+ LOGGER.error(
f"OpenPGP verification failed for {self.data_file} "
f"(sig in {self.signature_file}):\n{e}")
return 1
else:
- logging.info(
+ LOGGER.info(
f"File {self.data_file.name} verified successfully against "
f"the signature in {self.signature_file}:")
self.print_signatures(sigs)
@@ -692,7 +695,7 @@ class GnuPGWrapCommand(VerifyingOpenPGPMixin, GematoCommand):
sig = signal.strsignal(-ret)
else:
sig = -ret
- logging.error(
+ LOGGER.error(
f'Child process terminated due to signal: {sig}')
return ret
@@ -729,7 +732,7 @@ def main(argv):
finally:
vals.cmd.cleanup()
except GematoException as e:
- logging.error(e)
+ LOGGER.error(e)
return 1
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 3f659fa..52275a7 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -49,6 +49,8 @@ except ImportError:
GNUPG = os.environ.get('GNUPG', 'gpg')
GNUPGCONF = os.environ.get('GNUPGCONF', 'gpgconf')
+LOGGER = logging.getLogger(__name__)
+
class OpenPGPSignatureStatus(enum.Enum):
GOOD = enum.auto()
@@ -465,7 +467,7 @@ debug-level guru
ret, sout, serr = self._spawn_gpg(
[GNUPGCONF, '--kill', 'all'])
if ret != 0:
- logging.warning(
+ LOGGER.warning(
f'{GNUPGCONF} --kill failed:\n'
f'{serr.decode("utf8", errors="backslashescape")}')
if not self.debug:
@@ -474,8 +476,8 @@ debug-level guru
shutil.rmtree(self._home,
onerror=_rmtree_error_handler)
else:
- logging.debug(f'GNUPGHOME left for debug purposes: '
- f'{self._home}')
+ LOGGER.debug(f'GNUPGHOME left for debug purposes: '
+ f'{self._home}')
self._home = None
def import_key(self, keyfile, trust=True):
@@ -522,7 +524,7 @@ debug-level guru
raise OpenPGPKeyListingError(
f'Incorrect fingerprint {fpr} for key '
f'{prev_pub}')
- logging.debug(
+ LOGGER.debug(
f'list_keys(): fingerprint: {fpr}')
ret[fpr] = []
prev_pub = None
@@ -533,7 +535,7 @@ debug-level guru
elif line.startswith(b'pub:'):
# wait for the fingerprint
prev_pub = line.split(b':')[4].decode('ASCII')
- logging.debug(f'list_keys(): keyid: {prev_pub}')
+ LOGGER.debug(f'list_keys(): keyid: {prev_pub}')
elif line.startswith(b'uid:'):
if fpr is None:
raise OpenPGPKeyListingError(
@@ -542,10 +544,10 @@ debug-level guru
_, addr = email.utils.parseaddr(
uid.decode('utf8', errors='replace'))
if '@' in addr:
- logging.debug(f'list_keys(): UID: {addr}')
+ LOGGER.debug(f'list_keys(): UID: {addr}')
ret[fpr].append(addr)
else:
- logging.debug(
+ LOGGER.debug(
f'list_keys(): ignoring UID without mail: '
f'{uid!r}')
@@ -557,19 +559,19 @@ debug-level guru
keys were successfully found. Otherwise, returns false.
"""
if requests is None:
- logging.debug('refresh_keys_wkd(): failing because requests'
- 'module is missing')
+ LOGGER.debug('refresh_keys_wkd(): failing because requests'
+ 'module is missing')
return False
# list all keys in the keyring
keys = self.list_keys()
if not keys:
- logging.debug('refresh_keys_wkd(): no keys found')
+ LOGGER.debug('refresh_keys_wkd(): no keys found')
return False
addrs = set()
for key, uids in keys.items():
if not uids:
- logging.debug(
+ LOGGER.debug(
f'refresh_keys_wkd(): failing due to no UIDs on '
f'key {key}')
return False
@@ -591,8 +593,8 @@ debug-level guru
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
) as e:
- logging.debug(f'refresh_keys_wkd(): failing due to failed '
- f'request for {url}: {e}')
+ LOGGER.debug(f'refresh_keys_wkd(): failing due to failed '
+ f'request for {url}: {e}')
return False
data += resp.content
@@ -605,7 +607,7 @@ debug-level guru
for line in out.splitlines():
if line.startswith(b'[GNUPG:] IMPORT_OK'):
fpr = line.split(b' ')[3].decode('ASCII')
- logging.debug(
+ LOGGER.debug(
f'refresh_keys_wkd(): import successful for key: {fpr}')
imported_keys.add(fpr)
@@ -617,7 +619,7 @@ debug-level guru
unexpected_keys = imported_keys.difference(expected_keys)
if unexpected_keys:
# we need to delete unexpected keys
- logging.debug(
+ LOGGER.debug(
f'refresh_keys_wkd(): got unexpected key, will remove: '
f'{unexpected_keys}')
# 128x 40-byte fingerprints = 5KiB commandline max
@@ -629,7 +631,7 @@ debug-level guru
not_updated_keys = expected_keys.difference(imported_keys)
if not_updated_keys:
- logging.debug(
+ LOGGER.debug(
f'refresh_keys_wkd(): failing due to non-updated keys: '
f'{not_updated_keys}')
return False
@@ -646,8 +648,8 @@ debug-level guru
raise_on_error=OpenPGPKeyRefreshError)
def refresh_keys(self, allow_wkd=True, keyserver=None):
- logging.debug(f'refresh_keys(allow_wkd={allow_wkd}, '
- f'keyserver={keyserver}) called')
+ LOGGER.debug(f'refresh_keys(allow_wkd={allow_wkd}, '
+ f'keyserver={keyserver}) called')
if allow_wkd and self.refresh_keys_wkd():
return
@@ -710,12 +712,12 @@ class PGPyEnvironment:
try:
verifies = k.parent.verify(k)
except pgpy.errors.PGPError:
- logging.debug(
+ LOGGER.debug(
f'Rejecting subkey {fpr} due to missing sig')
self.keyring.unload(k)
else:
if not verifies:
- logging.debug(
+ LOGGER.debug(
f'Rejecting subkey {fpr} since parent '
f'key signature does not check out')
self.keyring.unload(k)