Tests: added tests for openat2() features.

This commit is contained in:
Andrei Zeliankou 2021-05-05 12:36:57 +01:00
parent de631d8c36
commit e0a061955b
6 changed files with 380 additions and 8 deletions

View file

@ -15,6 +15,7 @@ from multiprocessing import Process
import pytest
from unit.check.chroot import check_chroot
from unit.check.go import check_go
from unit.check.isolation import check_isolation
from unit.check.node import check_node
@ -204,6 +205,7 @@ def pytest_sessionstart(session):
k: v for k, v in option.available['modules'].items() if v is not None
}
check_chroot()
check_isolation()
_clear_conf(unit['temp_dir'] + '/control.unit.sock')

108
test/test_share_chroot.py Normal file
View file

@ -0,0 +1,108 @@
import os
from pathlib import Path
import pytest
from unit.applications.proto import TestApplicationProto
class TestShareChroot(TestApplicationProto):
prerequisites = {'features': ['chroot']}
@pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir):
os.makedirs(temp_dir + '/assets/dir')
with open(temp_dir + '/assets/index.html', 'w') as index, open(
temp_dir + '/assets/dir/file', 'w'
) as file:
index.write('0123456789')
file.write('blah')
test = Path(__file__)
self.test_path = '/' + test.parent.name + '/' + test.name
self._load_conf(
{
"listeners": {"*:7080": {"pass": "routes"}},
"routes": [{"action": {"share": temp_dir + "/assets"}}],
}
)
def test_share_chroot(self, temp_dir):
assert self.get(url='/dir/file')['status'] == 200, 'default chroot'
assert self.get(url='/index.html')['status'] == 200, 'default chroot 2'
assert 'success' in self.conf(
{
"share": temp_dir + "/assets",
"chroot": temp_dir + "/assets/dir",
},
'routes/0/action',
), 'configure chroot'
assert self.get(url='/dir/file')['status'] == 200, 'chroot'
assert self.get(url='/index.html')['status'] == 403, 'chroot 403 2'
assert self.get(url='/file')['status'] == 403, 'chroot 403'
def test_share_chroot_permission(self, temp_dir):
os.chmod(temp_dir + '/assets/dir', 0o100)
assert 'success' in self.conf(
{
"share": temp_dir + "/assets",
"chroot": temp_dir + "/assets/dir",
},
'routes/0/action',
), 'configure chroot'
assert self.get(url='/dir/file')['status'] == 200, 'chroot'
def test_share_chroot_empty(self, temp_dir):
assert 'success' in self.conf(
{"share": temp_dir + "/assets", "chroot": ""}, 'routes/0/action',
), 'configure chroot empty absolute'
assert (
self.get(url='/dir/file')['status'] == 200
), 'chroot empty absolute'
assert 'success' in self.conf(
{"share": ".", "chroot": ""}, 'routes/0/action',
), 'configure chroot empty relative'
assert (
self.get(url=self.test_path)['status'] == 200
), 'chroot empty relative'
def test_share_chroot_relative(self, is_su, temp_dir):
if is_su:
pytest.skip('does\'t work under root')
assert 'success' in self.conf(
{"share": temp_dir + "/assets", "chroot": "."}, 'routes/0/action',
), 'configure relative chroot'
assert self.get(url='/dir/file')['status'] == 403, 'relative chroot'
assert 'success' in self.conf(
{"share": "."}, 'routes/0/action',
), 'configure relative share'
assert self.get(url=self.test_path)['status'] == 200, 'relative share'
assert 'success' in self.conf(
{"share": ".", "chroot": "."}, 'routes/0/action',
), 'configure relative'
assert self.get(url=self.test_path)['status'] == 200, 'relative'
def test_share_chroot_invalid(self, temp_dir):
assert 'error' in self.conf(
{"share": temp_dir, "chroot": True}, 'routes/0/action',
), 'configure chroot error'
assert 'error' in self.conf(
{"share": temp_dir, "symlinks": "True"}, 'routes/0/action',
), 'configure symlink error'
assert 'error' in self.conf(
{"share": temp_dir, "mount": "True"}, 'routes/0/action',
), 'configure mount error'

