Event engine timers refactoring.

This commit is contained in:
Igor Sysoev 2017-01-31 22:26:50 +03:00
parent bb87fa11ca
commit 18281ee37e
12 changed files with 216 additions and 241 deletions

View file

@ -1136,7 +1136,7 @@ nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data)
c->socket.write = NXT_EVENT_BLOCKED;
if (c->write_state->autoreset_timer) {
nxt_timer_disable(&c->write_timer);
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
nxt_event_conn_io_handle(task->thread, c->write_work_queue,

View file

@ -206,7 +206,6 @@ nxt_event_conn_timer_init(ev, c, wq) \
(ev)->work_queue = (wq); \
(ev)->log = &(c)->log; \
(ev)->precision = NXT_TIMER_DEFAULT_PRECISION; \
nxt_timer_ident((ev), (c)->socket.fd); \
} while (0)

View file

@ -66,8 +66,6 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->timer.handler = nxt_event_conn_listen_timer_handler;
cls->timer.log = &nxt_main_log;
nxt_timer_ident(&cls->timer, cls->socket.fd);
cls->task.thread = task->thread;
cls->task.log = &nxt_main_log;
cls->task.ident = nxt_task_next_ident();
@ -202,9 +200,6 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
{
nxt_event_conn_t *next;
nxt_timer_ident(&c->read_timer, c->socket.fd);
nxt_timer_ident(&c->write_timer, c->socket.fd);
/* This allocation cannot fail. */
(void) nxt_sockaddr_text(c->mem_pool, c->remote, 0);

View file

@ -124,8 +124,6 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c)
#endif
c->socket.fd = s;
nxt_timer_ident(&c->read_timer, s);
nxt_timer_ident(&c->write_timer, s);
c->socket.task = task;
c->read_timer.task = task;
@ -156,7 +154,7 @@ nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data)
nxt_event_fd_block_write(task->thread->engine, &c->socket);
if (c->write_state->autoreset_timer) {
nxt_timer_disable(&c->write_timer);
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
err = 0;

View file

@ -84,10 +84,6 @@ nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
c->blocked = 1;
if (c->write_timer.state != NXT_TIMER_DISABLED) {
c->write_timer.state = NXT_TIMER_BLOCKED;
}
nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
return;
}
@ -210,10 +206,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
}
if (sent != 0 && c->write_state->autoreset_timer) {
nxt_timer_disable(&c->write_timer);
} else if (c->write_timer.state == NXT_TIMER_BLOCKED) {
c->write_timer.state = NXT_TIMER_ACTIVE;
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
if (c->socket.error == 0

View file

@ -86,7 +86,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
if (n != NXT_AGAIN) {
nxt_event_fd_block_read(engine, &c->socket);
nxt_timer_disable(&c->read_timer);
nxt_timer_disable(engine, &c->read_timer);
if (n == 0) {
handler = state->close_handler;
@ -125,7 +125,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
nxt_event_fd_block_read(engine, &c->socket);
if (state->autoreset_timer) {
nxt_timer_disable(&c->read_timer);
nxt_timer_disable(engine, &c->read_timer);
}
handler = state->ready_handler;

View file

@ -90,7 +90,7 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
if (sent != 0) {
if (c->write_state->autoreset_timer) {
nxt_timer_disable(&c->write_timer);
nxt_timer_disable(engine, &c->write_timer);
}
}

View file

@ -525,17 +525,8 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
engine->event->poll(&engine->task, engine->event_set, timeout);
/*
* Look up expired timers only if a new zero timer has been
* just added before the event poll or if the event poll slept
* at least 1 millisecond, because all old eligible timers were
* processed in the previous iterations.
*/
now = nxt_thread_monotonic_time(thr) / 1000000;
if (timeout == 0 || now != engine->timers.now) {
nxt_timer_expire(thr, now);
}
nxt_timer_expire(engine, now);
}
}

View file

@ -934,7 +934,7 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
c->socket.write = NXT_EVENT_BLOCKED;
if (c->write_state->autoreset_timer) {
nxt_timer_disable(&c->write_timer);
nxt_timer_disable(task->thread->engine, &c->write_timer);
}
nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,

View file

@ -226,7 +226,6 @@ nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle)
*/
cycle->timer.handler = nxt_master_stop_previous_worker_processes;
cycle->timer.log = &nxt_main_log;
nxt_timer_ident(&cycle->timer, -1);
cycle->timer.work_queue = &thr->engine->fast_work_queue;

View file

@ -8,26 +8,23 @@
/*
* Timer operations are batched to improve instruction and data
* cache locality of rbtree operations.
* Timer operations are batched in the changes array to improve instruction
* and data cache locality of rbtree operations.
*
* nxt_timer_add() adds a timer to the changes array to add or to
* modify the timer. The changes are processed by nxt_timer_find().
* nxt_timer_add() adds or modify a timer.
*
* nxt_timer_disable() disables a timer. The disabled timer may
* however present in rbtree for a long time and may be eventually removed
* by nxt_timer_find() or nxt_timer_expire().
* nxt_timer_disable() disables a timer.
*
* nxt_timer_delete() removes a timer at once from both the rbtree and
* the changes array and should be used only if the timer memory must be freed.
* nxt_timer_delete() deletes a timer. It returns 1 if there are pending
* changes in the changes array or 0 otherwise.
*/
static intptr_t nxt_timer_rbtree_compare(nxt_rbtree_node_t *node1,
nxt_rbtree_node_t *node2);
static void nxt_timer_change(nxt_timers_t *timers, nxt_timer_t *timer,
nxt_msec_t time);
static void nxt_commit_timer_changes(nxt_timers_t *timers);
static void nxt_timer_drop_changes(nxt_timers_t *timers, nxt_timer_t *timer);
static void nxt_timer_change(nxt_event_engine_t *engine, nxt_timer_t *timer,
nxt_timer_operation_t change, nxt_msec_t time);
static void nxt_timer_changes_commit(nxt_event_engine_t *engine);
static void nxt_timer_handler(nxt_task_t *task, void *obj, void *data);
nxt_int_t
@ -61,7 +58,7 @@ nxt_timer_rbtree_compare(nxt_rbtree_node_t *node1, nxt_rbtree_node_t *node2)
* This signed comparison takes into account that overflow.
*/
/* timer1->time < timer2->time */
return nxt_msec_diff(timer1->time, timer2->time);
return nxt_msec_diff(timer1->time , timer2->time);
}
@ -74,152 +71,165 @@ nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer,
time = engine->timers.now + timeout;
if (nxt_timer_is_in_tree(timer)) {
if (timer->state != NXT_TIMER_CHANGING) {
diff = nxt_msec_diff(time, timer->time);
if (nxt_timer_is_in_tree(timer)) {
/*
* Use the previous timer if difference between it and the
* new timer is less than required precision milliseconds:
* this decreases rbtree operations for fast connections.
*/
diff = nxt_msec_diff(time, timer->time);
/*
* Use the previous timer if difference between it and the
* new timer is less than required precision milliseconds: this
* decreases number of rbtree operations for fast connections.
*/
if (nxt_abs(diff) < timer->precision) {
nxt_debug(timer->task, "timer previous: %M:%d",
time, timer->state);
if (nxt_abs(diff) < timer->precision) {
nxt_log_debug(timer->log, "timer previous: %D: %d:%M",
timer->ident, timer->state, time);
if (timer->state == NXT_TIMER_DISABLED) {
timer->state = NXT_TIMER_ACTIVE;
timer->state = NXT_TIMER_WAITING;
return;
}
return;
}
nxt_log_debug(timer->log, "timer change: %D: %d:%M",
timer->ident, timer->state, timer->time);
} else {
/*
* The timer's time is updated here just to log a correct
* value by debug logging in nxt_timer_disable().
* It could be updated only in nxt_commit_timer_changes()
* just before nxt_rbtree_insert().
*/
timer->time = time;
nxt_log_debug(timer->log, "timer add: %D: %M:%M",
timer->ident, timeout, time);
}
nxt_timer_change(&engine->timers, timer, time);
nxt_debug(timer->task, "timer add: %M:%d %M:%M",
timer->time, timer->state, timeout, time);
nxt_timer_change(engine, timer, NXT_TIMER_ADD, time);
}
void
nxt_timer_disable(nxt_event_engine_t *engine, nxt_timer_t *timer)
{
nxt_debug(timer->task, "timer disable: %M:%d", timer->time, timer->state);
if (timer->state != NXT_TIMER_CHANGING) {
timer->state = NXT_TIMER_DISABLED;
} else {
nxt_timer_change(engine, timer, NXT_TIMER_DISABLE, 0);
}
}
nxt_bool_t
nxt_timer_delete(nxt_event_engine_t *engine, nxt_timer_t *timer)
{
nxt_bool_t pending;
if (nxt_timer_is_in_tree(timer) || timer->state == NXT_TIMER_CHANGING) {
nxt_debug(timer->task, "timer delete: %M:%d",
timer->time, timer->state);
nxt_timer_change(engine, timer, NXT_TIMER_DELETE, 0);
return 1;
}
pending = (timer->state == NXT_TIMER_ENQUEUED);
timer->state = NXT_TIMER_DISABLED;
return pending;
}
static void
nxt_timer_change(nxt_timers_t *timers, nxt_timer_t *timer, nxt_msec_t time)
nxt_timer_change(nxt_event_engine_t *engine, nxt_timer_t *timer,
nxt_timer_operation_t change, nxt_msec_t time)
{
nxt_timers_t *timers;
nxt_timer_change_t *ch;
timers = &engine->timers;
if (timers->nchanges >= timers->mchanges) {
nxt_commit_timer_changes(timers);
nxt_timer_changes_commit(engine);
}
timer->state = NXT_TIMER_ACTIVE;
nxt_debug(timer->task, "timer change: %M:%d", time, change);
timer->state = NXT_TIMER_CHANGING;
ch = &timers->changes[timers->nchanges];
ch->change = change;
ch->time = time;
ch->timer = timer;
timers->nchanges++;
}
#if (NXT_DEBUG)
void
nxt_timer_disable(nxt_timer_t *timer)
{
nxt_debug(timer->task, "timer disable: %D: %d:%M",
timer->ident, timer->state, timer->time);
timer->state = NXT_TIMER_DISABLED;
}
#endif
void
nxt_timer_delete(nxt_event_engine_t *engine, nxt_timer_t *timer)
{
if (nxt_timer_is_in_tree(timer)) {
nxt_log_debug(timer->log, "timer delete: %D: %d:%M",
timer->ident, timer->state, timer->time);
nxt_rbtree_delete(&engine->timers.tree, &timer->node);
nxt_timer_in_tree_clear(timer);
timer->state = NXT_TIMER_DISABLED;
}
nxt_timer_drop_changes(&engine->timers, timer);
}
static void
nxt_timer_drop_changes(nxt_timers_t *timers, nxt_timer_t *timer)
{
nxt_timer_change_t *dst, *src, *end;
dst = timers->changes;
end = dst + timers->nchanges;
for (src = dst; src < end; src++) {
if (src->timer == timer) {
continue;
}
if (dst != src) {
*dst = *src;
}
dst++;
}
timers->nchanges -= end - dst;
}
static void
nxt_commit_timer_changes(nxt_timers_t *timers)
nxt_timer_changes_commit(nxt_event_engine_t *engine)
{
int32_t diff;
nxt_timer_t *timer;
nxt_timers_t *timers;
nxt_timer_state_t state;
nxt_timer_change_t *ch, *end;
nxt_thread_log_debug("timers changes: %ui", timers->nchanges);
timers = &engine->timers;
nxt_debug(&engine->task, "timers changes: %ui", timers->nchanges);
ch = timers->changes;
end = ch + timers->nchanges;
while (ch < end) {
state = NXT_TIMER_DISABLED;
timer = ch->timer;
if (timer->state != NXT_TIMER_DISABLED) {
switch (ch->change) {
case NXT_TIMER_ADD:
if (nxt_timer_is_in_tree(timer)) {
nxt_log_debug(timer->log, "timer delete: %D: %d:%M",
timer->ident, timer->state, timer->time);
diff = nxt_msec_diff(ch->time, timer->time);
/* See the comment in nxt_timer_add(). */
if (nxt_abs(diff) < timer->precision) {
nxt_debug(timer->task, "timer rbtree previous: %M:%d",
ch->time, timer->state);
state = NXT_TIMER_WAITING;
break;
}
nxt_debug(timer->task, "timer rbtree delete: %M:%d",
timer->time, timer->state);
nxt_rbtree_delete(&timers->tree, &timer->node);
timer->time = ch->time;
}
nxt_log_debug(timer->log, "timer add: %D: %M",
timer->ident, timer->time);
timer->time = ch->time;
nxt_debug(timer->task, "timer rbtree insert: %M", timer->time);
nxt_rbtree_insert(&timers->tree, &timer->node);
nxt_timer_in_tree_set(timer);
state = NXT_TIMER_WAITING;
break;
case NXT_TIMER_DELETE:
if (nxt_timer_is_in_tree(timer)) {
nxt_debug(timer->task, "timer rbtree delete: %M:%d",
timer->time, timer->state);
nxt_rbtree_delete(&timers->tree, &timer->node);
}
break;
case NXT_TIMER_DISABLE:
break;
}
timer->state = state;
ch++;
}
@ -232,55 +242,72 @@ nxt_timer_find(nxt_event_engine_t *engine)
{
int32_t time;
nxt_timer_t *timer;
nxt_timers_t *timers;
nxt_rbtree_t *tree;
nxt_rbtree_node_t *node, *next;
if (engine->timers.nchanges != 0) {
nxt_commit_timer_changes(&engine->timers);
timers = &engine->timers;
if (timers->nchanges != 0) {
nxt_timer_changes_commit(engine);
}
for (node = nxt_rbtree_min(&engine->timers.tree);
nxt_rbtree_is_there_successor(&engine->timers.tree, node);
tree = &timers->tree;
for (node = nxt_rbtree_min(tree);
nxt_rbtree_is_there_successor(tree, node);
node = next)
{
next = nxt_rbtree_node_successor(&engine->timers.tree, node);
next = nxt_rbtree_node_successor(tree, node);
timer = (nxt_timer_t *) node;
/*
* Disabled timers are not deleted here since the minimum active
* timer may be larger than a disabled timer, but event poll may
* return much earlier and the disabled timer can be reactivated.
*/
if (timer->state != NXT_TIMER_DISABLED) {
time = timer->time;
timers->minimum = time;
if (timer->state == NXT_TIMER_BLOCKED) {
nxt_log_debug(timer->log, "timer blocked: %D: %M",
timer->ident, timer->time);
continue;
}
nxt_debug(timer->task, "timer found minimum: %M:%M",
time, timers->now);
time = nxt_msec_diff(timer->time, engine->timers.now);
time = nxt_msec_diff(time, timers->now);
return (nxt_msec_t) nxt_max(time, 0);
}
/* Delete disabled timer. */
nxt_log_debug(timer->log, "timer delete: %D: 0:%M",
timer->ident, timer->time);
nxt_rbtree_delete(&engine->timers.tree, &timer->node);
nxt_timer_in_tree_clear(timer);
}
/* Set minimum time one day ahead. */
timers->minimum = timers->now + 24 * 60 * 60 * 1000;
return NXT_INFINITE_MSEC;
}
void
nxt_timer_expire(nxt_thread_t *thr, nxt_msec_t now)
nxt_timer_expire(nxt_event_engine_t *engine, nxt_msec_t now)
{
nxt_timer_t *timer;
nxt_timers_t *timers;
nxt_rbtree_t *tree;
nxt_rbtree_node_t *node, *next;
thr->engine->timers.now = now;
tree = &thr->engine->timers.tree;
timers = &engine->timers;
timers->now = now;
nxt_debug(&engine->task, "timer expire minimum: %M:%M",
timers->minimum, now);
/* timers->minimum > now */
if (nxt_msec_diff(timers->minimum , now) > 0) {
return;
}
tree = &timers->tree;
for (node = nxt_rbtree_min(tree);
nxt_rbtree_is_there_successor(tree, node);
@ -289,29 +316,38 @@ nxt_timer_expire(nxt_thread_t *thr, nxt_msec_t now)
timer = (nxt_timer_t *) node;
/* timer->time > now */
if (nxt_msec_diff(timer->time, now) > 0) {
if (nxt_msec_diff(timer->time , now) > 0) {
return;
}
next = nxt_rbtree_node_successor(tree, node);
if (timer->state == NXT_TIMER_BLOCKED) {
nxt_log_debug(timer->log, "timer blocked: %D: %M",
timer->ident, timer->time);
continue;
}
nxt_log_debug(timer->log, "timer delete: %D: %d:%M",
timer->ident, timer->state, timer->time);
nxt_debug(timer->task, "timer expire delete: %M:%d",
timer->time, timer->state);
nxt_rbtree_delete(tree, &timer->node);
nxt_timer_in_tree_clear(timer);
if (timer->state != NXT_TIMER_DISABLED) {
timer->state = NXT_TIMER_DISABLED;
timer->state = NXT_TIMER_ENQUEUED;
nxt_work_queue_add(timer->work_queue, timer->handler, timer->task,
timer, NULL);
nxt_work_queue_add(timer->work_queue, nxt_timer_handler,
timer->task, timer, NULL);
}
}
}
static void
nxt_timer_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *timer;
timer = obj;
if (timer->state == NXT_TIMER_ENQUEUED) {
timer->state = NXT_TIMER_DISABLED;
timer->handler(task, timer, NULL);
}
}

View file

@ -13,21 +13,19 @@
//#define NXT_TIMER_DEFAULT_PRECISION 1
#if (NXT_DEBUG)
#define NXT_TIMER { NXT_RBTREE_NODE_INIT, 0, 0, 0, \
NULL, NULL, NULL, NULL, -1 }
#else
#define NXT_TIMER { NXT_RBTREE_NODE_INIT, 0, 0, 0, \
NULL, NULL, NULL, NULL }
#endif
typedef enum {
NXT_TIMER_DISABLED = 0,
NXT_TIMER_CHANGING,
NXT_TIMER_WAITING,
NXT_TIMER_ENQUEUED,
} nxt_timer_state_t;
typedef struct {
/* The rbtree node must be the first field. */
NXT_RBTREE_NODE (node);
uint8_t state;
nxt_timer_state_t state:8;
uint8_t precision;
nxt_msec_t time;
@ -36,13 +34,22 @@ typedef struct {
nxt_task_t *task;
nxt_log_t *log;
#if (NXT_DEBUG)
int32_t ident;
#endif
} nxt_timer_t;
#define NXT_TIMER { NXT_RBTREE_NODE_INIT, NXT_TIMER_DISABLED, \
0, 0, NULL, NULL, NULL, NULL }
typedef enum {
NXT_TIMER_ADD = 0,
NXT_TIMER_DISABLE,
NXT_TIMER_DELETE,
} nxt_timer_operation_t;
typedef struct {
nxt_timer_operation_t change:8;
nxt_msec_t time;
nxt_timer_t *timer;
} nxt_timer_change_t;
@ -53,6 +60,7 @@ typedef struct {
/* An overflown milliseconds counter. */
nxt_msec_t now;
nxt_msec_t minimum;
nxt_uint_t mchanges;
nxt_uint_t nchanges;
@ -80,60 +88,16 @@ typedef struct {
(timer)->node.parent = NULL
#define NXT_TIMER_DISABLED 0
#define NXT_TIMER_BLOCKED 1
#define NXT_TIMER_ACTIVE 2
#if (NXT_DEBUG)
#define nxt_timer_ident(timer, val) \
(timer)->ident = (val)
#else
#define nxt_timer_ident(timer, val)
#endif
nxt_inline nxt_timer_t *
nxt_timer_create(int32_t ident)
{
nxt_timer_t *timer;
timer = nxt_zalloc(sizeof(nxt_timer_t));
if (timer == NULL) {
return NULL;
}
timer->precision = NXT_TIMER_DEFAULT_PRECISION;
#if (NXT_DEBUG)
timer->ident = ident;
#endif
return timer;
}
nxt_int_t nxt_timers_init(nxt_timers_t *timers, nxt_uint_t mchanges);
nxt_msec_t nxt_timer_find(nxt_event_engine_t *engine);
void nxt_timer_expire(nxt_event_engine_t *engine, nxt_msec_t now);
NXT_EXPORT void nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer,
nxt_msec_t timeout);
NXT_EXPORT void nxt_timer_delete(nxt_event_engine_t *engine,
NXT_EXPORT void nxt_timer_disable(nxt_event_engine_t *engine,
nxt_timer_t *timer);
NXT_EXPORT nxt_bool_t nxt_timer_delete(nxt_event_engine_t *engine,
nxt_timer_t *timer);
nxt_msec_t nxt_timer_find(nxt_event_engine_t *engine);
void nxt_timer_expire(nxt_thread_t *thr, nxt_msec_t now);
#if (NXT_DEBUG)
NXT_EXPORT void nxt_timer_disable(nxt_timer_t *timer);
#else
#define nxt_timer_disable(timer) \
(timer)->state = NXT_TIMER_DISABLED
#endif
#endif /* _NXT_TIMER_H_INCLUDED_ */