summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2020-08-28 22:18:49 +0200
committerMichał Górny <mgorny@gentoo.org>2020-08-28 23:45:02 +0200
commit3e0b5c687d8b6216226d610e4492d20db4f1bef1 (patch)
tree538be74fb963ee336f1745506f5243aa1e95d4c8
parent9021d449d35da8d40999bc0a945b52f9d8bc1640 (diff)
downloadgemato-3e0b5c687d8b6216226d610e4492d20db4f1bef1.tar.gz
Reformat gemato code and fix PEP8 compliance
Signed-off-by: Michał Górny <mgorny@gentoo.org>
-rw-r--r--gemato/cli.py344
-rw-r--r--gemato/compression.py10
-rw-r--r--gemato/exceptions.py75
-rw-r--r--gemato/find_top_level.py20
-rw-r--r--gemato/hash.py117
-rw-r--r--gemato/manifest.py229
-rw-r--r--gemato/openpgp.py133
-rw-r--r--gemato/profile.py24
-rw-r--r--gemato/recursiveloader.py348
-rw-r--r--gemato/verify.py37
-rw-r--r--tox.ini2
11 files changed, 732 insertions, 607 deletions
diff --git a/gemato/cli.py b/gemato/cli.py
index c8e1813..08d908a 100644
--- a/gemato/cli.py
+++ b/gemato/cli.py
@@ -14,13 +14,19 @@ import os.path
import sys
import timeit
-import gemato.exceptions
-import gemato.find_top_level
-import gemato.hash
-import gemato.manifest
-import gemato.openpgp
-import gemato.profile
-import gemato.recursiveloader
+from gemato.exceptions import GematoException
+from gemato.find_top_level import find_top_level_manifest
+from gemato.hash import hash_file, hash_path
+from gemato.manifest import (
+ ManifestFileEntry,
+ manifest_hashes_to_hashlib,
+ )
+from gemato.openpgp import (
+ OpenPGPEnvironment,
+ OpenPGPSystemEnvironment,
+ )
+from gemato.profile import get_profile_by_name
+from gemato.recursiveloader import ManifestRecursiveLoader
def verify_failure(e):
@@ -81,26 +87,28 @@ class BaseOpenPGPMixin(object):
"""
def __init__(self):
- super(BaseOpenPGPMixin, self).__init__()
+ super().__init__()
self.openpgp_env = None
def add_options(self, subp):
- super(BaseOpenPGPMixin, self).add_options(subp)
+ super().add_options(subp)
- subp.add_argument('-K', '--openpgp-key',
- help='Use only the OpenPGP key(s) from a specific file')
- subp.add_argument('--proxy',
- help='Use HTTP proxy')
+ subp.add_argument(
+ '-K', '--openpgp-key',
+ help='Use only the OpenPGP key(s) from a specific file')
+ subp.add_argument(
+ '--proxy',
+ help='Use HTTP proxy')
def parse_args(self, args, argp):
- super(BaseOpenPGPMixin, self).parse_args(args, argp)
+ super().parse_args(args, argp)
# use isolated environment if key is specified;
# system environment otherwise
if args.openpgp_key is not None:
- env_class = gemato.openpgp.OpenPGPEnvironment
+ env_class = OpenPGPEnvironment
else:
- env_class = gemato.openpgp.OpenPGPSystemEnvironment
+ env_class = OpenPGPSystemEnvironment
self.openpgp_env = env_class(debug=args.debug,
proxy=args.proxy)
@@ -109,7 +117,7 @@ class BaseOpenPGPMixin(object):
self.openpgp_env.import_key(f)
def cleanup(self):
- super(BaseOpenPGPMixin, self).cleanup()
+ super().cleanup()
if self.openpgp_env is not None:
self.openpgp_env.close()
@@ -121,21 +129,24 @@ class VerifyingOpenPGPMixin(BaseOpenPGPMixin):
"""
def add_options(self, subp):
- super(VerifyingOpenPGPMixin, self).add_options(subp)
-
- subp.add_argument('-R', '--no-refresh-keys', action='store_false',
- dest='refresh_keys',
- help='Disable refreshing OpenPGP key (prevents network access, '
- +'applicable when using -K only)')
- subp.add_argument('-W', '--no-wkd', action='store_false',
- dest='allow_wkd',
- help='Do not attempt to use WKD to refetch keys (use '
- +'keyservers only)')
- subp.add_argument('--keyserver',
- help='Force custom keyserver URL')
+ super().add_options(subp)
+
+ subp.add_argument(
+ '-R', '--no-refresh-keys', action='store_false',
+ dest='refresh_keys',
+ help='Disable refreshing OpenPGP key (prevents network '
+ 'access, applicable when using -K only)')
+ subp.add_argument(
+ '-W', '--no-wkd', action='store_false',
+ dest='allow_wkd',
+ help='Do not attempt to use WKD to refetch keys (use '
+ 'keyservers only)')
+ subp.add_argument(
+ '--keyserver',
+ help='Force custom keyserver URL')
def parse_args(self, args, argp):
- super(VerifyingOpenPGPMixin, self).parse_args(args, argp)
+ super().parse_args(args, argp)
if args.openpgp_key is not None:
# always refresh keys to check for revocation
@@ -153,16 +164,19 @@ class BaseManifestLoaderMixin(object):
"""
def add_options(self, subp):
- super(BaseManifestLoaderMixin, self).add_options(subp)
+ super().add_options(subp)
- subp.add_argument('-j', '--jobs', type=int,
- help='Specify the maximum number of parallel jobs to use (default: {})'
- .format(multiprocessing.cpu_count()))
- subp.add_argument('-x', '--one-file-system', action='store_true',
- help='Do not cross filesystem boundaries (report an error instead)')
+ subp.add_argument(
+ '-j', '--jobs', type=int,
+ help=f'Specify the maximum number of parallel jobs to use '
+ f'(default: {multiprocessing.cpu_count()})')
+ subp.add_argument(
+ '-x', '--one-file-system', action='store_true',
+ help='Do not cross filesystem boundaries (report an error '
+ 'instead)')
def parse_args(self, args, argp):
- super(BaseManifestLoaderMixin, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.init_kwargs = {}
if args.jobs is not None:
@@ -179,20 +193,25 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin,
help = 'Verify one or more directories against Manifests'
def add_options(self, verify):
- super(VerifyCommand, self).add_options(verify)
-
- verify.add_argument('paths', nargs='*', default=['.'],
- help='Paths to verify (defaults to "." if none specified)')
- verify.add_argument('-k', '--keep-going', action='store_true',
- help='Continue reporting errors rather than terminating on the first failure')
- verify.add_argument('-P', '--no-openpgp-verify', action='store_false',
- dest='openpgp_verify',
- help='Disable OpenPGP verification of signed Manifests')
- verify.add_argument('-s', '--require-signed-manifest', action='store_true',
- help='Require that the top-level Manifest is OpenPGP signed')
+ super().add_options(verify)
+
+ verify.add_argument(
+ 'paths', nargs='*', default=['.'],
+ help='Paths to verify (defaults to "." if none specified)')
+ verify.add_argument(
+ '-k', '--keep-going', action='store_true',
+ help='Continue reporting errors rather than terminating '
+ 'on the first failure')
+ verify.add_argument(
+ '-P', '--no-openpgp-verify', action='store_false',
+ dest='openpgp_verify',
+ help='Disable OpenPGP verification of signed Manifests')
+ verify.add_argument(
+ '-s', '--require-signed-manifest', action='store_true',
+ help='Require that the top-level Manifest is OpenPGP signed')
def parse_args(self, args, argp):
- super(VerifyCommand, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.paths = args.paths
self.require_signed_manifest = args.require_signed_manifest
@@ -205,37 +224,40 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin,
self.init_kwargs['verify_openpgp'] = False
def __call__(self):
- super(VerifyCommand, self).__call__()
+ super().__call__()
ret = True
for p in self.paths:
- tlm = gemato.find_top_level.find_top_level_manifest(p)
+ tlm = find_top_level_manifest(p)
if tlm is None:
- logging.error('Top-level Manifest not found in {}'.format(p))
+ logging.error(f'Top-level Manifest not found in {p}')
return 1
start = timeit.default_timer()
- m = gemato.recursiveloader.ManifestRecursiveLoader(tlm,
- **self.init_kwargs)
+ m = ManifestRecursiveLoader(tlm, **self.init_kwargs)
if self.require_signed_manifest and not m.openpgp_signed:
- logging.error('Top-level Manifest {} is not OpenPGP signed'.format(tlm))
+ logging.error(f'Top-level Manifest {tlm} is not '
+ f'OpenPGP signed')
return 1
ts = m.find_timestamp()
if ts:
- logging.info('Manifest timestamp: {} UTC'.format(ts.ts))
+ logging.info(f'Manifest timestamp: {ts.ts} UTC')
if m.openpgp_signed:
logging.info('Valid OpenPGP signature found:')
- logging.info('- primary key: {}'.format(
- m.openpgp_signature.primary_key_fingerprint))
- logging.info('- subkey: {}'.format(
- m.openpgp_signature.fingerprint))
- logging.info('- timestamp: {} UTC'.format(
- m.openpgp_signature.timestamp))
-
- logging.info('Verifying {}...'.format(p))
+ logging.info(
+ f'- primary key: '
+ f'{m.openpgp_signature.primary_key_fingerprint}')
+ logging.info(
+ f'- subkey: '
+ f'{m.openpgp_signature.fingerprint}')
+ logging.info(
+ f'- timestamp: '
+ f'{m.openpgp_signature.timestamp} UTC')
+
+ logging.info(f'Verifying {p}...')
relpath = os.path.relpath(p, os.path.dirname(tlm))
if relpath == '.':
@@ -243,7 +265,7 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin,
ret &= m.assert_directory_verifies(relpath, **self.kwargs)
stop = timeit.default_timer()
- logging.info('{} verified in {:.2f} seconds'.format(p, stop - start))
+ logging.info(f'{p} verified in {stop - start:.2f} seconds')
return 0 if ret else 1
@@ -254,32 +276,43 @@ class BaseUpdateMixin(BaseManifestLoaderMixin, BaseOpenPGPMixin):
"""
def add_options(self, update):
- super(BaseUpdateMixin, self).add_options(update)
-
- update.add_argument('-c', '--compress-watermark', type=int,
- help='Minimum Manifest size for files to be compressed')
- update.add_argument('-C', '--compress-format',
- help='Format for compressed files (e.g. "gz", "bz2"...)')
- update.add_argument('-f', '--force-rewrite', action='store_true',
- help='Force rewriting all the Manifests, even if they did not change')
- update.add_argument('-H', '--hashes',
- help='Whitespace-separated list of hashes to use')
- update.add_argument('-k', '--openpgp-id',
- help='Use the specified OpenPGP key (by ID or user)')
- update.add_argument('-p', '--profile',
- help='Use the specified profile ("default", "ebuild", "old-ebuild"...)')
+ super().add_options(update)
+
+ update.add_argument(
+ '-c', '--compress-watermark', type=int,
+ help='Minimum Manifest size for files to be compressed')
+ update.add_argument(
+ '-C', '--compress-format',
+ help='Format for compressed files (e.g. "gz", "bz2"...)')
+ update.add_argument(
+ '-f', '--force-rewrite', action='store_true',
+ help='Force rewriting all the Manifests, even if they did '
+ 'not change')
+ update.add_argument(
+ '-H', '--hashes',
+ help='Whitespace-separated list of hashes to use')
+ update.add_argument(
+ '-k', '--openpgp-id',
+ help='Use the specified OpenPGP key (by ID or user)')
+ update.add_argument(
+ '-p', '--profile',
+ help='Use the specified profile ("default", "ebuild", '
+ '"old-ebuild"...)')
signgroup = update.add_mutually_exclusive_group()
- signgroup.add_argument('-s', '--sign', action='store_true',
- default=None,
- help='Force signing the top-level Manifest')
- signgroup.add_argument('-S', '--no-sign', action='store_false',
- dest='sign',
- help='Disable signing the top-level Manifest')
- update.add_argument('-t', '--timestamp', action='store_true',
- help='Include TIMESTAMP entry in Manifest')
+ signgroup.add_argument(
+ '-s', '--sign', action='store_true',
+ default=None,
+ help='Force signing the top-level Manifest')
+ signgroup.add_argument(
+ '-S', '--no-sign', action='store_false',
+ dest='sign',
+ help='Disable signing the top-level Manifest')
+ update.add_argument(
+ '-t', '--timestamp', action='store_true',
+ help='Include TIMESTAMP entry in Manifest')
def parse_args(self, args, argp):
- super(BaseUpdateMixin, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.timestamp = args.timestamp
@@ -299,8 +332,8 @@ class BaseUpdateMixin(BaseManifestLoaderMixin, BaseOpenPGPMixin):
if args.openpgp_id is not None:
self.init_kwargs['openpgp_keyid'] = args.openpgp_id
if args.profile is not None:
- self.init_kwargs['profile'] = gemato.profile.get_profile_by_name(
- args.profile)
+ self.init_kwargs['profile'] = (
+ get_profile_by_name(args.profile))
if args.sign is not None:
self.init_kwargs['sign_openpgp'] = args.sign
@@ -310,56 +343,62 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand):
help = 'Update the Manifest entries for one or more directory trees'
def add_options(self, update):
- super(UpdateCommand, self).add_options(update)
+ super().add_options(update)
- update.add_argument('paths', nargs='*', default=['.'],
- help='Paths to update (defaults to "." if none specified)')
- update.add_argument('-i', '--incremental', action='store_true',
- help='Perform incremental update by comparing mtimes against TIMESTAMP')
+ update.add_argument(
+ 'paths', nargs='*', default=['.'],
+ help='Paths to update (defaults to "." if none specified)')
+ update.add_argument(
+ '-i', '--incremental', action='store_true',
+ help='Perform incremental update by comparing mtimes '
+ 'against TIMESTAMP')
def parse_args(self, args, argp):
- super(UpdateCommand, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.paths = args.paths
self.incremental = args.incremental
def __call__(self):
- super(UpdateCommand, self).__call__()
+ super().__call__()
for p in self.paths:
- tlm = gemato.find_top_level.find_top_level_manifest(p)
+ tlm = find_top_level_manifest(p)
if tlm is None:
- logging.error('Top-level Manifest not found in {}'.format(p))
+ logging.error(f'Top-level Manifest not found in {p}')
return 1
start = timeit.default_timer()
- m = gemato.recursiveloader.ManifestRecursiveLoader(tlm,
- **self.init_kwargs)
+ m = ManifestRecursiveLoader(tlm, **self.init_kwargs)
# if not specified by user, profile must set it
if m.hashes is None:
- logging.error('--hashes must be specified if not implied by --profile')
+ logging.error('--hashes must be specified if not '
+ 'implied by --profile')
return 1
relpath = os.path.relpath(p, os.path.dirname(tlm))
if relpath == '.':
relpath = ''
if self.timestamp and relpath != '':
- logging.error('Timestamp can only be updated if doing full-tree update')
+ logging.error('Timestamp can only be updated if doing '
+ 'full-tree update')
return 1
update_kwargs = {}
if self.incremental:
if relpath != '':
- logging.error('Incremental works only for full-tree update')
+ logging.error('Incremental works only for '
+ 'full-tree update')
return 1
last_ts = m.find_timestamp()
if last_ts is None:
- logging.error('Incremental specified but no timestamp in Manifest')
+ logging.error('Incremental specified but no '
+ 'timestamp in Manifest')
return 1
update_kwargs['last_mtime'] = last_ts.ts.timestamp()
- logging.info('Updating Manifests in {}...'.format(p))
+ logging.info(f'Updating Manifests in {p}...')
start_ts = datetime.datetime.utcnow()
m.update_entries_for_directory(relpath, **update_kwargs)
@@ -378,7 +417,7 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand):
m.save_manifests(**self.save_kwargs)
stop = timeit.default_timer()
- logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
+ logging.info(f'{p} updated in {stop - start:.2f} seconds')
return 0
@@ -388,31 +427,34 @@ class CreateCommand(BaseUpdateMixin, GematoCommand):
help = 'Create a Manifest tree starting at the specified file'
def add_options(self, create):
- super(CreateCommand, self).add_options(create)
+ super().add_options(create)
- create.add_argument('paths', nargs='*', default=['.'],
- help='Paths to create Manifest in (defaults to "." if none specified)')
+ create.add_argument(
+ 'paths', nargs='*', default=['.'],
+ help='Paths to create Manifest in (defaults to "." '
+ 'if none specified)')
def parse_args(self, args, argp):
- super(CreateCommand, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.init_kwargs['allow_create'] = True
self.paths = args.paths
def __call__(self):
- super(CreateCommand, self).__call__()
+ super().__call__()
for p in self.paths:
start = timeit.default_timer()
- m = gemato.recursiveloader.ManifestRecursiveLoader(
- os.path.join(p, 'Manifest'), **self.init_kwargs)
+ m = ManifestRecursiveLoader(
+ os.path.join(p, 'Manifest'), **self.init_kwargs)
# if not specified by user, profile must set it
if m.hashes is None:
- logging.error('--hashes must be specified if not implied by --profile')
+ logging.error('--hashes must be specified if not '
+ 'implied by --profile')
return 1
- logging.info('Creating Manifests in {}...'.format(p))
+ logging.info(f'Creating Manifests in {p}...')
start_ts = datetime.datetime.utcnow()
m.update_entries_for_directory()
@@ -424,7 +466,7 @@ class CreateCommand(BaseUpdateMixin, GematoCommand):
m.save_manifests(**self.save_kwargs)
stop = timeit.default_timer()
- logging.info('{} updated in {:.2f} seconds'.format(p, stop - start))
+ logging.info(f'{p} updated in {stop - start:.2f} seconds')
return 0
@@ -434,56 +476,62 @@ class HashCommand(GematoCommand):
help = 'Generate hashes for specified file(s) and/or stdin'
def add_options(self, subp):
- super(HashCommand, self).add_options(subp)
+ super().add_options(subp)
- subp.add_argument('paths', nargs='*', default=['-'],
- help='Paths to hash (defaults to "-" (stdin) if not specified)')
- subp.add_argument('-H', '--hashes', required=True,
- help='Whitespace-separated list of hashes to use')
+ subp.add_argument(
+ 'paths', nargs='*', default=['-'],
+ help='Paths to hash (defaults to "-" (stdin) if not '
+ 'specified)')
+ subp.add_argument(
+ '-H', '--hashes', required=True,
+ help='Whitespace-separated list of hashes to use')
def parse_args(self, args, argp):
- super(HashCommand, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.hashes = sorted(args.hashes.split())
self.paths = args.paths
def __call__(self):
- super(HashCommand, self).__call__()
+ super().__call__()
- hashlib_hashes = list(
- gemato.manifest.manifest_hashes_to_hashlib(self.hashes))
+ hashlib_hashes = list(manifest_hashes_to_hashlib(self.hashes))
hashlib_hashes.append('__size__')
for p in self.paths:
if p == '-':
- h = gemato.hash.hash_file(sys.stdin.buffer,
- hashlib_hashes)
+ h = hash_file(sys.stdin.buffer, hashlib_hashes)
else:
- h = gemato.hash.hash_path(p, hashlib_hashes)
+ h = hash_path(p, hashlib_hashes)
sz = h.pop('__size__')
- e = gemato.manifest.ManifestFileEntry(p, sz,
- dict((mh, h[hh]) for mh, hh in zip(self.hashes, hashlib_hashes)))
+ e = ManifestFileEntry(
+ p, sz,
+ dict((mh, h[hh]) for mh, hh
+ in zip(self.hashes, hashlib_hashes)))
print(' '.join(e.to_list('DATA' if p != '-' else 'STDIN')))
class OpenPGPVerifyCommand(VerifyingOpenPGPMixin, GematoCommand):
name = 'openpgp-verify'
- help = 'Verify OpenPGP signatures embedded in specified file(s) and/or stdin'
+ help = ('Verify OpenPGP signatures embedded in specified file(s) '
+ 'and/or stdin')
def add_options(self, subp):
- super(OpenPGPVerifyCommand, self).add_options(subp)
+ super().add_options(subp)
- subp.add_argument('paths', nargs='*', default=['-'],
- help='Paths to hash (defaults to "-" (stdin) if not specified)')
+ subp.add_argument(
+ 'paths', nargs='*', default=['-'],
+ help='Paths to hash (defaults to "-" (stdin) if not '
+ 'specified)')
def parse_args(self, args, argp):
- super(OpenPGPVerifyCommand, self).parse_args(args, argp)
+ super().parse_args(args, argp)
self.paths = args.paths
def __call__(self):
- super(OpenPGPVerifyCommand, self).__call__()
+ super().__call__()
ret = True
@@ -496,19 +544,17 @@ class OpenPGPVerifyCommand(VerifyingOpenPGPMixin, GematoCommand):
try:
try:
sig = self.openpgp_env.verify_file(f)
- except gemato.exceptions.GematoException as e:
- logging.error(u'OpenPGP verification failed for {}:\n{}'
- .format(p, e))
+ except GematoException as e:
+ logging.error(
+ f'OpenPGP verification failed for {p}:\n{e}')
ret = False
else:
- logging.info('Valid OpenPGP signature found in {}:'
- .format(p))
- logging.info('- primary key: {}'.format(
- sig.primary_key_fingerprint))
- logging.info('- subkey: {}'.format(
- sig.fingerprint))
- logging.info('- timestamp: {} UTC'.format(
- sig.timestamp))
+ logging.info(
+ f'Valid OpenPGP signature found in {p}:')
+ logging.info(
+ f'- primary key: {sig.primary_key_fingerprint}')
+ logging.info(f'- subkey: {sig.fingerprint}')
+ logging.info(f'- timestamp: {sig.timestamp} UTC')
finally:
if p != '-':
f.close()
@@ -541,7 +587,7 @@ def main(argv):
return vals.cmd()
finally:
vals.cmd.cleanup()
- except gemato.exceptions.GematoException as e:
+ except GematoException as e:
logging.error(e)
return 1
diff --git a/gemato/compression.py b/gemato/compression.py
index cf021cd..e573153 100644
--- a/gemato/compression.py
+++ b/gemato/compression.py
@@ -9,7 +9,7 @@ import io
import lzma
import os.path
-import gemato.exceptions
+from gemato.exceptions import UnsupportedCompression
def open_compressed_file(suffix, f, mode='rb'):
@@ -34,7 +34,7 @@ def open_compressed_file(suffix, f, mode='rb'):
elif suffix == "xz" and lzma is not None:
return lzma.LZMAFile(f, format=lzma.FORMAT_XZ, mode=mode)
- raise gemato.exceptions.UnsupportedCompression(suffix)
+ raise UnsupportedCompression(suffix)
class FileStack(object):
@@ -84,8 +84,8 @@ def open_potentially_compressed_path(path, mode, **kwargs):
f = io.open(path, bmode)
fs = FileStack([f])
try:
- cf = open_compressed_file(compression, f,
- bmode if kwargs else mode)
+ cf = open_compressed_file(
+ compression, f, bmode if kwargs else mode)
fs.files.append(cf)
# add a TextIOWrapper on top whenever we do not want
@@ -93,7 +93,7 @@ def open_potentially_compressed_path(path, mode, **kwargs):
if 'b' not in mode:
iow = io.TextIOWrapper(cf, **kwargs)
fs.files.append(iow)
- except:
+ except Exception:
fs.close()
raise
diff --git a/gemato/exceptions.py b/gemato/exceptions.py
index 4c53be4..22766ab 100644
--- a/gemato/exceptions.py
+++ b/gemato/exceptions.py
@@ -1,6 +1,6 @@
# gemato: exceptions
# vim:fileencoding=utf-8
-# (c) 2017 Michał Górny
+# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
class GematoException(Exception):
@@ -14,42 +14,42 @@ class UnsupportedCompression(GematoException):
__slots__ = ['suffix']
def __init__(self, suffix):
- super(UnsupportedCompression, self).__init__(suffix)
+ super().__init__(suffix)
self.suffix = suffix
def __str__(self):
- return u'Unsupported compression suffix: {}'.format(self.suffix)
+ return f'Unsupported compression suffix: {self.suffix}'
class UnsupportedHash(GematoException):
__slots__ = ['hash_name']
def __init__(self, hash_name):
- super(UnsupportedHash, self).__init__(hash_name)
+ super().__init__(hash_name)
self.hash_name = hash_name
def __str__(self):
- return u'Unsupported hash name: {}'.format(self.hash_name)
+ return f'Unsupported hash name: {self.hash_name}'
class ManifestSyntaxError(GematoException):
def __init__(self, message):
- super(ManifestSyntaxError, self).__init__(message)
+ super().__init__(message)
class ManifestIncompatibleEntry(GematoException):
__slots__ = ['e1', 'e2', 'diff']
def __init__(self, e1, e2, diff):
- super(ManifestIncompatibleEntry, self).__init__(e1, e2, diff)
+ super().__init__(e1, e2, diff)
self.e1 = e1
self.e2 = e2
self.diff = diff
def __str__(self):
- msg = u"Incompatible Manifest entries for {}".format(self.e1.path)
+ msg = f'Incompatible Manifest entries for {self.e1.path}'
for k, d1, d2 in self.diff:
- msg += u"\n {}: e1: {}, e2: {}".format(k, d1, d2)
+ msg += f'\n {k}: e1: {d1}, e2: {d2}'
return msg
@@ -61,15 +61,15 @@ class ManifestMismatch(GematoException):
__slots__ = ['path', 'entry', 'diff']
def __init__(self, path, entry, diff):
- super(ManifestMismatch, self).__init__(path, entry, diff)
+ super().__init__(path, entry, diff)
self.path = path
self.entry = entry
self.diff = diff
def __str__(self):
- msg = u"Manifest mismatch for {}".format(self.path)
+ msg = f'Manifest mismatch for {self.path}'
for k, exp, got in self.diff:
- msg += u"\n {}: expected: {}, have: {}".format(k, exp, got)
+ msg += f'\n {k}: expected: {exp}, have: {got}'
return msg
@@ -81,12 +81,12 @@ class ManifestCrossDevice(GematoException):
__slots__ = ['path']
def __init__(self, path):
- super(ManifestCrossDevice, self).__init__(path)
+ super().__init__(path)
self.path = path
def __str__(self):
- return (u"Path {} crosses filesystem boundaries, it must be IGNORE-d explicitly"
- .format(self.path))
+ return (f'Path {self.path} crosses filesystem boundaries, it '
+ f'must be IGNORE-d explicitly')
class ManifestSymlinkLoop(GematoException):
@@ -98,12 +98,12 @@ class ManifestSymlinkLoop(GematoException):
__slots__ = ['path']
def __init__(self, path):
- super(ManifestSymlinkLoop, self).__init__(path)
+ super().__init__(path)
self.path = path
def __str__(self):
- return (u"Path {} is a symlink to one of its parent directories, it must be IGNORE-d explicitly"
- .format(self.path))
+ return (f'Path {self.path} is a symlink to one of its parent '
+ f'directories, it must be IGNORE-d explicitly')
class ManifestUnsignedData(GematoException):
@@ -113,7 +113,7 @@ class ManifestUnsignedData(GematoException):
"""
def __str__(self):
- return u"Unsigned data found in an OpenPGP signed Manifest"
+ return 'Unsigned data found in an OpenPGP signed Manifest'
class OpenPGPRuntimeError(GematoException):
@@ -124,7 +124,7 @@ class OpenPGPRuntimeError(GematoException):
__slots__ = ['output']
def __init__(self, output):
- super(OpenPGPRuntimeError, self).__init__(output)
+ super().__init__(output)
self.output = output
@@ -134,7 +134,7 @@ class OpenPGPKeyImportError(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP key import failed:\n{}".format(self.output)
+ return f'OpenPGP key import failed:\n{self.output}'
class OpenPGPKeyRefreshError(OpenPGPRuntimeError):
@@ -143,7 +143,7 @@ class OpenPGPKeyRefreshError(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP keyring refresh failed:\n{}".format(self.output)
+ return f'OpenPGP keyring refresh failed:\n{self.output}'
class OpenPGPVerificationFailure(OpenPGPRuntimeError):
@@ -152,7 +152,7 @@ class OpenPGPVerificationFailure(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP verification failed:\n{}".format(self.output)
+ return f'OpenPGP verification failed:\n{self.output}'
class OpenPGPExpiredKeyFailure(OpenPGPRuntimeError):
@@ -161,7 +161,8 @@ class OpenPGPExpiredKeyFailure(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP signature rejected because of expired key:\n{}".format(self.output)
+ return (f'OpenPGP signature rejected because of expired key:\n'
+ f'{self.output}')
class OpenPGPRevokedKeyFailure(OpenPGPRuntimeError):
@@ -170,7 +171,8 @@ class OpenPGPRevokedKeyFailure(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP signature rejected because of revoked key:\n{}".format(self.output)
+ return (f'OpenPGP signature rejected because of revoked key:\n'
+ f'{self.output}')
class OpenPGPUnknownSigFailure(OpenPGPRuntimeError):
@@ -180,7 +182,8 @@ class OpenPGPUnknownSigFailure(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP signature rejected for unknown reason:\n{}".format(self.output)
+ return (f'OpenPGP signature rejected for unknown reason:\n'
+ f'{self.output}')
class OpenPGPSigningFailure(OpenPGPRuntimeError):
@@ -189,7 +192,7 @@ class OpenPGPSigningFailure(OpenPGPRuntimeError):
"""
def __str__(self):
- return u"OpenPGP signing failed:\n{}".format(self.output)
+ return f'OpenPGP signing failed:\n{self.output}'
class OpenPGPNoImplementation(GematoException):
@@ -199,7 +202,8 @@ class OpenPGPNoImplementation(GematoException):
"""
def __str__(self):
- return u"No supported OpenPGP implementation found (install gnupg)"
+ return ('No supported OpenPGP implementation found (install '
+ 'gnupg)')
class ManifestInvalidPath(GematoException):
@@ -211,13 +215,14 @@ class ManifestInvalidPath(GematoException):
__slots__ = ['path', 'detail']
def __init__(self, path, detail):
- super(ManifestInvalidPath, self).__init__(path, detail)
+ super().__init__(path, detail)
self.path = path
self.detail = detail
def __str__(self):
- return (u"Attempting to add invalid path {} to Manifest: {} must not be {}"
- .format(self.path, self.detail[0], self.detail[1]))
+ return (f'Attempting to add invalid path {self.path} to '
+ f'Manifest: {self.detail[0]} must not be '
+ f'{self.detail[1]}')
class ManifestInvalidFilename(GematoException):
@@ -228,10 +233,12 @@ class ManifestInvalidFilename(GematoException):
__slots__ = ['filename', 'pos']
def __init__(self, filename, pos):
- super(ManifestInvalidFilename, self).__init__(filename, pos)
+ super().__init__(filename, pos)
self.filename = filename
self.pos = pos
def __str__(self):
- return (u"Attempting to add invalid filename {!r} to Manifest: disallowed character U+{:04X} at position {}"
- .format(self.filename, ord(self.filename[self.pos]), self.pos))
+ return (f'Attempting to add invalid filename {self.filename!r} '
+ f'to Manifest: disallowed character '
+ f'U+{ord(self.filename[self.pos]):04X} at position '
+ f'{self.pos}')
diff --git a/gemato/find_top_level.py b/gemato/find_top_level.py
index 17e743b..d049f17 100644
--- a/gemato/find_top_level.py
+++ b/gemato/find_top_level.py
@@ -1,14 +1,17 @@
# gemato: Top-level Manifest finding routine
# vim:fileencoding=utf-8
-# (c) 2017-2018 Michał Górny
+# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
import errno
import os
import os.path
-import gemato.compression
-import gemato.manifest
+from gemato.compression import (
+ get_potential_compressed_names,
+ open_potentially_compressed_path,
+ )
+from gemato.manifest import ManifestFile
def find_top_level_manifest(path='.', allow_xdev=True, allow_compressed=False):
@@ -31,14 +34,14 @@ def find_top_level_manifest(path='.', allow_xdev=True, allow_compressed=False):
cur_path = path
last_found = None
original_dev = None
- m = gemato.manifest.ManifestFile()
+ m = ManifestFile()
root_st = os.stat('/')
manifest_filenames = ('Manifest',)
if allow_compressed:
- manifest_filenames = list(gemato.compression
- .get_potential_compressed_names('Manifest'))
+ manifest_filenames = list(
+ get_potential_compressed_names('Manifest'))
while True:
st = os.stat(cur_path)
@@ -54,9 +57,8 @@ def find_top_level_manifest(path='.', allow_xdev=True, allow_compressed=False):
try:
# note: this is safe for allow_compressed=False
# since it detects compression by filename suffix
- with (gemato.compression
- .open_potentially_compressed_path(m_path, 'r',
- encoding='utf8')) as f:
+ with open_potentially_compressed_path(
+ m_path, 'r', encoding='utf8') as f:
fst = os.fstat(f.fileno())
if fst.st_dev != original_dev and not allow_xdev:
return last_found
diff --git a/gemato/hash.py b/gemato/hash.py
index 515bfd9..1423994 100644
--- a/gemato/hash.py
+++ b/gemato/hash.py
@@ -1,90 +1,89 @@
# gemato: hash support
# vim:fileencoding=utf-8
-# (c) 2017 Michał Górny
+# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
import hashlib
import io
-import gemato.exceptions
+from gemato.exceptions import UnsupportedHash
+
HASH_BUFFER_SIZE = 65536
MAX_SLURP_SIZE = 1048576
class SizeHash(object):
- """
- A cheap wrapper to count file size via hashlib-like interface.
- """
+ """A cheap wrapper to count file size via hashlib-like interface"""
- __slots__ = ['size']
+ __slots__ = ['size']
- def __init__(self):
- self.size = 0
+ def __init__(self):
+ self.size = 0
- def update(self, data):
- self.size += len(data)
+ def update(self, data):
+ self.size += len(data)
- def hexdigest(self):
- return self.size
+ def hexdigest(self):
+ return self.size
def get_hash_by_name(name):
- """
- Get a hashlib-compatible hash object for hash named @name. Supports
- multiple backends.
- """
- # special case hashes
- if name == '__size__':
- return SizeHash()
+ """
+ Get a hashlib-compatible hash object for hash named @name. Supports
+ multiple backends.
+ """
+ # special case hashes
+ if name == '__size__':
+ return SizeHash()
- # general hash support
- if name in hashlib.algorithms_available:
- return hashlib.new(name)
+ # general hash support
+ if name in hashlib.algorithms_available:
+ return hashlib.new(name)
- raise gemato.exceptions.UnsupportedHash(name)
+ raise UnsupportedHash(name)
def hash_file(f, hash_names, _apparent_size=0):
- """
- Hash the contents of file object @f using all hashes specified
- as @hash_names. Returns a dict of (hash_name -> hex value) mappings.
-
- @_apparent_size can be given as a tip on how large is the file
- expected to be. This is a private API used to workaround bug in PyPy
- and should not be relied on being present long-term.
- """
- hashes = {}
- for h in hash_names:
- hashes[h] = get_hash_by_name(h)
- if _apparent_size != 0 and _apparent_size < MAX_SLURP_SIZE:
- # if the file is reasonably small, read it all into one buffer;
- # we do this since PyPy has some serious bug in dealing with
- # passing buffers to C extensions and this apparently fails
- # less; https://bitbucket.org/pypy/pypy/issues/2752
- block = f.read()
- for h in hashes.values():
- h.update(block)
- else:
- for block in iter(lambda: f.read1(HASH_BUFFER_SIZE), b''):
- for h in hashes.values():
- h.update(block)
- return dict((k, h.hexdigest()) for k, h in hashes.items())
+ """
+ Hash the contents of file object @f using all hashes specified
+ as @hash_names. Returns a dict of (hash_name -> hex value) mappings.
+
+ @_apparent_size can be given as a tip on how large is the file
+ expected to be. This is a private API used to workaround bug in PyPy
+ and should not be relied on being present long-term.
+ """
+ hashes = {}
+ for h in hash_names:
+ hashes[h] = get_hash_by_name(h)
+ if _apparent_size != 0 and _apparent_size < MAX_SLURP_SIZE:
+ # if the file is reasonably small, read it all into one buffer;
+ # we do this since PyPy has some serious bug in dealing with
+ # passing buffers to C extensions and this apparently fails
+ # less; https://bitbucket.org/pypy/pypy/issues/2752
+ block = f.read()
+ for h in hashes.values():
+ h.update(block)
+ else:
+ for block in iter(lambda: f.read1(HASH_BUFFER_SIZE), b''):
+ for h in hashes.values():
+ h.update(block)
+ return dict((k, h.hexdigest()) for k, h in hashes.items())
def hash_path(path, hash_names):
- """
- Hash the contents of file at specified path @path using all hashes
- specified as @hash_names. Returns a dict of (hash_name -> hex value)
- mappings.
- """
- with io.open(path, 'rb') as f:
- return hash_file(f, hash_names)
+ """
+ Hash the contents of file at specified path @path using all hashes
+ specified as @hash_names. Returns a dict of (hash_name -> hex value)
+ mappings.
+ """
+ with io.open(path, 'rb') as f:
+ return hash_file(f, hash_names)
def hash_bytes(buf, hash_name):
- """
- Hash the data in provided buffer @buf using the hash @hash_name.
- Returns the hex value.
- """
- return hash_file(io.BytesIO(buf), (hash_name,))[hash_name]
+ """
+ Hash the data in provided buffer @buf using the hash @hash_name.
+ Returns the hex value.
+ """
+ return hash_file(io.BytesIO(buf), (hash_name,))[hash_name]
diff --git a/gemato/manifest.py b/gemato/manifest.py
index ae1eb68..aee035c 100644
--- a/gemato/manifest.py
+++ b/gemato/manifest.py
@@ -8,14 +8,18 @@ import io
import os.path
import re
-import gemato.exceptions
-import gemato.util
+from gemato.exceptions import (
+ ManifestSyntaxError,
+ ManifestUnsignedData,
+ )
+from gemato.util import (
+ path_starts_with,
+ path_inside_dir,
+ )
class ManifestEntryTIMESTAMP(object):
- """
- ISO-8601 timestamp.
- """
+ """ISO-8601 timestamp"""
__slots__ = ['ts']
tag = 'TIMESTAMP'
@@ -25,16 +29,18 @@ class ManifestEntryTIMESTAMP(object):
self.ts = ts
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- if len(l) != 2:
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: expects 1 value, got: {}'.format(l[0], l[1:]))
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ if len(data) != 2:
+ raise ManifestSyntaxError(
+ f'{data[0]} line: expects 1 value, got: {data[1:]}')
try:
- ts = datetime.datetime.strptime(l[1], '%Y-%m-%dT%H:%M:%SZ')
+ ts = datetime.datetime.strptime(data[1],
+ '%Y-%m-%dT%H:%M:%SZ')
except ValueError:
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: expected ISO8601 timestamp, got: {}'.format(l[0], l[1:]))
+ raise ManifestSyntaxError(
+ f'{data[0]} line: expected ISO8601 timestamp, '
+ f'got: {data[1:]}')
return cls(ts)
def to_list(self):
@@ -49,13 +55,12 @@ class ManifestEntryTIMESTAMP(object):
class ManifestPathEntry(object):
- """
- Base class for entries using a path.
- """
+ """Base class for entries using a path"""
__slots__ = ['path']
disallowed_path_re = re.compile(r'[\x00-\x1F\x7F-\x9F\s\\]', re.U)
- escape_seq_re = re.compile(r'\\(x[0-9a-fA-F]{2}|u[0-9a-fA-F]{4}|U[0-9a-fA-F]{8})?')
+ escape_seq_re = re.compile(
+ r'\\(x[0-9a-fA-F]{2}|u[0-9a-fA-F]{4}|U[0-9a-fA-F]{8})?')
def __init__(self, path):
self.path = path
@@ -64,30 +69,32 @@ class ManifestPathEntry(object):
def decode_char(m):
val = m.group(1)
if val is None:
- raise gemato.exceptions.ManifestSyntaxError(
- 'Invalid escape sequence at pos {} of: {}'.format(m.start(), m.string))
+ raise ManifestSyntaxError(
+ f'Invalid escape sequence at pos {m.start()} '
+ f'of: {m.string}')
return chr(int(val[1:], base=16))
@classmethod
- def process_path(cls, l):
- if len(l) != 2:
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: expects 1 value, got: {}'.format(l[0], l[1:]))
- if not l[1] or l[1][0] == '/':
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: expected relative path, got: {}'.format(l[0], l[1:]))
- return cls.escape_seq_re.sub(cls.decode_char, l[1])
+ def process_path(cls, data):
+ if len(data) != 2:
+ raise ManifestSyntaxError(
+ f'{data[0]} line: expects 1 value, got: {data[1:]}')
+ if not data[1] or data[1][0] == '/':
+ raise ManifestSyntaxError(
+ f'{data[0]} line: expected relative path, '
+ f'got: {data[1:]}')
+ return cls.escape_seq_re.sub(cls.decode_char, data[1])
@staticmethod
def encode_char(m):
assert len(m.group(0)) == 1
cp = ord(m.group(0))
if cp <= 0x7F:
- return '\\x{:02X}'.format(cp)
+ return f'\\x{cp:02X}'
elif cp <= 0xFFFF:
- return '\\u{:04X}'.format(cp)
+ return f'\\u{cp:04X}'
else:
- return '\\U{:08X}'.format(cp)
+ return f'\\U{cp:08X}'
@property
def encoded_path(self):
@@ -102,16 +109,14 @@ class ManifestPathEntry(object):
class ManifestEntryIGNORE(ManifestPathEntry):
- """
- Ignored path.
- """
+ """Ignored path"""
tag = 'IGNORE'
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- return cls(cls.process_path(l))
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ return cls(cls.process_path(data))
def to_list(self):
return (self.tag, self.encoded_path)
@@ -125,26 +130,28 @@ class ManifestFileEntry(ManifestPathEntry):
__slots__ = ['checksums', 'size']
def __init__(self, path, size, checksums):
- super(ManifestFileEntry, self).__init__(path)
+ super().__init__(path)
self.size = size
self.checksums = checksums
@staticmethod
- def process_checksums(l):
- if len(l) < 3:
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: expects at least 2 values, got: {}'.format(l[0], l[1:]))
+ def process_checksums(data):
+ if len(data) < 3:
+ raise ManifestSyntaxError(
+ f'{data[0]} line: expects at least 2 values, '
+ f'got: {data[1:]}')
try:
- size = int(l[2])
+ size = int(data[2])
if size < 0:
raise ValueError()
except ValueError:
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: size must be a non-negative integer, got: {}'.format(l[0], l[2]))
+ raise ManifestSyntaxError(
+ f'{data[0]} line: size must be a non-negative integer, '
+ f'got: {data[2]}')
checksums = {}
- it = iter(l[3:])
+ it = iter(data[3:])
while True:
try:
ckname = next(it)
@@ -153,8 +160,8 @@ class ManifestFileEntry(ManifestPathEntry):
try:
ckval = next(it)
except StopIteration:
- raise gemato.exceptions.ManifestSyntaxError(
- '{} line: checksum {} has no value'.format(l[0], ckname))
+ raise ManifestSyntaxError(
+ f'{data[0]} line: checksum {ckname} has no value')
checksums[ckname] = ckval
return size, checksums
@@ -166,7 +173,7 @@ class ManifestFileEntry(ManifestPathEntry):
return ret
def __eq__(self, other):
- return (super(ManifestFileEntry, self).__eq__(other)
+ return (super().__eq__(other)
and self.size == other.size
and self.checksums == other.checksums)
@@ -181,14 +188,14 @@ class ManifestEntryMANIFEST(ManifestFileEntry):
tag = 'MANIFEST'
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- path = cls.process_path(l[:2])
- size, checksums = cls.process_checksums(l)
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ path = cls.process_path(data[:2])
+ size, checksums = cls.process_checksums(data)
return cls(path, size, checksums)
def to_list(self):
- return super(ManifestEntryMANIFEST, self).to_list(self.tag)
+ return super().to_list(self.tag)
class ManifestEntryDATA(ManifestFileEntry):
@@ -199,14 +206,14 @@ class ManifestEntryDATA(ManifestFileEntry):
tag = 'DATA'
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- path = cls.process_path(l[:2])
- size, checksums = cls.process_checksums(l)
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ path = cls.process_path(data[:2])
+ size, checksums = cls.process_checksums(data)
return cls(path, size, checksums)
def to_list(self):
- return super(ManifestEntryDATA, self).to_list(self.tag)
+ return super().to_list(self.tag)
class ManifestEntryDIST(ManifestFileEntry):
@@ -217,16 +224,17 @@ class ManifestEntryDIST(ManifestFileEntry):
tag = 'DIST'
@classmethod
- def from_list(cls, l):
- path = cls.process_path(l[:2])
+ def from_list(cls, data):
+ path = cls.process_path(data[:2])
if '/' in path:
- raise gemato.exceptions.ManifestSyntaxError(
- 'DIST line: file name expected, got directory path: {}'.format(path))
- size, checksums = cls.process_checksums(l)
+ raise ManifestSyntaxError(
+ f'{data[0]} line: file name expected, got directory '
+ f'path: {path}')
+ size, checksums = cls.process_checksums(data)
return cls(path, size, checksums)
def to_list(self):
- return super(ManifestEntryDIST, self).to_list(self.tag)
+ return super().to_list(self.tag)
class ManifestEntryEBUILD(ManifestFileEntry):
@@ -237,14 +245,14 @@ class ManifestEntryEBUILD(ManifestFileEntry):
tag = 'EBUILD'
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- path = cls.process_path(l[:2])
- size, checksums = cls.process_checksums(l)
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ path = cls.process_path(data[:2])
+ size, checksums = cls.process_checksums(data)
return cls(path, size, checksums)
def to_list(self):
- return super(ManifestEntryEBUILD, self).to_list(self.tag)
+ return super().to_list(self.tag)
class ManifestEntryMISC(ManifestFileEntry):
@@ -255,14 +263,14 @@ class ManifestEntryMISC(ManifestFileEntry):
tag = 'MISC'
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- path = cls.process_path(l[:2])
- size, checksums = cls.process_checksums(l)
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ path = cls.process_path(data[:2])
+ size, checksums = cls.process_checksums(data)
return cls(path, size, checksums)
def to_list(self):
- return super(ManifestEntryMISC, self).to_list(self.tag)
+ return super().to_list(self.tag)
class ManifestEntryAUX(ManifestFileEntry):
@@ -275,19 +283,19 @@ class ManifestEntryAUX(ManifestFileEntry):
def __init__(self, aux_path, size, checksums):
self.aux_path = aux_path
- super(ManifestEntryAUX, self).__init__(
+ super().__init__(
os.path.join('files', aux_path), size, checksums)
@classmethod
- def from_list(cls, l):
- assert l[0] == cls.tag
- path = cls.process_path(l[:2])
- size, checksums = cls.process_checksums(l)
+ def from_list(cls, data):
+ assert data[0] == cls.tag
+ path = cls.process_path(data[:2])
+ size, checksums = cls.process_checksums(data)
return cls(path, size, checksums)
def to_list(self):
- ret = super(ManifestEntryAUX, self).to_list(self.tag)
- assert gemato.util.path_inside_dir(ret[1], 'files')
+ ret = super().to_list(self.tag)
+ assert path_inside_dir(ret[1], 'files')
ret[1] = ret[1][6:]
return ret
@@ -367,66 +375,67 @@ class ManifestFile(object):
state = ManifestState.DATA
openpgp_data = ''
- for l in f:
+ for line in f:
if state == ManifestState.DATA:
- if l == '-----BEGIN PGP SIGNED MESSAGE-----\n':
+ if line == '-----BEGIN PGP SIGNED MESSAGE-----\n':
if self.entries:
- raise gemato.exceptions.ManifestUnsignedData()
+ raise ManifestUnsignedData()
if verify_openpgp:
- openpgp_data += l
+ openpgp_data += line
state = ManifestState.SIGNED_PREAMBLE
continue
elif state == ManifestState.SIGNED_PREAMBLE:
if verify_openpgp:
- openpgp_data += l
+ openpgp_data += line
# skip header lines up to the empty line
- if l.strip():
+ if line.strip():
continue
state = ManifestState.SIGNED_DATA
elif state == ManifestState.SIGNED_DATA:
if verify_openpgp:
- openpgp_data += l
- if l == '-----BEGIN PGP SIGNATURE-----\n':
+ openpgp_data += line
+ if line == '-----BEGIN PGP SIGNATURE-----\n':
state = ManifestState.SIGNATURE
continue
# dash-escaping, RFC 4880 says any line can suffer from it
- if l.startswith('- '):
- l = l[2:]
+ if line.startswith('- '):
+ line = line[2:]
elif state == ManifestState.SIGNATURE:
if verify_openpgp:
- openpgp_data += l
- if l == '-----END PGP SIGNATURE-----\n':
+ openpgp_data += line
+ if line == '-----END PGP SIGNATURE-----\n':
state = ManifestState.POST_SIGNED_DATA
continue
- if l.startswith('-----') and l.rstrip().endswith('-----'):
- raise gemato.exceptions.ManifestSyntaxError(
- "Unexpected OpenPGP header: {}".format(l))
- if state in (ManifestState.SIGNED_PREAMBLE, ManifestState.SIGNATURE):
+ if line.startswith('-----') and line.rstrip().endswith('-----'):
+ raise ManifestSyntaxError(
+ f'Unexpected OpenPGP header: {line}')
+ if state in (ManifestState.SIGNED_PREAMBLE,
+ ManifestState.SIGNATURE):
continue
- sl = l.strip().split()
+ sl = line.strip().split()
# skip empty lines
if not sl:
continue
if state == ManifestState.POST_SIGNED_DATA:
- raise gemato.exceptions.ManifestUnsignedData()
+ raise ManifestUnsignedData()
tag = sl[0]
try:
- self.entries.append(MANIFEST_TAG_MAPPING[tag]
- .from_list(sl))
+ self.entries.append(
+ MANIFEST_TAG_MAPPING[tag].from_list(sl))
except KeyError:
- raise gemato.exceptions.ManifestSyntaxError(
- "Invalid Manifest line: {}".format(l))
+ raise ManifestSyntaxError(
+ f'Invalid Manifest line: {line}')
if state == ManifestState.SIGNED_PREAMBLE:
- raise gemato.exceptions.ManifestSyntaxError(
+ raise ManifestSyntaxError(
"Manifest terminated early, in OpenPGP headers")
elif state == ManifestState.SIGNED_DATA:
- raise gemato.exceptions.ManifestSyntaxError(
+ raise ManifestSyntaxError(
"Manifest terminated early, before signature")
elif state == ManifestState.SIGNATURE:
- raise gemato.exceptions.ManifestSyntaxError(
+ raise ManifestSyntaxError(
"Manifest terminated early, inside signature")
if verify_openpgp and state == ManifestState.POST_SIGNED_DATA:
@@ -436,7 +445,7 @@ class ManifestFile(object):
self.openpgp_signed = True
def dump(self, f, sign_openpgp=None, openpgp_keyid=None,
- openpgp_env=None, sort=False):
+ openpgp_env=None, sort=False):
"""
Dump data into file @f. The file should be open for writing
in text mode, and truncated to zero length.
@@ -468,7 +477,7 @@ class ManifestFile(object):
openpgp_env.clear_sign_file(data, f, keyid=openpgp_keyid)
else:
for e in self.entries:
- f.write(u' '.join(e.to_list()) + '\n')
+ f.write(' '.join(e.to_list()) + '\n')
def find_timestamp(self):
"""
@@ -491,7 +500,7 @@ class ManifestFile(object):
if e.tag == 'IGNORE':
# ignore matches recursively, so we process it separately
# py<3.5 does not have os.path.commonpath()
- if gemato.util.path_starts_with(path, e.path):
+ if path_starts_with(path, e.path):
return e
elif e.tag in ('DIST', 'TIMESTAMP'):
# distfiles are not local files, so skip them
@@ -523,7 +532,7 @@ class ManifestFile(object):
for e in self.entries:
if e.tag == 'MANIFEST':
mdir = os.path.dirname(e.path)
- if gemato.util.path_inside_dir(path, mdir):
+ if path_inside_dir(path, mdir):
yield e
diff --git a/gemato/openpgp.py b/gemato/openpgp.py
index a1c986d..09525a9 100644
--- a/gemato/openpgp.py
+++ b/gemato/openpgp.py
@@ -16,7 +16,16 @@ import subprocess
import tempfile
import urllib.parse
-import gemato.exceptions
+from gemato.exceptions import (
+ OpenPGPNoImplementation,
+ OpenPGPVerificationFailure,
+ OpenPGPExpiredKeyFailure,
+ OpenPGPRevokedKeyFailure,
+ OpenPGPKeyImportError,
+ OpenPGPKeyRefreshError,
+ OpenPGPUnknownSigFailure,
+ OpenPGPSigningFailure,
+ )
try:
import requests
@@ -67,7 +76,8 @@ class OpenPGPSystemEnvironment(object):
at the beginning.
"""
- raise NotImplementedError('import_key() is not implemented by this OpenPGP provider')
+ raise NotImplementedError(
+ 'import_key() is not implemented by this OpenPGP provider')
def refresh_keys(self, allow_wkd=True, keyserver=None):
"""
@@ -83,7 +93,8 @@ class OpenPGPSystemEnvironment(object):
it should specify a keyserver URL.
"""
- raise NotImplementedError('refresh_keys() is not implemented by this OpenPGP provider')
+ raise NotImplementedError(
+ 'refresh_keys() is not implemented by this OpenPGP provider')
def _parse_gpg_ts(self, ts):
"""
@@ -112,21 +123,21 @@ class OpenPGPSystemEnvironment(object):
[GNUPG, '--batch', '--status-fd', '1', '--verify'],
f.read().encode('utf8'))
if exitst != 0:
- raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8'))
+ raise OpenPGPVerificationFailure(err.decode('utf8'))
is_good = False
sig_data = None
# process the output of gpg to find the exact result
- for l in out.splitlines():
- if l.startswith(b'[GNUPG:] GOODSIG'):
+ for line in out.splitlines():
+ if line.startswith(b'[GNUPG:] GOODSIG'):
is_good = True
- elif l.startswith(b'[GNUPG:] EXPKEYSIG'):
- raise gemato.exceptions.OpenPGPExpiredKeyFailure(err.decode('utf8'))
- elif l.startswith(b'[GNUPG:] REVKEYSIG'):
- raise gemato.exceptions.OpenPGPRevokedKeyFailure(err.decode('utf8'))
- elif l.startswith(b'[GNUPG:] VALIDSIG'):
- spl = l.split(b' ')
+ elif line.startswith(b'[GNUPG:] EXPKEYSIG'):
+ raise OpenPGPExpiredKeyFailure(err.decode('utf8'))
+ elif line.startswith(b'[GNUPG:] REVKEYSIG'):
+ raise OpenPGPRevokedKeyFailure(err.decode('utf8'))
+ elif line.startswith(b'[GNUPG:] VALIDSIG'):
+ spl = line.split(b' ')
assert len(spl) >= 12
fp = spl[2].decode('utf8')
ts = self._parse_gpg_ts(spl[4].decode('utf8'))
@@ -137,7 +148,7 @@ class OpenPGPSystemEnvironment(object):
# require both GOODSIG and VALIDSIG
if not is_good or sig_data is None:
- raise gemato.exceptions.OpenPGPUnknownSigFailure(err.decode('utf8'))
+ raise OpenPGPUnknownSigFailure(err.decode('utf8'))
return sig_data
def clear_sign_file(self, f, outf, keyid=None):
@@ -158,7 +169,7 @@ class OpenPGPSystemEnvironment(object):
[GNUPG, '--batch', '--clearsign'] + args,
f.read().encode('utf8'))
if exitst != 0:
- raise gemato.exceptions.OpenPGPSigningFailure(err.decode('utf8'))
+ raise OpenPGPSigningFailure(err.decode('utf8'))
outf.write(out.decode('utf8'))
@@ -176,7 +187,7 @@ class OpenPGPSystemEnvironment(object):
except OSError as e:
if e.errno != errno.ENOENT:
raise
- raise gemato.exceptions.OpenPGPNoImplementation()
+ raise OpenPGPNoImplementation()
out, err = p.communicate(stdin)
return (p.wait(), out, err)
@@ -194,20 +205,20 @@ class OpenPGPEnvironment(OpenPGPSystemEnvironment):
__slots__ = ['_home', 'proxy']
def __init__(self, debug=False, proxy=None):
- super(OpenPGPEnvironment, self).__init__(debug=debug)
+ super().__init__(debug=debug)
self.proxy = proxy
self._home = tempfile.mkdtemp(prefix='gemato.')
with open(os.path.join(self._home, 'dirmngr.conf'), 'w') as f:
- f.write('''# autogenerated by gemato
+ f.write(f'''# autogenerated by gemato
# honor user's http_proxy setting
honor-http-proxy
# enable debugging, in case we needed it
-log-file {debug_file}
+log-file {os.path.join(self._home, 'dirmngr.log')}
debug-level guru
-'''.format(debug_file=os.path.join(self._home, 'dirmngr.log')))
+''')
with open(os.path.join(self._home, 'gpg.conf'), 'w') as f:
f.write('''# autogenerated by gemato
@@ -215,15 +226,15 @@ debug-level guru
trust-model always
''')
with open(os.path.join(self._home, 'gpg-agent.conf'), 'w') as f:
- f.write('''# autogenerated by gemato
+ f.write(f'''# autogenerated by gemato
# avoid any smartcard operations, we are running in isolation
disable-scdaemon
# enable debugging, in case we needed it
-log-file {debug_file}
+log-file {os.path.join(self._home, 'gpg-agent.log')}
debug-level guru
-'''.format(debug_file=os.path.join(self._home, 'gpg-agent.log')))
+''')
def __exit__(self, exc_type, exc_value, exc_cb):
if self._home is not None:
@@ -250,23 +261,22 @@ debug-level guru
ret, sout, serr = self._spawn_gpg(
[GNUPGCONF, '--kill', 'all'])
if ret != 0:
- logging.warning('{} --kill failed: {}'
- .format(GNUPGCONF, serr))
+ logging.warning(f'{GNUPGCONF} --kill failed: {serr}')
if not self.debug:
# we need to loop due to ENOTEMPTY potential
while os.path.isdir(self._home):
shutil.rmtree(self._home,
onerror=self._rmtree_error_handler)
else:
- logging.debug('GNUPGHOME left for debug purposes: {}'
- .format(self._home))
+ logging.debug(f'GNUPGHOME left for debug purposes: '
+ f'{self._home}')
self._home = None
def import_key(self, keyfile):
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--import'], keyfile.read())
if exitst != 0:
- raise gemato.exceptions.OpenPGPKeyImportError(err.decode('utf8'))
+ raise OpenPGPKeyImportError(err.decode('utf8'))
zbase32_translate = bytes.maketrans(
b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567',
@@ -275,8 +285,8 @@ debug-level guru
@classmethod
def get_wkd_url(cls, email):
localname, domain = email.encode('utf8').split(b'@', 1)
- b32 = (base64.b32encode(
- hashlib.sha1(localname.lower()).digest())
+ b32 = (
+ base64.b32encode(hashlib.sha1(localname.lower()).digest())
.translate(cls.zbase32_translate).decode())
uenc = urllib.parse.quote(localname)
ldomain = domain.lower().decode('utf8')
@@ -297,51 +307,52 @@ debug-level guru
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--with-colons', '--list-keys'])
if exitst != 0:
- raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8'))
+ raise OpenPGPKeyRefreshError(err.decode('utf8'))
# find keys and UIDs
addrs = set()
addrs_key = set()
keys = set()
prev_pub = None
- for l in out.splitlines():
+ for line in out.splitlines():
# were we expecting a fingerprint?
if prev_pub is not None:
- if l.startswith(b'fpr:'):
- fpr = l.split(b':')[9].decode('ASCII')
+ if line.startswith(b'fpr:'):
+ fpr = line.split(b':')[9].decode('ASCII')
assert fpr.endswith(prev_pub)
- logging.debug('refresh_keys_wkd(): fingerprint: {}'
- .format(fpr))
+ logging.debug(
+ f'refresh_keys_wkd(): fingerprint: {fpr}')
keys.add(fpr)
prev_pub = None
else:
# old GnuPG doesn't give fingerprints by default
# (but it doesn't support WKD either)
- logging.debug('refresh_keys_wkd(): failing due to old gpg')
+ logging.debug(
+ 'refresh_keys_wkd(): failing due to old gpg')
return False
- elif l.startswith(b'pub:'):
+ elif line.startswith(b'pub:'):
if keys:
# every key must have at least one UID
if not addrs_key:
- logging.debug('refresh_keys_wkd(): failing due to no UIDs')
+ logging.debug(
+ 'refresh_keys_wkd(): failing due to no UIDs')
return False
addrs.update(addrs_key)
addrs_key = set()
# wait for the fingerprint
- prev_pub = l.split(b':')[4].decode('ASCII')
- logging.debug('refresh_keys_wkd(): keyid: {}'
- .format(prev_pub))
- elif l.startswith(b'uid:'):
- uid = l.split(b':')[9]
+ prev_pub = line.split(b':')[4].decode('ASCII')
+ logging.debug(f'refresh_keys_wkd(): keyid: {prev_pub}')
+ elif line.startswith(b'uid:'):
+ uid = line.split(b':')[9]
name, addr = email.utils.parseaddr(uid.decode('utf8'))
if '@' in addr:
- logging.debug('refresh_keys_wkd(): UID: {}'
- .format(addr))
+ logging.debug(f'refresh_keys_wkd(): UID: {addr}')
addrs_key.add(addr)
else:
- logging.debug('refresh_keys_wkd(): ignoring UID without mail: {}'
- .format(uid.decode('utf8')))
+ logging.debug(
+ f'refresh_keys_wkd(): ignoring UID without '
+ f'mail: {uid.decode("utf8")}')
# grab the final set (also aborts when there are no keys)
if not addrs_key:
@@ -369,14 +380,14 @@ debug-level guru
[GNUPG, '--batch', '--import', '--status-fd', '1'], data)
if exitst != 0:
# there's no valid reason for import to fail here
- raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8'))
+ raise OpenPGPKeyRefreshError(err.decode('utf8'))
# we need to explicitly ensure all keys were fetched
- for l in out.splitlines():
- if l.startswith(b'[GNUPG:] IMPORT_OK'):
- fpr = l.split(b' ')[3].decode('ASCII')
- logging.debug('refresh_keys_wkd(): import successful for key: {}'
- .format(fpr))
+ for line in out.splitlines():
+ if line.startswith(b'[GNUPG:] IMPORT_OK'):
+ fpr = line.split(b' ')[3].decode('ASCII')
+ logging.debug(
+ f'refresh_keys_wkd(): import successful for key: {fpr}')
if fpr in keys:
keys.remove(fpr)
else:
@@ -384,11 +395,12 @@ debug-level guru
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--delete-keys', fpr])
if exitst != 0:
- raise gemato.exceptions.OpenPGPKeyRefreshError(
+ raise OpenPGPKeyRefreshError(
err.decode('utf8'))
if keys:
- logging.debug('refresh_keys_wkd(): failing due to non-updated keys: {}'
- .format(keys))
+ logging.debug(
+ f'refresh_keys_wkd(): failing due to non-updated keys: '
+ f'{keys}')
return False
return True
@@ -401,11 +413,11 @@ debug-level guru
exitst, out, err = self._spawn_gpg(
[GNUPG, '--batch', '--refresh-keys'] + ks_args)
if exitst != 0:
- raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8'))
+ raise OpenPGPKeyRefreshError(err.decode('utf8'))
def refresh_keys(self, allow_wkd=True, keyserver=None):
- logging.debug('refresh_keys(allow_wkd={}, keyserver={}) called'
- .format(allow_wkd, keyserver))
+ logging.debug(f'refresh_keys(allow_wkd={allow_wkd}, '
+ f'keyserver={keyserver}) called')
if allow_wkd and self.refresh_keys_wkd():
return
@@ -421,5 +433,4 @@ debug-level guru
env_override = {'GNUPGHOME': self.home}
if self.proxy is not None:
env_override['http_proxy'] = self.proxy
- return (super(OpenPGPEnvironment, self)
- ._spawn_gpg(options, stdin, env_override))
+ return (super()._spawn_gpg(options, stdin, env_override))
diff --git a/gemato/profile.py b/gemato/profile.py
index 87e491e..463b72c 100644
--- a/gemato/profile.py
+++ b/gemato/profile.py
@@ -54,7 +54,7 @@ class DefaultProfile(object):
return ()
def want_compressed_manifest(self, relpath, manifest, unc_size,
- compress_watermark):
+ compress_watermark):
"""
Determine whether the specified Manifest (at @relpath) can
be compressed. @manifest is the Manifest instance. @unc_size
@@ -87,15 +87,18 @@ class EbuildRepositoryProfile(DefaultProfile):
return True
# plus some unconditional standard directories
if relpath in ('eclass', 'licenses', 'metadata',
- 'profiles'):
+ 'profiles'):
return True
elif len(spl) == 2:
# 'slow' way of detecting package directories
if any(f.endswith('.ebuild') for f in filenames):
return True
# some standard directories worth separate Manifests
- if spl[0] == 'metadata' and spl[1] in ('dtd', 'glsa',
- 'md5-cache', 'news', 'xml-schema'):
+ if spl[0] == 'metadata' and spl[1] in ('dtd',
+ 'glsa',
+ 'md5-cache',
+ 'news',
+ 'xml-schema'):
return True
elif len(spl) == 3:
# metadata cache -> per-directory Manifests
@@ -145,19 +148,20 @@ class BackwardsCompatEbuildRepositoryProfile(EbuildRepositoryProfile):
if spl[2:3] == ['files']:
return 'AUX'
- return (super(BackwardsCompatEbuildRepositoryProfile, self)
- .get_entry_type_for_path(path))
+ return (super().get_entry_type_for_path(path))
def want_compressed_manifest(self, relpath, manifest, unc_size,
- compress_watermark):
+ compress_watermark):
for e in manifest.entries:
# disable compression in package directories
if e.tag == 'EBUILD':
return False
- return (super(BackwardsCompatEbuildRepositoryProfile, self)
- .want_compressed_manifest(relpath, manifest, unc_size,
- compress_watermark))
+ return (
+ super().want_compressed_manifest(relpath,
+ manifest,
+ unc_size,
+ compress_watermark))
PROFILE_MAPPING = {
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index 0b98c3a..530b61d 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -1,17 +1,43 @@
# gemato: Recursive loader for Manifests
# vim:fileencoding=utf-8
-# (c) 2017-2018 Michał Górny
+# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
import errno
import os.path
-import gemato.compression
-import gemato.exceptions
-import gemato.manifest
-import gemato.profile
-import gemato.util
-import gemato.verify
+from gemato.compression import (
+ open_potentially_compressed_path,
+ get_potential_compressed_names,
+ get_compressed_suffix_from_filename,
+ )
+from gemato.exceptions import (
+ ManifestMismatch,
+ ManifestIncompatibleEntry,
+ ManifestCrossDevice,
+ ManifestSymlinkLoop,
+ ManifestInvalidPath,
+ ManifestSyntaxError,
+ )
+from gemato.manifest import (
+ ManifestFile,
+ ManifestEntryIGNORE,
+ ManifestEntryTIMESTAMP,
+ new_manifest_entry,
+ ManifestEntryMANIFEST,
+ )
+from gemato.profile import DefaultProfile
+from gemato.util import (
+ path_starts_with,
+ MultiprocessingPoolWrapper,
+ throw_exception,
+ path_inside_dir,
+ )
+from gemato.verify import (
+ verify_path,
+ verify_entry_compatibility,
+ update_entry_for_path,
+ )
class ManifestLoader(object):
@@ -45,17 +71,16 @@ class ManifestLoader(object):
Returns a tuple of (ManifestFile instance, file stat result).
"""
- m = gemato.manifest.ManifestFile()
+ m = ManifestFile()
path = os.path.join(self.root_directory, relpath)
if verify_entry is not None:
- ret, diff = gemato.verify.verify_path(path, verify_entry)
+ ret, diff = verify_path(path, verify_entry)
if not ret:
- raise gemato.exceptions.ManifestMismatch(
- relpath, verify_entry, diff)
+ raise ManifestMismatch(relpath, verify_entry, diff)
- with gemato.compression.open_potentially_compressed_path(
- path, 'r', encoding='utf8') as f:
+ with open_potentially_compressed_path(path, 'r',
+ encoding='utf8') as f:
m.load(f, self.verify_openpgp, self.openpgp_env)
st = os.fstat(f.fileno())
@@ -90,12 +115,12 @@ class SubprocessVerifier(object):
self.last_mtime = last_mtime
def _verify_one_file(self, path, relpath, e):
- ret, diff = gemato.verify.verify_path(path, e,
- expected_dev=self.manifest_device,
- last_mtime=self.last_mtime)
+ ret, diff = verify_path(path, e,
+ expected_dev=self.manifest_device,
+ last_mtime=self.last_mtime)
if not ret:
- err = gemato.exceptions.ManifestMismatch(relpath, e, diff)
+ err = ManifestMismatch(relpath, e, diff)
ret = self.fail_handler(err)
if ret is None:
ret = True
@@ -118,7 +143,7 @@ class SubprocessVerifier(object):
if de is not None:
dpath = os.path.join(relpath, d)
ret &= self._verify_one_file(os.path.join(dirpath, d),
- dpath, de)
+ dpath, de)
for f in filenames:
# skip dotfiles
@@ -132,14 +157,13 @@ class SubprocessVerifier(object):
continue
fe = dirdict.pop(f, None)
ret &= self._verify_one_file(os.path.join(dirpath, f),
- fpath, fe)
-
+ fpath, fe)
# check for missing files
for f, e in dirdict.items():
fpath = os.path.join(relpath, f)
ret &= self._verify_one_file(os.path.join(dirpath, f),
- fpath, e)
+ fpath, e)
return ret
@@ -173,13 +197,21 @@ class ManifestRecursiveLoader(object):
'max_jobs',
]
- def __init__(self, top_manifest_path,
- verify_openpgp=None, openpgp_env=None,
- sign_openpgp=None, openpgp_keyid=None,
- hashes=None, allow_create=False, sort=None,
- compress_watermark=None, compress_format=None,
- profile=gemato.profile.DefaultProfile(),
- max_jobs=None, allow_xdev=True):
+ def __init__(self,
+ top_manifest_path,
+ verify_openpgp=None,
+ openpgp_env=None,
+ sign_openpgp=None,
+ openpgp_keyid=None,
+ hashes=None,
+ allow_create=False,
+ sort=None,
+ compress_watermark=None,
+ compress_format=None,
+ profile=DefaultProfile(),
+ max_jobs=None,
+ allow_xdev=True,
+ ):
"""
Instantiate the loader for a Manifest tree starting at top-level
Manifest @top_manifest_path.
@@ -252,22 +284,27 @@ class ManifestRecursiveLoader(object):
if self.compress_format is None:
self.compress_format = 'gz'
- self.manifest_loader = ManifestLoader(self.root_directory,
- verify_openpgp, self.openpgp_env)
+ self.manifest_loader = ManifestLoader(
+ self.root_directory, verify_openpgp, self.openpgp_env)
self.top_level_manifest_filename = os.path.basename(
- top_manifest_path)
+ top_manifest_path)
self.loaded_manifests = {}
self.updated_manifests = set()
self.manifest_device = None
# TODO: allow catching OpenPGP exceptions somehow?
m = self.load_manifest(self.top_level_manifest_filename,
- allow_create=allow_create, store_dev=not allow_xdev)
+ allow_create=allow_create,
+ store_dev=not allow_xdev)
self.openpgp_signed = m.openpgp_signed
self.openpgp_signature = m.openpgp_signature
- def load_manifest(self, relpath, verify_entry=None,
- allow_create=False, store_dev=False):
+ def load_manifest(self,
+ relpath,
+ verify_entry=None,
+ allow_create=False,
+ store_dev=False,
+ ):
"""
Load a single Manifest file whose relative path within Manifest
tree is @relpath. If @verify_entry is not null, the Manifest
@@ -287,7 +324,7 @@ class ManifestRecursiveLoader(object):
relpath, verify_entry)
except IOError as err:
if err.errno == errno.ENOENT and allow_create:
- m = gemato.manifest.ManifestFile()
+ m = ManifestFile()
path = os.path.join(self.root_directory, relpath)
st = os.stat(os.path.dirname(path))
# trigger saving
@@ -297,7 +334,7 @@ class ManifestRecursiveLoader(object):
if relpath == 'Manifest':
for ip in (self.profile
.get_ignore_paths_for_new_manifest('')):
- ie = gemato.manifest.ManifestEntryIGNORE(ip)
+ ie = ManifestEntryIGNORE(ip)
m.entries.append(ie)
else:
raise err
@@ -330,11 +367,13 @@ class ManifestRecursiveLoader(object):
else:
sign = False
- with gemato.compression.open_potentially_compressed_path(
- path, 'w', encoding='utf8') as f:
- m.dump(f, sign_openpgp=sign, sort=sort,
- openpgp_env=self.openpgp_env,
- openpgp_keyid=self.openpgp_keyid)
+ with open_potentially_compressed_path(path, 'w',
+ encoding='utf8') as f:
+ m.dump(f,
+ sign_openpgp=sign,
+ sort=sort,
+ openpgp_env=self.openpgp_env,
+ openpgp_keyid=self.openpgp_keyid)
return f.buffer.tell()
def _iter_unordered_manifests_for_path(self, path, recursive=False):
@@ -347,9 +386,9 @@ class ManifestRecursiveLoader(object):
"""
for k, v in self.loaded_manifests.items():
d = os.path.dirname(k)
- if gemato.util.path_starts_with(path, d):
+ if path_starts_with(path, d):
yield (k, d, v)
- elif recursive and gemato.util.path_starts_with(d, path):
+ elif recursive and path_starts_with(d, path):
yield (k, d, v)
def _iter_manifests_for_path(self, path, recursive=False):
@@ -380,7 +419,7 @@ class ManifestRecursiveLoader(object):
unconditionally of whether they match parent checksums.
"""
- with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool:
+ with MultiprocessingPoolWrapper(self.max_jobs) as pool:
# TODO: figure out how to avoid confusing uses of 'recursive'
while True:
to_load = []
@@ -395,15 +434,15 @@ class ManifestRecursiveLoader(object):
mdir = os.path.dirname(mpath)
if not verify:
e = None
- if gemato.util.path_starts_with(path, mdir):
+ if path_starts_with(path, mdir):
to_load.append((mpath, e))
- elif recursive and gemato.util.path_starts_with(mdir, path):
+ elif recursive and path_starts_with(mdir, path):
to_load.append((mpath, e))
if not to_load:
break
- manifests = pool.imap_unordered(self.manifest_loader, to_load,
- chunksize=16)
+ manifests = pool.imap_unordered(
+ self.manifest_loader, to_load, chunksize=16)
self.loaded_manifests.update(manifests)
def find_timestamp(self):
@@ -432,7 +471,7 @@ class ManifestRecursiveLoader(object):
e.ts = ts
else:
m = self.loaded_manifests[self.top_level_manifest_filename]
- e = gemato.manifest.ManifestEntryTIMESTAMP(ts)
+ e = ManifestEntryTIMESTAMP(ts)
m.entries.append(e)
def find_path_entry(self, path):
@@ -448,7 +487,7 @@ class ManifestRecursiveLoader(object):
# ignore matches recursively, so we process it separately
# py<3.5 does not have os.path.commonpath()
fullpath = os.path.join(relpath, e.path)
- if gemato.util.path_starts_with(path, fullpath):
+ if path_starts_with(path, fullpath):
return e
elif e.tag in ('DIST', 'TIMESTAMP'):
# distfiles are not local files, so skip them
@@ -468,7 +507,7 @@ class ManifestRecursiveLoader(object):
"""
real_path = os.path.join(self.root_directory, relpath)
path_entry = self.find_path_entry(relpath)
- return gemato.verify.verify_path(real_path, path_entry)
+ return verify_path(real_path, path_entry)
def assert_path_verifies(self, relpath):
"""
@@ -478,11 +517,10 @@ class ManifestRecursiveLoader(object):
"""
real_path = os.path.join(self.root_directory, relpath)
path_entry = self.find_path_entry(relpath)
- ret, diff = gemato.verify.verify_path(real_path, path_entry,
- expected_dev=self.manifest_device)
+ ret, diff = verify_path(real_path, path_entry,
+ expected_dev=self.manifest_device)
if not ret:
- raise gemato.exceptions.ManifestMismatch(
- relpath, path_entry, diff)
+ raise ManifestMismatch(relpath, path_entry, diff)
def find_dist_entry(self, filename, relpath=''):
"""
@@ -519,8 +557,8 @@ class ManifestRecursiveLoader(object):
self.load_manifests_for_path(path, recursive=True,
verify=verify_manifests)
out = {}
- for mpath, relpath, m in self._iter_manifests_for_path(path,
- recursive=True):
+ for mpath, relpath, m in self._iter_manifests_for_path(
+ path, recursive=True):
for e in m.entries:
if only_types is not None:
if e.tag not in only_types:
@@ -534,18 +572,19 @@ class ManifestRecursiveLoader(object):
continue
fullpath = os.path.join(relpath, e.path)
- if gemato.util.path_starts_with(fullpath, path):
+ if path_starts_with(fullpath, path):
dirpath = os.path.dirname(fullpath)
filename = os.path.basename(e.path)
dirout = out.setdefault(dirpath, {})
if filename in dirout:
# compare the two entries
- ret, diff = gemato.verify.verify_entry_compatibility(
- dirout[filename], e)
+ ret, diff = verify_entry_compatibility(
+ dirout[filename], e)
if not ret:
- raise gemato.exceptions.ManifestIncompatibleEntry(
- dirout[filename], e, diff)
- # we need to construct a single entry with both checksums
+ raise ManifestIncompatibleEntry(
+ dirout[filename], e, diff)
+ # we need to construct a single entry with both
+ # checksums
if diff:
new_checksums = dict(e.checksums)
for k, d1, d2 in diff:
@@ -555,9 +594,11 @@ class ManifestRecursiveLoader(object):
dirout[filename] = e
return out
- def assert_directory_verifies(self, path='',
- fail_handler=gemato.util.throw_exception,
- last_mtime=None):
+ def assert_directory_verifies(self,
+ path='',
+ fail_handler=throw_exception,
+ last_mtime=None,
+ ):
"""
Verify the complete directory tree starting at @path (relative
to top Manifest directory). Includes testing for stray files.
@@ -586,8 +627,8 @@ class ManifestRecursiveLoader(object):
entry_dict = self.get_file_entry_dict(path)
it = os.walk(os.path.join(self.root_directory, path),
- onerror=gemato.util.throw_exception,
- followlinks=True)
+ onerror=throw_exception,
+ followlinks=True)
def _walk_directory(it):
"""
@@ -600,7 +641,7 @@ class ManifestRecursiveLoader(object):
dir_st = os.stat(dirpath)
if (self.manifest_device is not None
and dir_st.st_dev != self.manifest_device):
- raise gemato.exceptions.ManifestCrossDevice(dirpath)
+ raise ManifestCrossDevice(dirpath)
dir_id = (dir_st.st_dev, dir_st.st_ino)
# if this directory was already processed for one of its
@@ -608,7 +649,7 @@ class ManifestRecursiveLoader(object):
parent_dir = os.path.dirname(dirpath)
parent_dir_ids = directory_ids.get(parent_dir, [])
if dir_id in parent_dir_ids:
- raise gemato.exceptions.ManifestSymlinkLoop(dirpath)
+ raise ManifestSymlinkLoop(dirpath)
relpath = os.path.relpath(dirpath, self.root_directory)
# strip dot to avoid matching problems
@@ -648,10 +689,10 @@ class ManifestRecursiveLoader(object):
self.manifest_device,
fail_handler, last_mtime)
- with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool:
+ with MultiprocessingPoolWrapper(self.max_jobs) as pool:
# verify the directories in parallel
- ret = all(pool.imap_unordered(verifier, _walk_directory(it),
- chunksize=64))
+ ret = all(pool.imap_unordered(
+ verifier, _walk_directory(it), chunksize=64))
# check for missing directories
for relpath, dirdict in entry_dict.items():
@@ -662,8 +703,13 @@ class ManifestRecursiveLoader(object):
return ret
- def save_manifests(self, hashes=None, force=False, sort=None,
- compress_watermark=None, compress_format=None):
+ def save_manifests(self,
+ hashes=None,
+ force=False,
+ sort=None,
+ compress_watermark=None,
+ compress_format=None,
+ ):
"""
Save the Manifests modified since the last save_manifests()
call.
@@ -687,7 +733,7 @@ class ManifestRecursiveLoader(object):
will be compressed using @compress_format. The Manifest files
whose size is smaller will be uncompressed. To compress all
Manifest files, pass a size of 0.
-
+
If @compress_watermark is None, the compression is left as-is.
"""
@@ -704,8 +750,8 @@ class ManifestRecursiveLoader(object):
fixed_manifests = set()
renamed_manifests = {}
- for mpath, relpath, m in self._iter_manifests_for_path('',
- recursive=True):
+ for mpath, relpath, m in self._iter_manifests_for_path(
+ '', recursive=True):
for e in m.entries:
if e.tag != 'MANIFEST':
continue
@@ -718,7 +764,7 @@ class ManifestRecursiveLoader(object):
fullpath = renamed_manifests[fullpath]
e.path = os.path.relpath(fullpath, relpath)
- gemato.verify.update_entry_for_path(
+ update_entry_for_path(
os.path.join(self.root_directory, fullpath),
e,
hashes=hashes,
@@ -735,8 +781,7 @@ class ManifestRecursiveLoader(object):
unc_size = self.save_manifest(mpath, sort=sort)
# let's see if we want to recompress it
if compress_watermark is not None:
- compr = (gemato.compression
- .get_compressed_suffix_from_filename(mpath))
+ compr = get_compressed_suffix_from_filename(mpath)
is_compr = compr is not None
want_compr = self.profile.want_compressed_manifest(
mpath, m, unc_size, compress_watermark)
@@ -752,7 +797,7 @@ class ManifestRecursiveLoader(object):
self.save_manifest(new_mpath)
del self.loaded_manifests[mpath]
os.unlink(os.path.join(self.root_directory,
- mpath))
+ mpath))
renamed_manifests[mpath] = new_mpath
if mpath == self.top_level_manifest_filename:
@@ -766,11 +811,13 @@ class ManifestRecursiveLoader(object):
self.updated_manifests.discard(self.top_level_manifest_filename)
# at this point, the list should be empty
assert not self.updated_manifests, (
- "Unlinked but updated Manifests: {}".format(
- self.updated_manifests))
+ f'Unlinked but updated Manifests: {self.updated_manifests}')
- def update_entry_for_path(self, path, new_entry_type='DATA',
- hashes=None):
+ def update_entry_for_path(self,
+ path,
+ new_entry_type='DATA',
+ hashes=None,
+ ):
"""
Update the Manifest entries for @path and queue the containing
Manifests for update. @path must not be covered by IGNORE.
@@ -814,7 +861,7 @@ class ManifestRecursiveLoader(object):
# ignore matches recursively, so we process it separately
# py<3.5 does not have os.path.commonpath()
fullpath = os.path.join(relpath, e.path)
- assert not gemato.util.path_starts_with(path, fullpath)
+ assert not path_starts_with(path, fullpath)
elif e.tag in ('DIST', 'TIMESTAMP'):
# distfiles are not local files, so skip them
# timestamp is not a file ;-)
@@ -832,13 +879,12 @@ class ManifestRecursiveLoader(object):
continue
try:
- gemato.verify.update_entry_for_path(
- os.path.join(self.root_directory,
- fullpath),
+ update_entry_for_path(
+ os.path.join(self.root_directory, fullpath),
e,
hashes=hashes,
expected_dev=self.manifest_device)
- except gemato.exceptions.ManifestInvalidPath as err:
+ except ManifestInvalidPath as err:
if err.detail[0] == '__exists__':
# file does not exist anymore, so remove
# the entry
@@ -863,13 +909,11 @@ class ManifestRecursiveLoader(object):
newpath = os.path.relpath(path, relpath)
if new_entry_type == 'AUX':
# AUX has implicit files/ prefix
- assert gemato.util.path_inside_dir(newpath,
- 'files')
+ assert path_inside_dir(newpath, 'files')
# drop files/ prefix
newpath = os.path.relpath(newpath, 'files')
- e = gemato.manifest.new_manifest_entry(
- new_entry_type, newpath, 0, {})
- gemato.verify.update_entry_for_path(
+ e = new_manifest_entry(new_entry_type, newpath, 0, {})
+ update_entry_for_path(
os.path.join(self.root_directory, path),
e,
hashes=hashes,
@@ -879,8 +923,10 @@ class ManifestRecursiveLoader(object):
had_entry = True
break
- def get_deduplicated_file_entry_dict_for_update(self, path='',
- verify_manifests=True):
+ def get_deduplicated_file_entry_dict_for_update(self,
+ path='',
+ verify_manifests=True,
+ ):
"""
Find all file entries that apply to paths starting with @path.
Remove all duplicate entries and queue the relevant Manifests
@@ -906,8 +952,8 @@ class ManifestRecursiveLoader(object):
self.load_manifests_for_path(path, recursive=True,
verify=verify_manifests)
out = {}
- for mpath, relpath, m in self._iter_manifests_for_path(path,
- recursive=True):
+ for mpath, relpath, m in self._iter_manifests_for_path(
+ path, recursive=True):
entries_to_remove = []
for e in m.entries:
if e.tag in ('DIST', 'TIMESTAMP'):
@@ -916,16 +962,15 @@ class ManifestRecursiveLoader(object):
continue
fullpath = os.path.join(relpath, e.path)
- if gemato.util.path_starts_with(fullpath, path):
+ if path_starts_with(fullpath, path):
if fullpath in out:
# compare the two entries
- ret, diff = gemato.verify.verify_entry_compatibility(
- out[fullpath][1], e)
+ ret, diff = verify_entry_compatibility(
+ out[fullpath][1], e)
# if semantically incompatible, throw
if not ret and diff[0][0] == '__type__':
- raise (gemato.exceptions
- .ManifestIncompatibleEntry(
- out[fullpath][1], e, diff))
+ raise ManifestIncompatibleEntry(
+ out[fullpath][1], e, diff)
# otherwise, make sure we have all checksums
out[fullpath][1].checksums.update(e.checksums)
# and drop the duplicate
@@ -957,22 +1002,23 @@ class ManifestRecursiveLoader(object):
doing updates.
"""
- manifest_filenames = (gemato.compression
- .get_potential_compressed_names('Manifest'))
+ manifest_filenames = get_potential_compressed_names('Manifest')
- entry_dict = self.get_file_entry_dict(path,
- only_types=['IGNORE'], verify_manifests=verify_manifests)
+ entry_dict = self.get_file_entry_dict(
+ path,
+ only_types=['IGNORE'],
+ verify_manifests=verify_manifests)
new_manifests = []
directory_ids = {}
it = os.walk(os.path.join(self.root_directory, path),
- onerror=gemato.util.throw_exception,
- followlinks=True)
+ onerror=throw_exception,
+ followlinks=True)
for dirpath, dirnames, filenames in it:
dir_st = os.stat(dirpath)
if (self.manifest_device is not None
and dir_st.st_dev != self.manifest_device):
- raise gemato.exceptions.ManifestCrossDevice(dirpath)
+ raise ManifestCrossDevice(dirpath)
dir_id = (dir_st.st_dev, dir_st.st_ino)
# if this directory was already processed for one of its
@@ -980,7 +1026,7 @@ class ManifestRecursiveLoader(object):
parent_dir = os.path.dirname(dirpath)
parent_dir_ids = directory_ids.get(parent_dir, [])
if dir_id in parent_dir_ids:
- raise gemato.exceptions.ManifestSymlinkLoop(dirpath)
+ raise ManifestSymlinkLoop(dirpath)
relpath = os.path.relpath(dirpath, self.root_directory)
# strip dot to avoid matching problems
@@ -1020,7 +1066,7 @@ class ManifestRecursiveLoader(object):
# let's try to load it
try:
self.load_manifest(fpath)
- except gemato.exceptions.ManifestSyntaxError:
+ except ManifestSyntaxError:
# syntax error? probably not a Manifest then.
pass
else:
@@ -1028,9 +1074,12 @@ class ManifestRecursiveLoader(object):
return new_manifests
-
- def update_entries_for_directory(self, path='', hashes=None,
- last_mtime=None, verify_manifests=False):
+ def update_entries_for_directory(self,
+ path='',
+ hashes=None,
+ last_mtime=None,
+ verify_manifests=False,
+ ):
"""
Update the Manifest entries for the contents of directory
@path (top directory by default), recursively. Includes adding
@@ -1063,29 +1112,27 @@ class ManifestRecursiveLoader(object):
hashes = self.hashes
assert hashes is not None
- manifest_filenames = (gemato.compression
- .get_potential_compressed_names('Manifest'))
+ manifest_filenames = get_potential_compressed_names('Manifest')
- new_manifests = self.load_unregistered_manifests(path,
- verify_manifests=verify_manifests)
+ new_manifests = self.load_unregistered_manifests(
+ path, verify_manifests=verify_manifests)
entry_dict = self.get_deduplicated_file_entry_dict_for_update(
- path, verify_manifests=verify_manifests)
+ path, verify_manifests=verify_manifests)
manifest_stack = []
- for mpath, mrpath, m in (self
- ._iter_manifests_for_path(path)):
+ for mpath, mrpath, m in (self._iter_manifests_for_path(path)):
manifest_stack.append((mpath, mrpath, m))
break
directory_ids = {}
it = os.walk(os.path.join(self.root_directory, path),
- onerror=gemato.util.throw_exception,
- followlinks=True)
+ onerror=throw_exception,
+ followlinks=True)
for dirpath, dirnames, filenames in it:
dir_st = os.stat(dirpath)
if (self.manifest_device is not None
and dir_st.st_dev != self.manifest_device):
- raise gemato.exceptions.ManifestCrossDevice(dirpath)
+ raise ManifestCrossDevice(dirpath)
dir_id = (dir_st.st_dev, dir_st.st_ino)
# if this directory was already processed for one of its
@@ -1093,7 +1140,7 @@ class ManifestRecursiveLoader(object):
parent_dir = os.path.dirname(dirpath)
parent_dir_ids = directory_ids.get(parent_dir, [])
if dir_id in parent_dir_ids:
- raise gemato.exceptions.ManifestSymlinkLoop(dirpath)
+ raise ManifestSymlinkLoop(dirpath)
relpath = os.path.relpath(dirpath, self.root_directory)
# strip dot to avoid matching problems
@@ -1101,8 +1148,7 @@ class ManifestRecursiveLoader(object):
relpath = ''
# drop Manifest paths until we get to a common directory
- while not gemato.util.path_starts_with(relpath,
- manifest_stack[-1][1]):
+ while not path_starts_with(relpath, manifest_stack[-1][1]):
manifest_stack.pop()
want_manifest = self.profile.want_manifest_in_directory(
@@ -1124,11 +1170,10 @@ class ManifestRecursiveLoader(object):
skip_dirs.append(d)
else:
# trigger the exception indirectly
- gemato.verify.update_entry_for_path(
- os.path.join(dirpath, d),
- de,
- hashes=hashes,
- expected_dev=self.manifest_device)
+ update_entry_for_path(os.path.join(dirpath, d),
+ de,
+ hashes=hashes,
+ expected_dev=self.manifest_device)
assert False, "exception should have been raised"
# skip scanning ignored directories
@@ -1150,8 +1195,8 @@ class ManifestRecursiveLoader(object):
if fe.tag == 'IGNORE':
continue
if fe.tag == 'MANIFEST':
- manifest_stack.append((fpath, relpath,
- self.loaded_manifests[fpath]))
+ manifest_stack.append(
+ (fpath, relpath, self.loaded_manifests[fpath]))
# do not update the Manifest entry if
# the relevant Manifest is going to be updated
# anyway
@@ -1164,20 +1209,19 @@ class ManifestRecursiveLoader(object):
continue
if fpath in new_manifests:
ftype = 'MANIFEST'
- manifest_stack.append((fpath, relpath,
- self.loaded_manifests[fpath]))
+ manifest_stack.append(
+ (fpath, relpath, self.loaded_manifests[fpath]))
else:
ftype = self.profile.get_entry_type_for_path(
fpath)
# note: .path needs to be corrected below
- fe = gemato.manifest.new_manifest_entry(ftype,
- fpath, 0, {})
+ fe = new_manifest_entry(ftype, fpath, 0, {})
new_entries.append(fe)
if relpath in self.updated_manifests:
continue
- changed = gemato.verify.update_entry_for_path(
+ changed = update_entry_for_path(
os.path.join(dirpath, f),
fe,
hashes=hashes,
@@ -1192,17 +1236,18 @@ class ManifestRecursiveLoader(object):
mpath = os.path.join(relpath, 'Manifest')
m = self.create_manifest(mpath)
manifest_stack.append((mpath, relpath, m))
- fe = gemato.manifest.ManifestEntryMANIFEST(
- mpath, 0, {})
+ fe = ManifestEntryMANIFEST(mpath, 0, {})
new_entries.append(fe)
for ip in (self.profile
.get_ignore_paths_for_new_manifest(relpath)):
- ie = gemato.manifest.ManifestEntryIGNORE(ip)
+ ie = ManifestEntryIGNORE(ip)
iep = os.path.join(relpath, ip)
if self.find_path_entry(iep):
- raise NotImplementedError('Need to remove old parent entry for now-ignored path')
+ raise NotImplementedError(
+ 'Need to remove old parent entry for '
+ 'now-ignored path')
m.entries.append(ie)
new_ignore_paths.append(iep)
@@ -1233,12 +1278,11 @@ class ManifestRecursiveLoader(object):
# but for now, we've shoved our path
# into .aux_path
fe.path = os.path.relpath(fe.aux_path,
- mdirpath)
- assert gemato.util.path_inside_dir(
- fe.path, 'files')
+ mdirpath)
+ assert path_inside_dir(fe.path, 'files')
# drop files/ prefix for the entry
- fe.aux_path = os.path.relpath(fe.path,
- 'files')
+ fe.aux_path = os.path.relpath(
+ fe.path, 'files')
else:
fe.path = os.path.relpath(fe.path, mdirpath)
# do not add duplicate entry if the path is ignored
diff --git a/gemato/verify.py b/gemato/verify.py
index 8a90506..707a43f 100644
--- a/gemato/verify.py
+++ b/gemato/verify.py
@@ -1,6 +1,6 @@
# gemato: File verification routines
# vim:fileencoding=utf-8
-# (c) 2017 Michał Górny
+# (c) 2017-2020 Michał Górny
# Licensed under the terms of 2-clause BSD license
import contextlib
@@ -10,9 +10,12 @@ import io
import os
import stat
-import gemato.exceptions
-import gemato.hash
-import gemato.manifest
+from gemato.exceptions import (
+ ManifestCrossDevice,
+ ManifestInvalidPath,
+ )
+from gemato.hash import hash_file
+from gemato.manifest import manifest_hashes_to_hashlib
def get_file_metadata(path, hashes):
@@ -37,7 +40,7 @@ def get_file_metadata(path, hashes):
try:
# we want O_NONBLOCK to avoid blocking when opening pipes
- fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK)
+ fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK)
except OSError as err:
if err.errno == errno.ENOENT:
exists = False
@@ -98,7 +101,7 @@ def get_file_metadata(path, hashes):
yield st.st_mtime
f = io.open(fd, 'rb')
- except:
+ except Exception:
if opened:
os.close(fd)
raise
@@ -110,11 +113,10 @@ def get_file_metadata(path, hashes):
# 5. checksums
e_hashes = sorted(hashes)
- hashes = list(gemato.manifest.manifest_hashes_to_hashlib(e_hashes))
+ hashes = list(manifest_hashes_to_hashlib(e_hashes))
e_hashes.append('__size__')
hashes.append('__size__')
- checksums = gemato.hash.hash_file(f, hashes,
- _apparent_size=st.st_size)
+ checksums = hash_file(f, hashes, _apparent_size=st.st_size)
ret = {}
for ek, k in zip(e_hashes, hashes):
@@ -175,7 +177,7 @@ def verify_path(path, e, expected_dev=None, last_mtime=None):
# 2. check for xdev condition
st_dev = next(g)
if expected_dev is not None and st_dev != expected_dev:
- raise gemato.exceptions.ManifestCrossDevice(path)
+ raise ManifestCrossDevice(path)
# 3. verify whether the file is a regular file
ifmt, ftype = next(g)
@@ -216,7 +218,7 @@ def verify_path(path, e, expected_dev=None, last_mtime=None):
def update_entry_for_path(path, e, hashes=None, expected_dev=None,
- last_mtime=None):
+ last_mtime=None):
"""
Update the data in entry @e to match the current state of file
at path @path. Uses hashes listed in @hashes (using Manifest names),
@@ -248,19 +250,17 @@ def update_entry_for_path(path, e, hashes=None, expected_dev=None,
# 1. verify whether the file existed in the first place
exists = next(g)
if not exists:
- raise gemato.exceptions.ManifestInvalidPath(path,
- ('__exists__', exists))
+ raise ManifestInvalidPath(path, ('__exists__', exists))
# 2. check for xdev condition
st_dev = next(g)
if expected_dev is not None and st_dev != expected_dev:
- raise gemato.exceptions.ManifestCrossDevice(path)
+ raise ManifestCrossDevice(path)
# 3. verify whether the file is a regular file
ifmt, ftype = next(g)
if not stat.S_ISREG(ifmt):
- raise gemato.exceptions.ManifestInvalidPath(path,
- ('__type__', ftype))
+ raise ManifestInvalidPath(path, ('__type__', ftype))
# 4. get the apparent file size
st_size = next(g)
@@ -276,8 +276,9 @@ def update_entry_for_path(path, e, hashes=None, expected_dev=None,
checksums = next(g)
size = checksums.pop('__size__')
if st_size != 0:
- assert st_size == size, ('Apparent size (st_size = {}) and real size ({}) are different!'
- .format(st_size, size))
+ assert st_size == size, (
+ f'Apparent size (st_size = {st_size}) and real size '
+ f'({size}) are different!')
if e.size != size or e.checksums != checksums:
e.size = size
diff --git a/tox.ini b/tox.ini
index 869c922..a8bf6e7 100644
--- a/tox.ini
+++ b/tox.ini
@@ -6,9 +6,11 @@ skipsdist = True
[testenv:qa]
deps =
+ pycodestyle
pyflakes
commands =
pyflakes {posargs:gemato tests}
+ pycodestyle {posargs:gemato}
[testenv]
deps =