142
test/test_share_mount.py Normal file
View file

@ -0,0 +1,142 @@
import os
import subprocess
import pytest
from unit.applications.proto import TestApplicationProto
class TestShareMount(TestApplicationProto):
prerequisites = {'features': ['chroot']}
@pytest.fixture(autouse=True)
def setup_method_fixture(self, is_su, temp_dir):
if not is_su:
pytest.skip('requires root')
os.makedirs(temp_dir + '/assets/dir/mount')
os.makedirs(temp_dir + '/assets/dir/dir')
os.makedirs(temp_dir + '/assets/mount')
with open(temp_dir + '/assets/index.html', 'w') as index, open(
temp_dir + '/assets/dir/dir/file', 'w'
) as file, open(temp_dir + '/assets/mount/index.html', 'w') as mount:
index.write('index')
file.write('file')
mount.write('mount')
try:
process = subprocess.Popen(
[
"mount",
"--bind",
temp_dir + "/assets/mount",
temp_dir + "/assets/dir/mount",
],
stderr=subprocess.STDOUT,
)
process.communicate()
except KeyboardInterrupt:
raise
except:
pytest.fail('Can\'t run mount process.')
self._load_conf(
{
"listeners": {"*:7080": {"pass": "routes"}},
"routes": [{"action": {"share": temp_dir + "/assets/dir"}}],
}
)
yield
try:
process = subprocess.Popen(
["umount", "--lazy", temp_dir + "/assets/dir/mount"],
stderr=subprocess.STDOUT,
)
process.communicate()
except KeyboardInterrupt:
raise
except:
pytest.fail('Can\'t run umount process.')
def test_share_mount(self, temp_dir, skip_alert):
skip_alert(r'opening.*failed')
resp = self.get(url='/mount/')
resp['status'] == 200
resp['body'] == 'mount'
assert 'success' in self.conf(
{"share": temp_dir + "/assets/dir", "traverse_mounts": False},
'routes/0/action',
), 'configure mount disable'
assert self.get(url='/mount/')['status'] == 403
assert 'success' in self.conf(
{"share": temp_dir + "/assets/dir", "traverse_mounts": True},
'routes/0/action',
), 'configure mount enable'
resp = self.get(url='/mount/')
resp['status'] == 200
resp['body'] == 'mount'
def test_share_mount_two_blocks(self, temp_dir, skip_alert):
skip_alert(r'opening.*failed')
os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link')
assert 'success' in self.conf(
[
{
"match": {"method": "HEAD"},
"action": {
"share": temp_dir + "/assets/dir",
"traverse_mounts": False,
},
},
{
"match": {"method": "GET"},
"action": {
"share": temp_dir + "/assets/dir",
"traverse_mounts": True,
},
},
],
'routes',
), 'configure two options'
assert self.get(url='/mount/')['status'] == 200, 'block enabled'
assert self.head(url='/mount/')['status'] == 403, 'block disabled'
def test_share_mount_chroot(self, temp_dir, skip_alert):
skip_alert(r'opening.*failed')
assert 'success' in self.conf(
{
"share": temp_dir + "/assets/dir",
"chroot": temp_dir + "/assets",
},
'routes/0/action',
), 'configure chroot mount default'
self.get(url='/mount/')['status'] == 200, 'chroot'
assert 'success' in self.conf(
{
"share": temp_dir + "/assets/dir",
"chroot": temp_dir + "/assets",
"traverse_mounts": False,
},
'routes/0/action',
), 'configure chroot mount disable'
self.get(url='/mount/')['status'] == 403, 'chroot mount'

View file

