summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-09-08 11:14:48 +0200
committerMichał Górny <mgorny@gentoo.org>2020-09-08 11:14:48 +0200
commitc8300a0e55a58d3eebbaa11101d0e26f6aa63d40 (patch)
tree45719615f247ea43a6147e840e11137f68718d51
parent1536c5215669daba1ffe5791f2dd3d8b06a9a7bc (diff)
downloadgemato-c8300a0e55a58d3eebbaa11101d0e26f6aa63d40.tar.gz
openpgp: Streamline GPG error handling, and handle malformed utf8
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r--gemato/openpgp.py71
1 files changed, 37 insertions, 34 deletions
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 9668b0f..fceec40 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -145,9 +145,8 @@ class SystemGPGEnvironment:
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--status-fd', '1', '--verify'],
- f.read().encode('utf8'))
- if exitst != 0:
- raise OpenPGPVerificationFailure(err.decode('utf8'))
+ f.read().encode('utf8'),
+ raise_on_error=OpenPGPVerificationFailure)
is_good = False
is_trusted = False
@@ -158,9 +157,11 @@ class SystemGPGEnvironment:
if line.startswith(b'[GNUPG:] GOODSIG'):
is_good = True
elif line.startswith(b'[GNUPG:] EXPKEYSIG'):
- raise OpenPGPExpiredKeyFailure(err.decode('utf8'))
+ raise OpenPGPExpiredKeyFailure(
+ err.decode('utf8', errors='backslashreplace'))
elif line.startswith(b'[GNUPG:] REVKEYSIG'):
- raise OpenPGPRevokedKeyFailure(err.decode('utf8'))
+ raise OpenPGPRevokedKeyFailure(
+ err.decode('utf8', errors='backslashreplace'))
elif line.startswith(b'[GNUPG:] VALIDSIG'):
spl = line.split(b' ')
assert len(spl) >= 12
@@ -179,9 +180,11 @@ class SystemGPGEnvironment:
# require both GOODSIG and VALIDSIG
if not is_good or sig_data is None:
- raise OpenPGPUnknownSigFailure(err.decode('utf8'))
+ raise OpenPGPUnknownSigFailure(
+ err.decode('utf8', errors='backslashreplace'))
if not is_trusted:
- raise OpenPGPUntrustedSigFailure(err.decode('utf8'))
+ raise OpenPGPUntrustedSigFailure(
+ err.decode('utf8', errors='backslashreplace'))
return sig_data
def clear_sign_file(self, f, outf, keyid=None):
@@ -200,13 +203,13 @@ class SystemGPGEnvironment:
args += ['--local-user', keyid]
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--clearsign'] + args,
- f.read().encode('utf8'))
- if exitst != 0:
- raise OpenPGPSigningFailure(err.decode('utf8'))
+ f.read().encode('utf8'),
+ raise_on_error=OpenPGPSigningFailure)
outf.write(out.decode('utf8'))
- def _spawn_gpg(self, argv, stdin='', env_override={}):
+ def _spawn_gpg(self, argv, stdin='', env_override={},
+ raise_on_error=None):
env = os.environ.copy()
env['TZ'] = 'UTC'
env.update(env_override)
@@ -221,6 +224,9 @@ class SystemGPGEnvironment:
raise OpenPGPNoImplementation('install gpg')
out, err = p.communicate(stdin)
+ if raise_on_error is not None and p.wait() != 0:
+ raise raise_on_error(
+ err.decode('utf8', errors='backslashreplace'))
return (p.wait(), out, err)
@@ -292,7 +298,9 @@ debug-level guru
ret, sout, serr = self._spawn_gpg(
[GNUPGCONF, '--kill', 'all'])
if ret != 0:
- logging.warning(f'{GNUPGCONF} --kill failed: {serr!r}')
+ logging.warning(
+ f'{GNUPGCONF} --kill failed:\n'
+ f'{serr.decode("utf8", errors="backslashescape")}')
if not self.debug:
# we need to loop due to ENOTEMPTY potential
while os.path.isdir(self._home):
@@ -306,9 +314,8 @@ debug-level guru
def import_key(self, keyfile, trust=True):
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--import', '--status-fd', '1'],
- keyfile.read())
- if exitst != 0:
- raise OpenPGPKeyImportError(err.decode('utf8'))
+ keyfile.read(),
+ raise_on_error=OpenPGPKeyImportError)
if trust:
fprs = set()
@@ -319,9 +326,8 @@ debug-level guru
ownertrust = ''.join(f'{fpr}:6:\n' for fpr in fprs).encode('utf8')
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--import-ownertrust'],
- ownertrust)
- if exitst != 0:
- raise OpenPGPKeyImportError(err.decode('utf8'))
+ ownertrust,
+ raise_on_error=OpenPGPKeyImportError)
def list_keys(self):
"""
@@ -332,9 +338,8 @@ debug-level guru
"""
exitst, out, err = self._spawn_gpg(
- [GNUPG, '--batch', '--with-colons', '--list-keys'])
- if exitst != 0:
- raise OpenPGPKeyListingError(err.decode('utf8'))
+ [GNUPG, '--batch', '--with-colons', '--list-keys'],
+ raise_on_error=OpenPGPKeyListingError)
prev_pub = None
fpr = None
@@ -419,10 +424,9 @@ debug-level guru
data += resp.content
exitst, out, err = self._spawn_gpg(
- [GNUPG, '--batch', '--import', '--status-fd', '1'], data)
- if exitst != 0:
- # there's no valid reason for import to fail here
- raise OpenPGPKeyRefreshError(err.decode('utf8'))
+ [GNUPG, '--batch', '--import', '--status-fd', '1'],
+ data,
+ raise_on_error=OpenPGPKeyRefreshError)
# we need to explicitly ensure all keys were fetched
for line in out.splitlines():
@@ -435,10 +439,8 @@ debug-level guru
else:
# we need to delete unexpected keys
exitst, out, err = self._spawn_gpg(
- [GNUPG, '--batch', '--delete-keys', fpr])
- if exitst != 0:
- raise OpenPGPKeyRefreshError(
- err.decode('utf8'))
+ [GNUPG, '--batch', '--delete-keys', fpr],
+ raise_on_error=OpenPGPKeyRefreshError)
if keys:
logging.debug(
f'refresh_keys_wkd(): failing due to non-updated keys: '
@@ -453,9 +455,8 @@ debug-level guru
ks_args = ['--keyserver', keyserver]
exitst, out, err = self._spawn_gpg(
- [GNUPG, '--batch', '--refresh-keys'] + ks_args)
- if exitst != 0:
- raise OpenPGPKeyRefreshError(err.decode('utf8'))
+ [GNUPG, '--batch', '--refresh-keys'] + ks_args,
+ raise_on_error=OpenPGPKeyRefreshError)
def refresh_keys(self, allow_wkd=True, keyserver=None):
logging.debug(f'refresh_keys(allow_wkd={allow_wkd}, '
@@ -471,11 +472,13 @@ debug-level guru
assert self._home is not None
return self._home
- def _spawn_gpg(self, options, stdin=''):
+ def _spawn_gpg(self, *args, **kwargs):
env_override = {'GNUPGHOME': self.home}
if self.proxy is not None:
env_override['http_proxy'] = self.proxy
- return (super()._spawn_gpg(options, stdin, env_override))
+ assert 'env_override' not in kwargs
+ kwargs['env_override'] = env_override
+ return super()._spawn_gpg(*args, **kwargs)
class PGPyEnvironment: