Introducing named port message handlers to avoid misprints.

This commit is contained in:
Max Romanov 2017-09-15 20:30:34 +03:00
parent 1449e27cb4
commit 838d9946ac
10 changed files with 134 additions and 146 deletions

View file

@ -99,20 +99,15 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state;
static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_port_handler_t nxt_controller_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_controller_process_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_port_data_handler,
nxt_port_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */
NULL, /* NXT_PORT_MSG_SOCKET */
NULL, /* NXT_PORT_MSG_MODULES */
NULL, /* NXT_PORT_MSG_CONF_STORE */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
nxt_port_handlers_t nxt_controller_process_port_handlers = {
.quit = nxt_worker_process_quit_handler,
.new_port = nxt_controller_process_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_data_handler,
.remove_pid = nxt_port_remove_pid_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};

View file

@ -19,6 +19,7 @@ typedef struct nxt_port_s nxt_port_t;
typedef struct nxt_task_s nxt_task_t;
typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
typedef struct nxt_port_handlers_s nxt_port_handlers_t;
typedef struct nxt_sig_event_s nxt_sig_event_t;
typedef struct nxt_runtime_s nxt_runtime_t;

View file

@ -231,20 +231,15 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
static nxt_port_handler_t nxt_main_process_port_handlers[] = {
NULL, /* NXT_PORT_MSG_QUIT */
NULL, /* NXT_PORT_MSG_NEW_PORT */
NULL, /* NXT_PORT_MSG_CHANGE_FILE */
NULL, /* NXT_PORT_MSG_MMAP */
nxt_port_main_data_handler,
NULL, /* NXT_PORT_MSG_REMOVE_PID */
nxt_port_ready_handler,
nxt_port_main_start_worker_handler,
nxt_main_port_socket_handler,
nxt_main_port_modules_handler,
nxt_main_port_conf_store_handler,
nxt_port_rpc_handler,
nxt_port_rpc_handler,
static nxt_port_handlers_t nxt_main_process_port_handlers = {
.data = nxt_port_main_data_handler,
.process_ready = nxt_port_process_ready_handler,
.start_worker = nxt_port_main_start_worker_handler,
.socket = nxt_main_port_socket_handler,
.modules = nxt_main_port_modules_handler,
.conf_store = nxt_main_port_conf_store_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};
@ -278,7 +273,7 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
* A main process port. A write port is not closed
* since it should be inherited by worker processes.
*/
nxt_port_enable(task, port, nxt_main_process_port_handlers);
nxt_port_enable(task, port, &nxt_main_process_port_handlers);
process->ready = 1;
@ -363,7 +358,7 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
init->start = nxt_controller_start;
init->name = "controller";
init->user_cred = &rt->user_cred;
init->port_handlers = nxt_controller_process_port_handlers;
init->port_handlers = &nxt_controller_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_CONTROLLER;
init->data = &conf;
@ -393,7 +388,7 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
init->start = nxt_discovery_start;
init->name = "discovery";
init->user_cred = &rt->user_cred;
init->port_handlers = nxt_discovery_process_port_handlers;
init->port_handlers = &nxt_discovery_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_DISCOVERY;
init->data = rt;
@ -417,7 +412,7 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
init->start = nxt_router_start;
init->name = "router";
init->user_cred = &rt->user_cred;
init->port_handlers = nxt_router_process_port_handlers;
init->port_handlers = &nxt_router_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_ROUTER;
init->data = rt;
@ -479,7 +474,7 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
init->start = nxt_app_start;
init->name = (char *) title;
init->port_handlers = nxt_app_process_port_handlers;
init->port_handlers = &nxt_app_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_WORKER;
init->data = app_conf;

View file

@ -28,11 +28,10 @@ nxt_int_t nxt_router_start(nxt_task_t *task, void *data);
nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, void *data);
extern nxt_port_handler_t nxt_controller_process_port_handlers[];
extern nxt_port_handler_t nxt_worker_process_port_handlers[];
extern nxt_port_handler_t nxt_discovery_process_port_handlers[];
extern nxt_port_handler_t nxt_app_process_port_handlers[];
extern nxt_port_handler_t nxt_router_process_port_handlers[];
extern nxt_port_handlers_t nxt_controller_process_port_handlers;
extern nxt_port_handlers_t nxt_discovery_process_port_handlers;
extern nxt_port_handlers_t nxt_app_process_port_handlers;
extern nxt_port_handlers_t nxt_router_process_port_handlers;
extern const nxt_sig_event_t nxt_main_process_signals[];
extern const nxt_sig_event_t nxt_worker_process_signals[];

View file

@ -121,11 +121,11 @@ nxt_port_reset_next_id()
void
nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handler_t *handlers)
nxt_port_handlers_t *handlers)
{
port->pid = nxt_pid;
port->handler = nxt_port_handler;
port->data = handlers;
port->data = (nxt_port_handler_t *) (handlers);
nxt_port_read_enable(task, port);
}
@ -271,7 +271,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
nxt_process_t *process;

View file

