Libunit: processing single port message.

This partially reverts the optimisation introduced in 1d84b9e4b459 to avoid an
unpredictable block in nxt_unit_process_port_msg().  Under high load, this
function may never return control to its caller, and the external event loop
(in Node.js and Python asyncio) won't be able to process other scheduled
events.

To reproduce the issue, two request processing types are needed: 'fast' and
'furious'.  The 'fast' one simply returns a small response, while the 'furious'
schedules asynchronous calls to external resources.  Thus, if Unit is subjected
to a large amount of 'fast' requests, the 'furious' request processing freezes
until the high load ends.

The issue was found by Wu Jian Ping (@wujjpp) during Node.js stream
implementation discussion and relates to PR #502 on GitHub.
This commit is contained in:
Max Romanov 2020-12-29 19:01:24 +03:00
parent d3d6864bdc
commit d65a66f9d8
3 changed files with 161 additions and 69 deletions

View file

@ -13,15 +13,29 @@
#include <nxt_unit_websocket.h>
static void delete_port_data(uv_handle_t* handle);
napi_ref Unit::constructor_;
struct port_data_t {
nxt_unit_ctx_t *ctx;
nxt_unit_port_t *port;
uv_poll_t poll;
port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
void process_port_msg();
void stop();
template<typename T>
static port_data_t *get(T *handle);
static void read_callback(uv_poll_t *handle, int status, int events);
static void timer_callback(uv_timer_t *handle);
static void delete_data(uv_handle_t* handle);
nxt_unit_ctx_t *ctx;
nxt_unit_port_t *port;
uv_poll_t poll;
uv_timer_t timer;
int ref_count;
bool scheduled;
bool stopped;
};
@ -33,6 +47,106 @@ struct req_data_t {
};
port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
{
timer.type = UV_UNKNOWN_HANDLE;
}
void
port_data_t::process_port_msg()
{
int rc, err;
rc = nxt_unit_process_port_msg(ctx, port);
if (rc != NXT_UNIT_OK) {
return;
}
if (timer.type == UV_UNKNOWN_HANDLE) {
err = uv_timer_init(poll.loop, &timer);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll");
return;
}
ref_count++;
timer.data = this;
}
if (!scheduled && !stopped) {
uv_timer_start(&timer, timer_callback, 0, 0);
scheduled = true;
}
}
void
port_data_t::stop()
{
stopped = true;
uv_poll_stop(&poll);
uv_close((uv_handle_t *) &poll, delete_data);
if (timer.type == UV_UNKNOWN_HANDLE) {
return;
}
uv_timer_stop(&timer);
uv_close((uv_handle_t *) &timer, delete_data);
}
template<typename T>
port_data_t *
port_data_t::get(T *handle)
{
return (port_data_t *) handle->data;
}
void
port_data_t::read_callback(uv_poll_t *handle, int status, int events)
{
get(handle)->process_port_msg();
}
void
port_data_t::timer_callback(uv_timer_t *handle)
{
port_data_t *data;
data = get(handle);
data->scheduled = false;
if (data->stopped) {
return;
}
data->process_port_msg();
}
void
port_data_t::delete_data(uv_handle_t* handle)
{
port_data_t *data;
data = get(handle);
if (--data->ref_count <= 0) {
delete data;
}
}
Unit::Unit(napi_env env, napi_value jsthis):
nxt_napi(env),
wrapper_(wrap(jsthis, this, destroy)),
@ -353,59 +467,50 @@ Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
}
static void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
port_data_t *data;
data = (port_data_t *) handle->data;
nxt_unit_process_port_msg(data->ctx, data->port);
}
int
Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int err;
Unit *obj;
uv_loop_t *loop;
port_data_t *data;
napi_status status;
int err;
Unit *obj;
uv_loop_t *loop;
port_data_t *data;
napi_status status;
if (port->in_fd != -1) {
obj = reinterpret_cast<Unit *>(ctx->unit->data);
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
port->in_fd, strerror(errno), errno);
return -1;
}
obj = reinterpret_cast<Unit *>(ctx->unit->data);
status = napi_get_uv_event_loop(obj->env(), &loop);
if (status != napi_ok) {
nxt_unit_warn(ctx, "Failed to get uv.loop");
return NXT_UNIT_ERROR;
}
data = new port_data_t;
data = new port_data_t(ctx, port);
err = uv_poll_init(loop, &data->poll, port->in_fd);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll");
delete data;
return NXT_UNIT_ERROR;
}
err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
err = uv_poll_start(&data->poll, UV_READABLE,
port_data_t::read_callback);
if (err < 0) {
nxt_unit_warn(ctx, "Failed to start uv.poll");
delete data;
return NXT_UNIT_ERROR;
}
port->data = data;
data->ctx = ctx;
data->port = port;
data->ref_count++;
data->poll.data = data;
}
@ -421,26 +526,11 @@ Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
if (port->data != NULL) {
data = (port_data_t *) port->data;
if (data->port == port) {
uv_poll_stop(&data->poll);
uv_close((uv_handle_t *) &data->poll, delete_port_data);
}
data->stop();
}
}
static void
delete_port_data(uv_handle_t* handle)
{
port_data_t *data;
data = (port_data_t *) handle->data;
delete data;
}
void
Unit::quit_cb(nxt_unit_ctx_t *ctx)
{

View file

@ -5016,7 +5016,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc;
nxt_unit_impl_t *lib;
nxt_unit_read_buf_t *rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
@ -5024,9 +5023,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
}
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
retry:
if (port == lib->shared_port) {
rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
@ -5052,15 +5048,6 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_unit_process_ready_req(ctx);
if (ctx_impl->online) {
rbuf = nxt_unit_read_buf_get(ctx);
if (nxt_slow_path(rbuf == NULL)) {
return NXT_UNIT_ERROR;
}
goto retry;
}
return rc;
}

