Port message fragmentation supported.
- Each sendmsg() transmits no more than port->max_size payload data. - Longer buffers are fragmented and send using multiple sendmsg() calls. - On receive side, buffers are connected in chain. - Number of handler calls is the same as number of nxt_port_socket_write() calls. - nxt_buf_make_plain() function introduced to make single plain buffer from the chain.
This commit is contained in:
parent
0faecee609
commit
00ecf713e3
9 changed files with 273 additions and 28 deletions
|
@ -298,3 +298,33 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
nxt_buf_t *
|
||||
nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size)
|
||||
{
|
||||
nxt_buf_t *b, *i;
|
||||
|
||||
if (nxt_slow_path(size == 0)) {
|
||||
for (i = src; i != NULL; i = i->next) {
|
||||
size += nxt_buf_used_size(i);
|
||||
}
|
||||
}
|
||||
|
||||
b = nxt_buf_mem_alloc(mp, size, 0);
|
||||
|
||||
if (nxt_slow_path(b == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (i = src; i != NULL; i = i->next) {
|
||||
if (nxt_slow_path(nxt_buf_mem_free_size(&b->mem) <
|
||||
nxt_buf_used_size(i))) {
|
||||
break;
|
||||
}
|
||||
|
||||
b->mem.free = nxt_cpymem(b->mem.free, i->mem.pos, nxt_buf_used_size(i));
|
||||
}
|
||||
|
||||
return b;
|
||||
}
|
||||
|
|
|
@ -250,6 +250,19 @@ NXT_EXPORT nxt_buf_t *nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags);
|
|||
|
||||
NXT_EXPORT nxt_int_t nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data);
|
||||
|
||||
NXT_EXPORT nxt_buf_t *nxt_buf_make_plain(nxt_mp_t *mp, nxt_buf_t *src,
|
||||
size_t size);
|
||||
|
||||
nxt_inline nxt_buf_t *
|
||||
nxt_buf_chk_make_plain(nxt_mp_t *mp, nxt_buf_t *src, size_t size)
|
||||
{
|
||||
if (nxt_slow_path(src != NULL && src->next != NULL)) {
|
||||
return nxt_buf_make_plain(mp, src, size);
|
||||
}
|
||||
|
||||
return src;
|
||||
}
|
||||
|
||||
#define \
|
||||
nxt_buf_free(mp, b) \
|
||||
nxt_mp_free((mp), (b))
|
||||
|
|
|
@ -179,13 +179,21 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||
|
||||
ret = NXT_ERROR;
|
||||
|
||||
b = msg->buf;
|
||||
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||
|
||||
if (nxt_slow_path(mp == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size);
|
||||
|
||||
if (b == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
|
||||
b->mem.pos);
|
||||
|
||||
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||
|
||||
nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
|
||||
|
||||
start = b->mem.pos;
|
||||
|
@ -821,6 +829,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||
b = msg->buf;
|
||||
sa = (nxt_sockaddr_t *) b->mem.pos;
|
||||
|
||||
/* TODO check b size and make plain */
|
||||
|
||||
out = NULL;
|
||||
|
||||
ls.socket = -1;
|
||||
|
@ -1037,14 +1047,20 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||
return;
|
||||
}
|
||||
|
||||
nxt_debug(task, "application languages: \"%*s\"",
|
||||
b->mem.free - b->mem.pos, b->mem.pos);
|
||||
|
||||
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||
if (mp == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
b = nxt_buf_chk_make_plain(mp, b, msg->size);
|
||||
|
||||
if (b == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_debug(task, "application languages: \"%*s\"",
|
||||
b->mem.free - b->mem.pos, b->mem.pos);
|
||||
|
||||
conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
|
||||
if (conf == NULL) {
|
||||
goto fail;
|
||||
|
@ -1131,7 +1147,7 @@ nxt_app_lang_compare(const void *v1, const void *v2)
|
|||
static void
|
||||
nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
ssize_t n, size;
|
||||
ssize_t n, size, offset;
|
||||
nxt_buf_t *b;
|
||||
nxt_int_t ret;
|
||||
nxt_file_t file;
|
||||
|
@ -1150,16 +1166,20 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||
goto error;
|
||||
}
|
||||
|
||||
offset = 0;
|
||||
|
||||
for (b = msg->buf; b != NULL; b = b->next) {
|
||||
size = nxt_buf_mem_used_size(&b->mem);
|
||||
|
||||
n = nxt_file_write(&file, b->mem.pos, size, 0);
|
||||
n = nxt_file_write(&file, b->mem.pos, size, offset);
|
||||
|
||||
if (nxt_slow_path(n != size)) {
|
||||
nxt_file_close(task, &file);
|
||||
(void) nxt_file_delete(file.name);
|
||||
goto error;
|
||||
}
|
||||
|
||||
offset += n;
|
||||
}
|
||||
|
||||
nxt_file_close(task, &file);
|
||||
|
|
|
@ -248,6 +248,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||
|
||||
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
|
||||
|
||||
/* TODO check b size and make plain */
|
||||
|
||||
nxt_debug(task, "new port %d received for process %PI:%d",
|
||||
msg->fd, new_port_msg->pid, new_port_msg->id);
|
||||
|
||||
|
|
|
@ -106,6 +106,9 @@ typedef struct {
|
|||
|
||||
/* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
|
||||
uint8_t mmap; /* 1 bit */
|
||||
|
||||
uint8_t nf;
|
||||
uint8_t mf;
|
||||
} nxt_port_msg_t;
|
||||
|
||||
|
||||
|
@ -171,6 +174,8 @@ struct nxt_port_s {
|
|||
nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
|
||||
nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
|
||||
|
||||
nxt_lvlhsh_t frags;
|
||||
|
||||
nxt_atomic_t use_count;
|
||||
|
||||
nxt_process_type_t type;
|
||||
|
|
|
@ -242,6 +242,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
|
|||
msg.port_msg.type = type & NXT_PORT_MSG_MASK;
|
||||
msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
|
||||
msg.port_msg.mmap = 0;
|
||||
msg.port_msg.nf = 0;
|
||||
msg.port_msg.mf = 0;
|
||||
|
||||
msg.work.data = NULL;
|
||||
|
||||
|
@ -324,11 +326,15 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|||
sb.size = 0;
|
||||
sb.limit = port->max_size;
|
||||
|
||||
sb.limit_reached = 0;
|
||||
sb.nmax_reached = 0;
|
||||
|
||||
m = nxt_port_mmap_get_method(task, port, msg->buf);
|
||||
|
||||
if (m == NXT_PORT_METHOD_MMAP) {
|
||||
sb.limit = (1ULL << 31) - 1;
|
||||
sb.nmax = NXT_IOBUF_MAX * 10 - 1;
|
||||
sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
|
||||
port->max_size / PORT_MMAP_MIN_SIZE);
|
||||
}
|
||||
|
||||
nxt_sendbuf_mem_coalesce(task, &sb);
|
||||
|
@ -347,6 +353,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|||
}
|
||||
|
||||
msg->port_msg.last |= sb.last;
|
||||
msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
|
||||
|
||||
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
|
||||
|
||||
|
@ -368,12 +375,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|||
m == NXT_PORT_METHOD_MMAP);
|
||||
|
||||
if (msg->buf != NULL) {
|
||||
nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
|
||||
msg->port_msg.stream);
|
||||
|
||||
/*
|
||||
* A file descriptor is sent only
|
||||
* in the first message of a stream.
|
||||
*/
|
||||
msg->fd = -1;
|
||||
msg->share += n;
|
||||
msg->port_msg.nf = 1;
|
||||
|
||||
if (msg->share >= port->max_share) {
|
||||
msg->share = 0;
|
||||
|
@ -534,12 +545,134 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
|
|||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
|
||||
{
|
||||
nxt_port_recv_msg_t *fmsg;
|
||||
|
||||
fmsg = data;
|
||||
|
||||
if (lhq->key.length == sizeof(uint32_t)
|
||||
&& *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
|
||||
{
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
return NXT_DECLINED;
|
||||
}
|
||||
|
||||
|
||||
static void *
|
||||
nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
|
||||
{
|
||||
return nxt_mp_alloc(ctx, size);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_port_lvlhsh_frag_free(void *ctx, void *p)
|
||||
{
|
||||
return nxt_mp_free(ctx, p);
|
||||
}
|
||||
|
||||
|
||||
static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = {
|
||||
NXT_LVLHSH_DEFAULT,
|
||||
nxt_port_lvlhsh_frag_test,
|
||||
nxt_port_lvlhsh_frag_alloc,
|
||||
nxt_port_lvlhsh_frag_free,
|
||||
};
|
||||
|
||||
|
||||
static nxt_port_recv_msg_t *
|
||||
nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
|
||||
nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_int_t res;
|
||||
nxt_lvlhsh_query_t lhq;
|
||||
nxt_port_recv_msg_t *fmsg;
|
||||
|
||||
nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
|
||||
|
||||
fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
|
||||
|
||||
if (nxt_slow_path(fmsg == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
*fmsg = *msg;
|
||||
|
||||
lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
|
||||
lhq.key.length = sizeof(uint32_t);
|
||||
lhq.key.start = (u_char *) &fmsg->port_msg.stream;
|
||||
lhq.proto = &lvlhsh_frag_proto;
|
||||
lhq.replace = 0;
|
||||
lhq.value = fmsg;
|
||||
lhq.pool = port->mem_pool;
|
||||
|
||||
res = nxt_lvlhsh_insert(&port->frags, &lhq);
|
||||
|
||||
switch (res) {
|
||||
|
||||
case NXT_OK:
|
||||
return fmsg;
|
||||
|
||||
case NXT_DECLINED:
|
||||
nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
|
||||
fmsg->port_msg.stream);
|
||||
nxt_mp_free(port->mem_pool, fmsg);
|
||||
|
||||
return NULL;
|
||||
|
||||
default:
|
||||
nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
|
||||
fmsg->port_msg.stream);
|
||||
|
||||
nxt_mp_free(port->mem_pool, fmsg);
|
||||
|
||||
return NULL;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static nxt_port_recv_msg_t *
|
||||
nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
|
||||
nxt_bool_t last)
|
||||
{
|
||||
nxt_int_t res;
|
||||
nxt_lvlhsh_query_t lhq;
|
||||
|
||||
nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
|
||||
|
||||
lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
|
||||
lhq.key.length = sizeof(uint32_t);
|
||||
lhq.key.start = (u_char *) &stream;
|
||||
lhq.proto = &lvlhsh_frag_proto;
|
||||
lhq.pool = port->mem_pool;
|
||||
|
||||
res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
|
||||
nxt_lvlhsh_find(&port->frags, &lhq);
|
||||
|
||||
switch (res) {
|
||||
|
||||
case NXT_OK:
|
||||
return lhq.value;
|
||||
|
||||
default:
|
||||
nxt_log(task, NXT_LOG_WARN, "frag stream #%uD not found", stream);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
||||
nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_buf_t *b;
|
||||
nxt_buf_t *orig_b;
|
||||
nxt_buf_t *b, *orig_b;
|
||||
nxt_port_recv_msg_t *fmsg;
|
||||
|
||||
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
|
||||
nxt_log(task, NXT_LOG_CRIT,
|
||||
|
@ -558,7 +691,49 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
|||
b = msg->buf;
|
||||
}
|
||||
|
||||
port->handler(task, msg);
|
||||
if (nxt_slow_path(msg->port_msg.nf != 0)) {
|
||||
fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
|
||||
msg->port_msg.mf == 0);
|
||||
|
||||
if (nxt_slow_path(fmsg == NULL)) {
|
||||
nxt_assert(fmsg != NULL);
|
||||
}
|
||||
|
||||
nxt_buf_chain_add(&fmsg->buf, msg->buf);
|
||||
|
||||
fmsg->size += msg->size;
|
||||
|
||||
msg->buf = NULL;
|
||||
b = NULL;
|
||||
|
||||
if (nxt_fast_path(msg->port_msg.mf == 0)) {
|
||||
b = fmsg->buf;
|
||||
|
||||
port->handler(task, fmsg);
|
||||
|
||||
msg->buf = fmsg->buf;
|
||||
msg->fd = fmsg->fd;
|
||||
|
||||
nxt_mp_free(port->mem_pool, fmsg);
|
||||
}
|
||||
} else {
|
||||
if (nxt_slow_path(msg->port_msg.mf != 0)) {
|
||||
fmsg = nxt_port_frag_start(task, port, msg);
|
||||
|
||||
if (nxt_slow_path(fmsg == NULL)) {
|
||||
nxt_assert(fmsg != NULL);
|
||||
}
|
||||
|
||||
fmsg->port_msg.nf = 0;
|
||||
fmsg->port_msg.mf = 0;
|
||||
|
||||
msg->buf = NULL;
|
||||
msg->fd = -1;
|
||||
b = NULL;
|
||||
} else {
|
||||
port->handler(task, msg);
|
||||
}
|
||||
}
|
||||
|
||||
if (msg->port_msg.mmap && orig_b != b) {
|
||||
|
||||
|
|
|
@ -592,27 +592,19 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||
void
|
||||
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
size_t dump_size;
|
||||
nxt_int_t ret;
|
||||
nxt_buf_t *b;
|
||||
nxt_router_temp_conf_t *tmcf;
|
||||
|
||||
b = msg->buf;
|
||||
|
||||
dump_size = nxt_buf_used_size(b);
|
||||
|
||||
if (dump_size > 300) {
|
||||
dump_size = 300;
|
||||
}
|
||||
|
||||
nxt_debug(task, "router conf data (%z): %*s",
|
||||
msg->size, dump_size, b->mem.pos);
|
||||
|
||||
tmcf = nxt_router_temp_conf(task);
|
||||
if (nxt_slow_path(tmcf == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
|
||||
|
||||
nxt_assert(b != NULL);
|
||||
|
||||
tmcf->conf->router = nxt_router;
|
||||
tmcf->stream = msg->port_msg.stream;
|
||||
tmcf->port = nxt_runtime_port_find(task->thread->runtime,
|
||||
|
@ -1442,8 +1434,12 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|||
|
||||
rpc = data;
|
||||
sa = rpc->socket_conf->sockaddr;
|
||||
tmcf = rpc->temp_conf;
|
||||
|
||||
in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
|
||||
|
||||
nxt_assert(in != NULL);
|
||||
|
||||
in = msg->buf;
|
||||
p = in->mem.pos;
|
||||
|
||||
error = *p++;
|
||||
|
@ -1452,8 +1448,6 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|||
+ sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
|
||||
+ sa->length + socket_errors[error].length + (in->mem.free - p);
|
||||
|
||||
tmcf = rpc->temp_conf;
|
||||
|
||||
out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
|
||||
if (nxt_slow_path(out == NULL)) {
|
||||
return;
|
||||
|
|
|
@ -111,7 +111,9 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
|
|||
if (total + size > sb->limit) {
|
||||
size = sb->limit - total;
|
||||
|
||||
if (size == 0) {
|
||||
sb->limit_reached = 1;
|
||||
|
||||
if (nxt_slow_path(size == 0)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -119,6 +121,8 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
|
|||
if (b->mem.pos != last) {
|
||||
|
||||
if (++n >= sb->nmax) {
|
||||
sb->nmax_reached = 1;
|
||||
|
||||
goto done;
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,8 @@ typedef struct {
|
|||
uint32_t nmax;
|
||||
uint8_t sync; /* 1 bit */
|
||||
uint8_t last; /* 1 bit */
|
||||
uint8_t limit_reached;
|
||||
uint8_t nmax_reached;
|
||||
|
||||
size_t size;
|
||||
size_t limit;
|
||||
|
|
Loading…
Reference in a new issue