@ -0,0 +1,96 @@
import os
import pytest
from unit.applications.proto import TestApplicationProto
class TestShareSymlink(TestApplicationProto):
prerequisites = {'features': ['chroot']}
@pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir):
os.makedirs(temp_dir + '/assets/dir/dir')
with open(temp_dir + '/assets/index.html', 'w') as index, open(
temp_dir + '/assets/dir/file', 'w'
) as file:
index.write('0123456789')
file.write('blah')
self._load_conf(
{
"listeners": {"*:7080": {"pass": "routes"}},
"routes": [{"action": {"share": temp_dir + "/assets"}}],
}
)
def test_share_symlink(self, temp_dir, skip_alert):
skip_alert(r'opening.*failed')
os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link')
assert self.get(url='/dir')['status'] == 301, 'dir'
assert self.get(url='/dir/file')['status'] == 200, 'file'
assert self.get(url='/link')['status'] == 301, 'symlink dir'
assert self.get(url='/link/file')['status'] == 200, 'symlink file'
assert 'success' in self.conf(
{"share": temp_dir + "/assets", "follow_symlinks": False},
'routes/0/action',
), 'configure symlink disable'
assert self.get(url='/link/file')['status'] == 403, 'symlink disabled'
assert 'success' in self.conf(
{"share": temp_dir + "/assets", "follow_symlinks": True},
'routes/0/action',
), 'configure symlink enable'
assert self.get(url='/link/file')['status'] == 200, 'symlink enabled'
def test_share_symlink_two_blocks(self, temp_dir, skip_alert):
skip_alert(r'opening.*failed')
os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link')
assert 'success' in self.conf(
[
{
"match": {"method": "HEAD"},
"action": {
"share": temp_dir + "/assets",
"follow_symlinks": False,
},
},
{
"match": {"method": "GET"},
"action": {
"share": temp_dir + "/assets",
"follow_symlinks": True,
},
},
],
'routes',
), 'configure two options'
assert self.get(url='/link/file')['status'] == 200, 'block enabled'
assert self.head(url='/link/file')['status'] == 403, 'block disabled'
def test_share_symlink_chroot(self, temp_dir, skip_alert):
skip_alert(r'opening.*failed')
os.symlink(
temp_dir + '/assets/dir/file', temp_dir + '/assets/dir/dir/link'
)
assert self.get(url='/dir/dir/link')['status'] == 200, 'default chroot'
assert 'success' in self.conf(
{
"share": temp_dir + "/assets",
"chroot": temp_dir + "/assets/dir/dir",
},
'routes/0/action',
), 'configure chroot'
assert self.get(url='/dir/dir/link')['status'] == 404, 'chroot'

View file

@ -168,14 +168,6 @@ class TestStatic(TestApplicationProto):
assert self.get(url='/fifo')['status'] == 404, 'fifo'
def test_static_symlink(self, temp_dir):
os.symlink(temp_dir + '/assets/dir', temp_dir + '/assets/link')
assert self.get(url='/dir')['status'] == 301, 'dir'
assert self.get(url='/dir/file')['status'] == 200, 'file'
assert self.get(url='/link')['status'] == 301, 'symlink dir'
assert self.get(url='/link/file')['status'] == 200, 'symlink file'
def test_static_method(self):
resp = self.head()
assert resp['status'] == 200, 'HEAD status'

32
test/unit/check/chroot.py Normal file
View file

@ -0,0 +1,32 @@
import json
from unit.http import TestHTTP
from unit.option import option
http = TestHTTP()
def check_chroot():
available = option.available
resp = http.put(
url='/config',
sock_type='unix',
addr=option.temp_dir + '/control.unit.sock',
body=json.dumps(
{
"listeners": {"*:7080": {"pass": "routes"}},
"routes": [
{
"action": {
"share": option.temp_dir,
"chroot": option.temp_dir,
}
}
],
}
),
)
if 'success' in resp['body']:
available['features']['chroot'] = True