From e0a061955bba69aaf28022d405362c8a5e444ff6 Mon Sep 17 00:00:00 2001 From: Andrei Zeliankou Date: Wed, 5 May 2021 12:36:57 +0100 Subject: [PATCH] Tests: added tests for openat2() features. --- test/conftest.py | 2 + test/test_share_chroot.py | 108 ++++++++++++++++++++++++++++ test/test_share_mount.py | 142 +++++++++++++++++++++++++++++++++++++ test/test_share_symlink.py | 96 +++++++++++++++++++++++++ test/test_static.py | 8 --- test/unit/check/chroot.py | 32 +++++++++ 6 files changed, 380 insertions(+), 8 deletions(-) create mode 100644 test/test_share_chroot.py create mode 100644 test/test_share_mount.py create mode 100644 test/test_share_symlink.py create mode 100644 test/unit/check/chroot.py diff --git a/test/conftest.py b/test/conftest.py index 0eaf54be..a084953a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -15,6 +15,7 @@ from multiprocessing import Process import pytest +from unit.check.chroot import check_chroot from unit.check.go import check_go from unit.check.isolation import check_isolation from unit.check.node import check_node @@ -204,6 +205,7 @@ def pytest_sessionstart(session): k: v for k, v in option.available['modules'].items() if v is not None } + check_chroot() check_isolation() _clear_conf(unit['temp_dir'] + '/control.unit.sock') diff --git a/test/test_share_chroot.py b/test/test_share_chroot.py new file mode 100644 index 00000000..7e53d3f7 --- /dev/null +++ b/test/test_share_chroot.py @@ -0,0 +1,108 @@ +import os +from pathlib import Path + +import pytest + +from unit.applications.proto import TestApplicationProto + + +class TestShareChroot(TestApplicationProto): + prerequisites = {'features': ['chroot']} + + @pytest.fixture(autouse=True) + def setup_method_fixture(self, temp_dir): + os.makedirs(temp_dir + '/assets/dir') + with open(temp_dir + '/assets/index.html', 'w') as index, open( + temp_dir + '/assets/dir/file', 'w' + ) as file: + index.write('0123456789') + file.write('blah') + + test = Path(__file__) + self.test_path = '/' + test.parent.name + '/' + test.name + + self._load_conf( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [{"action": {"share": temp_dir + "/assets"}}], + } + ) + + def test_share_chroot(self, temp_dir): + assert self.get(url='/dir/file')['status'] == 200, 'default chroot' + assert self.get(url='/index.html')['status'] == 200, 'default chroot 2' + + assert 'success' in self.conf( + { + "share": temp_dir + "/assets", + "chroot": temp_dir + "/assets/dir", + }, + 'routes/0/action', + ), 'configure chroot' + + assert self.get(url='/dir/file')['status'] == 200, 'chroot' + assert self.get(url='/index.html')['status'] == 403, 'chroot 403 2' + assert self.get(url='/file')['status'] == 403, 'chroot 403' + + def test_share_chroot_permission(self, temp_dir): + os.chmod(temp_dir + '/assets/dir', 0o100) + + assert 'success' in self.conf( + { + "share": temp_dir + "/assets", + "chroot": temp_dir + "/assets/dir", + }, + 'routes/0/action', + ), 'configure chroot' + + assert self.get(url='/dir/file')['status'] == 200, 'chroot' + + def test_share_chroot_empty(self, temp_dir): + assert 'success' in self.conf( + {"share": temp_dir + "/assets", "chroot": ""}, 'routes/0/action', + ), 'configure chroot empty absolute' + + assert ( + self.get(url='/dir/file')['status'] == 200 + ), 'chroot empty absolute' + + assert 'success' in self.conf( + {"share": ".", "chroot": ""}, 'routes/0/action', + ), 'configure chroot empty relative' + + assert ( + self.get(url=self.test_path)['status'] == 200 + ), 'chroot empty relative' + + def test_share_chroot_relative(self, is_su, temp_dir): + if is_su: + pytest.skip('does\'t work under root') + + assert 'success' in self.conf( + {"share": temp_dir + "/assets", "chroot": "."}, 'routes/0/action', + ), 'configure relative chroot' + + assert self.get(url='/dir/file')['status'] == 403, 'relative chroot' + + assert 'success' in self.conf( + {"share": "."}, 'routes/0/action', + ), 'configure relative share' + + assert self.get(url=self.test_path)['status'] == 200, 'relative share' + + assert 'success' in self.conf( + {"share": ".", "chroot": "."}, 'routes/0/action', + ), 'configure relative' + + assert self.get(url=self.test_path)['status'] == 200, 'relative' + + def test_share_chroot_invalid(self, temp_dir): + assert 'error' in self.conf( + {"share": temp_dir, "chroot": True}, 'routes/0/action', + ), 'configure chroot error' + assert 'error' in self.conf( + {"share": temp_dir, "symlinks": "True"}, 'routes/0/action', + ), 'configure symlink error' + assert 'error' in self.conf( + {"share": temp_dir, "mount": "True"}, 'routes/0/action', + ), 'configure mount error' diff --git a/test/test_share_mount.py b/test/test_share_mount.py new file mode 100644 index 00000000..f46e1279 --- /dev/null +++ b/test/test_share_mount.py @@ -0,0 +1,142 @@ +import os +import subprocess + +import pytest + +from unit.applications.proto import TestApplicationProto + + +class TestShareMount(TestApplicationProto): + prerequisites = {'features': ['chroot']} + + @pytest.fixture(autouse=True) + def setup_method_fixture(self, is_su, temp_dir): + if not is_su: + pytest.skip('requires root') + + os.makedirs(temp_dir + '/assets/dir/mount') + os.makedirs(temp_dir + '/assets/dir/dir') + os.makedirs(temp_dir + '/assets/mount') + with open(temp_dir + '/assets/index.html', 'w') as index, open( + temp_dir + '/assets/dir/dir/file', 'w' + ) as file, open(temp_dir + '/assets/mount/index.html', 'w') as mount: + index.write('index') + file.write('file') + mount.write('mount') + + try: + process = subprocess.Popen( + [ + "mount", + "--bind", + temp_dir + "/assets/mount", + temp_dir + "/assets/dir/mount", + ], + stderr=subprocess.STDOUT, + ) + + process.communicate() + + except KeyboardInterrupt: + raise + + except: + pytest.fail('Can\'t run mount process.') + + self._load_conf( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [{"action": {"share": temp_dir + "/assets/dir"}}], + } + ) + + yield + + try: + process = subprocess.Popen( + ["umount", "--lazy", temp_dir + "/assets/dir/mount"], + stderr=subprocess.STDOUT, + ) + + process.communicate() + + except KeyboardInterrupt: + raise + + except: + pytest.fail('Can\'t run umount process.') + + def test_share_mount(self, temp_dir, skip_alert): + skip_alert(r'opening.*failed') + + resp = self.get(url='/mount/') + resp['status'] == 200 + resp['body'] == 'mount' + + assert 'success' in self.conf( + {"share": temp_dir + "/assets/dir", "traverse_mounts": False}, + 'routes/0/action', + ), 'configure mount disable' + + assert self.get(url='/mount/')['status'] == 403 + + assert 'success' in self.conf( + {"share": temp_dir + "/assets/dir", "traverse_mounts": True}, + 'routes/0/action', + ), 'configure mount enable' + + resp = self.get(url='/mount/') + resp['status'] == 200 + resp['body'] == 'mount' + + def test_share_mount_two_blocks(self, temp_dir, skip_alert): + skip_alert(r'opening.*failed') + + os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link') + + assert 'success' in self.conf( + [ + { + "match": {"method": "HEAD"}, + "action": { + "share": temp_dir + "/assets/dir", + "traverse_mounts": False, + }, + }, + { + "match": {"method": "GET"}, + "action": { + "share": temp_dir + "/assets/dir", + "traverse_mounts": True, + }, + }, + ], + 'routes', + ), 'configure two options' + + assert self.get(url='/mount/')['status'] == 200, 'block enabled' + assert self.head(url='/mount/')['status'] == 403, 'block disabled' + + def test_share_mount_chroot(self, temp_dir, skip_alert): + skip_alert(r'opening.*failed') + + assert 'success' in self.conf( + { + "share": temp_dir + "/assets/dir", + "chroot": temp_dir + "/assets", + }, + 'routes/0/action', + ), 'configure chroot mount default' + + self.get(url='/mount/')['status'] == 200, 'chroot' + + assert 'success' in self.conf( + { + "share": temp_dir + "/assets/dir", + "chroot": temp_dir + "/assets", + "traverse_mounts": False, + }, + 'routes/0/action', + ), 'configure chroot mount disable' + + self.get(url='/mount/')['status'] == 403, 'chroot mount' diff --git a/test/test_share_symlink.py b/test/test_share_symlink.py new file mode 100644 index 00000000..3970b605 --- /dev/null +++ b/test/test_share_symlink.py @@ -0,0 +1,96 @@ +import os + +import pytest + +from unit.applications.proto import TestApplicationProto + + +class TestShareSymlink(TestApplicationProto): + prerequisites = {'features': ['chroot']} + + @pytest.fixture(autouse=True) + def setup_method_fixture(self, temp_dir): + os.makedirs(temp_dir + '/assets/dir/dir') + with open(temp_dir + '/assets/index.html', 'w') as index, open( + temp_dir + '/assets/dir/file', 'w' + ) as file: + index.write('0123456789') + file.write('blah') + + self._load_conf( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [{"action": {"share": temp_dir + "/assets"}}], + } + ) + + def test_share_symlink(self, temp_dir, skip_alert): + skip_alert(r'opening.*failed') + + os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link') + + assert self.get(url='/dir')['status'] == 301, 'dir' + assert self.get(url='/dir/file')['status'] == 200, 'file' + assert self.get(url='/link')['status'] == 301, 'symlink dir' + assert self.get(url='/link/file')['status'] == 200, 'symlink file' + + assert 'success' in self.conf( + {"share": temp_dir + "/assets", "follow_symlinks": False}, + 'routes/0/action', + ), 'configure symlink disable' + + assert self.get(url='/link/file')['status'] == 403, 'symlink disabled' + + assert 'success' in self.conf( + {"share": temp_dir + "/assets", "follow_symlinks": True}, + 'routes/0/action', + ), 'configure symlink enable' + + assert self.get(url='/link/file')['status'] == 200, 'symlink enabled' + + def test_share_symlink_two_blocks(self, temp_dir, skip_alert): + skip_alert(r'opening.*failed') + + os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link') + + assert 'success' in self.conf( + [ + { + "match": {"method": "HEAD"}, + "action": { + "share": temp_dir + "/assets", + "follow_symlinks": False, + }, + }, + { + "match": {"method": "GET"}, + "action": { + "share": temp_dir + "/assets", + "follow_symlinks": True, + }, + }, + ], + 'routes', + ), 'configure two options' + + assert self.get(url='/link/file')['status'] == 200, 'block enabled' + assert self.head(url='/link/file')['status'] == 403, 'block disabled' + + def test_share_symlink_chroot(self, temp_dir, skip_alert): + skip_alert(r'opening.*failed') + + os.symlink( + temp_dir + '/assets/dir/file', temp_dir + '/assets/dir/dir/link' + ) + + assert self.get(url='/dir/dir/link')['status'] == 200, 'default chroot' + + assert 'success' in self.conf( + { + "share": temp_dir + "/assets", + "chroot": temp_dir + "/assets/dir/dir", + }, + 'routes/0/action', + ), 'configure chroot' + + assert self.get(url='/dir/dir/link')['status'] == 404, 'chroot' diff --git a/test/test_static.py b/test/test_static.py index d8319292..669e265d 100644 --- a/test/test_static.py +++ b/test/test_static.py @@ -168,14 +168,6 @@ class TestStatic(TestApplicationProto): assert self.get(url='/fifo')['status'] == 404, 'fifo' - def test_static_symlink(self, temp_dir): - os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link') - - assert self.get(url='/dir')['status'] == 301, 'dir' - assert self.get(url='/dir/file')['status'] == 200, 'file' - assert self.get(url='/link')['status'] == 301, 'symlink dir' - assert self.get(url='/link/file')['status'] == 200, 'symlink file' - def test_static_method(self): resp = self.head() assert resp['status'] == 200, 'HEAD status' diff --git a/test/unit/check/chroot.py b/test/unit/check/chroot.py new file mode 100644 index 00000000..40b75058 --- /dev/null +++ b/test/unit/check/chroot.py @@ -0,0 +1,32 @@ +import json + +from unit.http import TestHTTP +from unit.option import option + +http = TestHTTP() + + +def check_chroot(): + available = option.available + + resp = http.put( + url='/config', + sock_type='unix', + addr=option.temp_dir + '/control.unit.sock', + body=json.dumps( + { + "listeners": {"*:7080": {"pass": "routes"}}, + "routes": [ + { + "action": { + "share": option.temp_dir, + "chroot": option.temp_dir, + } + } + ], + } + ), + ) + + if 'success' in resp['body']: + available['features']['chroot'] = True