import socket import ssl import time import pytest from unit.applications.tls import TestApplicationTLS class TestReconfigureTLS(TestApplicationTLS): prerequisites = {'modules': {'openssl': 'any'}} @pytest.fixture(autouse=True) def setup_method_fixture(self): self.certificate() assert 'success' in self.conf( { "listeners": { "*:7080": { "pass": "routes", "tls": {"certificate": "default"}, } }, "routes": [{"action": {"return": 200}}], "applications": {}, } ), 'load application configuration' def create_socket(self): ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) ctx.verify_mode = ssl.CERT_NONE ctx.check_hostname = False s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssl_sock = ctx.wrap_socket( s, server_hostname='localhost', do_handshake_on_connect=False ) ssl_sock.connect(('127.0.0.1', 7080)) return ssl_sock def clear_conf(self): assert 'success' in self.conf({"listeners": {}, "applications": {}}) @pytest.mark.skip('not yet') def test_reconfigure_tls_switch(self): assert 'success' in self.conf_delete('listeners/*:7080/tls') (_, sock) = self.get( headers={'Host': 'localhost', 'Connection': 'keep-alive'}, start=True, read_timeout=1, ) assert 'success' in self.conf( {"pass": "routes", "tls": {"certificate": "default"}}, 'listeners/*:7080', ) assert self.get(sock=sock)['status'] == 200, 'reconfigure' assert self.get_ssl()['status'] == 200, 'reconfigure tls' def test_reconfigure_tls(self): ssl_sock = self.create_socket() ssl_sock.sendall("""GET / HTTP/1.1\r\n""".encode()) self.clear_conf() ssl_sock.sendall( """Host: localhost\r\nConnection: close\r\n\r\n""".encode() ) assert ( self.recvall(ssl_sock).decode().startswith('HTTP/1.1 200 OK') ), 'finish request' def test_reconfigure_tls_2(self): ssl_sock = self.create_socket() # Waiting for connection completion. # Delay should be more than TCP_DEFER_ACCEPT. time.sleep(1.5) self.clear_conf() try: ssl_sock.do_handshake() except ssl.SSLError: ssl_sock.close() success = True if not success: pytest.fail('Connection is not closed.') def test_reconfigure_tls_3(self): ssl_sock = self.create_socket() ssl_sock.do_handshake() self.clear_conf() assert self.get(sock=ssl_sock)['status'] == 408, 'request timeout'