Introducing the shared application port.

This is the port shared between all application processes which use it to pass
requests for processing.  Using it significantly simplifies the request
processing code in the router.  The drawback is 2 more file descriptors per each
configured application and more complex libunit message wait/read code.
This commit is contained in:
Max Romanov 2020-08-11 19:20:15 +03:00
parent 6e31d6cd39
commit 8359560612
12 changed files with 934 additions and 1269 deletions

View file

@ -44,7 +44,7 @@ nxt_cgo_run(uintptr_t handler)
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
rc = nxt_unit_run(ctx); rc = nxt_unit_run_ctx(ctx);
nxt_unit_done(ctx); nxt_unit_done(ctx);
@ -105,7 +105,7 @@ nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length)
static int static int
nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{ {
nxt_go_add_port(port->id.pid, port->id.id, nxt_go_add_port((uintptr_t) ctx, port->id.pid, port->id.id,
port->in_fd, port->out_fd); port->in_fd, port->out_fd);
port->in_fd = -1; port->in_fd = -1;
@ -203,6 +203,13 @@ nxt_cgo_request_done(uintptr_t req, int res)
} }
void
nxt_cgo_unit_run_shared(uintptr_t ctx)
{
nxt_unit_run_shared((nxt_unit_ctx_t *) ctx);
}
void void
nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) nxt_cgo_warn(uintptr_t msg, uint32_t msg_len)
{ {

View file

@ -35,6 +35,8 @@ int nxt_cgo_request_close(uintptr_t req);
void nxt_cgo_request_done(uintptr_t req, int res); void nxt_cgo_request_done(uintptr_t req, int res);
void nxt_cgo_unit_run_shared(uintptr_t ctx);
void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len);
#endif /* _NXT_CGO_LIB_H_INCLUDED_ */ #endif /* _NXT_CGO_LIB_H_INCLUDED_ */

View file

@ -93,7 +93,7 @@ func getUnixConn(fd int) *net.UnixConn {
} }
//export nxt_go_add_port //export nxt_go_add_port
func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) {
p := &port{ p := &port{
key: port_key{ key: port_key{
pid: int(pid), pid: int(pid),
@ -104,6 +104,12 @@ func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) {
} }
add_port(p) add_port(p)
if id == 65535 {
go func(ctx C.uintptr_t) {
C.nxt_cgo_unit_run_shared(ctx);
}(ctx)
}
} }
//export nxt_go_remove_port //export nxt_go_remove_port

View file

@ -20,7 +20,7 @@ napi_ref Unit::constructor_;
struct port_data_t { struct port_data_t {
nxt_unit_ctx_t *ctx; nxt_unit_ctx_t *ctx;
nxt_unit_port_id_t port_id; nxt_unit_port_t *port;
uv_poll_t poll; uv_poll_t poll;
}; };
@ -351,7 +351,11 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
static void static void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events) nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{ {
nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); port_data_t *data;
data = (port_data_t *) handle->data;
nxt_unit_process_port_msg(data->ctx, data->port);
} }
@ -396,21 +400,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
port->data = data; port->data = data;
data->ctx = ctx; data->ctx = ctx;
data->port_id = port->id; data->port = port;
data->poll.data = ctx; data->poll.data = data;
} }
return NXT_UNIT_OK; return NXT_UNIT_OK;
} }
inline bool
operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
{
return p1.pid == p2.pid && p1.id == p2.id;
}
void void
Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
{ {
@ -419,10 +416,9 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
if (port->data != NULL) { if (port->data != NULL) {
data = (port_data_t *) port->data; data = (port_data_t *) port->data;
if (data->port_id == port->id) { if (data->port == port) {
uv_poll_stop(&data->poll); uv_poll_stop(&data->poll);
data->poll.data = data;
uv_close((uv_handle_t *) &data->poll, delete_port_data); uv_close((uv_handle_t *) &data->poll, delete_port_data);
} }
} }

View file

@ -33,15 +33,13 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *out, *buf, **out_tail, *b, *next; nxt_buf_t *out, *buf, **out_tail, *b, *next;
nxt_int_t res; nxt_int_t res;
nxt_http_request_t *r; nxt_http_request_t *r;
nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data; nxt_request_rpc_data_t *req_rpc_data;
nxt_websocket_header_t *wsh; nxt_websocket_header_t *wsh;
r = obj; r = obj;
req_rpc_data = r->req_rpc_data;
if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL if (nxt_slow_path(req_rpc_data == NULL)) {
|| (req_app_link = req_rpc_data->req_app_link) == NULL))
{
nxt_debug(task, "websocket client frame for destroyed request"); nxt_debug(task, "websocket client frame for destroyed request");
return; return;
@ -69,8 +67,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
if (buf == NULL || buf_free_size == 0) { if (buf == NULL || buf_free_size == 0) {
buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
buf = nxt_port_mmap_get_buf(task, buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
&req_app_link->app_port->process->outgoing,
buf_free_size); buf_free_size);
*out_tail = buf; *out_tail = buf;
@ -101,10 +98,10 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
b = next; b = next;
} }
res = nxt_port_socket_twrite(task, req_app_link->app_port, res = nxt_port_socket_twrite(task, req_rpc_data->app_port,
NXT_PORT_MSG_WEBSOCKET, -1, NXT_PORT_MSG_WEBSOCKET, -1,
req_app_link->stream, req_rpc_data->stream,
req_app_link->reply_port->id, out, NULL); task->thread->engine->port->id, out, NULL);
if (nxt_slow_path(res != NXT_OK)) { if (nxt_slow_path(res != NXT_OK)) {
// TODO: handle // TODO: handle
} }
@ -130,32 +127,27 @@ static void
nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
{ {
nxt_http_request_t *r; nxt_http_request_t *r;
nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data; nxt_request_rpc_data_t *req_rpc_data;
nxt_debug(task, "http websocket error handler"); nxt_debug(task, "http websocket error handler");
r = obj; r = obj;
req_rpc_data = r->req_rpc_data;
if ((req_rpc_data = r->req_rpc_data) == NULL) { if (req_rpc_data == NULL) {
nxt_debug(task, " req_rpc_data is NULL"); nxt_debug(task, " req_rpc_data is NULL");
goto close_handler; goto close_handler;
} }
if ((req_app_link = req_rpc_data->req_app_link) == NULL) { if (req_rpc_data->app_port == NULL) {
nxt_debug(task, " req_app_link is NULL");
goto close_handler;
}
if (req_app_link->app_port == NULL) {
nxt_debug(task, " app_port is NULL"); nxt_debug(task, " app_port is NULL");
goto close_handler; goto close_handler;
} }
(void) nxt_port_socket_twrite(task, req_app_link->app_port, (void) nxt_port_socket_twrite(task, req_rpc_data->app_port,
NXT_PORT_MSG_WEBSOCKET_LAST, NXT_PORT_MSG_WEBSOCKET_LAST,
-1, req_app_link->stream, -1, req_rpc_data->stream,
req_app_link->reply_port->id, NULL, NULL); task->thread->engine->port->id, NULL, NULL);
close_handler: close_handler:

View file

@ -67,8 +67,6 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages); nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex); nxt_thread_mutex_create(&port->write_mutex);
nxt_queue_init(&port->pending_requests);
nxt_queue_init(&port->active_websockets);
} else { } else {
nxt_mp_destroy(mp); nxt_mp_destroy(mp);

View file

@ -41,6 +41,7 @@ struct nxt_port_handlers_s {
/* Request headers. */ /* Request headers. */
nxt_port_handler_t req_headers; nxt_port_handler_t req_headers;
nxt_port_handler_t req_headers_ack;
/* Websocket frame. */ /* Websocket frame. */
nxt_port_handler_t websocket_frame; nxt_port_handler_t websocket_frame;
@ -89,6 +90,7 @@ typedef enum {
_NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
_NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers), _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
_NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
_NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
@ -113,7 +115,8 @@ typedef enum {
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT), NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT), NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP) NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
| NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, | NXT_PORT_MSG_SYNC,
NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED), NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY), NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
@ -193,6 +196,7 @@ struct nxt_port_s {
nxt_queue_link_t app_link; /* for nxt_app_t.ports */ nxt_queue_link_t app_link; /* for nxt_app_t.ports */
nxt_app_t *app; nxt_app_t *app;
nxt_port_t *main_app_port;
nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */ nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
nxt_msec_t idle_start; nxt_msec_t idle_start;
@ -205,11 +209,10 @@ struct nxt_port_s {
/* Maximum interleave of message parts. */ /* Maximum interleave of message parts. */
uint32_t max_share; uint32_t max_share;
uint32_t app_pending_responses;
uint32_t app_responses; uint32_t app_responses;
nxt_queue_t pending_requests;
nxt_queue_t active_websockets; uint32_t active_websockets;
uint32_t active_requests;
nxt_port_handler_t handler; nxt_port_handler_t handler;
nxt_port_handler_t *data; nxt_port_handler_t *data;

File diff suppressed because it is too large Load diff

View file

@ -103,16 +103,18 @@ typedef struct {
struct nxt_app_s { struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */ nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */ nxt_queue_t ports; /* of nxt_port_t.app_link */
nxt_lvlhsh_t port_hash; /* of nxt_port_t */
nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */ nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */
nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */ nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */
nxt_work_t adjust_idle_work; nxt_work_t adjust_idle_work;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_queue_t requests; /* of nxt_request_app_link_t */
nxt_queue_t pending; /* of nxt_request_app_link_t */
nxt_str_t name; nxt_str_t name;
uint32_t port_hash_count;
uint32_t active_requests;
uint32_t pending_processes; uint32_t pending_processes;
uint32_t processes; uint32_t processes;
uint32_t idle_processes; uint32_t idle_processes;
@ -120,7 +122,6 @@ struct nxt_app_s {
uint32_t max_processes; uint32_t max_processes;
uint32_t spare_processes; uint32_t spare_processes;
uint32_t max_pending_processes; uint32_t max_pending_processes;
uint32_t max_pending_responses;
uint32_t max_requests; uint32_t max_requests;
nxt_msec_t timeout; nxt_msec_t timeout;
@ -139,6 +140,9 @@ struct nxt_app_s {
nxt_atomic_t use_count; nxt_atomic_t use_count;
nxt_app_joint_t *joint; nxt_app_joint_t *joint;
nxt_port_t *shared_port;
nxt_port_mmaps_t outgoing;
}; };

View file

@ -9,14 +9,12 @@
typedef struct nxt_msg_info_s { typedef struct nxt_msg_info_s {
nxt_buf_t *buf; nxt_buf_t *buf;
nxt_fd_t body_fd;
nxt_port_mmap_tracking_t tracking; nxt_port_mmap_tracking_t tracking;
nxt_work_handler_t completion_handler; nxt_work_handler_t completion_handler;
} nxt_msg_info_t; } nxt_msg_info_t;
typedef struct nxt_request_app_link_s nxt_request_app_link_t;
typedef enum { typedef enum {
NXT_APR_NEW_PORT, NXT_APR_NEW_PORT,
NXT_APR_REQUEST_FAILED, NXT_APR_REQUEST_FAILED,
@ -35,38 +33,9 @@ typedef struct {
nxt_http_request_t *request; nxt_http_request_t *request;
nxt_msg_info_t msg_info; nxt_msg_info_t msg_info;
nxt_request_app_link_t *req_app_link;
nxt_bool_t rpc_cancel;
} nxt_request_rpc_data_t; } nxt_request_rpc_data_t;
struct nxt_request_app_link_s {
uint32_t stream;
nxt_atomic_t use_count;
nxt_port_t *app_port;
nxt_apr_action_t apr_action;
nxt_port_t *reply_port;
nxt_http_request_t *request;
nxt_msg_info_t msg_info;
nxt_request_rpc_data_t *req_rpc_data;
nxt_fd_t body_fd;
nxt_nsec_t res_time;
nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
/* for nxt_port_t.pending_requests */
nxt_queue_link_t link_port_pending;
nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
/* for nxt_port_t.active_websockets */
nxt_queue_link_t link_port_websockets;
nxt_mp_t *mem_pool;
nxt_work_t work;
int err_code;
const char *err_str;
};
#endif /* _NXT_ROUTER_REQUEST_H_INCLUDED_ */ #endif /* _NXT_ROUTER_REQUEST_H_INCLUDED_ */

View file

@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data); nxt_unit_ctx_impl_t *ctx_impl, void *data);
nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg); nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_port_id_t *port_id); nxt_unit_port_id_t *port_id);
static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg); nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove); pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
nxt_unit_read_buf_t *rbuf); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
const void *oob, size_t oob_size); const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size); const void *buf, size_t buf_size, const void *oob, size_t oob_size);
static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_port_t *port, void *buf, size_t buf_size, nxt_unit_read_buf_t *rbuf);
void *oob, size_t oob_size);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port); nxt_unit_port_t *port);
@ -308,6 +311,7 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_port_t *router_port; nxt_unit_port_t *router_port;
nxt_unit_port_t *shared_port;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init)
fail: fail:
nxt_unit_ctx_release(&lib->main_ctx); nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL; return NULL;
} }
@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->use_count = 0; lib->use_count = 0;
lib->router_port = NULL; lib->router_port = NULL;
lib->shared_port = NULL;
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_inline void nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
{ {
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
nxt_atomic_fetch_add(&ctx_impl->use_count, 1); nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
} }
nxt_inline void nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
{ {
long c; long c;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_port_release(lib->router_port); nxt_unit_port_release(lib->router_port);
} }
if (nxt_fast_path(lib->shared_port != NULL)) {
nxt_unit_port_release(lib->shared_port);
}
free(lib); free(lib);
} }
} }
@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
recv_msg.incoming_buf = NULL; recv_msg.incoming_buf = NULL;
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
if (nxt_slow_path(rbuf->size == 0)) {
nxt_unit_debug(ctx, "read port closed");
nxt_unit_quit(ctx);
rc = NXT_UNIT_OK;
goto fail;
}
nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail; goto fail;
} }
@ -946,6 +971,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
nxt_unit_process_release(recv_msg.process); nxt_unit_process_release(recv_msg.process);
} }
if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
#if (NXT_DEBUG)
memset(rbuf->buf, 0xAC, rbuf->size);
#endif
nxt_unit_read_buf_release(ctx, rbuf);
}
return rc; return rc;
} }
@ -954,6 +986,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{ {
int nb; int nb;
nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port; nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg; nxt_port_msg_new_port_t *new_port_msg;
@ -978,6 +1011,15 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->stream, (int) new_port_msg->pid, recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd); (int) new_port_msg->id, recv_msg->fd);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (new_port_msg->id == (nxt_port_id_t) -1) {
nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
new_port.in_fd = recv_msg->fd;
new_port.out_fd = -1;
} else {
nb = 0; nb = 0;
if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
@ -993,6 +1035,9 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port.in_fd = -1; new_port.in_fd = -1;
new_port.out_fd = recv_msg->fd; new_port.out_fd = recv_msg->fd;
}
new_port.data = NULL; new_port.data = NULL;
recv_msg->fd = -1; recv_msg->fd = -1;
@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
if (new_port_msg->id == (nxt_port_id_t) -1) {
lib->shared_port = port;
} else {
nxt_unit_port_release(port); nxt_unit_port_release(port);
}
return NXT_UNIT_OK; return NXT_UNIT_OK;
} }
@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
} }
if (nxt_fast_path(res == NXT_UNIT_OK)) { if (nxt_fast_path(res == NXT_UNIT_OK)) {
res = nxt_unit_send_req_headers_ack(req);
if (nxt_slow_path(res != NXT_UNIT_OK)) {
return res;
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->callbacks.request_handler(req); lib->callbacks.request_handler(req);
@ -1220,6 +1275,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
} }
static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
{
ssize_t res;
nxt_port_msg_t msg;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
memset(&msg, 0, sizeof(nxt_port_msg_t));
msg.stream = req_impl->stream;
msg.pid = lib->pid;
msg.reply_port = ctx_impl->read_port->id.id;
msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
res = nxt_unit_port_send(req->ctx, req->response_port,
&msg, sizeof(msg), NULL, 0);
if (nxt_slow_path(res != sizeof(msg))) {
return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
}
static int static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{ {
@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
nxt_unit_read_buf(ctx, rbuf); memset(rbuf->oob, 0, sizeof(struct cmsghdr));
nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf); nxt_unit_read_buf_release(ctx, rbuf);
@ -4220,24 +4307,21 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
{ {
int rc; int rc;
nxt_unit_impl_t *lib; nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); nxt_unit_ctx_use(ctx);
nxt_unit_ctx_use(ctx_impl);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK; rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) { while (nxt_fast_path(lib->online)) {
rc = nxt_unit_run_once(ctx); rc = nxt_unit_run_once_impl(ctx);
if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break; break;
} }
} }
nxt_unit_ctx_release(ctx_impl); nxt_unit_ctx_release(ctx);
return rc; return rc;
} }
@ -4247,108 +4331,162 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{ {
int rc; int rc;
nxt_queue_link_t *link;
nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx);
rc = nxt_unit_run_once_impl(ctx);
nxt_unit_ctx_release(ctx);
return rc;
}
static int
nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_read_buf_t *rbuf; nxt_unit_read_buf_t *rbuf;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
rc = nxt_unit_read_buf(ctx, rbuf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_read_buf_release(ctx, rbuf);
return rc;
}
rc = nxt_unit_process_msg(ctx, rbuf);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
rc = nxt_unit_process_pending_rbuf(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
nxt_unit_process_ready_req(ctx);
return rc;
}
static int
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
int res, err;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
struct pollfd fds[2];
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
nxt_unit_ctx_use(ctx_impl); memset(rbuf->oob, 0, sizeof(struct cmsghdr));
pthread_mutex_lock(&ctx_impl->mutex); if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
}
if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { retry:
next_pending: fds[0].fd = ctx_impl->read_port->in_fd;
fds[0].events = POLLIN;
fds[0].revents = 0;
link = nxt_queue_first(&ctx_impl->pending_rbuf); fds[1].fd = lib->shared_port->in_fd;
nxt_queue_remove(link); fds[1].events = POLLIN;
fds[1].revents = 0;
rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); res = poll(fds, 2, -1);
if (nxt_slow_path(res < 0)) {
err = errno;
pthread_mutex_unlock(&ctx_impl->mutex); if (err == EINTR) {
goto retry;
}
} else { nxt_unit_alert(ctx, "poll() failed: %s (%d)",
rbuf = nxt_unit_read_buf_get_impl(ctx_impl); strerror(err), err);
pthread_mutex_unlock(&ctx_impl->mutex); rbuf->size = -1;
if (nxt_slow_path(rbuf == NULL)) { return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
nxt_unit_ctx_release(ctx_impl); if ((fds[0].revents & POLLIN) != 0) {
return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
}
if ((fds[1].revents & POLLIN) != 0) {
return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
}
rbuf->size = -1;
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
nxt_unit_read_buf(ctx, rbuf);
static int
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_queue_t pending_rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
nxt_queue_init(&pending_rbuf);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
pthread_mutex_unlock(&ctx_impl->mutex);
return NXT_UNIT_OK;
} }
if (nxt_fast_path(rbuf->size > 0)) { nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
rc = nxt_unit_process_msg(ctx, rbuf); nxt_queue_init(&ctx_impl->pending_rbuf);
#if (NXT_DEBUG) pthread_mutex_unlock(&ctx_impl->mutex);
if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
memset(rbuf->buf, 0xAC, rbuf->size);
}
#endif
} else {
rc = NXT_UNIT_ERROR;
}
if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
rc = NXT_UNIT_OK; rc = NXT_UNIT_OK;
nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
} else { } else {
nxt_unit_read_buf_release(ctx, rbuf); nxt_unit_read_buf_release(ctx, rbuf);
} }
if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { } nxt_queue_loop;
rc = NXT_UNIT_OK;
}
if (nxt_fast_path(rc == NXT_UNIT_OK)) {
pthread_mutex_lock(&ctx_impl->mutex);
if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
goto next_pending;
}
pthread_mutex_unlock(&ctx_impl->mutex);
nxt_unit_process_ready_req(ctx_impl);
}
nxt_unit_ctx_release(ctx_impl);
return rc; return rc;
} }
static void static void
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port,
rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
}
static void
nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
{ {
nxt_queue_t ready_req; nxt_queue_t ready_req;
nxt_unit_impl_t *lib; nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl; nxt_unit_request_info_impl_t *req_impl;
nxt_queue_init(&ready_req); nxt_queue_init(&ready_req);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex); pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->ready_req)) { if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
{ {
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
(void) nxt_unit_send_req_headers_ack(&req_impl->req);
lib->callbacks.request_handler(&req_impl->req); lib->callbacks.request_handler(&req_impl->req);
} nxt_queue_loop; } nxt_queue_loop;
} }
int
nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
}
nxt_unit_ctx_release(ctx);
return rc;
}
int
nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
{
int rc;
nxt_unit_impl_t *lib;
nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
}
nxt_unit_ctx_release(ctx);
return rc;
}
int
nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
nxt_unit_ctx_use(ctx);
rc = nxt_unit_process_port_msg_impl(ctx, port);
nxt_unit_ctx_release(ctx);
return rc;
}
static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int rc;
nxt_unit_read_buf_t *rbuf;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
rc = nxt_unit_port_recv(ctx, port, rbuf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_read_buf_release(ctx, rbuf);
return rc;
}
rc = nxt_unit_process_msg(ctx, rbuf);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
rc = nxt_unit_process_pending_rbuf(ctx);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return NXT_UNIT_ERROR;
}
nxt_unit_process_ready_req(ctx);
return rc;
}
void void
nxt_unit_done(nxt_unit_ctx_t *ctx) nxt_unit_done(nxt_unit_ctx_t *ctx)
{ {
nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_release(ctx);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
nxt_unit_ctx_release(ctx_impl);
} }
@ -5056,12 +5295,11 @@ nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
} }
static ssize_t static int
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
void *buf, size_t buf_size, void *oob, size_t oob_size) nxt_unit_read_buf_t *rbuf)
{ {
int fd; int fd, err;
ssize_t res;
struct iovec iov[1]; struct iovec iov[1];
struct msghdr msg; struct msghdr msg;
nxt_unit_impl_t *lib; nxt_unit_impl_t *lib;
@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (lib->callbacks.port_recv != NULL) { if (lib->callbacks.port_recv != NULL) {
return lib->callbacks.port_recv(ctx, port, rbuf->size = lib->callbacks.port_recv(ctx, port,
buf, buf_size, oob, oob_size); rbuf->buf, sizeof(rbuf->buf),
rbuf->oob, sizeof(rbuf->oob));
if (nxt_slow_path(rbuf->size < 0)) {
return NXT_UNIT_ERROR;
} }
iov[0].iov_base = buf; return NXT_UNIT_OK;
iov[0].iov_len = buf_size; }
iov[0].iov_base = rbuf->buf;
iov[0].iov_len = sizeof(rbuf->buf);
msg.msg_name = NULL; msg.msg_name = NULL;
msg.msg_namelen = 0; msg.msg_namelen = 0;
msg.msg_iov = iov; msg.msg_iov = iov;
msg.msg_iovlen = 1; msg.msg_iovlen = 1;
msg.msg_flags = 0; msg.msg_flags = 0;
msg.msg_control = oob; msg.msg_control = rbuf->oob;
msg.msg_controllen = oob_size; msg.msg_controllen = sizeof(rbuf->oob);
fd = port->in_fd; fd = port->in_fd;
retry: retry:
res = recvmsg(fd, &msg, 0); rbuf->size = recvmsg(fd, &msg, 0);
if (nxt_slow_path(res == -1)) { if (nxt_slow_path(rbuf->size == -1)) {
if (errno == EINTR) { err = errno;
if (err == EINTR) {
goto retry; goto retry;
} }
if (err == EAGAIN) {
nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
fd, strerror(errno), errno);
return NXT_UNIT_AGAIN;
}
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
fd, strerror(errno), errno); fd, strerror(errno), errno);
} else { return NXT_UNIT_ERROR;
nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
} }
return res; nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
return NXT_UNIT_OK;
} }

View file

@ -202,8 +202,21 @@ nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
*/ */
int nxt_unit_run(nxt_unit_ctx_t *); int nxt_unit_run(nxt_unit_ctx_t *);
int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
/*
* Receive and process one message, invoke configured callbacks.
*
* If application implements it's own event loop, each datagram received
* from port socket should be initially processed by unit. This function
* may invoke other application-defined callback for message processing.
*/
int nxt_unit_run_once(nxt_unit_ctx_t *ctx); int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
/* Destroy application library object. */ /* Destroy application library object. */
void nxt_unit_done(nxt_unit_ctx_t *); void nxt_unit_done(nxt_unit_ctx_t *);