summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gemato/recursiveloader.py33
-rw-r--r--tests/test_recursiveloader.py57
2 files changed, 87 insertions, 3 deletions
diff --git a/gemato/recursiveloader.py b/gemato/recursiveloader.py
index 8c994fb..aea47f5 100644
--- a/gemato/recursiveloader.py
+++ b/gemato/recursiveloader.py
@@ -1,6 +1,6 @@
# gemato: Recursive loader for Manifests
# vim:fileencoding=utf-8
-# (c) 2017-2020 Michał Górny
+# (c) 2017-2022 Michał Górny
# Licensed under the terms of 2-clause BSD license
import os.path
@@ -18,6 +18,7 @@ from gemato.exceptions import (
ManifestSymlinkLoop,
ManifestInvalidPath,
ManifestSyntaxError,
+ ManifestInsecureHashes,
)
from gemato.manifest import (
ManifestFile,
@@ -25,6 +26,7 @@ from gemato.manifest import (
ManifestEntryTIMESTAMP,
new_manifest_entry,
ManifestEntryMANIFEST,
+ is_hash_secure,
)
from gemato.profile import DefaultProfile
from gemato.util import (
@@ -188,6 +190,7 @@ class ManifestRecursiveLoader:
'compress_watermark',
'compress_format',
'profile',
+ 'require_secure_hashes',
# internal variables
'manifest_loader',
'top_level_manifest_filename',
@@ -211,6 +214,7 @@ class ManifestRecursiveLoader:
profile=DefaultProfile(),
max_jobs=None,
allow_xdev=True,
+ require_secure_hashes=False,
):
"""
Instantiate the loader for a Manifest tree starting at top-level
@@ -262,8 +266,15 @@ class ManifestRecursiveLoader:
across different filesystem. If it is false, gemato will raise
an exception upon crossing filesystem boundaries. It defaults
to false.
+
+ If @require_secure_hashes is True, only secure hashes can be used.
"""
+ if require_secure_hashes and hashes is not None:
+ insecure = list(filter(lambda x: not is_hash_secure(x), hashes))
+ if insecure:
+ raise ManifestInsecureHashes(insecure)
+
self.root_directory = os.path.dirname(top_manifest_path)
self.openpgp_env = openpgp_env
self.sign_openpgp = sign_openpgp
@@ -274,6 +285,7 @@ class ManifestRecursiveLoader:
self.compress_watermark = compress_watermark
self.compress_format = compress_format
self.max_jobs = max_jobs
+ self.require_secure_hashes = require_secure_hashes
self.profile.set_loader_options(self)
@@ -739,6 +751,11 @@ class ManifestRecursiveLoader:
if hashes is None:
hashes = self.hashes
+ if self.require_secure_hashes and hashes is not None:
+ insecure = list(filter(lambda x: not is_hash_secure(x), hashes))
+ if insecure:
+ raise ManifestInsecureHashes(insecure)
+
if sort is None:
sort = self.sort
if compress_watermark is None:
@@ -768,7 +785,8 @@ class ManifestRecursiveLoader:
os.path.join(self.root_directory, fullpath),
e,
hashes=hashes,
- expected_dev=self.manifest_device)
+ expected_dev=self.manifest_device,
+ require_secure_hashes=self.require_secure_hashes)
# do not remove it from self.updated_manifests
# immediately as we may have to deal with multiple
@@ -852,6 +870,10 @@ class ManifestRecursiveLoader:
had_entry = False
if hashes is None:
hashes = self.hashes
+ if self.require_secure_hashes and hashes is not None:
+ insecure = list(filter(lambda x: not is_hash_secure(x), hashes))
+ if insecure:
+ raise ManifestInsecureHashes(insecure)
self.load_manifests_for_path(path)
for mpath, relpath, m in self._iter_manifests_for_path(path):
@@ -883,7 +905,8 @@ class ManifestRecursiveLoader:
os.path.join(self.root_directory, fullpath),
e,
hashes=hashes,
- expected_dev=self.manifest_device)
+ expected_dev=self.manifest_device,
+ require_secure_hashes=self.require_secure_hashes)
except ManifestInvalidPath as err:
if err.detail[0] == '__exists__':
# file does not exist anymore, so remove
@@ -1119,6 +1142,10 @@ class ManifestRecursiveLoader:
if hashes is None:
hashes = self.hashes
assert hashes is not None
+ if self.require_secure_hashes:
+ insecure = list(filter(lambda x: not is_hash_secure(x), hashes))
+ if insecure:
+ raise ManifestInsecureHashes(insecure)
manifest_filenames = get_potential_compressed_names('Manifest')
diff --git a/tests/test_recursiveloader.py b/tests/test_recursiveloader.py
index 6bd66d4..f00deb9 100644
--- a/tests/test_recursiveloader.py
+++ b/tests/test_recursiveloader.py
@@ -4,6 +4,7 @@
# Licensed under the terms of 2-clause BSD license
import base64
+import contextlib
import datetime
import gzip
import itertools
@@ -21,6 +22,7 @@ from gemato.exceptions import (
ManifestCrossDevice,
ManifestSymlinkLoop,
ManifestNoSupportedHashes,
+ ManifestInsecureHashes,
)
from gemato.manifest import ManifestPathEntry, ManifestFileEntry
from gemato.recursiveloader import ManifestRecursiveLoader
@@ -2457,3 +2459,58 @@ def test_update_mtime(layout_factory, last_mtime, manifest_update):
expected = dict(layout.MANIFESTS)
expected.update(manifest_update)
assert output == expected
+
+
+@pytest.mark.parametrize(
+ "hashes_arg,insecure",
+ [("MD5", True),
+ ("SHA1", True),
+ ("SHA512", False),
+ ("SHA1 SHA512", True),
+ ])
+def test_insecure_hashes(layout_factory, hashes_arg, insecure):
+ layout = BasicTestLayout
+ tmp_path = layout_factory.create(layout)
+ ctx = (pytest.raises(ManifestInsecureHashes) if insecure
+ else contextlib.nullcontext())
+ with ctx:
+ ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST,
+ hashes=hashes_arg.split(),
+ allow_xdev=False,
+ require_secure_hashes=True)
+
+
+@pytest.mark.parametrize(
+ "hashes_arg,insecure",
+ [("MD5", True),
+ ("SHA1", True),
+ ("SHA512", False),
+ ("SHA1 SHA512", True),
+ ])
+@pytest.mark.parametrize(
+ "func,arg",
+ [(ManifestRecursiveLoader.update_entry_for_path, "sub/deeper/test"),
+ (ManifestRecursiveLoader.update_entries_for_directory, "sub/deeper"),
+ ])
+def test_insecure_hashes_update(layout_factory, hashes_arg, insecure, func,
+ arg):
+ layout = BasicTestLayout
+ tmp_path = layout_factory.create(layout)
+ m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST,
+ hashes=["SHA512"],
+ allow_xdev=False,
+ require_secure_hashes=True)
+ ctx = (pytest.raises(ManifestInsecureHashes) if insecure
+ else contextlib.nullcontext())
+ with ctx:
+ func(m, arg, hashes=hashes_arg.split())
+
+
+def test_insecure_hashes_update_no_arg(layout_factory):
+ layout = BasicTestLayout
+ tmp_path = layout_factory.create(layout)
+ m = ManifestRecursiveLoader(tmp_path / layout.TOP_MANIFEST,
+ allow_xdev=False,
+ require_secure_hashes=True)
+ with pytest.raises(ManifestInsecureHashes):
+ m.update_entry_for_path("sub/deeper/test")