summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/openpgp.py82
1 files changed, 43 insertions, 39 deletions
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index 2d6dd84..13f8c3f 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -12,41 +12,16 @@ import tempfile
import gemato.exceptions
-def _spawn_gpg(options, env_instance, stdin):
- env = None
- impls = ['gpg2', 'gpg']
- if env_instance is not None:
- env={'GNUPGHOME': env_instance.home}
- if env_instance._impl is not None:
- impls = [env_instance._impl]
-
- for impl in impls:
- try:
- p = subprocess.Popen([impl, '--batch'] + options,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=env)
- except OSError as e:
- if e.errno != errno.ENOENT:
- raise
- else:
- break
- else:
- raise gemato.exceptions.OpenPGPNoImplementation()
-
- if env_instance is not None:
- env_instance._impl = impl
-
- out, err = p.communicate(stdin)
- return (p.wait(), out, err)
-
-
class OpenPGPEnvironmentBase(object):
"""
Base class for OpenPGP environment.
"""
+ __slots__ = ['_impl']
+
+ def __init__(self):
+ self._impl = None
+
def __enter__(self):
return self
@@ -88,6 +63,31 @@ class OpenPGPEnvironmentBase(object):
raise NotImplementedError('clear_sign_file() is not implemented by this OpenPGP provider')
+ def _spawn_gpg(self, options, stdin, env=None):
+ impls = ['gpg2', 'gpg']
+ if self._impl is not None:
+ impls = [self._impl]
+
+ for impl in impls:
+ try:
+ p = subprocess.Popen([impl, '--batch'] + options,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=env)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ break
+ else:
+ raise gemato.exceptions.OpenPGPNoImplementation()
+
+ self._impl = impl
+
+ out, err = p.communicate(stdin)
+ return (p.wait(), out, err)
+
class OpenPGPSystemEnvironment(OpenPGPEnvironmentBase):
"""
@@ -95,7 +95,7 @@ class OpenPGPSystemEnvironment(OpenPGPEnvironmentBase):
"""
def verify_file(self, f):
- exitst, out, err = _spawn_gpg(['--verify'], None, f.read().encode('utf8'))
+ exitst, out, err = self._spawn_gpg(['--verify'], f.read().encode('utf8'))
if exitst != 0:
raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8'))
@@ -103,8 +103,8 @@ class OpenPGPSystemEnvironment(OpenPGPEnvironmentBase):
args = []
if keyid is not None:
args += ['--local-user', keyid]
- exitst, out, err = _spawn_gpg(['--clearsign'] + args, None,
- f.read().encode('utf8'))
+ exitst, out, err = self._spawn_gpg(['--clearsign'] + args,
+ f.read().encode('utf8'))
if exitst != 0:
raise gemato.exceptions.OpenPGPSigningFailure(err.decode('utf8'))
@@ -120,11 +120,11 @@ class OpenPGPEnvironment(OpenPGPEnvironmentBase):
or use as a context manager (via 'with').
"""
- __slots__ = ['_home', '_impl']
+ __slots__ = ['_home']
def __init__(self):
+ super(OpenPGPEnvironment, self).__init__()
self._home = tempfile.mkdtemp()
- self._impl = None
with open(os.path.join(self._home, 'gpg-agent.conf'), 'w') as f:
f.write('''# autogenerated by gemato
@@ -161,12 +161,12 @@ disable-scdaemon
self._home = None
def import_key(self, keyfile):
- exitst, out, err = _spawn_gpg(['--import'], self, keyfile.read())
+ exitst, out, err = self._spawn_gpg(['--import'], keyfile.read())
if exitst != 0:
raise RuntimeError('Unable to import key: {}'.format(err.decode('utf8')))
def verify_file(self, f):
- exitst, out, err = _spawn_gpg(['--verify'], self, f.read().encode('utf8'))
+ exitst, out, err = self._spawn_gpg(['--verify'], f.read().encode('utf8'))
if exitst != 0:
raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8'))
@@ -174,8 +174,8 @@ disable-scdaemon
args = []
if keyid is not None:
args += ['--local-user', keyid]
- exitst, out, err = _spawn_gpg(['--clearsign'] + args, self,
- f.read().encode('utf8'))
+ exitst, out, err = self._spawn_gpg(['--clearsign'] + args,
+ f.read().encode('utf8'))
if exitst != 0:
raise gemato.exceptions.OpenPGPSigningFailure(err.decode('utf8'))
@@ -187,3 +187,7 @@ disable-scdaemon
raise RuntimeError(
'OpenPGPEnvironment has been closed')
return self._home
+
+ def _spawn_gpg(self, options, stdin):
+ env = {'GNUPGHOME': self.home}
+ return super(OpenPGPEnvironment, self)._spawn_gpg(options, stdin, env)