diff options
-rw-r--r-- | gemato/cli.py | 344 | ||||
-rw-r--r-- | gemato/compression.py | 10 | ||||
-rw-r--r-- | gemato/exceptions.py | 75 | ||||
-rw-r--r-- | gemato/find_top_level.py | 20 | ||||
-rw-r--r-- | gemato/hash.py | 117 | ||||
-rw-r--r-- | gemato/manifest.py | 229 | ||||
-rw-r--r-- | gemato/openpgp.py | 133 | ||||
-rw-r--r-- | gemato/profile.py | 24 | ||||
-rw-r--r-- | gemato/recursiveloader.py | 348 | ||||
-rw-r--r-- | gemato/verify.py | 37 | ||||
-rw-r--r-- | tox.ini | 2 |
11 files changed, 732 insertions, 607 deletions
diff --git a/gemato/cli.py b/gemato/cli.py index c8e1813..08d908a 100644 --- a/gemato/cli.py +++ b/gemato/cli.py @@ -14,13 +14,19 @@ import os.path import sys import timeit -import gemato.exceptions -import gemato.find_top_level -import gemato.hash -import gemato.manifest -import gemato.openpgp -import gemato.profile -import gemato.recursiveloader +from gemato.exceptions import GematoException +from gemato.find_top_level import find_top_level_manifest +from gemato.hash import hash_file, hash_path +from gemato.manifest import ( + ManifestFileEntry, + manifest_hashes_to_hashlib, + ) +from gemato.openpgp import ( + OpenPGPEnvironment, + OpenPGPSystemEnvironment, + ) +from gemato.profile import get_profile_by_name +from gemato.recursiveloader import ManifestRecursiveLoader def verify_failure(e): @@ -81,26 +87,28 @@ class BaseOpenPGPMixin(object): """ def __init__(self): - super(BaseOpenPGPMixin, self).__init__() + super().__init__() self.openpgp_env = None def add_options(self, subp): - super(BaseOpenPGPMixin, self).add_options(subp) + super().add_options(subp) - subp.add_argument('-K', '--openpgp-key', - help='Use only the OpenPGP key(s) from a specific file') - subp.add_argument('--proxy', - help='Use HTTP proxy') + subp.add_argument( + '-K', '--openpgp-key', + help='Use only the OpenPGP key(s) from a specific file') + subp.add_argument( + '--proxy', + help='Use HTTP proxy') def parse_args(self, args, argp): - super(BaseOpenPGPMixin, self).parse_args(args, argp) + super().parse_args(args, argp) # use isolated environment if key is specified; # system environment otherwise if args.openpgp_key is not None: - env_class = gemato.openpgp.OpenPGPEnvironment + env_class = OpenPGPEnvironment else: - env_class = gemato.openpgp.OpenPGPSystemEnvironment + env_class = OpenPGPSystemEnvironment self.openpgp_env = env_class(debug=args.debug, proxy=args.proxy) @@ -109,7 +117,7 @@ class BaseOpenPGPMixin(object): self.openpgp_env.import_key(f) def cleanup(self): - super(BaseOpenPGPMixin, self).cleanup() + super().cleanup() if self.openpgp_env is not None: self.openpgp_env.close() @@ -121,21 +129,24 @@ class VerifyingOpenPGPMixin(BaseOpenPGPMixin): """ def add_options(self, subp): - super(VerifyingOpenPGPMixin, self).add_options(subp) - - subp.add_argument('-R', '--no-refresh-keys', action='store_false', - dest='refresh_keys', - help='Disable refreshing OpenPGP key (prevents network access, ' - +'applicable when using -K only)') - subp.add_argument('-W', '--no-wkd', action='store_false', - dest='allow_wkd', - help='Do not attempt to use WKD to refetch keys (use ' - +'keyservers only)') - subp.add_argument('--keyserver', - help='Force custom keyserver URL') + super().add_options(subp) + + subp.add_argument( + '-R', '--no-refresh-keys', action='store_false', + dest='refresh_keys', + help='Disable refreshing OpenPGP key (prevents network ' + 'access, applicable when using -K only)') + subp.add_argument( + '-W', '--no-wkd', action='store_false', + dest='allow_wkd', + help='Do not attempt to use WKD to refetch keys (use ' + 'keyservers only)') + subp.add_argument( + '--keyserver', + help='Force custom keyserver URL') def parse_args(self, args, argp): - super(VerifyingOpenPGPMixin, self).parse_args(args, argp) + super().parse_args(args, argp) if args.openpgp_key is not None: # always refresh keys to check for revocation @@ -153,16 +164,19 @@ class BaseManifestLoaderMixin(object): """ def add_options(self, subp): - super(BaseManifestLoaderMixin, self).add_options(subp) + super().add_options(subp) - subp.add_argument('-j', '--jobs', type=int, - help='Specify the maximum number of parallel jobs to use (default: {})' - .format(multiprocessing.cpu_count())) - subp.add_argument('-x', '--one-file-system', action='store_true', - help='Do not cross filesystem boundaries (report an error instead)') + subp.add_argument( + '-j', '--jobs', type=int, + help=f'Specify the maximum number of parallel jobs to use ' + f'(default: {multiprocessing.cpu_count()})') + subp.add_argument( + '-x', '--one-file-system', action='store_true', + help='Do not cross filesystem boundaries (report an error ' + 'instead)') def parse_args(self, args, argp): - super(BaseManifestLoaderMixin, self).parse_args(args, argp) + super().parse_args(args, argp) self.init_kwargs = {} if args.jobs is not None: @@ -179,20 +193,25 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin, help = 'Verify one or more directories against Manifests' def add_options(self, verify): - super(VerifyCommand, self).add_options(verify) - - verify.add_argument('paths', nargs='*', default=['.'], - help='Paths to verify (defaults to "." if none specified)') - verify.add_argument('-k', '--keep-going', action='store_true', - help='Continue reporting errors rather than terminating on the first failure') - verify.add_argument('-P', '--no-openpgp-verify', action='store_false', - dest='openpgp_verify', - help='Disable OpenPGP verification of signed Manifests') - verify.add_argument('-s', '--require-signed-manifest', action='store_true', - help='Require that the top-level Manifest is OpenPGP signed') + super().add_options(verify) + + verify.add_argument( + 'paths', nargs='*', default=['.'], + help='Paths to verify (defaults to "." if none specified)') + verify.add_argument( + '-k', '--keep-going', action='store_true', + help='Continue reporting errors rather than terminating ' + 'on the first failure') + verify.add_argument( + '-P', '--no-openpgp-verify', action='store_false', + dest='openpgp_verify', + help='Disable OpenPGP verification of signed Manifests') + verify.add_argument( + '-s', '--require-signed-manifest', action='store_true', + help='Require that the top-level Manifest is OpenPGP signed') def parse_args(self, args, argp): - super(VerifyCommand, self).parse_args(args, argp) + super().parse_args(args, argp) self.paths = args.paths self.require_signed_manifest = args.require_signed_manifest @@ -205,37 +224,40 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin, self.init_kwargs['verify_openpgp'] = False def __call__(self): - super(VerifyCommand, self).__call__() + super().__call__() ret = True for p in self.paths: - tlm = gemato.find_top_level.find_top_level_manifest(p) + tlm = find_top_level_manifest(p) if tlm is None: - logging.error('Top-level Manifest not found in {}'.format(p)) + logging.error(f'Top-level Manifest not found in {p}') return 1 start = timeit.default_timer() - m = gemato.recursiveloader.ManifestRecursiveLoader(tlm, - **self.init_kwargs) + m = ManifestRecursiveLoader(tlm, **self.init_kwargs) if self.require_signed_manifest and not m.openpgp_signed: - logging.error('Top-level Manifest {} is not OpenPGP signed'.format(tlm)) + logging.error(f'Top-level Manifest {tlm} is not ' + f'OpenPGP signed') return 1 ts = m.find_timestamp() if ts: - logging.info('Manifest timestamp: {} UTC'.format(ts.ts)) + logging.info(f'Manifest timestamp: {ts.ts} UTC') if m.openpgp_signed: logging.info('Valid OpenPGP signature found:') - logging.info('- primary key: {}'.format( - m.openpgp_signature.primary_key_fingerprint)) - logging.info('- subkey: {}'.format( - m.openpgp_signature.fingerprint)) - logging.info('- timestamp: {} UTC'.format( - m.openpgp_signature.timestamp)) - - logging.info('Verifying {}...'.format(p)) + logging.info( + f'- primary key: ' + f'{m.openpgp_signature.primary_key_fingerprint}') + logging.info( + f'- subkey: ' + f'{m.openpgp_signature.fingerprint}') + logging.info( + f'- timestamp: ' + f'{m.openpgp_signature.timestamp} UTC') + + logging.info(f'Verifying {p}...') relpath = os.path.relpath(p, os.path.dirname(tlm)) if relpath == '.': @@ -243,7 +265,7 @@ class VerifyCommand(BaseManifestLoaderMixin, VerifyingOpenPGPMixin, ret &= m.assert_directory_verifies(relpath, **self.kwargs) stop = timeit.default_timer() - logging.info('{} verified in {:.2f} seconds'.format(p, stop - start)) + logging.info(f'{p} verified in {stop - start:.2f} seconds') return 0 if ret else 1 @@ -254,32 +276,43 @@ class BaseUpdateMixin(BaseManifestLoaderMixin, BaseOpenPGPMixin): """ def add_options(self, update): - super(BaseUpdateMixin, self).add_options(update) - - update.add_argument('-c', '--compress-watermark', type=int, - help='Minimum Manifest size for files to be compressed') - update.add_argument('-C', '--compress-format', - help='Format for compressed files (e.g. "gz", "bz2"...)') - update.add_argument('-f', '--force-rewrite', action='store_true', - help='Force rewriting all the Manifests, even if they did not change') - update.add_argument('-H', '--hashes', - help='Whitespace-separated list of hashes to use') - update.add_argument('-k', '--openpgp-id', - help='Use the specified OpenPGP key (by ID or user)') - update.add_argument('-p', '--profile', - help='Use the specified profile ("default", "ebuild", "old-ebuild"...)') + super().add_options(update) + + update.add_argument( + '-c', '--compress-watermark', type=int, + help='Minimum Manifest size for files to be compressed') + update.add_argument( + '-C', '--compress-format', + help='Format for compressed files (e.g. "gz", "bz2"...)') + update.add_argument( + '-f', '--force-rewrite', action='store_true', + help='Force rewriting all the Manifests, even if they did ' + 'not change') + update.add_argument( + '-H', '--hashes', + help='Whitespace-separated list of hashes to use') + update.add_argument( + '-k', '--openpgp-id', + help='Use the specified OpenPGP key (by ID or user)') + update.add_argument( + '-p', '--profile', + help='Use the specified profile ("default", "ebuild", ' + '"old-ebuild"...)') signgroup = update.add_mutually_exclusive_group() - signgroup.add_argument('-s', '--sign', action='store_true', - default=None, - help='Force signing the top-level Manifest') - signgroup.add_argument('-S', '--no-sign', action='store_false', - dest='sign', - help='Disable signing the top-level Manifest') - update.add_argument('-t', '--timestamp', action='store_true', - help='Include TIMESTAMP entry in Manifest') + signgroup.add_argument( + '-s', '--sign', action='store_true', + default=None, + help='Force signing the top-level Manifest') + signgroup.add_argument( + '-S', '--no-sign', action='store_false', + dest='sign', + help='Disable signing the top-level Manifest') + update.add_argument( + '-t', '--timestamp', action='store_true', + help='Include TIMESTAMP entry in Manifest') def parse_args(self, args, argp): - super(BaseUpdateMixin, self).parse_args(args, argp) + super().parse_args(args, argp) self.timestamp = args.timestamp @@ -299,8 +332,8 @@ class BaseUpdateMixin(BaseManifestLoaderMixin, BaseOpenPGPMixin): if args.openpgp_id is not None: self.init_kwargs['openpgp_keyid'] = args.openpgp_id if args.profile is not None: - self.init_kwargs['profile'] = gemato.profile.get_profile_by_name( - args.profile) + self.init_kwargs['profile'] = ( + get_profile_by_name(args.profile)) if args.sign is not None: self.init_kwargs['sign_openpgp'] = args.sign @@ -310,56 +343,62 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand): help = 'Update the Manifest entries for one or more directory trees' def add_options(self, update): - super(UpdateCommand, self).add_options(update) + super().add_options(update) - update.add_argument('paths', nargs='*', default=['.'], - help='Paths to update (defaults to "." if none specified)') - update.add_argument('-i', '--incremental', action='store_true', - help='Perform incremental update by comparing mtimes against TIMESTAMP') + update.add_argument( + 'paths', nargs='*', default=['.'], + help='Paths to update (defaults to "." if none specified)') + update.add_argument( + '-i', '--incremental', action='store_true', + help='Perform incremental update by comparing mtimes ' + 'against TIMESTAMP') def parse_args(self, args, argp): - super(UpdateCommand, self).parse_args(args, argp) + super().parse_args(args, argp) self.paths = args.paths self.incremental = args.incremental def __call__(self): - super(UpdateCommand, self).__call__() + super().__call__() for p in self.paths: - tlm = gemato.find_top_level.find_top_level_manifest(p) + tlm = find_top_level_manifest(p) if tlm is None: - logging.error('Top-level Manifest not found in {}'.format(p)) + logging.error(f'Top-level Manifest not found in {p}') return 1 start = timeit.default_timer() - m = gemato.recursiveloader.ManifestRecursiveLoader(tlm, - **self.init_kwargs) + m = ManifestRecursiveLoader(tlm, **self.init_kwargs) # if not specified by user, profile must set it if m.hashes is None: - logging.error('--hashes must be specified if not implied by --profile') + logging.error('--hashes must be specified if not ' + 'implied by --profile') return 1 relpath = os.path.relpath(p, os.path.dirname(tlm)) if relpath == '.': relpath = '' if self.timestamp and relpath != '': - logging.error('Timestamp can only be updated if doing full-tree update') + logging.error('Timestamp can only be updated if doing ' + 'full-tree update') return 1 update_kwargs = {} if self.incremental: if relpath != '': - logging.error('Incremental works only for full-tree update') + logging.error('Incremental works only for ' + 'full-tree update') return 1 last_ts = m.find_timestamp() if last_ts is None: - logging.error('Incremental specified but no timestamp in Manifest') + logging.error('Incremental specified but no ' + 'timestamp in Manifest') return 1 update_kwargs['last_mtime'] = last_ts.ts.timestamp() - logging.info('Updating Manifests in {}...'.format(p)) + logging.info(f'Updating Manifests in {p}...') start_ts = datetime.datetime.utcnow() m.update_entries_for_directory(relpath, **update_kwargs) @@ -378,7 +417,7 @@ class UpdateCommand(BaseUpdateMixin, GematoCommand): m.save_manifests(**self.save_kwargs) stop = timeit.default_timer() - logging.info('{} updated in {:.2f} seconds'.format(p, stop - start)) + logging.info(f'{p} updated in {stop - start:.2f} seconds') return 0 @@ -388,31 +427,34 @@ class CreateCommand(BaseUpdateMixin, GematoCommand): help = 'Create a Manifest tree starting at the specified file' def add_options(self, create): - super(CreateCommand, self).add_options(create) + super().add_options(create) - create.add_argument('paths', nargs='*', default=['.'], - help='Paths to create Manifest in (defaults to "." if none specified)') + create.add_argument( + 'paths', nargs='*', default=['.'], + help='Paths to create Manifest in (defaults to "." ' + 'if none specified)') def parse_args(self, args, argp): - super(CreateCommand, self).parse_args(args, argp) + super().parse_args(args, argp) self.init_kwargs['allow_create'] = True self.paths = args.paths def __call__(self): - super(CreateCommand, self).__call__() + super().__call__() for p in self.paths: start = timeit.default_timer() - m = gemato.recursiveloader.ManifestRecursiveLoader( - os.path.join(p, 'Manifest'), **self.init_kwargs) + m = ManifestRecursiveLoader( + os.path.join(p, 'Manifest'), **self.init_kwargs) # if not specified by user, profile must set it if m.hashes is None: - logging.error('--hashes must be specified if not implied by --profile') + logging.error('--hashes must be specified if not ' + 'implied by --profile') return 1 - logging.info('Creating Manifests in {}...'.format(p)) + logging.info(f'Creating Manifests in {p}...') start_ts = datetime.datetime.utcnow() m.update_entries_for_directory() @@ -424,7 +466,7 @@ class CreateCommand(BaseUpdateMixin, GematoCommand): m.save_manifests(**self.save_kwargs) stop = timeit.default_timer() - logging.info('{} updated in {:.2f} seconds'.format(p, stop - start)) + logging.info(f'{p} updated in {stop - start:.2f} seconds') return 0 @@ -434,56 +476,62 @@ class HashCommand(GematoCommand): help = 'Generate hashes for specified file(s) and/or stdin' def add_options(self, subp): - super(HashCommand, self).add_options(subp) + super().add_options(subp) - subp.add_argument('paths', nargs='*', default=['-'], - help='Paths to hash (defaults to "-" (stdin) if not specified)') - subp.add_argument('-H', '--hashes', required=True, - help='Whitespace-separated list of hashes to use') + subp.add_argument( + 'paths', nargs='*', default=['-'], + help='Paths to hash (defaults to "-" (stdin) if not ' + 'specified)') + subp.add_argument( + '-H', '--hashes', required=True, + help='Whitespace-separated list of hashes to use') def parse_args(self, args, argp): - super(HashCommand, self).parse_args(args, argp) + super().parse_args(args, argp) self.hashes = sorted(args.hashes.split()) self.paths = args.paths def __call__(self): - super(HashCommand, self).__call__() + super().__call__() - hashlib_hashes = list( - gemato.manifest.manifest_hashes_to_hashlib(self.hashes)) + hashlib_hashes = list(manifest_hashes_to_hashlib(self.hashes)) hashlib_hashes.append('__size__') for p in self.paths: if p == '-': - h = gemato.hash.hash_file(sys.stdin.buffer, - hashlib_hashes) + h = hash_file(sys.stdin.buffer, hashlib_hashes) else: - h = gemato.hash.hash_path(p, hashlib_hashes) + h = hash_path(p, hashlib_hashes) sz = h.pop('__size__') - e = gemato.manifest.ManifestFileEntry(p, sz, - dict((mh, h[hh]) for mh, hh in zip(self.hashes, hashlib_hashes))) + e = ManifestFileEntry( + p, sz, + dict((mh, h[hh]) for mh, hh + in zip(self.hashes, hashlib_hashes))) print(' '.join(e.to_list('DATA' if p != '-' else 'STDIN'))) class OpenPGPVerifyCommand(VerifyingOpenPGPMixin, GematoCommand): name = 'openpgp-verify' - help = 'Verify OpenPGP signatures embedded in specified file(s) and/or stdin' + help = ('Verify OpenPGP signatures embedded in specified file(s) ' + 'and/or stdin') def add_options(self, subp): - super(OpenPGPVerifyCommand, self).add_options(subp) + super().add_options(subp) - subp.add_argument('paths', nargs='*', default=['-'], - help='Paths to hash (defaults to "-" (stdin) if not specified)') + subp.add_argument( + 'paths', nargs='*', default=['-'], + help='Paths to hash (defaults to "-" (stdin) if not ' + 'specified)') def parse_args(self, args, argp): - super(OpenPGPVerifyCommand, self).parse_args(args, argp) + super().parse_args(args, argp) self.paths = args.paths def __call__(self): - super(OpenPGPVerifyCommand, self).__call__() + super().__call__() ret = True @@ -496,19 +544,17 @@ class OpenPGPVerifyCommand(VerifyingOpenPGPMixin, GematoCommand): try: try: sig = self.openpgp_env.verify_file(f) - except gemato.exceptions.GematoException as e: - logging.error(u'OpenPGP verification failed for {}:\n{}' - .format(p, e)) + except GematoException as e: + logging.error( + f'OpenPGP verification failed for {p}:\n{e}') ret = False else: - logging.info('Valid OpenPGP signature found in {}:' - .format(p)) - logging.info('- primary key: {}'.format( - sig.primary_key_fingerprint)) - logging.info('- subkey: {}'.format( - sig.fingerprint)) - logging.info('- timestamp: {} UTC'.format( - sig.timestamp)) + logging.info( + f'Valid OpenPGP signature found in {p}:') + logging.info( + f'- primary key: {sig.primary_key_fingerprint}') + logging.info(f'- subkey: {sig.fingerprint}') + logging.info(f'- timestamp: {sig.timestamp} UTC') finally: if p != '-': f.close() @@ -541,7 +587,7 @@ def main(argv): return vals.cmd() finally: vals.cmd.cleanup() - except gemato.exceptions.GematoException as e: + except GematoException as e: logging.error(e) return 1 diff --git a/gemato/compression.py b/gemato/compression.py index cf021cd..e573153 100644 --- a/gemato/compression.py +++ b/gemato/compression.py @@ -9,7 +9,7 @@ import io import lzma import os.path -import gemato.exceptions +from gemato.exceptions import UnsupportedCompression def open_compressed_file(suffix, f, mode='rb'): @@ -34,7 +34,7 @@ def open_compressed_file(suffix, f, mode='rb'): elif suffix == "xz" and lzma is not None: return lzma.LZMAFile(f, format=lzma.FORMAT_XZ, mode=mode) - raise gemato.exceptions.UnsupportedCompression(suffix) + raise UnsupportedCompression(suffix) class FileStack(object): @@ -84,8 +84,8 @@ def open_potentially_compressed_path(path, mode, **kwargs): f = io.open(path, bmode) fs = FileStack([f]) try: - cf = open_compressed_file(compression, f, - bmode if kwargs else mode) + cf = open_compressed_file( + compression, f, bmode if kwargs else mode) fs.files.append(cf) # add a TextIOWrapper on top whenever we do not want @@ -93,7 +93,7 @@ def open_potentially_compressed_path(path, mode, **kwargs): if 'b' not in mode: iow = io.TextIOWrapper(cf, **kwargs) fs.files.append(iow) - except: + except Exception: fs.close() raise diff --git a/gemato/exceptions.py b/gemato/exceptions.py index 4c53be4..22766ab 100644 --- a/gemato/exceptions.py +++ b/gemato/exceptions.py @@ -1,6 +1,6 @@ # gemato: exceptions # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license class GematoException(Exception): @@ -14,42 +14,42 @@ class UnsupportedCompression(GematoException): __slots__ = ['suffix'] def __init__(self, suffix): - super(UnsupportedCompression, self).__init__(suffix) + super().__init__(suffix) self.suffix = suffix def __str__(self): - return u'Unsupported compression suffix: {}'.format(self.suffix) + return f'Unsupported compression suffix: {self.suffix}' class UnsupportedHash(GematoException): __slots__ = ['hash_name'] def __init__(self, hash_name): - super(UnsupportedHash, self).__init__(hash_name) + super().__init__(hash_name) self.hash_name = hash_name def __str__(self): - return u'Unsupported hash name: {}'.format(self.hash_name) + return f'Unsupported hash name: {self.hash_name}' class ManifestSyntaxError(GematoException): def __init__(self, message): - super(ManifestSyntaxError, self).__init__(message) + super().__init__(message) class ManifestIncompatibleEntry(GematoException): __slots__ = ['e1', 'e2', 'diff'] def __init__(self, e1, e2, diff): - super(ManifestIncompatibleEntry, self).__init__(e1, e2, diff) + super().__init__(e1, e2, diff) self.e1 = e1 self.e2 = e2 self.diff = diff def __str__(self): - msg = u"Incompatible Manifest entries for {}".format(self.e1.path) + msg = f'Incompatible Manifest entries for {self.e1.path}' for k, d1, d2 in self.diff: - msg += u"\n {}: e1: {}, e2: {}".format(k, d1, d2) + msg += f'\n {k}: e1: {d1}, e2: {d2}' return msg @@ -61,15 +61,15 @@ class ManifestMismatch(GematoException): __slots__ = ['path', 'entry', 'diff'] def __init__(self, path, entry, diff): - super(ManifestMismatch, self).__init__(path, entry, diff) + super().__init__(path, entry, diff) self.path = path self.entry = entry self.diff = diff def __str__(self): - msg = u"Manifest mismatch for {}".format(self.path) + msg = f'Manifest mismatch for {self.path}' for k, exp, got in self.diff: - msg += u"\n {}: expected: {}, have: {}".format(k, exp, got) + msg += f'\n {k}: expected: {exp}, have: {got}' return msg @@ -81,12 +81,12 @@ class ManifestCrossDevice(GematoException): __slots__ = ['path'] def __init__(self, path): - super(ManifestCrossDevice, self).__init__(path) + super().__init__(path) self.path = path def __str__(self): - return (u"Path {} crosses filesystem boundaries, it must be IGNORE-d explicitly" - .format(self.path)) + return (f'Path {self.path} crosses filesystem boundaries, it ' + f'must be IGNORE-d explicitly') class ManifestSymlinkLoop(GematoException): @@ -98,12 +98,12 @@ class ManifestSymlinkLoop(GematoException): __slots__ = ['path'] def __init__(self, path): - super(ManifestSymlinkLoop, self).__init__(path) + super().__init__(path) self.path = path def __str__(self): - return (u"Path {} is a symlink to one of its parent directories, it must be IGNORE-d explicitly" - .format(self.path)) + return (f'Path {self.path} is a symlink to one of its parent ' + f'directories, it must be IGNORE-d explicitly') class ManifestUnsignedData(GematoException): @@ -113,7 +113,7 @@ class ManifestUnsignedData(GematoException): """ def __str__(self): - return u"Unsigned data found in an OpenPGP signed Manifest" + return 'Unsigned data found in an OpenPGP signed Manifest' class OpenPGPRuntimeError(GematoException): @@ -124,7 +124,7 @@ class OpenPGPRuntimeError(GematoException): __slots__ = ['output'] def __init__(self, output): - super(OpenPGPRuntimeError, self).__init__(output) + super().__init__(output) self.output = output @@ -134,7 +134,7 @@ class OpenPGPKeyImportError(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP key import failed:\n{}".format(self.output) + return f'OpenPGP key import failed:\n{self.output}' class OpenPGPKeyRefreshError(OpenPGPRuntimeError): @@ -143,7 +143,7 @@ class OpenPGPKeyRefreshError(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP keyring refresh failed:\n{}".format(self.output) + return f'OpenPGP keyring refresh failed:\n{self.output}' class OpenPGPVerificationFailure(OpenPGPRuntimeError): @@ -152,7 +152,7 @@ class OpenPGPVerificationFailure(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP verification failed:\n{}".format(self.output) + return f'OpenPGP verification failed:\n{self.output}' class OpenPGPExpiredKeyFailure(OpenPGPRuntimeError): @@ -161,7 +161,8 @@ class OpenPGPExpiredKeyFailure(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP signature rejected because of expired key:\n{}".format(self.output) + return (f'OpenPGP signature rejected because of expired key:\n' + f'{self.output}') class OpenPGPRevokedKeyFailure(OpenPGPRuntimeError): @@ -170,7 +171,8 @@ class OpenPGPRevokedKeyFailure(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP signature rejected because of revoked key:\n{}".format(self.output) + return (f'OpenPGP signature rejected because of revoked key:\n' + f'{self.output}') class OpenPGPUnknownSigFailure(OpenPGPRuntimeError): @@ -180,7 +182,8 @@ class OpenPGPUnknownSigFailure(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP signature rejected for unknown reason:\n{}".format(self.output) + return (f'OpenPGP signature rejected for unknown reason:\n' + f'{self.output}') class OpenPGPSigningFailure(OpenPGPRuntimeError): @@ -189,7 +192,7 @@ class OpenPGPSigningFailure(OpenPGPRuntimeError): """ def __str__(self): - return u"OpenPGP signing failed:\n{}".format(self.output) + return f'OpenPGP signing failed:\n{self.output}' class OpenPGPNoImplementation(GematoException): @@ -199,7 +202,8 @@ class OpenPGPNoImplementation(GematoException): """ def __str__(self): - return u"No supported OpenPGP implementation found (install gnupg)" + return ('No supported OpenPGP implementation found (install ' + 'gnupg)') class ManifestInvalidPath(GematoException): @@ -211,13 +215,14 @@ class ManifestInvalidPath(GematoException): __slots__ = ['path', 'detail'] def __init__(self, path, detail): - super(ManifestInvalidPath, self).__init__(path, detail) + super().__init__(path, detail) self.path = path self.detail = detail def __str__(self): - return (u"Attempting to add invalid path {} to Manifest: {} must not be {}" - .format(self.path, self.detail[0], self.detail[1])) + return (f'Attempting to add invalid path {self.path} to ' + f'Manifest: {self.detail[0]} must not be ' + f'{self.detail[1]}') class ManifestInvalidFilename(GematoException): @@ -228,10 +233,12 @@ class ManifestInvalidFilename(GematoException): __slots__ = ['filename', 'pos'] def __init__(self, filename, pos): - super(ManifestInvalidFilename, self).__init__(filename, pos) + super().__init__(filename, pos) self.filename = filename self.pos = pos def __str__(self): - return (u"Attempting to add invalid filename {!r} to Manifest: disallowed character U+{:04X} at position {}" - .format(self.filename, ord(self.filename[self.pos]), self.pos)) + return (f'Attempting to add invalid filename {self.filename!r} ' + f'to Manifest: disallowed character ' + f'U+{ord(self.filename[self.pos]):04X} at position ' + f'{self.pos}') diff --git a/gemato/find_top_level.py b/gemato/find_top_level.py index 17e743b..d049f17 100644 --- a/gemato/find_top_level.py +++ b/gemato/find_top_level.py @@ -1,14 +1,17 @@ # gemato: Top-level Manifest finding routine # vim:fileencoding=utf-8 -# (c) 2017-2018 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import errno import os import os.path -import gemato.compression -import gemato.manifest +from gemato.compression import ( + get_potential_compressed_names, + open_potentially_compressed_path, + ) +from gemato.manifest import ManifestFile def find_top_level_manifest(path='.', allow_xdev=True, allow_compressed=False): @@ -31,14 +34,14 @@ def find_top_level_manifest(path='.', allow_xdev=True, allow_compressed=False): cur_path = path last_found = None original_dev = None - m = gemato.manifest.ManifestFile() + m = ManifestFile() root_st = os.stat('/') manifest_filenames = ('Manifest',) if allow_compressed: - manifest_filenames = list(gemato.compression - .get_potential_compressed_names('Manifest')) + manifest_filenames = list( + get_potential_compressed_names('Manifest')) while True: st = os.stat(cur_path) @@ -54,9 +57,8 @@ def find_top_level_manifest(path='.', allow_xdev=True, allow_compressed=False): try: # note: this is safe for allow_compressed=False # since it detects compression by filename suffix - with (gemato.compression - .open_potentially_compressed_path(m_path, 'r', - encoding='utf8')) as f: + with open_potentially_compressed_path( + m_path, 'r', encoding='utf8') as f: fst = os.fstat(f.fileno()) if fst.st_dev != original_dev and not allow_xdev: return last_found diff --git a/gemato/hash.py b/gemato/hash.py index 515bfd9..1423994 100644 --- a/gemato/hash.py +++ b/gemato/hash.py @@ -1,90 +1,89 @@ # gemato: hash support # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import hashlib import io -import gemato.exceptions +from gemato.exceptions import UnsupportedHash + HASH_BUFFER_SIZE = 65536 MAX_SLURP_SIZE = 1048576 class SizeHash(object): - """ - A cheap wrapper to count file size via hashlib-like interface. - """ + """A cheap wrapper to count file size via hashlib-like interface""" - __slots__ = ['size'] + __slots__ = ['size'] - def __init__(self): - self.size = 0 + def __init__(self): + self.size = 0 - def update(self, data): - self.size += len(data) + def update(self, data): + self.size += len(data) - def hexdigest(self): - return self.size + def hexdigest(self): + return self.size def get_hash_by_name(name): - """ - Get a hashlib-compatible hash object for hash named @name. Supports - multiple backends. - """ - # special case hashes - if name == '__size__': - return SizeHash() + """ + Get a hashlib-compatible hash object for hash named @name. Supports + multiple backends. + """ + # special case hashes + if name == '__size__': + return SizeHash() - # general hash support - if name in hashlib.algorithms_available: - return hashlib.new(name) + # general hash support + if name in hashlib.algorithms_available: + return hashlib.new(name) - raise gemato.exceptions.UnsupportedHash(name) + raise UnsupportedHash(name) def hash_file(f, hash_names, _apparent_size=0): - """ - Hash the contents of file object @f using all hashes specified - as @hash_names. Returns a dict of (hash_name -> hex value) mappings. - - @_apparent_size can be given as a tip on how large is the file - expected to be. This is a private API used to workaround bug in PyPy - and should not be relied on being present long-term. - """ - hashes = {} - for h in hash_names: - hashes[h] = get_hash_by_name(h) - if _apparent_size != 0 and _apparent_size < MAX_SLURP_SIZE: - # if the file is reasonably small, read it all into one buffer; - # we do this since PyPy has some serious bug in dealing with - # passing buffers to C extensions and this apparently fails - # less; https://bitbucket.org/pypy/pypy/issues/2752 - block = f.read() - for h in hashes.values(): - h.update(block) - else: - for block in iter(lambda: f.read1(HASH_BUFFER_SIZE), b''): - for h in hashes.values(): - h.update(block) - return dict((k, h.hexdigest()) for k, h in hashes.items()) + """ + Hash the contents of file object @f using all hashes specified + as @hash_names. Returns a dict of (hash_name -> hex value) mappings. + + @_apparent_size can be given as a tip on how large is the file + expected to be. This is a private API used to workaround bug in PyPy + and should not be relied on being present long-term. + """ + hashes = {} + for h in hash_names: + hashes[h] = get_hash_by_name(h) + if _apparent_size != 0 and _apparent_size < MAX_SLURP_SIZE: + # if the file is reasonably small, read it all into one buffer; + # we do this since PyPy has some serious bug in dealing with + # passing buffers to C extensions and this apparently fails + # less; https://bitbucket.org/pypy/pypy/issues/2752 + block = f.read() + for h in hashes.values(): + h.update(block) + else: + for block in iter(lambda: f.read1(HASH_BUFFER_SIZE), b''): + for h in hashes.values(): + h.update(block) + return dict((k, h.hexdigest()) for k, h in hashes.items()) def hash_path(path, hash_names): - """ - Hash the contents of file at specified path @path using all hashes - specified as @hash_names. Returns a dict of (hash_name -> hex value) - mappings. - """ - with io.open(path, 'rb') as f: - return hash_file(f, hash_names) + """ + Hash the contents of file at specified path @path using all hashes + specified as @hash_names. Returns a dict of (hash_name -> hex value) + mappings. + """ + with io.open(path, 'rb') as f: + return hash_file(f, hash_names) def hash_bytes(buf, hash_name): - """ - Hash the data in provided buffer @buf using the hash @hash_name. - Returns the hex value. - """ - return hash_file(io.BytesIO(buf), (hash_name,))[hash_name] + """ + Hash the data in provided buffer @buf using the hash @hash_name. + Returns the hex value. + """ + return hash_file(io.BytesIO(buf), (hash_name,))[hash_name] diff --git a/gemato/manifest.py b/gemato/manifest.py index ae1eb68..aee035c 100644 --- a/gemato/manifest.py +++ b/gemato/manifest.py @@ -8,14 +8,18 @@ import io import os.path import re -import gemato.exceptions -import gemato.util +from gemato.exceptions import ( + ManifestSyntaxError, + ManifestUnsignedData, + ) +from gemato.util import ( + path_starts_with, + path_inside_dir, + ) class ManifestEntryTIMESTAMP(object): - """ - ISO-8601 timestamp. - """ + """ISO-8601 timestamp""" __slots__ = ['ts'] tag = 'TIMESTAMP' @@ -25,16 +29,18 @@ class ManifestEntryTIMESTAMP(object): self.ts = ts @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - if len(l) != 2: - raise gemato.exceptions.ManifestSyntaxError( - '{} line: expects 1 value, got: {}'.format(l[0], l[1:])) + def from_list(cls, data): + assert data[0] == cls.tag + if len(data) != 2: + raise ManifestSyntaxError( + f'{data[0]} line: expects 1 value, got: {data[1:]}') try: - ts = datetime.datetime.strptime(l[1], '%Y-%m-%dT%H:%M:%SZ') + ts = datetime.datetime.strptime(data[1], + '%Y-%m-%dT%H:%M:%SZ') except ValueError: - raise gemato.exceptions.ManifestSyntaxError( - '{} line: expected ISO8601 timestamp, got: {}'.format(l[0], l[1:])) + raise ManifestSyntaxError( + f'{data[0]} line: expected ISO8601 timestamp, ' + f'got: {data[1:]}') return cls(ts) def to_list(self): @@ -49,13 +55,12 @@ class ManifestEntryTIMESTAMP(object): class ManifestPathEntry(object): - """ - Base class for entries using a path. - """ + """Base class for entries using a path""" __slots__ = ['path'] disallowed_path_re = re.compile(r'[\x00-\x1F\x7F-\x9F\s\\]', re.U) - escape_seq_re = re.compile(r'\\(x[0-9a-fA-F]{2}|u[0-9a-fA-F]{4}|U[0-9a-fA-F]{8})?') + escape_seq_re = re.compile( + r'\\(x[0-9a-fA-F]{2}|u[0-9a-fA-F]{4}|U[0-9a-fA-F]{8})?') def __init__(self, path): self.path = path @@ -64,30 +69,32 @@ class ManifestPathEntry(object): def decode_char(m): val = m.group(1) if val is None: - raise gemato.exceptions.ManifestSyntaxError( - 'Invalid escape sequence at pos {} of: {}'.format(m.start(), m.string)) + raise ManifestSyntaxError( + f'Invalid escape sequence at pos {m.start()} ' + f'of: {m.string}') return chr(int(val[1:], base=16)) @classmethod - def process_path(cls, l): - if len(l) != 2: - raise gemato.exceptions.ManifestSyntaxError( - '{} line: expects 1 value, got: {}'.format(l[0], l[1:])) - if not l[1] or l[1][0] == '/': - raise gemato.exceptions.ManifestSyntaxError( - '{} line: expected relative path, got: {}'.format(l[0], l[1:])) - return cls.escape_seq_re.sub(cls.decode_char, l[1]) + def process_path(cls, data): + if len(data) != 2: + raise ManifestSyntaxError( + f'{data[0]} line: expects 1 value, got: {data[1:]}') + if not data[1] or data[1][0] == '/': + raise ManifestSyntaxError( + f'{data[0]} line: expected relative path, ' + f'got: {data[1:]}') + return cls.escape_seq_re.sub(cls.decode_char, data[1]) @staticmethod def encode_char(m): assert len(m.group(0)) == 1 cp = ord(m.group(0)) if cp <= 0x7F: - return '\\x{:02X}'.format(cp) + return f'\\x{cp:02X}' elif cp <= 0xFFFF: - return '\\u{:04X}'.format(cp) + return f'\\u{cp:04X}' else: - return '\\U{:08X}'.format(cp) + return f'\\U{cp:08X}' @property def encoded_path(self): @@ -102,16 +109,14 @@ class ManifestPathEntry(object): class ManifestEntryIGNORE(ManifestPathEntry): - """ - Ignored path. - """ + """Ignored path""" tag = 'IGNORE' @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - return cls(cls.process_path(l)) + def from_list(cls, data): + assert data[0] == cls.tag + return cls(cls.process_path(data)) def to_list(self): return (self.tag, self.encoded_path) @@ -125,26 +130,28 @@ class ManifestFileEntry(ManifestPathEntry): __slots__ = ['checksums', 'size'] def __init__(self, path, size, checksums): - super(ManifestFileEntry, self).__init__(path) + super().__init__(path) self.size = size self.checksums = checksums @staticmethod - def process_checksums(l): - if len(l) < 3: - raise gemato.exceptions.ManifestSyntaxError( - '{} line: expects at least 2 values, got: {}'.format(l[0], l[1:])) + def process_checksums(data): + if len(data) < 3: + raise ManifestSyntaxError( + f'{data[0]} line: expects at least 2 values, ' + f'got: {data[1:]}') try: - size = int(l[2]) + size = int(data[2]) if size < 0: raise ValueError() except ValueError: - raise gemato.exceptions.ManifestSyntaxError( - '{} line: size must be a non-negative integer, got: {}'.format(l[0], l[2])) + raise ManifestSyntaxError( + f'{data[0]} line: size must be a non-negative integer, ' + f'got: {data[2]}') checksums = {} - it = iter(l[3:]) + it = iter(data[3:]) while True: try: ckname = next(it) @@ -153,8 +160,8 @@ class ManifestFileEntry(ManifestPathEntry): try: ckval = next(it) except StopIteration: - raise gemato.exceptions.ManifestSyntaxError( - '{} line: checksum {} has no value'.format(l[0], ckname)) + raise ManifestSyntaxError( + f'{data[0]} line: checksum {ckname} has no value') checksums[ckname] = ckval return size, checksums @@ -166,7 +173,7 @@ class ManifestFileEntry(ManifestPathEntry): return ret def __eq__(self, other): - return (super(ManifestFileEntry, self).__eq__(other) + return (super().__eq__(other) and self.size == other.size and self.checksums == other.checksums) @@ -181,14 +188,14 @@ class ManifestEntryMANIFEST(ManifestFileEntry): tag = 'MANIFEST' @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - path = cls.process_path(l[:2]) - size, checksums = cls.process_checksums(l) + def from_list(cls, data): + assert data[0] == cls.tag + path = cls.process_path(data[:2]) + size, checksums = cls.process_checksums(data) return cls(path, size, checksums) def to_list(self): - return super(ManifestEntryMANIFEST, self).to_list(self.tag) + return super().to_list(self.tag) class ManifestEntryDATA(ManifestFileEntry): @@ -199,14 +206,14 @@ class ManifestEntryDATA(ManifestFileEntry): tag = 'DATA' @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - path = cls.process_path(l[:2]) - size, checksums = cls.process_checksums(l) + def from_list(cls, data): + assert data[0] == cls.tag + path = cls.process_path(data[:2]) + size, checksums = cls.process_checksums(data) return cls(path, size, checksums) def to_list(self): - return super(ManifestEntryDATA, self).to_list(self.tag) + return super().to_list(self.tag) class ManifestEntryDIST(ManifestFileEntry): @@ -217,16 +224,17 @@ class ManifestEntryDIST(ManifestFileEntry): tag = 'DIST' @classmethod - def from_list(cls, l): - path = cls.process_path(l[:2]) + def from_list(cls, data): + path = cls.process_path(data[:2]) if '/' in path: - raise gemato.exceptions.ManifestSyntaxError( - 'DIST line: file name expected, got directory path: {}'.format(path)) - size, checksums = cls.process_checksums(l) + raise ManifestSyntaxError( + f'{data[0]} line: file name expected, got directory ' + f'path: {path}') + size, checksums = cls.process_checksums(data) return cls(path, size, checksums) def to_list(self): - return super(ManifestEntryDIST, self).to_list(self.tag) + return super().to_list(self.tag) class ManifestEntryEBUILD(ManifestFileEntry): @@ -237,14 +245,14 @@ class ManifestEntryEBUILD(ManifestFileEntry): tag = 'EBUILD' @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - path = cls.process_path(l[:2]) - size, checksums = cls.process_checksums(l) + def from_list(cls, data): + assert data[0] == cls.tag + path = cls.process_path(data[:2]) + size, checksums = cls.process_checksums(data) return cls(path, size, checksums) def to_list(self): - return super(ManifestEntryEBUILD, self).to_list(self.tag) + return super().to_list(self.tag) class ManifestEntryMISC(ManifestFileEntry): @@ -255,14 +263,14 @@ class ManifestEntryMISC(ManifestFileEntry): tag = 'MISC' @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - path = cls.process_path(l[:2]) - size, checksums = cls.process_checksums(l) + def from_list(cls, data): + assert data[0] == cls.tag + path = cls.process_path(data[:2]) + size, checksums = cls.process_checksums(data) return cls(path, size, checksums) def to_list(self): - return super(ManifestEntryMISC, self).to_list(self.tag) + return super().to_list(self.tag) class ManifestEntryAUX(ManifestFileEntry): @@ -275,19 +283,19 @@ class ManifestEntryAUX(ManifestFileEntry): def __init__(self, aux_path, size, checksums): self.aux_path = aux_path - super(ManifestEntryAUX, self).__init__( + super().__init__( os.path.join('files', aux_path), size, checksums) @classmethod - def from_list(cls, l): - assert l[0] == cls.tag - path = cls.process_path(l[:2]) - size, checksums = cls.process_checksums(l) + def from_list(cls, data): + assert data[0] == cls.tag + path = cls.process_path(data[:2]) + size, checksums = cls.process_checksums(data) return cls(path, size, checksums) def to_list(self): - ret = super(ManifestEntryAUX, self).to_list(self.tag) - assert gemato.util.path_inside_dir(ret[1], 'files') + ret = super().to_list(self.tag) + assert path_inside_dir(ret[1], 'files') ret[1] = ret[1][6:] return ret @@ -367,66 +375,67 @@ class ManifestFile(object): state = ManifestState.DATA openpgp_data = '' - for l in f: + for line in f: if state == ManifestState.DATA: - if l == '-----BEGIN PGP SIGNED MESSAGE-----\n': + if line == '-----BEGIN PGP SIGNED MESSAGE-----\n': if self.entries: - raise gemato.exceptions.ManifestUnsignedData() + raise ManifestUnsignedData() if verify_openpgp: - openpgp_data += l + openpgp_data += line state = ManifestState.SIGNED_PREAMBLE continue elif state == ManifestState.SIGNED_PREAMBLE: if verify_openpgp: - openpgp_data += l + openpgp_data += line # skip header lines up to the empty line - if l.strip(): + if line.strip(): continue state = ManifestState.SIGNED_DATA elif state == ManifestState.SIGNED_DATA: if verify_openpgp: - openpgp_data += l - if l == '-----BEGIN PGP SIGNATURE-----\n': + openpgp_data += line + if line == '-----BEGIN PGP SIGNATURE-----\n': state = ManifestState.SIGNATURE continue # dash-escaping, RFC 4880 says any line can suffer from it - if l.startswith('- '): - l = l[2:] + if line.startswith('- '): + line = line[2:] elif state == ManifestState.SIGNATURE: if verify_openpgp: - openpgp_data += l - if l == '-----END PGP SIGNATURE-----\n': + openpgp_data += line + if line == '-----END PGP SIGNATURE-----\n': state = ManifestState.POST_SIGNED_DATA continue - if l.startswith('-----') and l.rstrip().endswith('-----'): - raise gemato.exceptions.ManifestSyntaxError( - "Unexpected OpenPGP header: {}".format(l)) - if state in (ManifestState.SIGNED_PREAMBLE, ManifestState.SIGNATURE): + if line.startswith('-----') and line.rstrip().endswith('-----'): + raise ManifestSyntaxError( + f'Unexpected OpenPGP header: {line}') + if state in (ManifestState.SIGNED_PREAMBLE, + ManifestState.SIGNATURE): continue - sl = l.strip().split() + sl = line.strip().split() # skip empty lines if not sl: continue if state == ManifestState.POST_SIGNED_DATA: - raise gemato.exceptions.ManifestUnsignedData() + raise ManifestUnsignedData() tag = sl[0] try: - self.entries.append(MANIFEST_TAG_MAPPING[tag] - .from_list(sl)) + self.entries.append( + MANIFEST_TAG_MAPPING[tag].from_list(sl)) except KeyError: - raise gemato.exceptions.ManifestSyntaxError( - "Invalid Manifest line: {}".format(l)) + raise ManifestSyntaxError( + f'Invalid Manifest line: {line}') if state == ManifestState.SIGNED_PREAMBLE: - raise gemato.exceptions.ManifestSyntaxError( + raise ManifestSyntaxError( "Manifest terminated early, in OpenPGP headers") elif state == ManifestState.SIGNED_DATA: - raise gemato.exceptions.ManifestSyntaxError( + raise ManifestSyntaxError( "Manifest terminated early, before signature") elif state == ManifestState.SIGNATURE: - raise gemato.exceptions.ManifestSyntaxError( + raise ManifestSyntaxError( "Manifest terminated early, inside signature") if verify_openpgp and state == ManifestState.POST_SIGNED_DATA: @@ -436,7 +445,7 @@ class ManifestFile(object): self.openpgp_signed = True def dump(self, f, sign_openpgp=None, openpgp_keyid=None, - openpgp_env=None, sort=False): + openpgp_env=None, sort=False): """ Dump data into file @f. The file should be open for writing in text mode, and truncated to zero length. @@ -468,7 +477,7 @@ class ManifestFile(object): openpgp_env.clear_sign_file(data, f, keyid=openpgp_keyid) else: for e in self.entries: - f.write(u' '.join(e.to_list()) + '\n') + f.write(' '.join(e.to_list()) + '\n') def find_timestamp(self): """ @@ -491,7 +500,7 @@ class ManifestFile(object): if e.tag == 'IGNORE': # ignore matches recursively, so we process it separately # py<3.5 does not have os.path.commonpath() - if gemato.util.path_starts_with(path, e.path): + if path_starts_with(path, e.path): return e elif e.tag in ('DIST', 'TIMESTAMP'): # distfiles are not local files, so skip them @@ -523,7 +532,7 @@ class ManifestFile(object): for e in self.entries: if e.tag == 'MANIFEST': mdir = os.path.dirname(e.path) - if gemato.util.path_inside_dir(path, mdir): + if path_inside_dir(path, mdir): yield e diff --git a/gemato/openpgp.py b/gemato/openpgp.py index a1c986d..09525a9 100644 --- a/gemato/openpgp.py +++ b/gemato/openpgp.py @@ -16,7 +16,16 @@ import subprocess import tempfile import urllib.parse -import gemato.exceptions +from gemato.exceptions import ( + OpenPGPNoImplementation, + OpenPGPVerificationFailure, + OpenPGPExpiredKeyFailure, + OpenPGPRevokedKeyFailure, + OpenPGPKeyImportError, + OpenPGPKeyRefreshError, + OpenPGPUnknownSigFailure, + OpenPGPSigningFailure, + ) try: import requests @@ -67,7 +76,8 @@ class OpenPGPSystemEnvironment(object): at the beginning. """ - raise NotImplementedError('import_key() is not implemented by this OpenPGP provider') + raise NotImplementedError( + 'import_key() is not implemented by this OpenPGP provider') def refresh_keys(self, allow_wkd=True, keyserver=None): """ @@ -83,7 +93,8 @@ class OpenPGPSystemEnvironment(object): it should specify a keyserver URL. """ - raise NotImplementedError('refresh_keys() is not implemented by this OpenPGP provider') + raise NotImplementedError( + 'refresh_keys() is not implemented by this OpenPGP provider') def _parse_gpg_ts(self, ts): """ @@ -112,21 +123,21 @@ class OpenPGPSystemEnvironment(object): [GNUPG, '--batch', '--status-fd', '1', '--verify'], f.read().encode('utf8')) if exitst != 0: - raise gemato.exceptions.OpenPGPVerificationFailure(err.decode('utf8')) + raise OpenPGPVerificationFailure(err.decode('utf8')) is_good = False sig_data = None # process the output of gpg to find the exact result - for l in out.splitlines(): - if l.startswith(b'[GNUPG:] GOODSIG'): + for line in out.splitlines(): + if line.startswith(b'[GNUPG:] GOODSIG'): is_good = True - elif l.startswith(b'[GNUPG:] EXPKEYSIG'): - raise gemato.exceptions.OpenPGPExpiredKeyFailure(err.decode('utf8')) - elif l.startswith(b'[GNUPG:] REVKEYSIG'): - raise gemato.exceptions.OpenPGPRevokedKeyFailure(err.decode('utf8')) - elif l.startswith(b'[GNUPG:] VALIDSIG'): - spl = l.split(b' ') + elif line.startswith(b'[GNUPG:] EXPKEYSIG'): + raise OpenPGPExpiredKeyFailure(err.decode('utf8')) + elif line.startswith(b'[GNUPG:] REVKEYSIG'): + raise OpenPGPRevokedKeyFailure(err.decode('utf8')) + elif line.startswith(b'[GNUPG:] VALIDSIG'): + spl = line.split(b' ') assert len(spl) >= 12 fp = spl[2].decode('utf8') ts = self._parse_gpg_ts(spl[4].decode('utf8')) @@ -137,7 +148,7 @@ class OpenPGPSystemEnvironment(object): # require both GOODSIG and VALIDSIG if not is_good or sig_data is None: - raise gemato.exceptions.OpenPGPUnknownSigFailure(err.decode('utf8')) + raise OpenPGPUnknownSigFailure(err.decode('utf8')) return sig_data def clear_sign_file(self, f, outf, keyid=None): @@ -158,7 +169,7 @@ class OpenPGPSystemEnvironment(object): [GNUPG, '--batch', '--clearsign'] + args, f.read().encode('utf8')) if exitst != 0: - raise gemato.exceptions.OpenPGPSigningFailure(err.decode('utf8')) + raise OpenPGPSigningFailure(err.decode('utf8')) outf.write(out.decode('utf8')) @@ -176,7 +187,7 @@ class OpenPGPSystemEnvironment(object): except OSError as e: if e.errno != errno.ENOENT: raise - raise gemato.exceptions.OpenPGPNoImplementation() + raise OpenPGPNoImplementation() out, err = p.communicate(stdin) return (p.wait(), out, err) @@ -194,20 +205,20 @@ class OpenPGPEnvironment(OpenPGPSystemEnvironment): __slots__ = ['_home', 'proxy'] def __init__(self, debug=False, proxy=None): - super(OpenPGPEnvironment, self).__init__(debug=debug) + super().__init__(debug=debug) self.proxy = proxy self._home = tempfile.mkdtemp(prefix='gemato.') with open(os.path.join(self._home, 'dirmngr.conf'), 'w') as f: - f.write('''# autogenerated by gemato + f.write(f'''# autogenerated by gemato # honor user's http_proxy setting honor-http-proxy # enable debugging, in case we needed it -log-file {debug_file} +log-file {os.path.join(self._home, 'dirmngr.log')} debug-level guru -'''.format(debug_file=os.path.join(self._home, 'dirmngr.log'))) +''') with open(os.path.join(self._home, 'gpg.conf'), 'w') as f: f.write('''# autogenerated by gemato @@ -215,15 +226,15 @@ debug-level guru trust-model always ''') with open(os.path.join(self._home, 'gpg-agent.conf'), 'w') as f: - f.write('''# autogenerated by gemato + f.write(f'''# autogenerated by gemato # avoid any smartcard operations, we are running in isolation disable-scdaemon # enable debugging, in case we needed it -log-file {debug_file} +log-file {os.path.join(self._home, 'gpg-agent.log')} debug-level guru -'''.format(debug_file=os.path.join(self._home, 'gpg-agent.log'))) +''') def __exit__(self, exc_type, exc_value, exc_cb): if self._home is not None: @@ -250,23 +261,22 @@ debug-level guru ret, sout, serr = self._spawn_gpg( [GNUPGCONF, '--kill', 'all']) if ret != 0: - logging.warning('{} --kill failed: {}' - .format(GNUPGCONF, serr)) + logging.warning(f'{GNUPGCONF} --kill failed: {serr}') if not self.debug: # we need to loop due to ENOTEMPTY potential while os.path.isdir(self._home): shutil.rmtree(self._home, onerror=self._rmtree_error_handler) else: - logging.debug('GNUPGHOME left for debug purposes: {}' - .format(self._home)) + logging.debug(f'GNUPGHOME left for debug purposes: ' + f'{self._home}') self._home = None def import_key(self, keyfile): exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--import'], keyfile.read()) if exitst != 0: - raise gemato.exceptions.OpenPGPKeyImportError(err.decode('utf8')) + raise OpenPGPKeyImportError(err.decode('utf8')) zbase32_translate = bytes.maketrans( b'ABCDEFGHIJKLMNOPQRSTUVWXYZ234567', @@ -275,8 +285,8 @@ debug-level guru @classmethod def get_wkd_url(cls, email): localname, domain = email.encode('utf8').split(b'@', 1) - b32 = (base64.b32encode( - hashlib.sha1(localname.lower()).digest()) + b32 = ( + base64.b32encode(hashlib.sha1(localname.lower()).digest()) .translate(cls.zbase32_translate).decode()) uenc = urllib.parse.quote(localname) ldomain = domain.lower().decode('utf8') @@ -297,51 +307,52 @@ debug-level guru exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--with-colons', '--list-keys']) if exitst != 0: - raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) + raise OpenPGPKeyRefreshError(err.decode('utf8')) # find keys and UIDs addrs = set() addrs_key = set() keys = set() prev_pub = None - for l in out.splitlines(): + for line in out.splitlines(): # were we expecting a fingerprint? if prev_pub is not None: - if l.startswith(b'fpr:'): - fpr = l.split(b':')[9].decode('ASCII') + if line.startswith(b'fpr:'): + fpr = line.split(b':')[9].decode('ASCII') assert fpr.endswith(prev_pub) - logging.debug('refresh_keys_wkd(): fingerprint: {}' - .format(fpr)) + logging.debug( + f'refresh_keys_wkd(): fingerprint: {fpr}') keys.add(fpr) prev_pub = None else: # old GnuPG doesn't give fingerprints by default # (but it doesn't support WKD either) - logging.debug('refresh_keys_wkd(): failing due to old gpg') + logging.debug( + 'refresh_keys_wkd(): failing due to old gpg') return False - elif l.startswith(b'pub:'): + elif line.startswith(b'pub:'): if keys: # every key must have at least one UID if not addrs_key: - logging.debug('refresh_keys_wkd(): failing due to no UIDs') + logging.debug( + 'refresh_keys_wkd(): failing due to no UIDs') return False addrs.update(addrs_key) addrs_key = set() # wait for the fingerprint - prev_pub = l.split(b':')[4].decode('ASCII') - logging.debug('refresh_keys_wkd(): keyid: {}' - .format(prev_pub)) - elif l.startswith(b'uid:'): - uid = l.split(b':')[9] + prev_pub = line.split(b':')[4].decode('ASCII') + logging.debug(f'refresh_keys_wkd(): keyid: {prev_pub}') + elif line.startswith(b'uid:'): + uid = line.split(b':')[9] name, addr = email.utils.parseaddr(uid.decode('utf8')) if '@' in addr: - logging.debug('refresh_keys_wkd(): UID: {}' - .format(addr)) + logging.debug(f'refresh_keys_wkd(): UID: {addr}') addrs_key.add(addr) else: - logging.debug('refresh_keys_wkd(): ignoring UID without mail: {}' - .format(uid.decode('utf8'))) + logging.debug( + f'refresh_keys_wkd(): ignoring UID without ' + f'mail: {uid.decode("utf8")}') # grab the final set (also aborts when there are no keys) if not addrs_key: @@ -369,14 +380,14 @@ debug-level guru [GNUPG, '--batch', '--import', '--status-fd', '1'], data) if exitst != 0: # there's no valid reason for import to fail here - raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) + raise OpenPGPKeyRefreshError(err.decode('utf8')) # we need to explicitly ensure all keys were fetched - for l in out.splitlines(): - if l.startswith(b'[GNUPG:] IMPORT_OK'): - fpr = l.split(b' ')[3].decode('ASCII') - logging.debug('refresh_keys_wkd(): import successful for key: {}' - .format(fpr)) + for line in out.splitlines(): + if line.startswith(b'[GNUPG:] IMPORT_OK'): + fpr = line.split(b' ')[3].decode('ASCII') + logging.debug( + f'refresh_keys_wkd(): import successful for key: {fpr}') if fpr in keys: keys.remove(fpr) else: @@ -384,11 +395,12 @@ debug-level guru exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--delete-keys', fpr]) if exitst != 0: - raise gemato.exceptions.OpenPGPKeyRefreshError( + raise OpenPGPKeyRefreshError( err.decode('utf8')) if keys: - logging.debug('refresh_keys_wkd(): failing due to non-updated keys: {}' - .format(keys)) + logging.debug( + f'refresh_keys_wkd(): failing due to non-updated keys: ' + f'{keys}') return False return True @@ -401,11 +413,11 @@ debug-level guru exitst, out, err = self._spawn_gpg( [GNUPG, '--batch', '--refresh-keys'] + ks_args) if exitst != 0: - raise gemato.exceptions.OpenPGPKeyRefreshError(err.decode('utf8')) + raise OpenPGPKeyRefreshError(err.decode('utf8')) def refresh_keys(self, allow_wkd=True, keyserver=None): - logging.debug('refresh_keys(allow_wkd={}, keyserver={}) called' - .format(allow_wkd, keyserver)) + logging.debug(f'refresh_keys(allow_wkd={allow_wkd}, ' + f'keyserver={keyserver}) called') if allow_wkd and self.refresh_keys_wkd(): return @@ -421,5 +433,4 @@ debug-level guru env_override = {'GNUPGHOME': self.home} if self.proxy is not None: env_override['http_proxy'] = self.proxy - return (super(OpenPGPEnvironment, self) - ._spawn_gpg(options, stdin, env_override)) + return (super()._spawn_gpg(options, stdin, env_override)) diff --git a/gemato/profile.py b/gemato/profile.py index 87e491e..463b72c 100644 --- a/gemato/profile.py +++ b/gemato/profile.py @@ -54,7 +54,7 @@ class DefaultProfile(object): return () def want_compressed_manifest(self, relpath, manifest, unc_size, - compress_watermark): + compress_watermark): """ Determine whether the specified Manifest (at @relpath) can be compressed. @manifest is the Manifest instance. @unc_size @@ -87,15 +87,18 @@ class EbuildRepositoryProfile(DefaultProfile): return True # plus some unconditional standard directories if relpath in ('eclass', 'licenses', 'metadata', - 'profiles'): + 'profiles'): return True elif len(spl) == 2: # 'slow' way of detecting package directories if any(f.endswith('.ebuild') for f in filenames): return True # some standard directories worth separate Manifests - if spl[0] == 'metadata' and spl[1] in ('dtd', 'glsa', - 'md5-cache', 'news', 'xml-schema'): + if spl[0] == 'metadata' and spl[1] in ('dtd', + 'glsa', + 'md5-cache', + 'news', + 'xml-schema'): return True elif len(spl) == 3: # metadata cache -> per-directory Manifests @@ -145,19 +148,20 @@ class BackwardsCompatEbuildRepositoryProfile(EbuildRepositoryProfile): if spl[2:3] == ['files']: return 'AUX' - return (super(BackwardsCompatEbuildRepositoryProfile, self) - .get_entry_type_for_path(path)) + return (super().get_entry_type_for_path(path)) def want_compressed_manifest(self, relpath, manifest, unc_size, - compress_watermark): + compress_watermark): for e in manifest.entries: # disable compression in package directories if e.tag == 'EBUILD': return False - return (super(BackwardsCompatEbuildRepositoryProfile, self) - .want_compressed_manifest(relpath, manifest, unc_size, - compress_watermark)) + return ( + super().want_compressed_manifest(relpath, + manifest, + unc_size, + compress_watermark)) PROFILE_MAPPING = { diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py index 0b98c3a..530b61d 100644 --- a/gemato/recursiveloader.py +++ b/gemato/recursiveloader.py @@ -1,17 +1,43 @@ # gemato: Recursive loader for Manifests # vim:fileencoding=utf-8 -# (c) 2017-2018 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import errno import os.path -import gemato.compression -import gemato.exceptions -import gemato.manifest -import gemato.profile -import gemato.util -import gemato.verify +from gemato.compression import ( + open_potentially_compressed_path, + get_potential_compressed_names, + get_compressed_suffix_from_filename, + ) +from gemato.exceptions import ( + ManifestMismatch, + ManifestIncompatibleEntry, + ManifestCrossDevice, + ManifestSymlinkLoop, + ManifestInvalidPath, + ManifestSyntaxError, + ) +from gemato.manifest import ( + ManifestFile, + ManifestEntryIGNORE, + ManifestEntryTIMESTAMP, + new_manifest_entry, + ManifestEntryMANIFEST, + ) +from gemato.profile import DefaultProfile +from gemato.util import ( + path_starts_with, + MultiprocessingPoolWrapper, + throw_exception, + path_inside_dir, + ) +from gemato.verify import ( + verify_path, + verify_entry_compatibility, + update_entry_for_path, + ) class ManifestLoader(object): @@ -45,17 +71,16 @@ class ManifestLoader(object): Returns a tuple of (ManifestFile instance, file stat result). """ - m = gemato.manifest.ManifestFile() + m = ManifestFile() path = os.path.join(self.root_directory, relpath) if verify_entry is not None: - ret, diff = gemato.verify.verify_path(path, verify_entry) + ret, diff = verify_path(path, verify_entry) if not ret: - raise gemato.exceptions.ManifestMismatch( - relpath, verify_entry, diff) + raise ManifestMismatch(relpath, verify_entry, diff) - with gemato.compression.open_potentially_compressed_path( - path, 'r', encoding='utf8') as f: + with open_potentially_compressed_path(path, 'r', + encoding='utf8') as f: m.load(f, self.verify_openpgp, self.openpgp_env) st = os.fstat(f.fileno()) @@ -90,12 +115,12 @@ class SubprocessVerifier(object): self.last_mtime = last_mtime def _verify_one_file(self, path, relpath, e): - ret, diff = gemato.verify.verify_path(path, e, - expected_dev=self.manifest_device, - last_mtime=self.last_mtime) + ret, diff = verify_path(path, e, + expected_dev=self.manifest_device, + last_mtime=self.last_mtime) if not ret: - err = gemato.exceptions.ManifestMismatch(relpath, e, diff) + err = ManifestMismatch(relpath, e, diff) ret = self.fail_handler(err) if ret is None: ret = True @@ -118,7 +143,7 @@ class SubprocessVerifier(object): if de is not None: dpath = os.path.join(relpath, d) ret &= self._verify_one_file(os.path.join(dirpath, d), - dpath, de) + dpath, de) for f in filenames: # skip dotfiles @@ -132,14 +157,13 @@ class SubprocessVerifier(object): continue fe = dirdict.pop(f, None) ret &= self._verify_one_file(os.path.join(dirpath, f), - fpath, fe) - + fpath, fe) # check for missing files for f, e in dirdict.items(): fpath = os.path.join(relpath, f) ret &= self._verify_one_file(os.path.join(dirpath, f), - fpath, e) + fpath, e) return ret @@ -173,13 +197,21 @@ class ManifestRecursiveLoader(object): 'max_jobs', ] - def __init__(self, top_manifest_path, - verify_openpgp=None, openpgp_env=None, - sign_openpgp=None, openpgp_keyid=None, - hashes=None, allow_create=False, sort=None, - compress_watermark=None, compress_format=None, - profile=gemato.profile.DefaultProfile(), - max_jobs=None, allow_xdev=True): + def __init__(self, + top_manifest_path, + verify_openpgp=None, + openpgp_env=None, + sign_openpgp=None, + openpgp_keyid=None, + hashes=None, + allow_create=False, + sort=None, + compress_watermark=None, + compress_format=None, + profile=DefaultProfile(), + max_jobs=None, + allow_xdev=True, + ): """ Instantiate the loader for a Manifest tree starting at top-level Manifest @top_manifest_path. @@ -252,22 +284,27 @@ class ManifestRecursiveLoader(object): if self.compress_format is None: self.compress_format = 'gz' - self.manifest_loader = ManifestLoader(self.root_directory, - verify_openpgp, self.openpgp_env) + self.manifest_loader = ManifestLoader( + self.root_directory, verify_openpgp, self.openpgp_env) self.top_level_manifest_filename = os.path.basename( - top_manifest_path) + top_manifest_path) self.loaded_manifests = {} self.updated_manifests = set() self.manifest_device = None # TODO: allow catching OpenPGP exceptions somehow? m = self.load_manifest(self.top_level_manifest_filename, - allow_create=allow_create, store_dev=not allow_xdev) + allow_create=allow_create, + store_dev=not allow_xdev) self.openpgp_signed = m.openpgp_signed self.openpgp_signature = m.openpgp_signature - def load_manifest(self, relpath, verify_entry=None, - allow_create=False, store_dev=False): + def load_manifest(self, + relpath, + verify_entry=None, + allow_create=False, + store_dev=False, + ): """ Load a single Manifest file whose relative path within Manifest tree is @relpath. If @verify_entry is not null, the Manifest @@ -287,7 +324,7 @@ class ManifestRecursiveLoader(object): relpath, verify_entry) except IOError as err: if err.errno == errno.ENOENT and allow_create: - m = gemato.manifest.ManifestFile() + m = ManifestFile() path = os.path.join(self.root_directory, relpath) st = os.stat(os.path.dirname(path)) # trigger saving @@ -297,7 +334,7 @@ class ManifestRecursiveLoader(object): if relpath == 'Manifest': for ip in (self.profile .get_ignore_paths_for_new_manifest('')): - ie = gemato.manifest.ManifestEntryIGNORE(ip) + ie = ManifestEntryIGNORE(ip) m.entries.append(ie) else: raise err @@ -330,11 +367,13 @@ class ManifestRecursiveLoader(object): else: sign = False - with gemato.compression.open_potentially_compressed_path( - path, 'w', encoding='utf8') as f: - m.dump(f, sign_openpgp=sign, sort=sort, - openpgp_env=self.openpgp_env, - openpgp_keyid=self.openpgp_keyid) + with open_potentially_compressed_path(path, 'w', + encoding='utf8') as f: + m.dump(f, + sign_openpgp=sign, + sort=sort, + openpgp_env=self.openpgp_env, + openpgp_keyid=self.openpgp_keyid) return f.buffer.tell() def _iter_unordered_manifests_for_path(self, path, recursive=False): @@ -347,9 +386,9 @@ class ManifestRecursiveLoader(object): """ for k, v in self.loaded_manifests.items(): d = os.path.dirname(k) - if gemato.util.path_starts_with(path, d): + if path_starts_with(path, d): yield (k, d, v) - elif recursive and gemato.util.path_starts_with(d, path): + elif recursive and path_starts_with(d, path): yield (k, d, v) def _iter_manifests_for_path(self, path, recursive=False): @@ -380,7 +419,7 @@ class ManifestRecursiveLoader(object): unconditionally of whether they match parent checksums. """ - with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool: + with MultiprocessingPoolWrapper(self.max_jobs) as pool: # TODO: figure out how to avoid confusing uses of 'recursive' while True: to_load = [] @@ -395,15 +434,15 @@ class ManifestRecursiveLoader(object): mdir = os.path.dirname(mpath) if not verify: e = None - if gemato.util.path_starts_with(path, mdir): + if path_starts_with(path, mdir): to_load.append((mpath, e)) - elif recursive and gemato.util.path_starts_with(mdir, path): + elif recursive and path_starts_with(mdir, path): to_load.append((mpath, e)) if not to_load: break - manifests = pool.imap_unordered(self.manifest_loader, to_load, - chunksize=16) + manifests = pool.imap_unordered( + self.manifest_loader, to_load, chunksize=16) self.loaded_manifests.update(manifests) def find_timestamp(self): @@ -432,7 +471,7 @@ class ManifestRecursiveLoader(object): e.ts = ts else: m = self.loaded_manifests[self.top_level_manifest_filename] - e = gemato.manifest.ManifestEntryTIMESTAMP(ts) + e = ManifestEntryTIMESTAMP(ts) m.entries.append(e) def find_path_entry(self, path): @@ -448,7 +487,7 @@ class ManifestRecursiveLoader(object): # ignore matches recursively, so we process it separately # py<3.5 does not have os.path.commonpath() fullpath = os.path.join(relpath, e.path) - if gemato.util.path_starts_with(path, fullpath): + if path_starts_with(path, fullpath): return e elif e.tag in ('DIST', 'TIMESTAMP'): # distfiles are not local files, so skip them @@ -468,7 +507,7 @@ class ManifestRecursiveLoader(object): """ real_path = os.path.join(self.root_directory, relpath) path_entry = self.find_path_entry(relpath) - return gemato.verify.verify_path(real_path, path_entry) + return verify_path(real_path, path_entry) def assert_path_verifies(self, relpath): """ @@ -478,11 +517,10 @@ class ManifestRecursiveLoader(object): """ real_path = os.path.join(self.root_directory, relpath) path_entry = self.find_path_entry(relpath) - ret, diff = gemato.verify.verify_path(real_path, path_entry, - expected_dev=self.manifest_device) + ret, diff = verify_path(real_path, path_entry, + expected_dev=self.manifest_device) if not ret: - raise gemato.exceptions.ManifestMismatch( - relpath, path_entry, diff) + raise ManifestMismatch(relpath, path_entry, diff) def find_dist_entry(self, filename, relpath=''): """ @@ -519,8 +557,8 @@ class ManifestRecursiveLoader(object): self.load_manifests_for_path(path, recursive=True, verify=verify_manifests) out = {} - for mpath, relpath, m in self._iter_manifests_for_path(path, - recursive=True): + for mpath, relpath, m in self._iter_manifests_for_path( + path, recursive=True): for e in m.entries: if only_types is not None: if e.tag not in only_types: @@ -534,18 +572,19 @@ class ManifestRecursiveLoader(object): continue fullpath = os.path.join(relpath, e.path) - if gemato.util.path_starts_with(fullpath, path): + if path_starts_with(fullpath, path): dirpath = os.path.dirname(fullpath) filename = os.path.basename(e.path) dirout = out.setdefault(dirpath, {}) if filename in dirout: # compare the two entries - ret, diff = gemato.verify.verify_entry_compatibility( - dirout[filename], e) + ret, diff = verify_entry_compatibility( + dirout[filename], e) if not ret: - raise gemato.exceptions.ManifestIncompatibleEntry( - dirout[filename], e, diff) - # we need to construct a single entry with both checksums + raise ManifestIncompatibleEntry( + dirout[filename], e, diff) + # we need to construct a single entry with both + # checksums if diff: new_checksums = dict(e.checksums) for k, d1, d2 in diff: @@ -555,9 +594,11 @@ class ManifestRecursiveLoader(object): dirout[filename] = e return out - def assert_directory_verifies(self, path='', - fail_handler=gemato.util.throw_exception, - last_mtime=None): + def assert_directory_verifies(self, + path='', + fail_handler=throw_exception, + last_mtime=None, + ): """ Verify the complete directory tree starting at @path (relative to top Manifest directory). Includes testing for stray files. @@ -586,8 +627,8 @@ class ManifestRecursiveLoader(object): entry_dict = self.get_file_entry_dict(path) it = os.walk(os.path.join(self.root_directory, path), - onerror=gemato.util.throw_exception, - followlinks=True) + onerror=throw_exception, + followlinks=True) def _walk_directory(it): """ @@ -600,7 +641,7 @@ class ManifestRecursiveLoader(object): dir_st = os.stat(dirpath) if (self.manifest_device is not None and dir_st.st_dev != self.manifest_device): - raise gemato.exceptions.ManifestCrossDevice(dirpath) + raise ManifestCrossDevice(dirpath) dir_id = (dir_st.st_dev, dir_st.st_ino) # if this directory was already processed for one of its @@ -608,7 +649,7 @@ class ManifestRecursiveLoader(object): parent_dir = os.path.dirname(dirpath) parent_dir_ids = directory_ids.get(parent_dir, []) if dir_id in parent_dir_ids: - raise gemato.exceptions.ManifestSymlinkLoop(dirpath) + raise ManifestSymlinkLoop(dirpath) relpath = os.path.relpath(dirpath, self.root_directory) # strip dot to avoid matching problems @@ -648,10 +689,10 @@ class ManifestRecursiveLoader(object): self.manifest_device, fail_handler, last_mtime) - with gemato.util.MultiprocessingPoolWrapper(self.max_jobs) as pool: + with MultiprocessingPoolWrapper(self.max_jobs) as pool: # verify the directories in parallel - ret = all(pool.imap_unordered(verifier, _walk_directory(it), - chunksize=64)) + ret = all(pool.imap_unordered( + verifier, _walk_directory(it), chunksize=64)) # check for missing directories for relpath, dirdict in entry_dict.items(): @@ -662,8 +703,13 @@ class ManifestRecursiveLoader(object): return ret - def save_manifests(self, hashes=None, force=False, sort=None, - compress_watermark=None, compress_format=None): + def save_manifests(self, + hashes=None, + force=False, + sort=None, + compress_watermark=None, + compress_format=None, + ): """ Save the Manifests modified since the last save_manifests() call. @@ -687,7 +733,7 @@ class ManifestRecursiveLoader(object): will be compressed using @compress_format. The Manifest files whose size is smaller will be uncompressed. To compress all Manifest files, pass a size of 0. - + If @compress_watermark is None, the compression is left as-is. """ @@ -704,8 +750,8 @@ class ManifestRecursiveLoader(object): fixed_manifests = set() renamed_manifests = {} - for mpath, relpath, m in self._iter_manifests_for_path('', - recursive=True): + for mpath, relpath, m in self._iter_manifests_for_path( + '', recursive=True): for e in m.entries: if e.tag != 'MANIFEST': continue @@ -718,7 +764,7 @@ class ManifestRecursiveLoader(object): fullpath = renamed_manifests[fullpath] e.path = os.path.relpath(fullpath, relpath) - gemato.verify.update_entry_for_path( + update_entry_for_path( os.path.join(self.root_directory, fullpath), e, hashes=hashes, @@ -735,8 +781,7 @@ class ManifestRecursiveLoader(object): unc_size = self.save_manifest(mpath, sort=sort) # let's see if we want to recompress it if compress_watermark is not None: - compr = (gemato.compression - .get_compressed_suffix_from_filename(mpath)) + compr = get_compressed_suffix_from_filename(mpath) is_compr = compr is not None want_compr = self.profile.want_compressed_manifest( mpath, m, unc_size, compress_watermark) @@ -752,7 +797,7 @@ class ManifestRecursiveLoader(object): self.save_manifest(new_mpath) del self.loaded_manifests[mpath] os.unlink(os.path.join(self.root_directory, - mpath)) + mpath)) renamed_manifests[mpath] = new_mpath if mpath == self.top_level_manifest_filename: @@ -766,11 +811,13 @@ class ManifestRecursiveLoader(object): self.updated_manifests.discard(self.top_level_manifest_filename) # at this point, the list should be empty assert not self.updated_manifests, ( - "Unlinked but updated Manifests: {}".format( - self.updated_manifests)) + f'Unlinked but updated Manifests: {self.updated_manifests}') - def update_entry_for_path(self, path, new_entry_type='DATA', - hashes=None): + def update_entry_for_path(self, + path, + new_entry_type='DATA', + hashes=None, + ): """ Update the Manifest entries for @path and queue the containing Manifests for update. @path must not be covered by IGNORE. @@ -814,7 +861,7 @@ class ManifestRecursiveLoader(object): # ignore matches recursively, so we process it separately # py<3.5 does not have os.path.commonpath() fullpath = os.path.join(relpath, e.path) - assert not gemato.util.path_starts_with(path, fullpath) + assert not path_starts_with(path, fullpath) elif e.tag in ('DIST', 'TIMESTAMP'): # distfiles are not local files, so skip them # timestamp is not a file ;-) @@ -832,13 +879,12 @@ class ManifestRecursiveLoader(object): continue try: - gemato.verify.update_entry_for_path( - os.path.join(self.root_directory, - fullpath), + update_entry_for_path( + os.path.join(self.root_directory, fullpath), e, hashes=hashes, expected_dev=self.manifest_device) - except gemato.exceptions.ManifestInvalidPath as err: + except ManifestInvalidPath as err: if err.detail[0] == '__exists__': # file does not exist anymore, so remove # the entry @@ -863,13 +909,11 @@ class ManifestRecursiveLoader(object): newpath = os.path.relpath(path, relpath) if new_entry_type == 'AUX': # AUX has implicit files/ prefix - assert gemato.util.path_inside_dir(newpath, - 'files') + assert path_inside_dir(newpath, 'files') # drop files/ prefix newpath = os.path.relpath(newpath, 'files') - e = gemato.manifest.new_manifest_entry( - new_entry_type, newpath, 0, {}) - gemato.verify.update_entry_for_path( + e = new_manifest_entry(new_entry_type, newpath, 0, {}) + update_entry_for_path( os.path.join(self.root_directory, path), e, hashes=hashes, @@ -879,8 +923,10 @@ class ManifestRecursiveLoader(object): had_entry = True break - def get_deduplicated_file_entry_dict_for_update(self, path='', - verify_manifests=True): + def get_deduplicated_file_entry_dict_for_update(self, + path='', + verify_manifests=True, + ): """ Find all file entries that apply to paths starting with @path. Remove all duplicate entries and queue the relevant Manifests @@ -906,8 +952,8 @@ class ManifestRecursiveLoader(object): self.load_manifests_for_path(path, recursive=True, verify=verify_manifests) out = {} - for mpath, relpath, m in self._iter_manifests_for_path(path, - recursive=True): + for mpath, relpath, m in self._iter_manifests_for_path( + path, recursive=True): entries_to_remove = [] for e in m.entries: if e.tag in ('DIST', 'TIMESTAMP'): @@ -916,16 +962,15 @@ class ManifestRecursiveLoader(object): continue fullpath = os.path.join(relpath, e.path) - if gemato.util.path_starts_with(fullpath, path): + if path_starts_with(fullpath, path): if fullpath in out: # compare the two entries - ret, diff = gemato.verify.verify_entry_compatibility( - out[fullpath][1], e) + ret, diff = verify_entry_compatibility( + out[fullpath][1], e) # if semantically incompatible, throw if not ret and diff[0][0] == '__type__': - raise (gemato.exceptions - .ManifestIncompatibleEntry( - out[fullpath][1], e, diff)) + raise ManifestIncompatibleEntry( + out[fullpath][1], e, diff) # otherwise, make sure we have all checksums out[fullpath][1].checksums.update(e.checksums) # and drop the duplicate @@ -957,22 +1002,23 @@ class ManifestRecursiveLoader(object): doing updates. """ - manifest_filenames = (gemato.compression - .get_potential_compressed_names('Manifest')) + manifest_filenames = get_potential_compressed_names('Manifest') - entry_dict = self.get_file_entry_dict(path, - only_types=['IGNORE'], verify_manifests=verify_manifests) + entry_dict = self.get_file_entry_dict( + path, + only_types=['IGNORE'], + verify_manifests=verify_manifests) new_manifests = [] directory_ids = {} it = os.walk(os.path.join(self.root_directory, path), - onerror=gemato.util.throw_exception, - followlinks=True) + onerror=throw_exception, + followlinks=True) for dirpath, dirnames, filenames in it: dir_st = os.stat(dirpath) if (self.manifest_device is not None and dir_st.st_dev != self.manifest_device): - raise gemato.exceptions.ManifestCrossDevice(dirpath) + raise ManifestCrossDevice(dirpath) dir_id = (dir_st.st_dev, dir_st.st_ino) # if this directory was already processed for one of its @@ -980,7 +1026,7 @@ class ManifestRecursiveLoader(object): parent_dir = os.path.dirname(dirpath) parent_dir_ids = directory_ids.get(parent_dir, []) if dir_id in parent_dir_ids: - raise gemato.exceptions.ManifestSymlinkLoop(dirpath) + raise ManifestSymlinkLoop(dirpath) relpath = os.path.relpath(dirpath, self.root_directory) # strip dot to avoid matching problems @@ -1020,7 +1066,7 @@ class ManifestRecursiveLoader(object): # let's try to load it try: self.load_manifest(fpath) - except gemato.exceptions.ManifestSyntaxError: + except ManifestSyntaxError: # syntax error? probably not a Manifest then. pass else: @@ -1028,9 +1074,12 @@ class ManifestRecursiveLoader(object): return new_manifests - - def update_entries_for_directory(self, path='', hashes=None, - last_mtime=None, verify_manifests=False): + def update_entries_for_directory(self, + path='', + hashes=None, + last_mtime=None, + verify_manifests=False, + ): """ Update the Manifest entries for the contents of directory @path (top directory by default), recursively. Includes adding @@ -1063,29 +1112,27 @@ class ManifestRecursiveLoader(object): hashes = self.hashes assert hashes is not None - manifest_filenames = (gemato.compression - .get_potential_compressed_names('Manifest')) + manifest_filenames = get_potential_compressed_names('Manifest') - new_manifests = self.load_unregistered_manifests(path, - verify_manifests=verify_manifests) + new_manifests = self.load_unregistered_manifests( + path, verify_manifests=verify_manifests) entry_dict = self.get_deduplicated_file_entry_dict_for_update( - path, verify_manifests=verify_manifests) + path, verify_manifests=verify_manifests) manifest_stack = [] - for mpath, mrpath, m in (self - ._iter_manifests_for_path(path)): + for mpath, mrpath, m in (self._iter_manifests_for_path(path)): manifest_stack.append((mpath, mrpath, m)) break directory_ids = {} it = os.walk(os.path.join(self.root_directory, path), - onerror=gemato.util.throw_exception, - followlinks=True) + onerror=throw_exception, + followlinks=True) for dirpath, dirnames, filenames in it: dir_st = os.stat(dirpath) if (self.manifest_device is not None and dir_st.st_dev != self.manifest_device): - raise gemato.exceptions.ManifestCrossDevice(dirpath) + raise ManifestCrossDevice(dirpath) dir_id = (dir_st.st_dev, dir_st.st_ino) # if this directory was already processed for one of its @@ -1093,7 +1140,7 @@ class ManifestRecursiveLoader(object): parent_dir = os.path.dirname(dirpath) parent_dir_ids = directory_ids.get(parent_dir, []) if dir_id in parent_dir_ids: - raise gemato.exceptions.ManifestSymlinkLoop(dirpath) + raise ManifestSymlinkLoop(dirpath) relpath = os.path.relpath(dirpath, self.root_directory) # strip dot to avoid matching problems @@ -1101,8 +1148,7 @@ class ManifestRecursiveLoader(object): relpath = '' # drop Manifest paths until we get to a common directory - while not gemato.util.path_starts_with(relpath, - manifest_stack[-1][1]): + while not path_starts_with(relpath, manifest_stack[-1][1]): manifest_stack.pop() want_manifest = self.profile.want_manifest_in_directory( @@ -1124,11 +1170,10 @@ class ManifestRecursiveLoader(object): skip_dirs.append(d) else: # trigger the exception indirectly - gemato.verify.update_entry_for_path( - os.path.join(dirpath, d), - de, - hashes=hashes, - expected_dev=self.manifest_device) + update_entry_for_path(os.path.join(dirpath, d), + de, + hashes=hashes, + expected_dev=self.manifest_device) assert False, "exception should have been raised" # skip scanning ignored directories @@ -1150,8 +1195,8 @@ class ManifestRecursiveLoader(object): if fe.tag == 'IGNORE': continue if fe.tag == 'MANIFEST': - manifest_stack.append((fpath, relpath, - self.loaded_manifests[fpath])) + manifest_stack.append( + (fpath, relpath, self.loaded_manifests[fpath])) # do not update the Manifest entry if # the relevant Manifest is going to be updated # anyway @@ -1164,20 +1209,19 @@ class ManifestRecursiveLoader(object): continue if fpath in new_manifests: ftype = 'MANIFEST' - manifest_stack.append((fpath, relpath, - self.loaded_manifests[fpath])) + manifest_stack.append( + (fpath, relpath, self.loaded_manifests[fpath])) else: ftype = self.profile.get_entry_type_for_path( fpath) # note: .path needs to be corrected below - fe = gemato.manifest.new_manifest_entry(ftype, - fpath, 0, {}) + fe = new_manifest_entry(ftype, fpath, 0, {}) new_entries.append(fe) if relpath in self.updated_manifests: continue - changed = gemato.verify.update_entry_for_path( + changed = update_entry_for_path( os.path.join(dirpath, f), fe, hashes=hashes, @@ -1192,17 +1236,18 @@ class ManifestRecursiveLoader(object): mpath = os.path.join(relpath, 'Manifest') m = self.create_manifest(mpath) manifest_stack.append((mpath, relpath, m)) - fe = gemato.manifest.ManifestEntryMANIFEST( - mpath, 0, {}) + fe = ManifestEntryMANIFEST(mpath, 0, {}) new_entries.append(fe) for ip in (self.profile .get_ignore_paths_for_new_manifest(relpath)): - ie = gemato.manifest.ManifestEntryIGNORE(ip) + ie = ManifestEntryIGNORE(ip) iep = os.path.join(relpath, ip) if self.find_path_entry(iep): - raise NotImplementedError('Need to remove old parent entry for now-ignored path') + raise NotImplementedError( + 'Need to remove old parent entry for ' + 'now-ignored path') m.entries.append(ie) new_ignore_paths.append(iep) @@ -1233,12 +1278,11 @@ class ManifestRecursiveLoader(object): # but for now, we've shoved our path # into .aux_path fe.path = os.path.relpath(fe.aux_path, - mdirpath) - assert gemato.util.path_inside_dir( - fe.path, 'files') + mdirpath) + assert path_inside_dir(fe.path, 'files') # drop files/ prefix for the entry - fe.aux_path = os.path.relpath(fe.path, - 'files') + fe.aux_path = os.path.relpath( + fe.path, 'files') else: fe.path = os.path.relpath(fe.path, mdirpath) # do not add duplicate entry if the path is ignored diff --git a/gemato/verify.py b/gemato/verify.py index 8a90506..707a43f 100644 --- a/gemato/verify.py +++ b/gemato/verify.py @@ -1,6 +1,6 @@ # gemato: File verification routines # vim:fileencoding=utf-8 -# (c) 2017 Michał Górny +# (c) 2017-2020 Michał Górny # Licensed under the terms of 2-clause BSD license import contextlib @@ -10,9 +10,12 @@ import io import os import stat -import gemato.exceptions -import gemato.hash -import gemato.manifest +from gemato.exceptions import ( + ManifestCrossDevice, + ManifestInvalidPath, + ) +from gemato.hash import hash_file +from gemato.manifest import manifest_hashes_to_hashlib def get_file_metadata(path, hashes): @@ -37,7 +40,7 @@ def get_file_metadata(path, hashes): try: # we want O_NONBLOCK to avoid blocking when opening pipes - fd = os.open(path, os.O_RDONLY|os.O_NONBLOCK) + fd = os.open(path, os.O_RDONLY | os.O_NONBLOCK) except OSError as err: if err.errno == errno.ENOENT: exists = False @@ -98,7 +101,7 @@ def get_file_metadata(path, hashes): yield st.st_mtime f = io.open(fd, 'rb') - except: + except Exception: if opened: os.close(fd) raise @@ -110,11 +113,10 @@ def get_file_metadata(path, hashes): # 5. checksums e_hashes = sorted(hashes) - hashes = list(gemato.manifest.manifest_hashes_to_hashlib(e_hashes)) + hashes = list(manifest_hashes_to_hashlib(e_hashes)) e_hashes.append('__size__') hashes.append('__size__') - checksums = gemato.hash.hash_file(f, hashes, - _apparent_size=st.st_size) + checksums = hash_file(f, hashes, _apparent_size=st.st_size) ret = {} for ek, k in zip(e_hashes, hashes): @@ -175,7 +177,7 @@ def verify_path(path, e, expected_dev=None, last_mtime=None): # 2. check for xdev condition st_dev = next(g) if expected_dev is not None and st_dev != expected_dev: - raise gemato.exceptions.ManifestCrossDevice(path) + raise ManifestCrossDevice(path) # 3. verify whether the file is a regular file ifmt, ftype = next(g) @@ -216,7 +218,7 @@ def verify_path(path, e, expected_dev=None, last_mtime=None): def update_entry_for_path(path, e, hashes=None, expected_dev=None, - last_mtime=None): + last_mtime=None): """ Update the data in entry @e to match the current state of file at path @path. Uses hashes listed in @hashes (using Manifest names), @@ -248,19 +250,17 @@ def update_entry_for_path(path, e, hashes=None, expected_dev=None, # 1. verify whether the file existed in the first place exists = next(g) if not exists: - raise gemato.exceptions.ManifestInvalidPath(path, - ('__exists__', exists)) + raise ManifestInvalidPath(path, ('__exists__', exists)) # 2. check for xdev condition st_dev = next(g) if expected_dev is not None and st_dev != expected_dev: - raise gemato.exceptions.ManifestCrossDevice(path) + raise ManifestCrossDevice(path) # 3. verify whether the file is a regular file ifmt, ftype = next(g) if not stat.S_ISREG(ifmt): - raise gemato.exceptions.ManifestInvalidPath(path, - ('__type__', ftype)) + raise ManifestInvalidPath(path, ('__type__', ftype)) # 4. get the apparent file size st_size = next(g) @@ -276,8 +276,9 @@ def update_entry_for_path(path, e, hashes=None, expected_dev=None, checksums = next(g) size = checksums.pop('__size__') if st_size != 0: - assert st_size == size, ('Apparent size (st_size = {}) and real size ({}) are different!' - .format(st_size, size)) + assert st_size == size, ( + f'Apparent size (st_size = {st_size}) and real size ' + f'({size}) are different!') if e.size != size or e.checksums != checksums: e.size = size @@ -6,9 +6,11 @@ skipsdist = True [testenv:qa] deps = + pycodestyle pyflakes commands = pyflakes {posargs:gemato tests} + pycodestyle {posargs:gemato} [testenv] deps = |