@ -8,6 +8,40 @@
#define _NXT_PORT_H_INCLUDED_
struct nxt_port_handlers_s {
/* RPC responses. */
nxt_port_handler_t rpc_ready;
nxt_port_handler_t rpc_error;
/* Main process RPC requests. */
nxt_port_handler_t start_worker;
nxt_port_handler_t socket;
nxt_port_handler_t modules;
nxt_port_handler_t conf_store;
/* File descriptor exchange. */
nxt_port_handler_t change_file;
nxt_port_handler_t new_port;
nxt_port_handler_t mmap;
/* New process ready. */
nxt_port_handler_t process_ready;
/* Process exit/crash notification. */
nxt_port_handler_t remove_pid;
/* Stop process command. */
nxt_port_handler_t quit;
/* Various data. */
nxt_port_handler_t data;
};
#define nxt_port_handler_idx(name) \
( &((nxt_port_handlers_t *) 0)->name - (nxt_port_handler_t *) 0)
typedef enum {
NXT_PORT_MSG_LAST = 0x100,
NXT_PORT_MSG_CLOSE_FD = 0x200,
@ -15,39 +49,49 @@ typedef enum {
NXT_PORT_MSG_MASK = 0xFF,
_NXT_PORT_MSG_QUIT = 0,
_NXT_PORT_MSG_NEW_PORT,
_NXT_PORT_MSG_CHANGE_FILE,
_NXT_PORT_MSG_MMAP,
_NXT_PORT_MSG_DATA,
_NXT_PORT_MSG_REMOVE_PID,
_NXT_PORT_MSG_READY,
_NXT_PORT_MSG_START_WORKER,
_NXT_PORT_MSG_SOCKET,
_NXT_PORT_MSG_MODULES,
_NXT_PORT_MSG_CONF_STORE,
_NXT_PORT_MSG_RPC_READY,
_NXT_PORT_MSG_RPC_ERROR,
_NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready),
_NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error),
NXT_PORT_MSG_MAX,
_NXT_PORT_MSG_START_WORKER = nxt_port_handler_idx(start_worker),
_NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket),
_NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules),
_NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store),
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
_NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
_NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
_NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) /
sizeof(nxt_port_handler_t),
NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_START_WORKER = _NXT_PORT_MSG_START_WORKER |
NXT_PORT_MSG_LAST,
NXT_PORT_MSG_SOCKET = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_MODULES = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_CONF_STORE = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
NXT_PORT_MSG_PROCESS_READY = _NXT_PORT_MSG_PROCESS_READY |
NXT_PORT_MSG_LAST,
NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;
@ -169,7 +213,7 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_buf_t *b);
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handler_t *handlers);
nxt_port_handlers_t *handlers);
void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *port, uint32_t stream);
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
@ -179,7 +223,7 @@ void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_change_log_file_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);

View file

@ -161,7 +161,7 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
nxt_port_enable(task, port, init->port_handlers);
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_READY,
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
-1, init->stream, 0, NULL);
if (nxt_slow_path(ret != NXT_OK)) {

View file

@ -30,7 +30,7 @@ struct nxt_process_init_s {
const char *name;
nxt_user_cred_t *user_cred;
nxt_port_handler_t *port_handlers;
nxt_port_handlers_t *port_handlers;
const nxt_sig_event_t *signals;
nxt_process_type_t type;

View file

@ -1783,21 +1783,9 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
}
static nxt_port_handler_t nxt_router_app_port_handlers[] = {
NULL, /* NXT_PORT_MSG_QUIT */
NULL, /* NXT_PORT_MSG_NEW_PORT */
NULL, /* NXT_PORT_MSG_CHANGE_FILE */
/* TODO: remove mmap_handler from app ports */
nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */
nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */
NULL, /* NXT_PORT_MSG_REMOVE_PID */
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */
NULL, /* NXT_PORT_MSG_SOCKET */
NULL, /* NXT_PORT_MSG_MODULES */
NULL, /* NXT_PORT_MSG_CONF_STORE */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
static nxt_port_handlers_t nxt_router_app_port_handlers = {
.mmap = nxt_port_mmap_handler,
.data = nxt_port_rpc_handler,
};
@ -1844,7 +1832,7 @@ nxt_router_thread_start(void *data)
engine->port = port;
nxt_port_enable(task, port, nxt_router_app_port_handlers);
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
nxt_event_engine_start(engine);
}

View file

@ -20,71 +20,37 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_port_data_handler,
nxt_port_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */
NULL, /* NXT_PORT_MSG_SOCKET */
NULL, /* NXT_PORT_MSG_MODULES */
NULL, /* NXT_PORT_MSG_CONF_STORE */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
nxt_port_handlers_t nxt_app_process_port_handlers = {
.quit = nxt_worker_process_quit_handler,
.new_port = nxt_port_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_app_data_handler,
.remove_pid = nxt_port_remove_pid_handler,
};
nxt_port_handler_t nxt_app_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_port_app_data_handler,
nxt_port_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */
NULL, /* NXT_PORT_MSG_SOCKET */
NULL, /* NXT_PORT_MSG_MODULES */
NULL, /* NXT_PORT_MSG_CONF_STORE */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
nxt_port_handlers_t nxt_router_process_port_handlers = {
.quit = nxt_worker_process_quit_handler,
.new_port = nxt_router_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_router_conf_data_handler,
.remove_pid = nxt_router_remove_pid_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};
nxt_port_handler_t nxt_router_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_router_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_router_conf_data_handler,
nxt_router_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */
NULL, /* NXT_PORT_MSG_SOCKET */
NULL, /* NXT_PORT_MSG_MODULES */
NULL, /* NXT_PORT_MSG_CONF_STORE */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
};
nxt_port_handler_t nxt_discovery_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_port_data_handler,
nxt_port_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */
NULL, /* NXT_PORT_MSG_SOCKET */
NULL, /* NXT_PORT_MSG_MODULES */
NULL, /* NXT_PORT_MSG_CONF_STORE */
nxt_port_rpc_handler,
nxt_port_rpc_handler,
nxt_port_handlers_t nxt_discovery_process_port_handlers = {
.quit = nxt_worker_process_quit_handler,
.new_port = nxt_port_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_data_handler,
.remove_pid = nxt_port_remove_pid_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};