unit/test/test_tls.py

599 lines
16 KiB
Python
Raw Normal View History

import io
2020-09-16 20:31:15 +00:00
import pytest
2018-09-20 13:34:34 +00:00
import re
import ssl
import subprocess
2020-05-15 03:20:56 +00:00
2019-03-28 15:43:13 +00:00
from unit.applications.tls import TestApplicationTLS
2020-09-16 20:31:15 +00:00
from conftest import skip_alert
2018-09-20 13:34:34 +00:00
2019-03-28 15:43:13 +00:00
class TestTLS(TestApplicationTLS):
prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
2018-09-20 13:34:34 +00:00
def findall(self, pattern):
2020-09-16 20:31:15 +00:00
with open(self.temp_dir + '/unit.log', 'r', errors='ignore') as f:
2018-09-20 13:34:34 +00:00
return re.findall(pattern, f.read())
def openssl_date_to_sec_epoch(self, date):
return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
def add_tls(self, application='empty', cert='default', port=7080):
2019-03-26 20:38:30 +00:00
self.conf(
{
"pass": "applications/" + application,
"tls": {"certificate": cert}
},
2019-03-26 20:38:30 +00:00
'listeners/*:' + str(port),
)
2018-09-20 13:34:34 +00:00
def remove_tls(self, application='empty', port=7080):
self.conf(
{"pass": "applications/" + application}, 'listeners/*:' + str(port)
)
2018-09-20 13:34:34 +00:00
def test_tls_listener_option_add(self):
self.load('empty')
self.certificate()
self.add_tls()
2020-09-16 20:31:15 +00:00
assert self.get_ssl()['status'] == 200, 'add listener option'
2018-09-20 13:34:34 +00:00
def test_tls_listener_option_remove(self):
self.load('empty')
self.certificate()
self.add_tls()
self.get_ssl()
self.remove_tls()
2020-09-16 20:31:15 +00:00
assert self.get()['status'] == 200, 'remove listener option'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_remove(self):
self.load('empty')
self.certificate()
2020-09-16 20:31:15 +00:00
assert 'success' in self.conf_delete(
'/certificates/default'
), 'remove certificate'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_remove_used(self):
self.load('empty')
self.certificate()
self.add_tls()
2020-09-16 20:31:15 +00:00
assert 'error' in self.conf_delete(
'/certificates/default'
), 'remove certificate'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_remove_nonexisting(self):
self.load('empty')
self.certificate()
self.add_tls()
2020-09-16 20:31:15 +00:00
assert 'error' in self.conf_delete(
'/certificates/blah'
), 'remove nonexistings certificate'
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
@pytest.mark.skip('not yet')
2018-09-20 13:34:34 +00:00
def test_tls_certificate_update(self):
self.load('empty')
self.certificate()
self.add_tls()
cert_old = self.get_server_certificate()
self.certificate()
2020-09-16 20:31:15 +00:00
assert cert_old != self.get_server_certificate(), 'update certificate'
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
@pytest.mark.skip('not yet')
2018-09-20 13:34:34 +00:00
def test_tls_certificate_key_incorrect(self):
self.load('empty')
self.certificate('first', False)
self.certificate('second', False)
2020-09-16 20:31:15 +00:00
assert 'error' in self.certificate_load(
'first', 'second'
), 'key incorrect'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_change(self):
self.load('empty')
self.certificate()
self.certificate('new')
self.add_tls()
cert_old = self.get_server_certificate()
self.add_tls(cert='new')
2020-09-16 20:31:15 +00:00
assert cert_old != self.get_server_certificate(), 'change certificate'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_key_rsa(self):
self.load('empty')
self.certificate()
2020-09-16 20:31:15 +00:00
assert (
self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
), 'certificate key rsa'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_key_ec(self):
self.load('empty')
self.openssl_conf()
2019-03-26 20:38:30 +00:00
subprocess.call(
[
'openssl',
'ecparam',
'-noout',
'-genkey',
2020-09-16 20:31:15 +00:00
'-out',
self.temp_dir + '/ec.key',
'-name',
'prime256v1',
],
stderr=subprocess.STDOUT,
2019-03-26 20:38:30 +00:00
)
subprocess.call(
[
'openssl',
'req',
'-x509',
'-new',
2020-09-16 20:31:15 +00:00
'-subj',
'/CN=ec/',
'-config',
self.temp_dir + '/openssl.conf',
'-key',
self.temp_dir + '/ec.key',
'-out',
self.temp_dir + '/ec.crt',
],
stderr=subprocess.STDOUT,
2019-03-26 20:38:30 +00:00
)
2018-09-20 13:34:34 +00:00
self.certificate_load('ec')
2020-09-16 20:31:15 +00:00
assert (
self.conf_get('/certificates/ec/key') == 'ECDH'
), 'certificate key ec'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_chain_options(self):
self.load('empty')
self.certificate()
chain = self.conf_get('/certificates/default/chain')
2020-09-16 20:31:15 +00:00
assert len(chain) == 1, 'certificate chain length'
2018-09-20 13:34:34 +00:00
cert = chain[0]
2020-09-16 20:31:15 +00:00
assert (
cert['subject']['common_name'] == 'default'
), 'certificate subject common name'
assert (
cert['issuer']['common_name'] == 'default'
), 'certificate issuer common name'
2019-03-26 20:38:30 +00:00
2020-09-16 20:31:15 +00:00
assert (
2019-03-26 20:38:30 +00:00
abs(
self.sec_epoch()
- self.openssl_date_to_sec_epoch(cert['validity']['since'])
2020-09-16 20:31:15 +00:00
)
< 5
), 'certificate validity since'
assert (
2019-03-26 20:38:30 +00:00
self.openssl_date_to_sec_epoch(cert['validity']['until'])
2020-09-16 20:31:15 +00:00
- self.openssl_date_to_sec_epoch(cert['validity']['since'])
== 2592000
), 'certificate validity until'
2018-09-20 13:34:34 +00:00
def test_tls_certificate_chain(self):
self.load('empty')
self.certificate('root', False)
2019-03-26 20:38:30 +00:00
subprocess.call(
[
'openssl',
'req',
'-new',
2020-09-16 20:31:15 +00:00
'-subj',
'/CN=int/',
'-config',
self.temp_dir + '/openssl.conf',
'-out',
self.temp_dir + '/int.csr',
'-keyout',
self.temp_dir + '/int.key',
],
stderr=subprocess.STDOUT,
2019-03-26 20:38:30 +00:00
)
subprocess.call(
[
'openssl',
'req',
'-new',
2020-09-16 20:31:15 +00:00
'-subj',
'/CN=end/',
'-config',
self.temp_dir + '/openssl.conf',
'-out',
self.temp_dir + '/end.csr',
'-keyout',
self.temp_dir + '/end.key',
],
stderr=subprocess.STDOUT,
2019-03-26 20:38:30 +00:00
)
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
with open(self.temp_dir + '/ca.conf', 'w') as f:
2019-03-26 20:38:30 +00:00
f.write(
"""[ ca ]
2018-09-20 13:34:34 +00:00
default_ca = myca
[ myca ]
new_certs_dir = %(dir)s
database = %(database)s
default_md = sha256
2018-09-20 13:34:34 +00:00
policy = myca_policy
serial = %(certserial)s
default_days = 1
x509_extensions = myca_extensions
[ myca_policy ]
commonName = supplied
[ myca_extensions ]
2019-03-26 20:38:30 +00:00
basicConstraints = critical,CA:TRUE"""
% {
2020-09-16 20:31:15 +00:00
'dir': self.temp_dir,
'database': self.temp_dir + '/certindex',
'certserial': self.temp_dir + '/certserial',
2019-03-26 20:38:30 +00:00
}
)
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
with open(self.temp_dir + '/certserial', 'w') as f:
2018-09-20 13:34:34 +00:00
f.write('1000')
2020-09-16 20:31:15 +00:00
with open(self.temp_dir + '/certindex', 'w') as f:
2018-09-20 13:34:34 +00:00
f.write('')
2019-03-26 20:38:30 +00:00
subprocess.call(
[
'openssl',
'ca',
'-batch',
2020-09-16 20:31:15 +00:00
'-subj',
'/CN=int/',
'-config',
self.temp_dir + '/ca.conf',
'-keyfile',
self.temp_dir + '/root.key',
'-cert',
self.temp_dir + '/root.crt',
'-in',
self.temp_dir + '/int.csr',
'-out',
self.temp_dir + '/int.crt',
],
stderr=subprocess.STDOUT,
2019-03-26 20:38:30 +00:00
)
subprocess.call(
[
'openssl',
'ca',
'-batch',
2020-09-16 20:31:15 +00:00
'-subj',
'/CN=end/',
'-config',
self.temp_dir + '/ca.conf',
'-keyfile',
self.temp_dir + '/int.key',
'-cert',
self.temp_dir + '/int.crt',
'-in',
self.temp_dir + '/end.csr',
'-out',
self.temp_dir + '/end.crt',
],
stderr=subprocess.STDOUT,
2019-03-26 20:38:30 +00:00
)
2020-09-16 20:31:15 +00:00
crt_path = self.temp_dir + '/end-int.crt'
end_path = self.temp_dir + '/end.crt'
int_path = self.temp_dir + '/int.crt'
2019-03-26 20:38:30 +00:00
2020-09-16 20:31:15 +00:00
with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
int_path, 'rb'
) as int:
2019-03-26 20:38:30 +00:00
crt.write(end.read() + int.read())
2018-09-20 13:34:34 +00:00
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_REQUIRED
2020-09-16 20:31:15 +00:00
self.context.load_verify_locations(self.temp_dir + '/root.crt')
2018-09-20 13:34:34 +00:00
# incomplete chain
2020-09-16 20:31:15 +00:00
assert 'success' in self.certificate_load(
'end', 'end'
), 'certificate chain end upload'
2018-09-20 13:34:34 +00:00
chain = self.conf_get('/certificates/end/chain')
2020-09-16 20:31:15 +00:00
assert len(chain) == 1, 'certificate chain end length'
assert (
chain[0]['subject']['common_name'] == 'end'
), 'certificate chain end subject common name'
assert (
chain[0]['issuer']['common_name'] == 'int'
), 'certificate chain end issuer common name'
2018-09-20 13:34:34 +00:00
self.add_tls(cert='end')
try:
resp = self.get_ssl()
except ssl.SSLError:
resp = None
2020-09-16 20:31:15 +00:00
assert resp == None, 'certificate chain incomplete chain'
2018-09-20 13:34:34 +00:00
# intermediate
2020-09-16 20:31:15 +00:00
assert 'success' in self.certificate_load(
'int', 'int'
), 'certificate chain int upload'
2018-09-20 13:34:34 +00:00
chain = self.conf_get('/certificates/int/chain')
2020-09-16 20:31:15 +00:00
assert len(chain) == 1, 'certificate chain int length'
assert (
chain[0]['subject']['common_name'] == 'int'
), 'certificate chain int subject common name'
assert (
chain[0]['issuer']['common_name'] == 'root'
), 'certificate chain int issuer common name'
2018-09-20 13:34:34 +00:00
self.add_tls(cert='int')
2020-09-16 20:31:15 +00:00
assert (
self.get_ssl()['status'] == 200
), 'certificate chain intermediate'
2018-09-20 13:34:34 +00:00
# intermediate server
2020-09-16 20:31:15 +00:00
assert 'success' in self.certificate_load(
'end-int', 'end'
), 'certificate chain end-int upload'
2018-09-20 13:34:34 +00:00
chain = self.conf_get('/certificates/end-int/chain')
2020-09-16 20:31:15 +00:00
assert len(chain) == 2, 'certificate chain end-int length'
assert (
chain[0]['subject']['common_name'] == 'end'
), 'certificate chain end-int int subject common name'
assert (
chain[0]['issuer']['common_name'] == 'int'
), 'certificate chain end-int int issuer common name'
assert (
chain[1]['subject']['common_name'] == 'int'
), 'certificate chain end-int end subject common name'
assert (
chain[1]['issuer']['common_name'] == 'root'
), 'certificate chain end-int end issuer common name'
2018-09-20 13:34:34 +00:00
self.add_tls(cert='end-int')
2020-09-16 20:31:15 +00:00
assert (
self.get_ssl()['status'] == 200
), 'certificate chain intermediate server'
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
@pytest.mark.skip('not yet')
2018-09-20 13:34:34 +00:00
def test_tls_reconfigure(self):
self.load('empty')
2020-09-16 20:31:15 +00:00
assert self.get()['status'] == 200, 'init'
2019-04-09 17:59:35 +00:00
2018-09-20 13:34:34 +00:00
self.certificate()
2019-03-26 20:38:30 +00:00
(resp, sock) = self.get(
headers={'Host': 'localhost', 'Connection': 'keep-alive'},
start=True,
2019-04-09 17:59:35 +00:00
read_timeout=1,
2019-03-26 20:38:30 +00:00
)
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
assert resp['status'] == 200, 'initial status'
2018-09-20 13:34:34 +00:00
self.add_tls()
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
2018-09-20 13:34:34 +00:00
def test_tls_keepalive(self):
self.load('mirror')
2020-09-16 20:31:15 +00:00
assert self.get()['status'] == 200, 'init'
2019-04-09 17:59:35 +00:00
2018-09-20 13:34:34 +00:00
self.certificate()
self.add_tls(application='mirror')
2019-03-26 20:38:30 +00:00
(resp, sock) = self.post_ssl(
headers={
'Host': 'localhost',
'Connection': 'keep-alive',
'Content-Type': 'text/html',
},
start=True,
body='0123456789',
2019-04-09 17:59:35 +00:00
read_timeout=1,
2019-03-26 20:38:30 +00:00
)
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
assert resp['body'] == '0123456789', 'keepalive 1'
2018-09-20 13:34:34 +00:00
2019-03-26 20:38:30 +00:00
resp = self.post_ssl(
headers={
'Host': 'localhost',
'Connection': 'close',
'Content-Type': 'text/html',
},
sock=sock,
body='0123456789',
)
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
assert resp['body'] == '0123456789', 'keepalive 2'
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
@pytest.mark.skip('not yet')
2018-09-20 13:34:34 +00:00
def test_tls_keepalive_certificate_remove(self):
self.load('empty')
2020-09-16 20:31:15 +00:00
assert self.get()['status'] == 200, 'init'
2019-04-09 17:59:35 +00:00
2018-09-20 13:34:34 +00:00
self.certificate()
self.add_tls()
2019-03-26 20:38:30 +00:00
(resp, sock) = self.get_ssl(
headers={'Host': 'localhost', 'Connection': 'keep-alive'},
start=True,
2019-04-09 17:59:35 +00:00
read_timeout=1,
2019-03-26 20:38:30 +00:00
)
2018-09-20 13:34:34 +00:00
self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
2018-09-20 13:34:34 +00:00
self.conf_delete('/certificates/default')
try:
2019-03-26 20:38:30 +00:00
resp = self.get_ssl(
headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
)
2018-09-20 13:34:34 +00:00
except:
resp = None
2020-09-16 20:31:15 +00:00
assert resp == None, 'keepalive remove certificate'
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
@pytest.mark.skip('not yet')
2018-09-20 13:34:34 +00:00
def test_tls_certificates_remove_all(self):
self.load('empty')
self.certificate()
2020-09-16 20:31:15 +00:00
assert 'success' in self.conf_delete(
'/certificates'
), 'remove all certificates'
2018-09-20 13:34:34 +00:00
def test_tls_application_respawn(self):
self.load('mirror')
self.certificate()
self.conf('1', 'applications/mirror/processes')
self.add_tls(application='mirror')
2020-04-14 01:35:04 +00:00
(_, sock) = self.post_ssl(
2019-03-26 20:38:30 +00:00
headers={
'Host': 'localhost',
'Connection': 'keep-alive',
'Content-Type': 'text/html',
},
start=True,
body='0123456789',
2019-04-09 17:59:35 +00:00
read_timeout=1,
2019-03-26 20:38:30 +00:00
)
2018-09-20 13:34:34 +00:00
app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
subprocess.call(['kill', '-9', app_id])
2020-09-16 20:31:15 +00:00
skip_alert(r'process %s exited on signal 9' % app_id)
2020-04-14 01:35:04 +00:00
2019-03-26 20:38:30 +00:00
self.wait_for_record(
re.compile(
' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
)
)
2018-09-20 13:34:34 +00:00
2019-03-26 20:38:30 +00:00
resp = self.post_ssl(
headers={
'Host': 'localhost',
'Connection': 'close',
'Content-Type': 'text/html',
},
sock=sock,
body='0123456789',
)
2018-09-20 13:34:34 +00:00
2020-09-16 20:31:15 +00:00
assert resp['status'] == 200, 'application respawn status'
assert resp['body'] == '0123456789', 'application respawn body'
2018-09-20 13:34:34 +00:00
def test_tls_url_scheme(self):
self.load('variables')
2020-09-16 20:31:15 +00:00
assert (
2019-03-26 20:38:30 +00:00
self.post(
headers={
'Host': 'localhost',
'Content-Type': 'text/html',
'Custom-Header': '',
'Connection': 'close',
}
2020-09-16 20:31:15 +00:00
)['headers']['Wsgi-Url-Scheme']
== 'http'
), 'url scheme http'
self.certificate()
self.add_tls(application='variables')
2020-09-16 20:31:15 +00:00
assert (
2019-03-26 20:38:30 +00:00
self.post_ssl(
headers={
'Host': 'localhost',
'Content-Type': 'text/html',
'Custom-Header': '',
'Connection': 'close',
}
2020-09-16 20:31:15 +00:00
)['headers']['Wsgi-Url-Scheme']
== 'https'
), 'url scheme https'
def test_tls_big_upload(self):
self.load('upload')
self.certificate()
self.add_tls(application='upload')
filename = 'test.txt'
data = '0123456789' * 9000
2020-09-16 20:31:15 +00:00
res = self.post_ssl(
body={
'file': {
'filename': filename,
'type': 'text/plain',
'data': io.StringIO(data),
}
}
2020-09-16 20:31:15 +00:00
)
assert res['status'] == 200, 'status ok'
assert res['body'] == filename + data