Node.js: implementing output message drain using SHM_ACK feature.
ServerResponse.write() method tries to write data buffer using libunit and stores buffers to write in a Server-wide output queue, which is processed in response to SHM_ACK message from router. As a side effect 'drain' event implemented and socket.writable flag reflect current state.
This commit is contained in:
parent
df7caf4650
commit
763bdff401
3 changed files with 188 additions and 26 deletions
|
@ -227,6 +227,7 @@ ServerResponse.prototype._write = unit_lib.response_write;
|
|||
|
||||
ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
|
||||
var contentLength = 0;
|
||||
var res, o;
|
||||
|
||||
this._sendHeaders();
|
||||
|
||||
|
@ -247,11 +248,32 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
|
|||
if (typeof chunk === 'string') {
|
||||
contentLength = Buffer.byteLength(chunk, encoding);
|
||||
|
||||
if (contentLength > unit_lib.buf_min) {
|
||||
chunk = Buffer.from(chunk, encoding);
|
||||
|
||||
contentLength = chunk.length;
|
||||
}
|
||||
|
||||
} else {
|
||||
contentLength = chunk.length;
|
||||
}
|
||||
|
||||
this._write(chunk, contentLength);
|
||||
if (this.server._output.length > 0 || !this.socket.writable) {
|
||||
o = new BufferedOutput(this, 0, chunk, encoding, callback);
|
||||
this.server._output.push(o);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
res = this._write(chunk, 0, contentLength);
|
||||
if (res < contentLength) {
|
||||
this.socket.writable = false;
|
||||
|
||||
o = new BufferedOutput(this, res, chunk, encoding, callback);
|
||||
this.server._output.push(o);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (typeof callback === 'function') {
|
||||
|
@ -265,30 +287,49 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
|
|||
* the event loop. All callbacks passed to process.nextTick()
|
||||
* will be resolved before the event loop continues.
|
||||
*/
|
||||
process.nextTick(function () {
|
||||
callback(this);
|
||||
}.bind(this));
|
||||
process.nextTick(callback);
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
ServerResponse.prototype.write = function write(chunk, encoding, callback) {
|
||||
if (this.finished) {
|
||||
throw new Error("Write after end");
|
||||
if (typeof encoding === 'function') {
|
||||
callback = encoding;
|
||||
encoding = null;
|
||||
}
|
||||
|
||||
this._writeBody(chunk, encoding, callback);
|
||||
var err = new Error("Write after end");
|
||||
process.nextTick(() => {
|
||||
this.emit('error', err);
|
||||
|
||||
return true;
|
||||
if (typeof callback === 'function') {
|
||||
callback(err);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return this._writeBody(chunk, encoding, callback);
|
||||
};
|
||||
|
||||
ServerResponse.prototype._end = unit_lib.response_end;
|
||||
|
||||
ServerResponse.prototype.end = function end(chunk, encoding, callback) {
|
||||
if (!this.finished) {
|
||||
this._writeBody(chunk, encoding, callback);
|
||||
if (typeof encoding === 'function') {
|
||||
callback = encoding;
|
||||
encoding = null;
|
||||
}
|
||||
|
||||
this._writeBody(chunk, encoding, () => {
|
||||
this._end();
|
||||
|
||||
if (typeof callback === 'function') {
|
||||
callback();
|
||||
}
|
||||
});
|
||||
|
||||
this.finished = true;
|
||||
}
|
||||
|
||||
|
@ -393,6 +434,9 @@ function Server(requestListener) {
|
|||
this._upgradeListenerCount--;
|
||||
}
|
||||
});
|
||||
|
||||
this._output = [];
|
||||
this._drain_resp = new Set();
|
||||
}
|
||||
|
||||
util.inherits(Server, EventEmitter);
|
||||
|
@ -429,6 +473,63 @@ Server.prototype.emit_close = function () {
|
|||
this.emit('close');
|
||||
};
|
||||
|
||||
Server.prototype.emit_drain = function () {
|
||||
var res, o, l;
|
||||
|
||||
if (this._output.length <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (this._output.length > 0) {
|
||||
o = this._output[0];
|
||||
|
||||
if (typeof o.chunk === 'string') {
|
||||
l = Buffer.byteLength(o.chunk, o.encoding);
|
||||
|
||||
} else {
|
||||
l = o.chunk.length;
|
||||
}
|
||||
|
||||
res = o.resp._write(o.chunk, o.offset, l);
|
||||
|
||||
o.offset += res;
|
||||
if (o.offset < l) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._drain_resp.add(o.resp);
|
||||
|
||||
if (typeof o.callback === 'function') {
|
||||
process.nextTick(o.callback);
|
||||
}
|
||||
|
||||
this._output.shift();
|
||||
}
|
||||
|
||||
for (var resp of this._drain_resp) {
|
||||
|
||||
if (resp.socket.writable) {
|
||||
continue;
|
||||
}
|
||||
|
||||
resp.socket.writable = true;
|
||||
|
||||
process.nextTick(() => {
|
||||
resp.emit("drain");
|
||||
});
|
||||
}
|
||||
|
||||
this._drain_resp.clear();
|
||||
};
|
||||
|
||||
function BufferedOutput(resp, offset, chunk, encoding, callback) {
|
||||
this.resp = resp;
|
||||
this.offset = offset;
|
||||
this.chunk = chunk;
|
||||
this.encoding = encoding;
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
function connectionListener(socket) {
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,8 @@ Unit::init(napi_env env, napi_value exports)
|
|||
websocket_send_frame);
|
||||
napi.set_named_property(exports, "websocket_set_sock",
|
||||
websocket_set_sock);
|
||||
napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
|
||||
napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
|
||||
|
||||
} catch (exception &e) {
|
||||
napi.throw_error(e);
|
||||
|
@ -148,6 +150,7 @@ Unit::create_server(napi_env env, napi_callback_info info)
|
|||
unit_init.callbacks.request_handler = request_handler_cb;
|
||||
unit_init.callbacks.websocket_handler = websocket_handler_cb;
|
||||
unit_init.callbacks.close_handler = close_handler_cb;
|
||||
unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
|
||||
unit_init.callbacks.add_port = add_port;
|
||||
unit_init.callbacks.remove_port = remove_port;
|
||||
unit_init.callbacks.quit = quit_cb;
|
||||
|
@ -308,6 +311,40 @@ Unit::close_handler(nxt_unit_request_info_t *req)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
Unit *obj;
|
||||
|
||||
obj = reinterpret_cast<Unit *>(ctx->unit->data);
|
||||
|
||||
obj->shm_ack_handler(ctx);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
napi_value server_obj, emit_drain;
|
||||
|
||||
try {
|
||||
nxt_handle_scope scope(env());
|
||||
|
||||
server_obj = get_server_object();
|
||||
|
||||
emit_drain = get_named_property(server_obj, "emit_drain");
|
||||
|
||||
nxt_async_context async_context(env(), "shm_ack_handler");
|
||||
nxt_callback_scope async_scope(async_context);
|
||||
|
||||
make_callback(async_context, server_obj, emit_drain);
|
||||
|
||||
} catch (exception &e) {
|
||||
nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
|
||||
{
|
||||
|
@ -748,47 +785,68 @@ Unit::response_write(napi_env env, napi_callback_info info)
|
|||
int ret;
|
||||
void *ptr;
|
||||
size_t argc, have_buf_len;
|
||||
uint32_t buf_len;
|
||||
ssize_t res_len;
|
||||
uint32_t buf_start, buf_len;
|
||||
nxt_napi napi(env);
|
||||
napi_value this_arg;
|
||||
nxt_unit_buf_t *buf;
|
||||
napi_valuetype buf_type;
|
||||
nxt_unit_request_info_t *req;
|
||||
napi_value argv[2];
|
||||
napi_value argv[3];
|
||||
|
||||
argc = 2;
|
||||
argc = 3;
|
||||
|
||||
try {
|
||||
this_arg = napi.get_cb_info(info, argc, argv);
|
||||
if (argc != 2) {
|
||||
if (argc != 3) {
|
||||
throw exception("Wrong args count. Expected: "
|
||||
"chunk, chunk length");
|
||||
"chunk, start, length");
|
||||
}
|
||||
|
||||
req = napi.get_request_info(this_arg);
|
||||
buf_type = napi.type_of(argv[0]);
|
||||
buf_len = napi.get_value_uint32(argv[1]) + 1;
|
||||
|
||||
buf = nxt_unit_response_buf_alloc(req, buf_len);
|
||||
if (buf == NULL) {
|
||||
throw exception("Failed to allocate response buffer");
|
||||
}
|
||||
buf_start = napi.get_value_uint32(argv[1]);
|
||||
buf_len = napi.get_value_uint32(argv[2]) + 1;
|
||||
|
||||
if (buf_type == napi_string) {
|
||||
/* TODO: will work only for utf8 content-type */
|
||||
|
||||
have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
|
||||
buf_len);
|
||||
if (req->response_buf != NULL
|
||||
&& (req->response_buf->end - req->response_buf->free)
|
||||
>= buf_len)
|
||||
{
|
||||
buf = req->response_buf;
|
||||
|
||||
} else {
|
||||
ptr = napi.get_buffer_info(argv[0], have_buf_len);
|
||||
|
||||
memcpy(buf->free, ptr, have_buf_len);
|
||||
buf = nxt_unit_response_buf_alloc(req, buf_len);
|
||||
if (buf == NULL) {
|
||||
throw exception("Failed to allocate response buffer");
|
||||
}
|
||||
}
|
||||
|
||||
have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
|
||||
buf_len);
|
||||
|
||||
buf->free += have_buf_len;
|
||||
|
||||
ret = nxt_unit_buf_send(buf);
|
||||
if (ret == NXT_UNIT_OK) {
|
||||
res_len = have_buf_len;
|
||||
}
|
||||
|
||||
} else {
|
||||
ptr = napi.get_buffer_info(argv[0], have_buf_len);
|
||||
|
||||
if (buf_start > 0) {
|
||||
ptr = ((uint8_t *) ptr) + buf_start;
|
||||
have_buf_len -= buf_start;
|
||||
}
|
||||
|
||||
res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
|
||||
|
||||
ret = res_len < 0 ? -res_len : NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
if (ret != NXT_UNIT_OK) {
|
||||
throw exception("Failed to send body buf");
|
||||
}
|
||||
|
@ -797,7 +855,7 @@ Unit::response_write(napi_env env, napi_callback_info info)
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
return this_arg;
|
||||
return napi.create((int64_t) res_len);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -36,6 +36,9 @@ class Unit : public nxt_napi {
|
|||
static void close_handler_cb(nxt_unit_request_info_t *req);
|
||||
void close_handler(nxt_unit_request_info_t *req);
|
||||
|
||||
static void shm_ack_handler_cb(nxt_unit_ctx_t *ctx);
|
||||
void shm_ack_handler(nxt_unit_ctx_t *ctx);
|
||||
|
||||
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
|
||||
static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
|
||||
|
||||
|
|
Loading…
Reference in a new issue