From f23f985899760fafd853e993d9023b1339f09533 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 2 Aug 2017 13:22:07 +0300 Subject: [PATCH] Runtime processes protected with mutex. --- src/nxt_master_process.c | 6 +- src/nxt_port.c | 34 ++--------- src/nxt_port.h | 2 - src/nxt_port_memory.c | 2 +- src/nxt_process.c | 2 +- src/nxt_process.h | 1 + src/nxt_runtime.c | 124 +++++++++++++++++++++++++++++---------- src/nxt_runtime.h | 3 +- 8 files changed, 106 insertions(+), 68 deletions(-) diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 2bc97c1c..41e659a4 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -406,7 +406,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, port = nxt_port_new(0, 0, init->type); if (nxt_slow_path(port == NULL)) { - nxt_runtime_process_destroy(rt, process); + nxt_runtime_process_remove(rt, process); return NXT_ERROR; } @@ -658,7 +658,9 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (!nxt_exiting) { nxt_runtime_process_each(rt, process) { - if (process->pid == nxt_pid) { + if (process->pid == nxt_pid || + process->pid == pid || + nxt_queue_is_empty(&process->ports)) { continue; } diff --git a/src/nxt_port.c b/src/nxt_port.c index 18dc4121..2e5b229b 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -106,28 +106,6 @@ nxt_port_enable(nxt_task_t *task, nxt_port_t *port, } -void -nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) -{ - nxt_port_t *port; - nxt_process_t *process; - - nxt_runtime_process_each(rt, process) - { - if (nxt_pid != process->pid) { - nxt_process_port_each(process, port) { - - (void) nxt_port_socket_write(task, port, type, - fd, stream, 0, b); - - } nxt_process_port_loop; - } - } - nxt_runtime_process_loop; -} - - static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -276,6 +254,8 @@ nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; + nxt_assert(nxt_runtime_is_master(rt)); + process = nxt_runtime_process_get(rt, msg->port_msg.pid); if (nxt_slow_path(process == NULL)) { return; @@ -290,9 +270,7 @@ nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "process %PI ready", msg->port_msg.pid); - if (nxt_runtime_is_master(rt)) { - nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); - } + nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); } @@ -310,7 +288,7 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process = nxt_runtime_process_get(rt, msg->port_msg.pid); + process = nxt_runtime_process_find(rt, msg->port_msg.pid); if (nxt_slow_path(process == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", msg->port_msg.pid); @@ -415,14 +393,14 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_runtime_t *rt; nxt_process_t *process; - nxt_debug(task, "port remove pid handler"); - buf = msg->buf; nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + nxt_debug(task, "port remove pid %PI handler", pid); + rt = task->thread->runtime; nxt_port_rpc_remove_peer(task, msg->port, pid); diff --git a/src/nxt_port.h b/src/nxt_port.h index 8a5c3390..a7f78b4c 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -160,8 +160,6 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, nxt_port_handler_t *handlers); -void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *port, uint32_t stream); nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index d21d263a..ec3227b1 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -379,7 +379,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; - process = nxt_runtime_process_get(task->thread->runtime, spid); + process = nxt_runtime_process_find(task->thread->runtime, spid); if (nxt_slow_path(process == NULL)) { return NULL; } diff --git a/src/nxt_process.c b/src/nxt_process.c index 9ab3eaf3..e6a6fa0c 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -577,7 +577,7 @@ nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) process->port_cleanups--; if (process->port_cleanups == 0) { - nxt_runtime_process_destroy(rt, process); + nxt_runtime_process_remove(rt, process); } } diff --git a/src/nxt_process.h b/src/nxt_process.h index aa3aa7a5..b76c4f1a 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -57,6 +57,7 @@ typedef struct { nxt_pid_t pid; nxt_queue_t ports; /* of nxt_port_t */ nxt_bool_t ready; + nxt_bool_t registered; nxt_uint_t port_cleanups; nxt_process_init_t *init; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 31829225..5f4ec6f4 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -48,6 +48,11 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, nxt_runtime_cont_t cont); #endif +static void nxt_runtime_process_destroy(nxt_runtime_t *rt, + nxt_process_t *process); +static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, + nxt_pid_t pid); + nxt_int_t nxt_runtime_create(nxt_task_t *task) @@ -71,6 +76,8 @@ nxt_runtime_create(nxt_task_t *task) task->thread->runtime = rt; rt->mem_pool = mp; + nxt_thread_mutex_create(&rt->processes_mutex); + rt->prefix = nxt_current_directory(mp); if (nxt_slow_path(rt->prefix == NULL)) { goto fail; @@ -557,6 +564,8 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) } nxt_runtime_process_loop; + nxt_thread_mutex_destroy(&rt->processes_mutex); + nxt_mp_destroy(rt->mem_pool); nxt_debug(task, "exit"); @@ -1488,10 +1497,11 @@ nxt_runtime_process_new(nxt_runtime_t *rt) } -void +static void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) { nxt_assert(process->port_cleanups == 0); + nxt_assert(process->registered == 0); nxt_port_mmaps_destroy(process->incoming, 1); nxt_port_mmaps_destroy(process->outgoing, 1); @@ -1533,23 +1543,38 @@ static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { }; +nxt_inline void +nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) +{ + lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); + lhq->key.length = sizeof(*pid); + lhq->key.start = (u_char *) pid; + lhq->proto = &lvlhsh_processes_proto; +} + + nxt_process_t * nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) { + nxt_process_t *process; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); - lhq.key.length = sizeof(pid); - lhq.key.start = (u_char *) &pid; - lhq.proto = &lvlhsh_processes_proto; + process = NULL; + + nxt_runtime_process_lhq_pid(&lhq, &pid); + + nxt_thread_mutex_lock(&rt->processes_mutex); if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { - return lhq.value; + process = lhq.value; + + } else { + nxt_thread_log_debug("process %PI not found", pid); } - nxt_thread_log_debug("process %PI not found", pid); + nxt_thread_mutex_unlock(&rt->processes_mutex); - return NULL; + return process; } @@ -1559,13 +1584,14 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) nxt_process_t *process; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); - lhq.key.length = sizeof(pid); - lhq.key.start = (u_char *) &pid; - lhq.proto = &lvlhsh_processes_proto; + nxt_runtime_process_lhq_pid(&lhq, &pid); + + nxt_thread_mutex_lock(&rt->processes_mutex); if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { nxt_thread_log_debug("process %PI found", pid); + + nxt_thread_mutex_unlock(&rt->processes_mutex); return lhq.value; } @@ -1589,6 +1615,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) rt->nprocesses++; + process->registered = 1; + nxt_thread_log_debug("process %PI insert", pid); break; @@ -1597,6 +1625,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) break; } + nxt_thread_mutex_unlock(&rt->processes_mutex); + return process; } @@ -1607,14 +1637,16 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_t *port; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); - lhq.key.length = sizeof(process->pid); - lhq.key.start = (u_char *) &process->pid; - lhq.proto = &lvlhsh_processes_proto; + nxt_assert(process->registered == 0); + + nxt_runtime_process_lhq_pid(&lhq, &process->pid); + lhq.replace = 0; lhq.value = process; lhq.pool = rt->mem_pool; + nxt_thread_mutex_lock(&rt->processes_mutex); + switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { case NXT_OK: @@ -1632,37 +1664,70 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; + process->registered = 1; + + nxt_thread_log_debug("process %PI added", process->pid); break; default: + nxt_thread_log_debug("process %PI failed to add", process->pid); break; } + + nxt_thread_mutex_unlock(&rt->processes_mutex); } -void -nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +static nxt_process_t * +nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) { - nxt_port_t *port; + nxt_process_t *process; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); - lhq.key.length = sizeof(process->pid); - lhq.key.start = (u_char *) &process->pid; - lhq.proto = &lvlhsh_processes_proto; - lhq.replace = 0; - lhq.value = process; + process = NULL; + + nxt_runtime_process_lhq_pid(&lhq, &pid); + lhq.pool = rt->mem_pool; + nxt_thread_mutex_lock(&rt->processes_mutex); + switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { case NXT_OK: rt->nprocesses--; - if (process->port_cleanups == 0) { - nxt_runtime_process_destroy(rt, process); + process = lhq.value; + + process->registered = 0; + + nxt_thread_log_debug("process %PI removed", pid); + break; + + default: + nxt_thread_log_debug("process %PI remove failed", pid); + break; + } + + nxt_thread_mutex_unlock(&rt->processes_mutex); + + return process; +} + + +void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_t *port; + + if (process->port_cleanups == 0) { + if (process->registered == 1) { + nxt_runtime_process_remove_pid(rt, process->pid); } + nxt_runtime_process_destroy(rt, process); + + } else { nxt_process_port_each(process, port) { nxt_runtime_port_remove(rt, port); @@ -1670,11 +1735,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_release(port); } nxt_process_port_loop; - - break; - - default: - break; } } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 8ee8560b..7922c2ef 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -37,6 +37,7 @@ struct nxt_runtime_s { nxt_process_t *mprocess; size_t nprocesses; + nxt_thread_mutex_t processes_mutex; nxt_lvlhsh_t processes; /* of nxt_process_t */ nxt_port_t *port_by_type[NXT_PROCESS_MAX]; @@ -101,8 +102,6 @@ nxt_runtime_is_master(nxt_runtime_t *rt) nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); -void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process); - nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);