diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-09-08 11:14:48 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-09-08 11:14:48 +0200 |
commit | c8300a0e55a58d3eebbaa11101d0e26f6aa63d40 (patch) | |
tree | 45719615f247ea43a6147e840e11137f68718d51 | |
parent | 1536c5215669daba1ffe5791f2dd3d8b06a9a7bc (diff) | |
download | gemato-c8300a0e55a58d3eebbaa11101d0e26f6aa63d40.tar.gz |
openpgp: Streamline GPG error handling, and handle malformed utf8
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r-- | gemato/openpgp.py | 71 |
1 files changed, 37 insertions, 34 deletions
diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 9668b0f..fceec40 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -145,9 +145,8 @@ class SystemGPGEnvironment: exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--status-fd', '1', '--verify'], - f.read().encode('utf8')) - if exitst != 0: - raise OpenPGPVerificationFailure(err.decode('utf8')) + f.read().encode('utf8'), + raise_on_error=OpenPGPVerificationFailure) is_good = False is_trusted = False @@ -158,9 +157,11 @@ class SystemGPGEnvironment: if line.startswith(b'[GNUPG:] GOODSIG'): is_good = True elif line.startswith(b'[GNUPG:] EXPKEYSIG'): - raise OpenPGPExpiredKeyFailure(err.decode('utf8')) + raise OpenPGPExpiredKeyFailure( + err.decode('utf8', errors='backslashreplace')) elif line.startswith(b'[GNUPG:] REVKEYSIG'): - raise OpenPGPRevokedKeyFailure(err.decode('utf8')) + raise OpenPGPRevokedKeyFailure( + err.decode('utf8', errors='backslashreplace')) elif line.startswith(b'[GNUPG:] VALIDSIG'): spl = line.split(b' ') assert len(spl) >= 12 @@ -179,9 +180,11 @@ class SystemGPGEnvironment: # require both GOODSIG and VALIDSIG if not is_good or sig_data is None: - raise OpenPGPUnknownSigFailure(err.decode('utf8')) + raise OpenPGPUnknownSigFailure( + err.decode('utf8', errors='backslashreplace')) if not is_trusted: - raise OpenPGPUntrustedSigFailure(err.decode('utf8')) + raise OpenPGPUntrustedSigFailure( + err.decode('utf8', errors='backslashreplace')) return sig_data def clear_sign_file(self, f, outf, keyid=None): @@ -200,13 +203,13 @@ class SystemGPGEnvironment: args += ['--local-user', keyid] exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--clearsign'] + args, - f.read().encode('utf8')) - if exitst != 0: - raise OpenPGPSigningFailure(err.decode('utf8')) + f.read().encode('utf8'), + raise_on_error=OpenPGPSigningFailure) outf.write(out.decode('utf8')) - def _spawn_gpg(self, argv, stdin='', env_override={}): + def _spawn_gpg(self, argv, stdin='', env_override={}, + raise_on_error=None): env = os.environ.copy() env['TZ'] = 'UTC' env.update(env_override) @@ -221,6 +224,9 @@ class SystemGPGEnvironment: raise OpenPGPNoImplementation('install gpg') out, err = p.communicate(stdin) + if raise_on_error is not None and p.wait() != 0: + raise raise_on_error( + err.decode('utf8', errors='backslashreplace')) return (p.wait(), out, err) @@ -292,7 +298,9 @@ debug-level guru ret, sout, serr = self._spawn_gpg( [GNUPGCONF, '--kill', 'all']) if ret != 0: - logging.warning(f'{GNUPGCONF} --kill failed: {serr!r}') + logging.warning( + f'{GNUPGCONF} --kill failed:\n' + f'{serr.decode("utf8", errors="backslashescape")}') if not self.debug: # we need to loop due to ENOTEMPTY potential while os.path.isdir(self._home): @@ -306,9 +314,8 @@ debug-level guru def import_key(self, keyfile, trust=True): exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--import', '--status-fd', '1'], - keyfile.read()) - if exitst != 0: - raise OpenPGPKeyImportError(err.decode('utf8')) + keyfile.read(), + raise_on_error=OpenPGPKeyImportError) if trust: fprs = set() @@ -319,9 +326,8 @@ debug-level guru ownertrust = ''.join(f'{fpr}:6:\n' for fpr in fprs).encode('utf8') exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--import-ownertrust'], - ownertrust) - if exitst != 0: - raise OpenPGPKeyImportError(err.decode('utf8')) + ownertrust, + raise_on_error=OpenPGPKeyImportError) def list_keys(self): """ @@ -332,9 +338,8 @@ debug-level guru """ exitst, out, err = self._spawn_gpg( - [GNUPG, '--batch', '--with-colons', '--list-keys']) - if exitst != 0: - raise OpenPGPKeyListingError(err.decode('utf8')) + [GNUPG, '--batch', '--with-colons', '--list-keys'], + raise_on_error=OpenPGPKeyListingError) prev_pub = None fpr = None @@ -419,10 +424,9 @@ debug-level guru data += resp.content exitst, out, err = self._spawn_gpg( - [GNUPG, '--batch', '--import', '--status-fd', '1'], data) - if exitst != 0: - # there's no valid reason for import to fail here - raise OpenPGPKeyRefreshError(err.decode('utf8')) + [GNUPG, '--batch', '--import', '--status-fd', '1'], + data, + raise_on_error=OpenPGPKeyRefreshError) # we need to explicitly ensure all keys were fetched for line in out.splitlines(): @@ -435,10 +439,8 @@ debug-level guru else: # we need to delete unexpected keys exitst, out, err = self._spawn_gpg( - [GNUPG, '--batch', '--delete-keys', fpr]) - if exitst != 0: - raise OpenPGPKeyRefreshError( - err.decode('utf8')) + [GNUPG, '--batch', '--delete-keys', fpr], + raise_on_error=OpenPGPKeyRefreshError) if keys: logging.debug( f'refresh_keys_wkd(): failing due to non-updated keys: ' @@ -453,9 +455,8 @@ debug-level guru ks_args = ['--keyserver', keyserver] exitst, out, err = self._spawn_gpg( - [GNUPG, '--batch', '--refresh-keys'] + ks_args) - if exitst != 0: - raise OpenPGPKeyRefreshError(err.decode('utf8')) + [GNUPG, '--batch', '--refresh-keys'] + ks_args, + raise_on_error=OpenPGPKeyRefreshError) def refresh_keys(self, allow_wkd=True, keyserver=None): logging.debug(f'refresh_keys(allow_wkd={allow_wkd}, ' @@ -471,11 +472,13 @@ debug-level guru assert self._home is not None return self._home - def _spawn_gpg(self, options, stdin=''): + def _spawn_gpg(self, *args, **kwargs): env_override = {'GNUPGHOME': self.home} if self.proxy is not None: env_override['http_proxy'] = self.proxy - return (super()._spawn_gpg(options, stdin, env_override)) + assert 'env_override' not in kwargs + kwargs['env_override'] = env_override + return super()._spawn_gpg(*args, **kwargs) class PGPyEnvironment: |