Router: introducing the PORT_ACK message.
The PORT_ACK message is the router's response to the application's NEW_PORT message. After receiving PORT_ACK, the application is safe to process requests using this port. This message avoids a racing condition when the application starts processing a request from the shared queue and sends REQ_HEADERS_ACK. The REQ_HEADERS_ACK message contains the application port ID as reply_port, which the router uses to send request data. When the application creates a new port, it immediately sends it to the main router thread. Because the request is processed outside the main thread, a racing condition can occur between the receipt of the new port in the main thread and the receipt of REQ_HEADERS_ACK in the worker router thread where the same port is specified as reply_port.
This commit is contained in:
parent
131b6a7ffa
commit
4cb8aeb31a
4 changed files with 39 additions and 5 deletions
|
@ -26,6 +26,7 @@ struct nxt_port_handlers_s {
|
||||||
nxt_port_handler_t change_file;
|
nxt_port_handler_t change_file;
|
||||||
nxt_port_handler_t new_port;
|
nxt_port_handler_t new_port;
|
||||||
nxt_port_handler_t get_port;
|
nxt_port_handler_t get_port;
|
||||||
|
nxt_port_handler_t port_ack;
|
||||||
nxt_port_handler_t mmap;
|
nxt_port_handler_t mmap;
|
||||||
nxt_port_handler_t get_mmap;
|
nxt_port_handler_t get_mmap;
|
||||||
|
|
||||||
|
@ -84,6 +85,7 @@ typedef enum {
|
||||||
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
|
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
|
||||||
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
|
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
|
||||||
_NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
|
_NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
|
||||||
|
_NXT_PORT_MSG_PORT_ACK = nxt_port_handler_idx(port_ack),
|
||||||
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
|
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
|
||||||
_NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
|
_NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
|
||||||
|
|
||||||
|
@ -120,6 +122,7 @@ typedef enum {
|
||||||
NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
|
NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
|
||||||
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
|
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
|
||||||
NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
|
NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
|
||||||
|
NXT_PORT_MSG_PORT_ACK = nxt_msg_last(_NXT_PORT_MSG_PORT_ACK),
|
||||||
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
|
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
|
||||||
| NXT_PORT_MSG_SYNC,
|
| NXT_PORT_MSG_SYNC,
|
||||||
NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
|
NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
|
||||||
|
|
|
@ -688,6 +688,8 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
|
|
||||||
port->app = app;
|
port->app = app;
|
||||||
port->main_app_port = main_app_port;
|
port->main_app_port = main_app_port;
|
||||||
|
|
||||||
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,7 @@ static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
|
||||||
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
|
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
|
||||||
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
|
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_recv_msg_t *recv_msg);
|
nxt_unit_recv_msg_t *recv_msg);
|
||||||
|
static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
|
||||||
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
|
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_recv_msg_t *recv_msg);
|
nxt_unit_recv_msg_t *recv_msg);
|
||||||
static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
|
static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
|
||||||
|
@ -306,6 +307,7 @@ struct nxt_unit_ctx_impl_s {
|
||||||
nxt_queue_t free_rbuf;
|
nxt_queue_t free_rbuf;
|
||||||
|
|
||||||
int online;
|
int online;
|
||||||
|
int ready;
|
||||||
|
|
||||||
nxt_unit_mmap_buf_t ctx_buf[2];
|
nxt_unit_mmap_buf_t ctx_buf[2];
|
||||||
nxt_unit_read_buf_t ctx_read_buf;
|
nxt_unit_read_buf_t ctx_read_buf;
|
||||||
|
@ -624,6 +626,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
||||||
ctx_impl->use_count = 1;
|
ctx_impl->use_count = 1;
|
||||||
ctx_impl->wait_items = 0;
|
ctx_impl->wait_items = 0;
|
||||||
ctx_impl->online = 1;
|
ctx_impl->online = 1;
|
||||||
|
ctx_impl->ready = 0;
|
||||||
|
|
||||||
nxt_queue_init(&ctx_impl->free_req);
|
nxt_queue_init(&ctx_impl->free_req);
|
||||||
nxt_queue_init(&ctx_impl->free_ws);
|
nxt_queue_init(&ctx_impl->free_ws);
|
||||||
|
@ -996,6 +999,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
||||||
rc = nxt_unit_process_new_port(ctx, &recv_msg);
|
rc = nxt_unit_process_new_port(ctx, &recv_msg);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case _NXT_PORT_MSG_PORT_ACK:
|
||||||
|
rc = nxt_unit_ctx_ready(ctx);
|
||||||
|
break;
|
||||||
|
|
||||||
case _NXT_PORT_MSG_CHANGE_FILE:
|
case _NXT_PORT_MSG_CHANGE_FILE:
|
||||||
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
|
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
|
||||||
port_msg->stream, recv_msg.fd[0]);
|
port_msg->stream, recv_msg.fd[0]);
|
||||||
|
@ -1169,8 +1176,28 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||||
if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
|
if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
|
||||||
lib->shared_port = port;
|
lib->shared_port = port;
|
||||||
|
|
||||||
} else {
|
return nxt_unit_ctx_ready(ctx);
|
||||||
nxt_unit_port_release(port);
|
}
|
||||||
|
|
||||||
|
nxt_unit_port_release(port);
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
|
||||||
|
{
|
||||||
|
nxt_unit_impl_t *lib;
|
||||||
|
nxt_unit_ctx_impl_t *ctx_impl;
|
||||||
|
|
||||||
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
|
|
||||||
|
ctx_impl->ready = 1;
|
||||||
|
|
||||||
|
if (lib->callbacks.ready_handler) {
|
||||||
|
return lib->callbacks.ready_handler(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NXT_UNIT_OK;
|
return NXT_UNIT_OK;
|
||||||
|
@ -4495,17 +4522,17 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
||||||
nxt_unit_port_impl_t *port_impl;
|
nxt_unit_port_impl_t *port_impl;
|
||||||
struct pollfd fds[2];
|
struct pollfd fds[2];
|
||||||
|
|
||||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
|
||||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
|
|
||||||
if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
|
if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
|
||||||
|
|
||||||
return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
|
return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
|
port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
|
||||||
port);
|
port);
|
||||||
|
|
||||||
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
|
||||||
retry:
|
retry:
|
||||||
|
|
||||||
if (port_impl->from_socket == 0) {
|
if (port_impl->from_socket == 0) {
|
||||||
|
|
|
@ -154,6 +154,8 @@ struct nxt_unit_callbacks_s {
|
||||||
/* Receive data on port id. Optional. */
|
/* Receive data on port id. Optional. */
|
||||||
ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
|
ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
|
||||||
void *buf, size_t buf_size, void *oob, size_t oob_size);
|
void *buf, size_t buf_size, void *oob, size_t oob_size);
|
||||||
|
|
||||||
|
int (*ready_handler)(nxt_unit_ctx_t *);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue