Router: broadcasting the SHM_ACK message to all process ports.

This commit is contained in:
Max Romanov 2020-10-28 00:01:46 +03:00
parent 735bb2f127
commit ccee391ab2
3 changed files with 40 additions and 11 deletions

View file

@ -17,6 +17,10 @@
#include <nxt_port_memory_int.h> #include <nxt_port_memory_int.h>
static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
void *data);
nxt_inline void nxt_inline void
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
{ {
@ -112,7 +116,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
u_char *p; u_char *p;
nxt_mp_t *mp; nxt_mp_t *mp;
nxt_buf_t *b, *next; nxt_buf_t *b, *next;
nxt_port_t *port;
nxt_process_t *process; nxt_process_t *process;
nxt_chunk_id_t c; nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr; nxt_port_mmap_header_t *hdr;
@ -171,14 +174,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
{ {
process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
if (process != NULL && !nxt_queue_is_empty(&process->ports)) { nxt_process_broadcast_shm_ack(task, process);
port = nxt_process_port_first(process);
if (port->type == NXT_PROCESS_APP) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
-1, 0, 0, NULL);
}
}
} }
release_buf: release_buf:
@ -976,3 +972,35 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
return m; return m;
} }
void
nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
{
nxt_port_t *port;
if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
{
return;
}
port = nxt_process_port_first(process);
if (port->type == NXT_PROCESS_APP) {
nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
}
}
static void
nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
{
nxt_process_t *process;
process = data;
nxt_queue_each(port, &process->ports, nxt_port_t, link) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
-1, 0, 0, NULL);
} nxt_queue_loop;
}

View file

@ -71,4 +71,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size); nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size);
void nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process);
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */ #endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */

View file

@ -5389,8 +5389,7 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_thread_mutex_unlock(&process->incoming.mutex); nxt_thread_mutex_unlock(&process->incoming.mutex);
if (ack) { if (ack) {
(void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK, nxt_process_broadcast_shm_ack(task, process);
-1, 0, 0, NULL);
} }
} }