Using port rpc in controller->router configuration update.

This commit is contained in:
Max Romanov 2017-08-02 13:20:57 +03:00
parent 82c0304ab8
commit bcf99f87e2
4 changed files with 30 additions and 46 deletions

View file

@ -776,8 +776,9 @@ nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
}
void
nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
size_t size;
nxt_buf_t *b;
@ -787,15 +788,14 @@ nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
b = msg->buf;
size = b->mem.free - b->mem.pos;
nxt_debug(task, "contoller data: %*s ...", size, b->mem.pos);
nxt_debug(task, "controller conf ready: %*s ...", size, b->mem.pos);
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
req = nxt_controller_current_request;
nxt_controller_current_request = NULL;
if (size == 2 && nxt_memcmp(b->mem.pos, "OK", 2) == 0) {
if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
nxt_mp_destroy(nxt_controller_conf.pool);
nxt_controller_conf = req->conf;
@ -848,22 +848,36 @@ static nxt_int_t
nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
{
size_t size;
uint32_t stream;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *port;
nxt_port_t *router_port, *controller_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
port = rt->port_by_type[NXT_PROCESS_ROUTER];
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
size = nxt_conf_json_length(conf, NULL);
b = nxt_port_mmap_get_buf(task, port, size);
b = nxt_port_mmap_get_buf(task, router_port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0,
0, b);
stream = nxt_port_rpc_register_handler(task, controller_port,
nxt_controller_conf_handler,
nxt_controller_conf_handler,
router_port->pid, NULL);
rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
}
return rc;
}

View file

@ -53,9 +53,7 @@ static void nxt_router_conf_success(nxt_task_t *task,
static void nxt_router_conf_error(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_conf_send(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, size_t size);
static void nxt_router_conf_buf_completion(nxt_task_t *task, void *obj,
void *data);
nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
@ -490,7 +488,7 @@ nxt_router_conf_success(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_debug(task, "temp conf count:%D", tmcf->count);
if (--tmcf->count == 0) {
nxt_router_conf_send(task, tmcf, (u_char *) "OK", 2);
nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
}
}
@ -526,40 +524,15 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_mp_destroy(tmcf->conf->mem_pool);
nxt_router_conf_send(task, tmcf, (u_char *) "ERROR", 5);
nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
}
static void
nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, size_t size)
nxt_port_msg_type_t type)
{
nxt_buf_t *b;
b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
if (nxt_slow_path(b == NULL)) {
return;
}
b->mem.free = nxt_cpymem(b->mem.free, start, size);
b->parent = tmcf->mem_pool;
b->completion_handler = nxt_router_conf_buf_completion;
nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA_LAST, -1,
tmcf->stream, 0, b);
}
static void
nxt_router_conf_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
/* nxt_router_temp_conf_t mem pool. */
mp = data;
nxt_mp_destroy(mp);
nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
}

View file

@ -135,9 +135,6 @@ nxt_port_t *nxt_runtime_port_first(nxt_runtime_t *rt,
/* STUB */
nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt);
void nxt_port_controller_data_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
nxt_str_t *nxt_current_directory(nxt_mp_t *mp);
nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt,

View file

@ -27,7 +27,7 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = {
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_port_controller_data_handler,
nxt_port_data_handler,
nxt_port_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */