diff options
author | Michał Górny <mgorny@gentoo.org> | 2020-05-16 17:09:46 +0200 |
---|---|---|
committer | Michał Górny <mgorny@gentoo.org> | 2020-05-16 17:10:14 +0200 |
commit | 73f9a2f9fb0990b964dc8fadf32ee2b76c30419c (patch) | |
tree | 452b10773903c8dd3bcd55182400e99648e03d19 | |
parent | f3d79b33a21c4683c39600b63b958c3b0e2c4e2d (diff) | |
download | gemato-73f9a2f9fb0990b964dc8fadf32ee2b76c30419c.tar.gz |
openpgp: Pass 'gpg --batch' to _spawn_gpg() explicitly
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r-- | gemato/openpgp.py | 35 | ||||
-rw-r--r-- | tests/testutil.py | 19 |
2 files changed, 31 insertions, 23 deletions
diff --git a/gemato/openpgp.py b/gemato/openpgp.py index 9cd86a6..364da66 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -1,6 +1,6 @@ # gemato: OpenPGP verification support # vim:fileencoding=utf-8 -# (c) 2017-2019 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import datetime @@ -96,8 +96,9 @@ class OpenPGPSystemEnvironment(object): fails. """ - exitst, out, err = self._spawn_gpg(['--status-fd', '1', '--verify'], - f.read().encode('utf8')) + exitst, out, err = self._spawn_gpg( + ['gpg', '--batch', '--status-fd', '1', '--verify'], + f.read().encode('utf8')) if exitst != 0: raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8')) @@ -141,20 +142,21 @@ class OpenPGPSystemEnvironment(object): args = [] if keyid is not None: args += ['--local-user', keyid] - exitst, out, err = self._spawn_gpg(['--clearsign'] + args, - f.read().encode('utf8')) + exitst, out, err = self._spawn_gpg( + ['gpg', '--batch', '--clearsign'] + args, + f.read().encode('utf8')) if exitst != 0: raise gemato.exceptions.OpenPGPSigningFailure(err.decode('utf8')) outf.write(out.decode('utf8')) - def _spawn_gpg(self, options, stdin, env_override={}): + def _spawn_gpg(self, argv, stdin, env_override={}): env = os.environ.copy() env['TZ'] = 'UTC' env.update(env_override) try: - p = subprocess.Popen(['gpg', '--batch'] + options, + p = subprocess.Popen(argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -243,7 +245,8 @@ debug-level guru self._home = None def import_key(self, keyfile): - exitst, out, err = self._spawn_gpg(['--import'], keyfile.read()) + exitst, out, err = self._spawn_gpg( + ['gpg', '--batch', '--import'], keyfile.read()) if exitst != 0: raise gemato.exceptions.OpenPGPKeyImportError(err.decode('utf8')) @@ -254,7 +257,7 @@ debug-level guru """ # list all keys in the keyring exitst, out, err = self._spawn_gpg( - ['--with-colons', '--list-keys'], '') + ['gpg', '--batch', '--with-colons', '--list-keys'], '') if exitst != 0: raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) @@ -311,8 +314,8 @@ debug-level guru # create another isolated environment to fetch keys cleanly with self.clone() as subenv: # use --locate-keys to fetch keys via WKD - exitst, out, err = subenv._spawn_gpg(['--locate-keys'] - + list(addrs), '') + exitst, out, err = subenv._spawn_gpg( + ['gpg', '--batch', '--locate-keys'] + list(addrs), '') # if at least one fetch failed, gpg returns unsuccessfully if exitst != 0: logging.debug('refresh_keys_wkd(): gpg --locate-keys failed: {}' @@ -320,14 +323,15 @@ debug-level guru return False # otherwise, xfer the keys - exitst, out, err = subenv._spawn_gpg(['--export'] + list(keys), '') + exitst, out, err = subenv._spawn_gpg( + ['gpg', '--batch', '--export'] + list(keys), '') if exitst != 0: logging.debug('refresh_keys_wkd(): gpg --export failed: {}' .format(err.decode('utf8'))) return False - exitst, out, err = self._spawn_gpg(['--import', - '--status-fd', '1'], out) + exitst, out, err = self._spawn_gpg( + ['gpg', '--batch', '--import', '--status-fd', '1'], out) if exitst != 0: # there's no valid reason for import to fail here raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) @@ -351,7 +355,8 @@ debug-level guru if keyserver is not None: ks_args = ['--keyserver', keyserver] - exitst, out, err = self._spawn_gpg(ks_args + ['--refresh-keys'], '') + exitst, out, err = self._spawn_gpg( + ['gpg', '--batch', '--refresh-keys'] + ks_args, '') if exitst != 0: raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) diff --git a/tests/testutil.py b/tests/testutil.py index ad9525d..572dad6 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -1,6 +1,6 @@ # gemato: Test utility functions # vim:fileencoding=utf-8 -# (c) 2017-2018 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import errno @@ -141,16 +141,19 @@ class MockedWKDOpenPGPEnvironment(gemato.openpgp.OpenPGPEnvironment): def clone(self): return MockedWKDOpenPGPEnvironment(self.keys) - def _spawn_gpg(self, args, stdin): - if '--locate-keys' in args: - args.remove('--locate-keys') - assert len(args) == 1 - if args[0] in self.keys: + def _spawn_gpg(self, argv, stdin): + if '--locate-keys' in argv: + argv.remove('--locate-keys') + assert len(argv) == 3 + assert argv[:2] == ['gpg', '--batch'] + if argv[2] in self.keys: ret, sout, serr = super(MockedWKDOpenPGPEnvironment, - self)._spawn_gpg(['--import'], self.keys[args[0]]) + self)._spawn_gpg( + ['gpg', '--batch', '--import'], + self.keys[argv[2]]) else: ret = 2 return (ret, b'', b'') return super(MockedWKDOpenPGPEnvironment, self)._spawn_gpg( - args, stdin) + argv, stdin) |