summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichał Górny <mgorny@gentoo.org>2017-10-25 23:18:31 +0200
committerMichał Górny <mgorny@gentoo.org>2017-10-26 16:20:44 +0200
commit5460f2c432ec0baae1757917ff12e8a105bc4900 (patch)
tree8b6bbbedeee3eee315594bd5e0edfe83edf243b7
parentb26020978de77ad31b4c4885c38b857831d85635 (diff)
downloadgemato-5460f2c432ec0baae1757917ff12e8a105bc4900.tar.gz
find_top_level: Support compressed Manifests
-rw-r--r--gemato/find_top_level.py43
-rw-r--r--tests/test_find_top_level.py103
2 files changed, 127 insertions, 19 deletions
diff --git a/gemato/find_top_level.py b/gemato/find_top_level.py
index 41c5c66..c28cb39 100644
--- a/gemato/find_top_level.py
+++ b/gemato/find_top_level.py
@@ -4,10 +4,10 @@
# Licensed under the terms of 2-clause BSD license
import errno
-import io
import os
import os.path
+import gemato.compression
import gemato.manifest
@@ -35,26 +35,31 @@ def find_top_level_manifest(path='.'):
break
m_path = os.path.join(cur_path, 'Manifest')
- try:
- with io.open(m_path, 'r', encoding='utf8') as f:
- fst = os.fstat(f.fileno())
- if fst.st_dev != original_dev:
- break
+ for m_path in (gemato.compression
+ .get_potential_compressed_names(m_path)):
+ try:
+ with (gemato.compression
+ .open_potentially_compressed_path(m_path, 'r',
+ encoding='utf8')) as f:
+ fst = os.fstat(f.fileno())
+ if fst.st_dev != original_dev:
+ return last_found
- m.load(f)
- except IOError as e:
- if e.errno != errno.ENOENT:
- raise
- else:
- # check if the initial path is ignored
- relpath = os.path.relpath(path, cur_path)
- if relpath == '.':
- relpath = ''
- fe = m.find_path_entry(relpath)
- if isinstance(fe, gemato.manifest.ManifestEntryIGNORE):
- break
+ m.load(f)
+ except IOError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ else:
+ # check if the initial path is ignored
+ relpath = os.path.relpath(path, cur_path)
+ if relpath == '.':
+ relpath = ''
+ fe = m.find_path_entry(relpath)
+ if isinstance(fe, gemato.manifest.ManifestEntryIGNORE):
+ return last_found
- last_found = m_path
+ last_found = m_path
+ break
# check if we reached root directory
if st.st_dev == root_st.st_dev and st.st_ino == root_st.st_ino:
diff --git a/tests/test_find_top_level.py b/tests/test_find_top_level.py
index 5cb349d..deaee9c 100644
--- a/tests/test_find_top_level.py
+++ b/tests/test_find_top_level.py
@@ -3,6 +3,7 @@
# (c) 2017 Michał Górny
# Licensed under the terms of 2-clause BSD license
+import gzip
import os
import os.path
import unittest
@@ -195,3 +196,105 @@ class TestCrossDeviceManifest(TempDirTestCase):
self.assertIsNone(
gemato.find_top_level.find_top_level_manifest(
os.path.join(self.dir, 'sub')))
+
+
+class TestCompressedManifest(TempDirTestCase):
+ """
+ Test for finding compressed Manifest in a plain tree.
+ """
+
+ DIRS = ['suba', 'subb', 'subc', 'subc/sub']
+ FILES = {
+ 'subb/Manifest': u'',
+ }
+
+ def setUp(self):
+ super(TestCompressedManifest, self).setUp()
+ with gzip.GzipFile(os.path.join(self.dir, 'Manifest.gz'), 'wb'):
+ pass
+ with gzip.GzipFile(os.path.join(self.dir, 'subc/sub/Manifest.gz'), 'wb'):
+ pass
+
+ def tearDown(self):
+ os.unlink(os.path.join(self.dir, 'subc/sub/Manifest.gz'))
+ os.unlink(os.path.join(self.dir, 'Manifest.gz'))
+ super(TestCompressedManifest, self).tearDown()
+
+ def test_find_top_level_manifest(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(self.dir),
+ self.dir),
+ 'Manifest.gz')
+
+ def test_find_top_level_manifest_from_empty_subdir(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(
+ os.path.join(self.dir, 'suba')),
+ self.dir),
+ 'Manifest.gz')
+
+ def test_find_top_level_manifest_from_manifest_subdir(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(
+ os.path.join(self.dir, 'subb')),
+ self.dir),
+ 'Manifest.gz')
+
+ def test_find_top_level_manifest_from_deep_manifest_subdir(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(
+ os.path.join(self.dir, 'subc', 'sub')),
+ self.dir),
+ 'Manifest.gz')
+
+
+class TestCompressedManifestWithIgnore(TempDirTestCase):
+ DIRS = ['suba', 'subb', 'subc', 'subc/sub']
+ FILES = {
+ 'subb/Manifest': u'',
+ }
+
+ def setUp(self):
+ super(TestCompressedManifestWithIgnore, self).setUp()
+ with gzip.GzipFile(os.path.join(self.dir, 'Manifest.gz'), 'wb') as f:
+ f.write(b'IGNORE suba\n')
+ f.write(b'IGNORE subc\n')
+ with gzip.GzipFile(os.path.join(self.dir, 'subc/sub/Manifest.gz'), 'wb'):
+ pass
+
+ def tearDown(self):
+ os.unlink(os.path.join(self.dir, 'subc/sub/Manifest.gz'))
+ os.unlink(os.path.join(self.dir, 'Manifest.gz'))
+ super(TestCompressedManifestWithIgnore, self).tearDown()
+
+ def test_find_top_level_manifest(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(self.dir),
+ self.dir),
+ 'Manifest.gz')
+
+ def test_find_top_level_manifest_from_ignored_empty_subdir(self):
+ self.assertIsNone(
+ gemato.find_top_level.find_top_level_manifest(
+ os.path.join(self.dir, 'suba')))
+
+ def test_find_top_level_manifest_from_manifest_subdir(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(
+ os.path.join(self.dir, 'subb')),
+ self.dir),
+ 'Manifest.gz')
+
+ def test_find_top_level_manifest_from_deep_ignored_subdir(self):
+ self.assertEqual(
+ os.path.relpath(
+ gemato.find_top_level.find_top_level_manifest(
+ os.path.join(self.dir, 'subc', 'sub')),
+ self.dir),
+ 'subc/sub/Manifest.gz')