Store pointer to shared memory start in buf->parent.

nxt_port_mmap_t stored in arrays and it is unsafe to store
pointer to array element.

Shared memory structures and macros moved to separate header
file to be used by GO package.
This commit is contained in:
Max Romanov 2017-06-23 19:20:04 +03:00
parent b13cdb0faa
commit 0cd9521687
4 changed files with 394 additions and 233 deletions

View file

@ -20,6 +20,7 @@ NXT_LIB_DEPS=" \
src/nxt_port.h \
src/nxt_port_hash.h \
src/nxt_port_memory.h \
src/nxt_port_memory_int.h \
src/nxt_dyld.h \
src/nxt_thread.h \
src/nxt_thread_id.h \

View file

@ -1,4 +1,9 @@
/*
* Copyright (C) Max Romanov
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#if (NXT_HAVE_MEMFD_CREATE)
@ -9,89 +14,14 @@
#endif
#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
#define PORT_MMAP_HEADER_SIZE (1024 * 4)
#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10)
#define PORT_MMAP_CHUNK_COUNT \
( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE )
typedef uint32_t nxt_chunk_id_t;
typedef nxt_atomic_uint_t nxt_free_map_t;
#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
#define FREE_MASK(nchunk) \
( 1ULL << ( (nchunk) % FREE_BITS ) )
#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
/* Mapped at the start of shared memory segment. */
struct nxt_port_mmap_header_s {
nxt_free_map_t free_map[MAX_FREE_IDX];
};
/*
* Element of nxt_process_t.incoming/outgoing, shared memory segment
* descriptor.
*/
struct nxt_port_mmap_s {
uint32_t id;
nxt_fd_t fd;
nxt_pid_t pid; /* For sanity check. */
union {
void *mem;
nxt_port_mmap_header_t *hdr;
} u;
};
/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
typedef struct {
uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
uint32_t size; /* Payload data size. */
} nxt_port_mmap_msg_t;
static nxt_bool_t
nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c);
#define nxt_port_mmap_get_chunk_busy(hdr, c) \
((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
nxt_inline void
nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
nxt_inline void
nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
#define nxt_port_mmap_chunk_id(port_mmap, b) \
((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \
PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE)
#define nxt_port_mmap_chunk_start(port_mmap, chunk) \
(((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \
(chunk) * PORT_MMAP_CHUNK_SIZE)
#include <nxt_port_memory_int.h>
void
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
{
if (port_mmap->u.mem != NULL) {
nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE);
port_mmap->u.mem = NULL;
}
if (port_mmap->fd != -1) {
nxt_fd_close(port_mmap->fd);
port_mmap->fd = -1;
if (port_mmap->hdr != NULL) {
nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
port_mmap->hdr = NULL;
}
}
@ -103,7 +33,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_mp_t *mp;
nxt_buf_t *b;
nxt_chunk_id_t c;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
b = obj;
@ -120,8 +49,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
port_mmap = data;
hdr = port_mmap->u.hdr;
hdr = data;
if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
/*
@ -129,11 +57,11 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
* let's release rest (if any).
*/
p = b->mem.pos - 1;
c = nxt_port_mmap_chunk_id(port_mmap, p) + 1;
p = nxt_port_mmap_chunk_start(port_mmap, c);
c = nxt_port_mmap_chunk_id(hdr, p) + 1;
p = nxt_port_mmap_chunk_start(hdr, c);
} else {
p = b->mem.start;
c = nxt_port_mmap_chunk_id(port_mmap, p);
c = nxt_port_mmap_chunk_id(hdr, p);
}
while (p < b->mem.end) {
@ -147,61 +75,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
static nxt_bool_t
nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c)
{
int ffs;
size_t i;
nxt_free_map_t bits;
nxt_free_map_t *free_map;
free_map = port_mmap->u.hdr->free_map;
for (i = 0; i < MAX_FREE_IDX; i++) {
bits = free_map[i];
if (bits == 0) {
continue;
}
ffs = __builtin_ffsll(bits);
if (ffs != 0) {
*c = i * FREE_BITS + ffs - 1;
return 1;
}
}
return 0;
}
nxt_inline void
nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
{
nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c);
}
nxt_inline void
nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
{
nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c);
}
nxt_port_mmap_t *
nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd)
{
struct stat mmap_stat;
nxt_port_mmap_t *port_mmap;
void *mem;
struct stat mmap_stat;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "got new mmap fd #%FD from process %PI",
fd, process->pid);
port_mmap = NULL;
hdr = NULL;
if (fstat(fd, &mmap_stat) == -1) {
nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
@ -216,44 +104,51 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
if (nxt_slow_path(process->incoming == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
return NULL;
goto fail;
}
port_mmap = nxt_array_zero_add(process->incoming);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
return NULL;
goto fail;
}
port_mmap->id = process->incoming->nelts - 1;
port_mmap->fd = -1;
port_mmap->pid = process->pid;
port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
port_mmap->u.mem = NULL;
port_mmap = NULL;
return NULL;
goto fail;
}
return port_mmap;
port_mmap->hdr = mem;
hdr = port_mmap->hdr;
if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
port_mmap->hdr->id, process->incoming->nelts - 1);
}
fail:
return hdr;
}
static void
nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_mp_t *mp;
nxt_port_mmap_t *port_mmap;
nxt_fd_t fd;
nxt_buf_t *b;
nxt_mp_t *mp;
b = obj;
mp = b->data;
port_mmap = data;
fd = (nxt_fd_t) (intptr_t) data;
#if (NXT_DEBUG)
if (nxt_slow_path(data != b->parent)) {
@ -263,24 +158,27 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
nxt_debug(task, "mmap fd %FD has been sent", port_mmap->fd);
nxt_debug(task, "mmap fd %FD has been sent", fd);
nxt_fd_close(port_mmap->fd);
port_mmap->fd = -1;
nxt_fd_close(fd);
nxt_buf_free(mp, b);
}
static nxt_port_mmap_t *
static nxt_port_mmap_header_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
{
void *mem;
u_char *p, name[64];
nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
port_mmap = NULL;
if (process->outgoing == NULL) {
process->outgoing = nxt_array_create(process->mem_pool, 1,
sizeof(nxt_port_mmap_t));
@ -300,34 +198,30 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
port_mmap->id = process->outgoing->nelts - 1;
port_mmap->pid = process->pid;
p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD",
nxt_pid, nxt_random(&nxt_random_data));
*p = '\0';
#if (NXT_HAVE_MEMFD_CREATE)
port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
if (nxt_slow_path(port_mmap->fd == -1)) {
if (nxt_slow_path(fd == -1)) {
nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
name, nxt_errno);
goto remove_fail;
}
nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd);
nxt_debug(task, "memfd_create(%s): %FD", name, fd);
#elif (NXT_HAVE_SHM_OPEN)
shm_unlink((char *) name); // just in case
port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR,
S_IRUSR | S_IWUSR);
fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd);
nxt_debug(task, "shm_open(%s): %FD", name, fd);
if (nxt_slow_path(port_mmap->fd == -1)) {
if (nxt_slow_path(fd == -1)) {
nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
goto remove_fail;
@ -339,20 +233,21 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
#endif
if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) {
if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
goto remove_fail;
}
port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED,
port_mmap->fd, 0);
mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
if (nxt_slow_path(mem == MAP_FAILED)) {
goto remove_fail;
}
port_mmap->hdr = mem;
b = nxt_buf_mem_alloc(port->mem_pool, 0, 0);
if (nxt_slow_path(b == NULL)) {
goto remove_fail;
@ -360,27 +255,32 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
b->data = port->mem_pool;
b->parent = port_mmap;
b->parent = (void *) (intptr_t) fd;
/* Init segment header. */
hdr = port_mmap->u.hdr;
hdr = port_mmap->hdr;
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
hdr->id = process->outgoing->nelts - 1;
hdr->pid = process->pid;
/* Mark first chunk as busy */
nxt_port_mmap_set_chunk_busy(hdr, 0);
/* Mark as busy chunk followed the last available chunk. */
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd,
nxt_debug(task, "send mmap fd %FD to process %PI", fd,
port->pid);
/* TODO handle error */
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd,
0, 0, b);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
port_mmap->id, nxt_pid, process->pid);
hdr->id, nxt_pid, process->pid);
return port_mmap;
return hdr;
remove_fail:
@ -390,24 +290,29 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
static nxt_port_mmap_t *
static nxt_port_mmap_header_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
size_t size)
{
nxt_array_t *outgoing;
nxt_process_t *process;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_array_t *outgoing;
nxt_process_t *process;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_port_mmap_header_t *hdr;
process = nxt_runtime_process_get(task->thread->runtime, port->pid);
process = port->process;
if (nxt_slow_path(process == NULL)) {
return NULL;
}
*c = 0;
port_mmap = NULL;
hdr = NULL;
if (process->outgoing == NULL) {
return nxt_port_new_port_mmap(task, process, port);
hdr = nxt_port_new_port_mmap(task, process, port);
goto unlock_return;
}
outgoing = process->outgoing;
@ -416,44 +321,51 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
while (port_mmap < end_port_mmap) {
if (nxt_port_mmap_get_free_chunk(port_mmap, c)) {
return port_mmap;
if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
hdr = port_mmap->hdr;
goto unlock_return;
}
port_mmap++;
}
/* TODO introduce port_mmap limit and release wait. */
return nxt_port_new_port_mmap(task, process, port);
hdr = nxt_port_new_port_mmap(task, process, port);
unlock_return:
return hdr;
}
static nxt_port_mmap_t *
static nxt_port_mmap_header_t *
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
{
nxt_array_t *incoming;
nxt_process_t *process;
nxt_port_mmap_t *port_mmap;
nxt_array_t *incoming;
nxt_process_t *process;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
process = nxt_runtime_process_get(task->thread->runtime, spid);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
hdr = NULL;
incoming = process->incoming;
if (nxt_slow_path(incoming == NULL)) {
/* TODO add warning */
return NULL;
if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
port_mmap = incoming->elts;
hdr = port_mmap[id].hdr;
} else {
nxt_log(task, NXT_LOG_WARN,
"failed to get incoming mmap #%d for process %PI", id, spid);
}
if (nxt_slow_path(incoming->nelts <= id)) {
/* TODO add warning */
return NULL;
}
port_mmap = incoming->elts;
return port_mmap + id;
return hdr;
}
@ -463,11 +375,15 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
size_t nchunks;
nxt_buf_t *b;
nxt_chunk_id_t c;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "request %z bytes shm buffer", size);
if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
nxt_debug(task, "requested size (%z bytes) too big", size);
return NULL;
}
b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
if (nxt_slow_path(b == NULL)) {
return NULL;
@ -479,22 +395,18 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_buf_set_port_mmap(b);
port_mmap = nxt_port_mmap_get(task, port, &c, size);
if (nxt_slow_path(port_mmap == NULL)) {
hdr = nxt_port_mmap_get(task, port, &c, size);
if (nxt_slow_path(hdr == NULL)) {
nxt_buf_free(port->mem_pool, b);
return NULL;
}
hdr = port_mmap->u.hdr;
b->parent = port_mmap;
b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c);
b->parent = hdr;
b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
nxt_port_mmap_set_chunk_busy(hdr, c);
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
@ -506,10 +418,9 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
nxt_port_mmap_set_chunk_busy(hdr, c);
b->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
@ -520,13 +431,79 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
}
nxt_int_t
nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
{
size_t nchunks;
nxt_chunk_id_t c, start;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "request increase %z bytes shm buffer", size);
if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
nxt_log(task, NXT_LOG_WARN,
"failed to increase, not a mmap buffer");
return NXT_ERROR;
}
if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) {
return NXT_OK;
}
hdr = b->parent;
start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
size -= nxt_buf_mem_free_size(&b->mem);
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
}
c = start;
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
c++;
nchunks--;
}
if (nchunks != 0) {
c--;
while (c >= start) {
nxt_port_mmap_set_chunk_free(hdr, c);
c--;
}
nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
return NXT_ERROR;
} else {
b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
return NXT_OK;
}
}
static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
{
size_t nchunks;
nxt_buf_t *b;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
if (nxt_slow_path(hdr == NULL)) {
return NULL;
}
b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
if (nxt_slow_path(b == NULL)) {
@ -539,23 +516,17 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_buf_set_port_mmap(b);
port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_buf_free(port->mem_pool, b);
return NULL;
}
nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
nchunks++;
}
b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id);
b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start + mmap_msg->size;
b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
b->parent = port_mmap;
b->parent = hdr;
return b;
}
@ -565,11 +536,11 @@ void
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
{
size_t bsize;
nxt_buf_t *b, *bmem;
nxt_uint_t i;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_msg_t *mmap_msg;
size_t bsize;
nxt_buf_t *b, *bmem;
nxt_uint_t i;
nxt_port_mmap_msg_t *mmap_msg;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
"via shared memory", sb->size, port->pid);
@ -598,10 +569,10 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
/* TODO clear b and exit */
}
port_mmap = (nxt_port_mmap_t *) bmem->parent;
hdr = bmem->parent;
mmap_msg->mmap_id = port_mmap->id;
mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos);
mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
mmap_msg->size = sb->iobuf[i].iov_len;
nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
@ -660,8 +631,8 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
nxt_port_mmap_t *port_mmap;
nxt_port_method_t m;
nxt_port_method_t m;
nxt_port_mmap_header_t *hdr;
m = NXT_PORT_METHOD_ANY;
@ -672,7 +643,7 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
port_mmap = (nxt_port_mmap_t *) b->parent;
hdr = b->parent;
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
@ -682,10 +653,10 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
if (port->pid != port_mmap->pid) {
if (port->pid != hdr->pid) {
nxt_log_error(NXT_LOG_ERR, task->log,
"send mmap buffer for %PI to %PI, "
"using plain mode", port_mmap->pid, port->pid);
"using plain mode", hdr->pid, port->pid);
m = NXT_PORT_METHOD_PLAIN;

View file

@ -1,7 +1,13 @@
/*
* Copyright (C) Max Romanov
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_PORT_MEMORY_H_INCLUDED_
#define _NXT_PORT_MEMORY_H_INCLUDED_
#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t))
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
@ -19,7 +25,10 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
nxt_port_mmap_t *
nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
size_t size);
nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd);
@ -47,4 +56,5 @@ typedef enum nxt_port_method_e nxt_port_method_t;
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */

179
src/nxt_port_memory_int.h Normal file
View file

@ -0,0 +1,179 @@
/*
* Copyright (C) Max Romanov
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_PORT_MEMORY_INT_H_INCLUDED_
#define _NXT_PORT_MEMORY_INT_H_INCLUDED_
#include <stdint.h>
#include <nxt_atomic.h>
#ifdef NXT_MMAP_TINY_CHUNK
#define PORT_MMAP_CHUNK_SIZE 16
#define PORT_MMAP_HEADER_SIZE 1024
#define PORT_MMAP_DATA_SIZE 1024
#else
#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
#define PORT_MMAP_HEADER_SIZE (1024 * 4)
#define PORT_MMAP_DATA_SIZE (1024 * 1024 * 10)
#endif
#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + PORT_MMAP_DATA_SIZE)
#define PORT_MMAP_CHUNK_COUNT (PORT_MMAP_DATA_SIZE / PORT_MMAP_CHUNK_SIZE)
typedef uint32_t nxt_chunk_id_t;
typedef nxt_atomic_uint_t nxt_free_map_t;
#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
#define FREE_MASK(nchunk) \
( 1ULL << ( (nchunk) % FREE_BITS ) )
#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
/* Mapped at the start of shared memory segment. */
struct nxt_port_mmap_header_s {
uint32_t id;
nxt_pid_t pid; /* For sanity check. */
nxt_free_map_t free_map[MAX_FREE_IDX];
};
/*
* Element of nxt_process_t.incoming/outgoing, shared memory segment
* descriptor.
*/
struct nxt_port_mmap_s {
nxt_port_mmap_header_t *hdr;
};
typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t;
/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
struct nxt_port_mmap_msg_s {
uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
uint32_t size; /* Payload data size. */
};
static nxt_bool_t
nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c);
#define nxt_port_mmap_get_chunk_busy(hdr, c) \
((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
nxt_inline void
nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
nxt_inline nxt_bool_t
nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
nxt_inline void
nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
nxt_inline nxt_chunk_id_t
nxt_port_mmap_chunk_id(nxt_port_mmap_header_t *hdr, u_char *p)
{
u_char *mm_start;
mm_start = (u_char *) hdr;
return ((p - mm_start) - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE;
}
nxt_inline u_char *
nxt_port_mmap_chunk_start(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
{
u_char *mm_start;
mm_start = (u_char *) hdr;
return mm_start + PORT_MMAP_HEADER_SIZE + c * PORT_MMAP_CHUNK_SIZE;
}
static nxt_bool_t
nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c)
{
int ffs;
size_t i;
nxt_chunk_id_t chunk;
nxt_free_map_t bits;
nxt_free_map_t *free_map;
free_map = hdr->free_map;
for (i = 0; i < MAX_FREE_IDX; i++) {
bits = free_map[i];
if (bits == 0) {
continue;
}
ffs = __builtin_ffsll(bits);
if (ffs != 0) {
chunk = i * FREE_BITS + ffs - 1;
if (nxt_port_mmap_chk_set_chunk_busy(hdr, chunk)) {
*c = chunk;
return 1;
}
}
}
return 0;
}
nxt_inline void
nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
{
nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
}
nxt_inline nxt_bool_t
nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
{
nxt_free_map_t *f;
nxt_free_map_t free_val, busy_val;
f = hdr->free_map + FREE_IDX(c);
while ( (*f & FREE_MASK(c)) != 0 ) {
free_val = *f | FREE_MASK(c);
busy_val = free_val & ~FREE_MASK(c);
if (nxt_atomic_cmp_set(f, free_val, busy_val) != 0) {
return 1;
}
}
return 0;
}
nxt_inline void
nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
{
nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
}
#endif /* _NXT_PORT_MEMORY_INT_H_INCLUDED_ */