View file

@ -1131,11 +1131,12 @@ nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx)
static PyObject *
nxt_py_asgi_port_read(PyObject *self, PyObject *args)
{
int rc;
PyObject *arg;
Py_ssize_t n;
nxt_unit_ctx_t *ctx;
nxt_unit_port_t *port;
int rc;
PyObject *arg0, *arg1, *res;
Py_ssize_t n;
nxt_unit_ctx_t *ctx;
nxt_unit_port_t *port;
nxt_py_asgi_ctx_data_t *ctx_data;
n = PyTuple_GET_SIZE(args);
@ -1147,31 +1148,45 @@ nxt_py_asgi_port_read(PyObject *self, PyObject *args)
return PyErr_Format(PyExc_TypeError, "invalid number of arguments");
}
arg = PyTuple_GET_ITEM(args, 0);
if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
arg0 = PyTuple_GET_ITEM(args, 0);
if (nxt_slow_path(arg0 == NULL || PyLong_Check(arg0) == 0)) {
return PyErr_Format(PyExc_TypeError,
"the first argument is not a long");
}
ctx = PyLong_AsVoidPtr(arg);
ctx = PyLong_AsVoidPtr(arg0);
arg = PyTuple_GET_ITEM(args, 1);
if (nxt_slow_path(arg == NULL || PyLong_Check(arg) == 0)) {
arg1 = PyTuple_GET_ITEM(args, 1);
if (nxt_slow_path(arg1 == NULL || PyLong_Check(arg1) == 0)) {
return PyErr_Format(PyExc_TypeError,
"the second argument is not a long");
}
port = PyLong_AsVoidPtr(arg);
nxt_unit_debug(ctx, "asgi_port_read %p %p", ctx, port);
port = PyLong_AsVoidPtr(arg1);
rc = nxt_unit_process_port_msg(ctx, port);
nxt_unit_debug(ctx, "asgi_port_read(%p,%p): %d", ctx, port, rc);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
return PyErr_Format(PyExc_RuntimeError,
"error processing port %d message", port->id.id);
}
if (rc == NXT_UNIT_OK) {
ctx_data = ctx->data;
res = PyObject_CallFunctionObjArgs(ctx_data->loop_call_soon,
nxt_py_port_read,
arg0, arg1, NULL);
if (nxt_slow_path(res == NULL)) {
nxt_unit_alert(ctx, "Python failed to call 'loop.call_soon'");
nxt_python_print_exception();
}
Py_XDECREF(res);
}
Py_RETURN_NONE;
}