Application-side message processing.
Usage on the router side: nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t parse_ctx; nxt_app_http_req_init(task, &parse_ctx); /* parse incoming request data */ if (nxt_app_http_req_parse(task, &parse_ctx, buf) == NXT_DONE) { /* choose app */ nxt_app = nxt_select_app(... &parse_ctx.r ...); /* find port */ wmsg.port = nxt_get_app_port(... nxt_app ...); wmsg.buf = &wmsg.write; /* fill write message buffer in shared mem */ nxt_app->prepare_msg(task, &parse_ctx.r, &wmsg); /* send message to app for processing */ nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, -1, 0, 0, wmsg.write); }
This commit is contained in:
parent
3b9aa27625
commit
e7a0634a71
6 changed files with 623 additions and 868 deletions
File diff suppressed because it is too large
Load diff
|
@ -1,5 +1,6 @@
|
|||
|
||||
/*
|
||||
* Copyright (C) Max Romanov
|
||||
* Copyright (C) Valentin V. Bartenev
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
@ -17,36 +18,124 @@ typedef struct {
|
|||
typedef struct {
|
||||
nxt_str_t method;
|
||||
nxt_str_t path;
|
||||
nxt_str_t path_no_query;
|
||||
nxt_str_t query;
|
||||
nxt_str_t version;
|
||||
nxt_uint_t fields_num;
|
||||
nxt_app_header_field_t *fields;
|
||||
|
||||
nxt_str_t *content_length;
|
||||
nxt_str_t *content_type;
|
||||
nxt_list_t *fields;
|
||||
|
||||
nxt_str_t cookie;
|
||||
nxt_str_t content_length;
|
||||
nxt_str_t content_type;
|
||||
nxt_str_t host;
|
||||
|
||||
off_t parsed_content_length;
|
||||
nxt_bool_t done;
|
||||
} nxt_app_request_header_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_event_engine_t *engine;
|
||||
nxt_mp_t *mem_pool;
|
||||
nxt_conn_t *event_conn;
|
||||
nxt_log_t *log;
|
||||
nxt_str_t preread;
|
||||
nxt_bool_t done;
|
||||
} nxt_app_request_body_t;
|
||||
|
||||
nxt_buf_t *output_buf;
|
||||
|
||||
typedef struct {
|
||||
nxt_app_request_header_t header;
|
||||
nxt_str_t body_preread;
|
||||
off_t body_rest;
|
||||
void *ctx;
|
||||
nxt_app_request_body_t body;
|
||||
} nxt_app_request_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_int_t (*init)(nxt_thread_t *thr);
|
||||
nxt_int_t (*start)(nxt_app_request_t *r);
|
||||
nxt_int_t (*header)(nxt_app_request_t *r,
|
||||
nxt_app_header_field_t *field);
|
||||
nxt_int_t (*run)(nxt_app_request_t *r);
|
||||
nxt_app_request_t r;
|
||||
nxt_http_request_parse_t parser;
|
||||
nxt_mp_t *mem_pool;
|
||||
} nxt_app_parse_ctx_t;
|
||||
|
||||
|
||||
nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
|
||||
|
||||
nxt_int_t nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
|
||||
nxt_buf_t *buf);
|
||||
|
||||
nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
|
||||
|
||||
nxt_int_t nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt);
|
||||
|
||||
|
||||
typedef struct nxt_app_wmsg_s nxt_app_wmsg_t;
|
||||
typedef struct nxt_app_rmsg_s nxt_app_rmsg_t;
|
||||
|
||||
struct nxt_app_wmsg_s {
|
||||
nxt_port_t *port; /* where prepared buf will be sent */
|
||||
nxt_buf_t *write;
|
||||
nxt_buf_t **buf;
|
||||
uint32_t stream;
|
||||
};
|
||||
|
||||
struct nxt_app_rmsg_s {
|
||||
nxt_buf_t *buf; /* current buffer to read */
|
||||
};
|
||||
|
||||
|
||||
nxt_inline u_char *
|
||||
nxt_app_msg_write_length(u_char *dst, size_t length);
|
||||
|
||||
/* TODO asynchronous mmap buffer assignment */
|
||||
u_char *nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg,
|
||||
size_t size);
|
||||
|
||||
nxt_int_t nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg,
|
||||
u_char *c, size_t size);
|
||||
|
||||
nxt_int_t nxt_app_msg_write_prefixed_upcase(nxt_task_t *task,
|
||||
nxt_app_wmsg_t *msg, const nxt_str_t *prefix, const nxt_str_t *v);
|
||||
|
||||
nxt_inline nxt_int_t
|
||||
nxt_app_msg_write_nvp_(nxt_task_t *task, nxt_app_wmsg_t *msg,
|
||||
u_char *n, size_t nsize, u_char *v, size_t vsize);
|
||||
|
||||
|
||||
#define nxt_app_msg_write_const(task, msg, c) \
|
||||
nxt_app_msg_write((task), (msg), (u_char *)(c), sizeof(c) - 1)
|
||||
|
||||
#define nxt_app_msg_write_str(task, msg, str) \
|
||||
nxt_app_msg_write((task), (msg), (str)->start, (str)->length)
|
||||
|
||||
#define nxt_app_msg_write_cstr(task, msg, c) \
|
||||
nxt_app_msg_write((task), (msg), (c), nxt_strlen(c))
|
||||
|
||||
#define nxt_app_msg_write_nvp(task, msg, n, v) \
|
||||
nxt_app_msg_write_nvp_((task), (msg), (u_char *)(n), sizeof(n) - 1, \
|
||||
(v)->start, (v)->length)
|
||||
|
||||
nxt_inline nxt_int_t nxt_app_msg_write_size(nxt_task_t *task,
|
||||
nxt_app_wmsg_t *msg, size_t size);
|
||||
|
||||
nxt_int_t nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg,
|
||||
nxt_bool_t last);
|
||||
|
||||
nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg,
|
||||
const u_char *c, size_t size);
|
||||
|
||||
nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg,
|
||||
nxt_str_t *str);
|
||||
|
||||
nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
|
||||
nxt_str_t *n, nxt_str_t *v);
|
||||
|
||||
nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
|
||||
size_t *size);
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_int_t (*init)(nxt_task_t *task);
|
||||
nxt_int_t (*prepare_msg)(nxt_task_t *task,
|
||||
nxt_app_request_t *r,
|
||||
nxt_app_wmsg_t *wmsg);
|
||||
nxt_int_t (*run)(nxt_task_t *task,
|
||||
nxt_app_rmsg_t *rmsg,
|
||||
nxt_app_wmsg_t *wmsg);
|
||||
} nxt_application_module_t;
|
||||
|
||||
|
||||
|
@ -58,5 +147,74 @@ nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data,
|
|||
size_t len);
|
||||
nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len);
|
||||
|
||||
nxt_inline u_char *
|
||||
nxt_app_msg_write_length(u_char *dst, size_t length)
|
||||
{
|
||||
if (length < 128) {
|
||||
*dst = length;
|
||||
dst++;
|
||||
} else {
|
||||
dst[0] = 0x80U | (length >> 24);
|
||||
dst[1] = 0xFFU & (length >> 16);
|
||||
dst[2] = 0xFFU & (length >> 8);
|
||||
dst[3] = 0xFFU & length;
|
||||
dst += 4;
|
||||
}
|
||||
|
||||
return dst;
|
||||
}
|
||||
|
||||
|
||||
nxt_inline nxt_int_t
|
||||
nxt_app_msg_write_nvp_(nxt_task_t *task, nxt_app_wmsg_t *msg,
|
||||
u_char *n, size_t nsize, u_char *v, size_t vsize)
|
||||
{
|
||||
nxt_int_t rc;
|
||||
|
||||
rc = nxt_app_msg_write(task, msg, n, nsize);
|
||||
if (nxt_slow_path(rc != NXT_OK)) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
return nxt_app_msg_write(task, msg, v, vsize);
|
||||
}
|
||||
|
||||
|
||||
nxt_inline nxt_int_t
|
||||
nxt_app_msg_write_size(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
|
||||
{
|
||||
u_char *dst;
|
||||
size_t dst_length;
|
||||
|
||||
dst_length = size < 128 ? 1 : 4;
|
||||
|
||||
dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
|
||||
if (nxt_slow_path(dst == NULL)) {
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_app_msg_write_length(dst, size);
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
|
||||
nxt_inline u_char *
|
||||
nxt_app_msg_read_length(u_char *src, size_t *length)
|
||||
{
|
||||
if (src[0] < 128) {
|
||||
*length = src[0];
|
||||
src++;
|
||||
} else {
|
||||
*length = ((src[0] & 0x7fU) << 24) +
|
||||
(src[1] << 16) +
|
||||
(src[2] << 8) +
|
||||
src[3];
|
||||
src += 4;
|
||||
}
|
||||
|
||||
return src;
|
||||
}
|
||||
|
||||
|
||||
#endif /* _NXT_APPLICATION_H_INCLIDED_ */
|
||||
|
|
|
@ -189,7 +189,7 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
|
|||
init->start = nxt_app_start;
|
||||
init->name = "worker process";
|
||||
init->user_cred = &rt->user_cred;
|
||||
init->port_handlers = nxt_worker_process_port_handlers;
|
||||
init->port_handlers = nxt_app_process_port_handlers;
|
||||
init->signals = nxt_worker_process_signals;
|
||||
init->type = NXT_PROCESS_WORKER;
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt);
|
|||
|
||||
|
||||
extern nxt_port_handler_t nxt_worker_process_port_handlers[];
|
||||
extern nxt_port_handler_t nxt_app_process_port_handlers[];
|
||||
extern const nxt_sig_event_t nxt_master_process_signals[];
|
||||
extern const nxt_sig_event_t nxt_worker_process_signals[];
|
||||
|
||||
|
|
|
@ -135,6 +135,8 @@ void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log,
|
|||
void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
|
||||
nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt);
|
||||
|
||||
void nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
||||
|
||||
|
||||
#define nxt_runtime_process_each(rt, process) \
|
||||
do { \
|
||||
|
|
|
@ -30,6 +30,15 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = {
|
|||
};
|
||||
|
||||
|
||||
nxt_port_handler_t nxt_app_process_port_handlers[] = {
|
||||
nxt_worker_process_quit_handler,
|
||||
nxt_port_new_port_handler,
|
||||
nxt_port_change_log_file_handler,
|
||||
nxt_port_mmap_handler,
|
||||
nxt_port_app_data_handler,
|
||||
};
|
||||
|
||||
|
||||
const nxt_sig_event_t nxt_worker_process_signals[] = {
|
||||
nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler),
|
||||
nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler),
|
||||
|
|
Loading…
Reference in a new issue