diff options
-rw-r--r-- | gemato/find_top_level.py | 43 | ||||
-rw-r--r-- | tests/test_find_top_level.py | 103 |
2 files changed, 127 insertions, 19 deletions
diff --git a/gemato/find_top_level.py b/gemato/find_top_level.py index 41c5c66..c28cb39 100644 --- a/gemato/find_top_level.py +++ b/gemato/find_top_level.py @@ -4,10 +4,10 @@ # Licensed under the terms of 2-clause BSD license import errno -import io import os import os.path +import gemato.compression import gemato.manifest @@ -35,26 +35,31 @@ def find_top_level_manifest(path='.'): break m_path = os.path.join(cur_path, 'Manifest') - try: - with io.open(m_path, 'r', encoding='utf8') as f: - fst = os.fstat(f.fileno()) - if fst.st_dev != original_dev: - break + for m_path in (gemato.compression + .get_potential_compressed_names(m_path)): + try: + with (gemato.compression + .open_potentially_compressed_path(m_path, 'r', + encoding='utf8')) as f: + fst = os.fstat(f.fileno()) + if fst.st_dev != original_dev: + return last_found - m.load(f) - except IOError as e: - if e.errno != errno.ENOENT: - raise - else: - # check if the initial path is ignored - relpath = os.path.relpath(path, cur_path) - if relpath == '.': - relpath = '' - fe = m.find_path_entry(relpath) - if isinstance(fe, gemato.manifest.ManifestEntryIGNORE): - break + m.load(f) + except IOError as e: + if e.errno != errno.ENOENT: + raise + else: + # check if the initial path is ignored + relpath = os.path.relpath(path, cur_path) + if relpath == '.': + relpath = '' + fe = m.find_path_entry(relpath) + if isinstance(fe, gemato.manifest.ManifestEntryIGNORE): + return last_found - last_found = m_path + last_found = m_path + break # check if we reached root directory if st.st_dev == root_st.st_dev and st.st_ino == root_st.st_ino: diff --git a/tests/test_find_top_level.py b/tests/test_find_top_level.py index 5cb349d..deaee9c 100644 --- a/tests/test_find_top_level.py +++ b/tests/test_find_top_level.py @@ -3,6 +3,7 @@ # (c) 2017 Michał Górny # Licensed under the terms of 2-clause BSD license +import gzip import os import os.path import unittest @@ -195,3 +196,105 @@ class TestCrossDeviceManifest(TempDirTestCase): self.assertIsNone( gemato.find_top_level.find_top_level_manifest( os.path.join(self.dir, 'sub'))) + + +class TestCompressedManifest(TempDirTestCase): + """ + Test for finding compressed Manifest in a plain tree. + """ + + DIRS = ['suba', 'subb', 'subc', 'subc/sub'] + FILES = { + 'subb/Manifest': u'', + } + + def setUp(self): + super(TestCompressedManifest, self).setUp() + with gzip.GzipFile(os.path.join(self.dir, 'Manifest.gz'), 'wb'): + pass + with gzip.GzipFile(os.path.join(self.dir, 'subc/sub/Manifest.gz'), 'wb'): + pass + + def tearDown(self): + os.unlink(os.path.join(self.dir, 'subc/sub/Manifest.gz')) + os.unlink(os.path.join(self.dir, 'Manifest.gz')) + super(TestCompressedManifest, self).tearDown() + + def test_find_top_level_manifest(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest(self.dir), + self.dir), + 'Manifest.gz') + + def test_find_top_level_manifest_from_empty_subdir(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest( + os.path.join(self.dir, 'suba')), + self.dir), + 'Manifest.gz') + + def test_find_top_level_manifest_from_manifest_subdir(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest( + os.path.join(self.dir, 'subb')), + self.dir), + 'Manifest.gz') + + def test_find_top_level_manifest_from_deep_manifest_subdir(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest( + os.path.join(self.dir, 'subc', 'sub')), + self.dir), + 'Manifest.gz') + + +class TestCompressedManifestWithIgnore(TempDirTestCase): + DIRS = ['suba', 'subb', 'subc', 'subc/sub'] + FILES = { + 'subb/Manifest': u'', + } + + def setUp(self): + super(TestCompressedManifestWithIgnore, self).setUp() + with gzip.GzipFile(os.path.join(self.dir, 'Manifest.gz'), 'wb') as f: + f.write(b'IGNORE suba\n') + f.write(b'IGNORE subc\n') + with gzip.GzipFile(os.path.join(self.dir, 'subc/sub/Manifest.gz'), 'wb'): + pass + + def tearDown(self): + os.unlink(os.path.join(self.dir, 'subc/sub/Manifest.gz')) + os.unlink(os.path.join(self.dir, 'Manifest.gz')) + super(TestCompressedManifestWithIgnore, self).tearDown() + + def test_find_top_level_manifest(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest(self.dir), + self.dir), + 'Manifest.gz') + + def test_find_top_level_manifest_from_ignored_empty_subdir(self): + self.assertIsNone( + gemato.find_top_level.find_top_level_manifest( + os.path.join(self.dir, 'suba'))) + + def test_find_top_level_manifest_from_manifest_subdir(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest( + os.path.join(self.dir, 'subb')), + self.dir), + 'Manifest.gz') + + def test_find_top_level_manifest_from_deep_ignored_subdir(self): + self.assertEqual( + os.path.relpath( + gemato.find_top_level.find_top_level_manifest( + os.path.join(self.dir, 'subc', 'sub')), + self.dir), + 'subc/sub/Manifest.gz') |