Round robin upstream added.

This commit is contained in:
Igor Sysoev 2020-03-06 18:28:54 +03:00
parent 794248090a
commit 7935ea4543
11 changed files with 654 additions and 34 deletions

View file

@ -69,6 +69,7 @@ NXT_LIB_SRCS=" \
src/nxt_job_resolve.c \
src/nxt_sockaddr.c \
src/nxt_listen_socket.c \
src/nxt_upstream.c \
src/nxt_upstream_round_robin.c \
src/nxt_http_parse.c \
src/nxt_app_log.c \

View file

@ -110,6 +110,12 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt,
nxt_str_t *name, nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_server(nxt_conf_validation_t *vldt,
nxt_str_t *name, nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
@ -226,6 +232,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_root_members[] = {
&nxt_conf_vldt_object_iterator,
(void *) &nxt_conf_vldt_app },
{ nxt_string("upstreams"),
NXT_CONF_VLDT_OBJECT,
&nxt_conf_vldt_object_iterator,
(void *) &nxt_conf_vldt_upstream },
{ nxt_string("access_log"),
NXT_CONF_VLDT_STRING,
NULL,
@ -682,6 +693,26 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
};
static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_members[] = {
{ nxt_string("servers"),
NXT_CONF_VLDT_OBJECT,
&nxt_conf_vldt_object_iterator,
(void *) &nxt_conf_vldt_server },
NXT_CONF_VLDT_END
};
static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_server_members[] = {
{ nxt_string("weight"),
NXT_CONF_VLDT_INTEGER,
&nxt_conf_vldt_server_weight,
NULL },
NXT_CONF_VLDT_END
};
nxt_int_t
nxt_conf_validate(nxt_conf_validation_t *vldt)
{
@ -1017,6 +1048,27 @@ nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
return NXT_OK;
}
if (nxt_str_eq(&first, "upstreams", 9)) {
if (second.length == 0) {
goto error;
}
value = nxt_conf_get_object_member(vldt->conf, &first, NULL);
if (nxt_slow_path(value == NULL)) {
goto error;
}
value = nxt_conf_get_object_member(value, &second, NULL);
if (nxt_slow_path(value == NULL)) {
goto error;
}
return NXT_OK;
}
if (nxt_str_eq(&first, "routes", 6)) {
value = nxt_conf_get_object_member(vldt->conf, &first, NULL);
@ -1901,3 +1953,81 @@ nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value)
return NXT_OK;
}
static nxt_int_t
nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, nxt_str_t *name,
nxt_conf_value_t *value)
{
nxt_int_t ret;
nxt_conf_value_t *conf;
static nxt_str_t servers = nxt_string("servers");
ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
if (ret != NXT_OK) {
return ret;
}
ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_upstream_members);
if (ret != NXT_OK) {
return ret;
}
conf = nxt_conf_get_object_member(value, &servers, NULL);
if (conf == NULL) {
return nxt_conf_vldt_error(vldt, "The \"%V\" upstream must contain "
"\"servers\" object value.", name);
}
return NXT_OK;
}
static nxt_int_t
nxt_conf_vldt_server(nxt_conf_validation_t *vldt, nxt_str_t *name,
nxt_conf_value_t *value)
{
nxt_int_t ret;
nxt_sockaddr_t *sa;
ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
if (ret != NXT_OK) {
return ret;
}
sa = nxt_sockaddr_parse(vldt->pool, name);
if (sa == NULL) {
return nxt_conf_vldt_error(vldt, "The \"%V\" is not valid "
"server address.", name);
}
return nxt_conf_vldt_object(vldt, value,
nxt_conf_vldt_upstream_server_members);
}
static nxt_int_t
nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data)
{
int64_t int_value;
int_value = nxt_conf_get_integer(value);
if (int_value <= 0) {
return nxt_conf_vldt_error(vldt, "The \"weight\" number must be "
"greater than 0.");
}
if (int_value > NXT_INT32_T_MAX) {
return nxt_conf_vldt_error(vldt, "The \"weight\" number must "
"not exceed %d.", NXT_INT32_T_MAX);
}
return NXT_OK;
}

View file

@ -6,6 +6,7 @@
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_upstream.h>
#include <nxt_h1proto.h>
#include <nxt_websocket.h>
#include <nxt_websocket_header.h>
@ -2004,7 +2005,7 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
c->read_timer.task = task;
c->write_timer.task = task;
c->socket.data = peer;
c->remote = peer->sockaddr;
c->remote = peer->server->sockaddr;
c->socket.write_ready = 1;
c->write_state = &nxt_h1p_peer_connect_state;

View file

@ -106,10 +106,12 @@ typedef struct {
} nxt_http_response_t;
typedef struct nxt_upstream_server_s nxt_upstream_server_t;
typedef struct {
nxt_http_proto_t proto;
nxt_http_request_t *request;
nxt_sockaddr_t *sockaddr;
nxt_upstream_server_t *server;
nxt_list_t *fields;
nxt_buf_t *body;
nxt_off_t remainder;
@ -178,7 +180,6 @@ struct nxt_http_request_s {
typedef struct nxt_http_route_s nxt_http_route_t;
typedef struct nxt_http_upstream_s nxt_http_upstream_t;
struct nxt_http_action_s {
@ -187,9 +188,10 @@ struct nxt_http_action_s {
nxt_http_action_t *action);
union {
nxt_http_route_t *route;
nxt_http_upstream_t *upstream;
nxt_app_t *application;
nxt_http_action_t *fallback;
nxt_upstream_t *upstream;
uint32_t upstream_number;
} u;
nxt_str_t name;
@ -275,6 +277,11 @@ nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task,
void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes);
void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action);
nxt_int_t nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_conf_value_t *conf);
nxt_int_t nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf,
nxt_upstream_t ***upstream_joint);
nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash);
@ -285,6 +292,11 @@ nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash,
nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
void nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
nxt_http_action_t *action);
nxt_http_action_t *nxt_upstream_proxy_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_upstream_t *upstream);
nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action);
nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field,

View file

@ -6,23 +6,21 @@
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_upstream.h>
typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
struct nxt_http_upstream_s {
uint32_t current;
uint32_t n;
uint8_t protocol;
nxt_http_upstream_connect_t connect;
nxt_sockaddr_t *sockaddr[1];
struct nxt_upstream_proxy_s {
nxt_sockaddr_t *sockaddr;
uint8_t protocol;
};
static void nxt_http_upstream_connect(nxt_task_t *task,
nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
static void nxt_http_proxy_server_get(nxt_task_t *task,
nxt_upstream_server_t *us);
static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
nxt_upstream_server_t *us);
static void nxt_http_proxy_upstream_error(nxt_task_t *task,
nxt_upstream_server_t *us);
static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
@ -43,12 +41,24 @@ static const nxt_http_request_state_t nxt_http_proxy_header_read_state;
static const nxt_http_request_state_t nxt_http_proxy_read_state;
static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = {
.get = nxt_http_proxy_server_get,
};
static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = {
.ready = nxt_http_proxy_upstream_ready,
.error = nxt_http_proxy_upstream_error,
};
nxt_int_t
nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
{
nxt_str_t name;
nxt_sockaddr_t *sa;
nxt_http_upstream_t *upstream;
nxt_str_t name;
nxt_sockaddr_t *sa;
nxt_upstream_t *up;
nxt_upstream_proxy_t *proxy;
sa = NULL;
name = action->name;
@ -66,18 +76,25 @@ nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
}
if (sa != NULL) {
upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
if (nxt_slow_path(upstream == NULL)) {
up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
if (nxt_slow_path(up == NULL)) {
return NXT_ERROR;
}
upstream->current = 0;
upstream->n = 1;
upstream->protocol = NXT_HTTP_PROTO_H1;
upstream->connect = nxt_http_upstream_connect;
upstream->sockaddr[0] = sa;
up->name.length = sa->length;
up->name.start = nxt_sockaddr_start(sa);
up->proto = &nxt_upstream_simple_proto;
action->u.upstream = upstream;
proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
if (nxt_slow_path(proxy == NULL)) {
return NXT_ERROR;
}
proxy->sockaddr = sa;
proxy->protocol = NXT_HTTP_PROTO_H1;
up->type.proxy = proxy;
action->u.upstream = up;
action->handler = nxt_http_proxy_handler;
}
@ -89,7 +106,22 @@ static nxt_http_action_t *
nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_action_t *action)
{
nxt_http_peer_t *peer;
return nxt_upstream_proxy_handler(task, r, action->u.upstream);
}
nxt_http_action_t *
nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_upstream_t *upstream)
{
nxt_http_peer_t *peer;
nxt_upstream_server_t *us;
us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
if (nxt_slow_path(us == NULL)) {
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
return NULL;
}
peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
if (nxt_slow_path(peer == NULL)) {
@ -102,18 +134,39 @@ nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_mp_retain(r->mem_pool);
action->u.upstream->connect(task, action->u.upstream, peer);
us->state = &nxt_upstream_proxy_state;
us->peer.http = peer;
peer->server = us;
us->upstream = upstream;
upstream->proto->get(task, us);
return NULL;
}
static void
nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
nxt_http_peer_t *peer)
nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
{
peer->protocol = upstream->protocol;
peer->sockaddr = upstream->sockaddr[0];
nxt_upstream_proxy_t *proxy;
proxy = us->upstream->type.proxy;
us->sockaddr = proxy->sockaddr;
us->protocol = proxy->protocol;
us->state->ready(task, us);
}
static void
nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
{
nxt_http_peer_t *peer;
peer = us->peer.http;
peer->protocol = us->protocol;
peer->request->state = &nxt_http_proxy_header_send_state;
@ -121,6 +174,19 @@ nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
}
static void
nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
{
nxt_http_request_t *r;
r = us->peer.http->request;
nxt_mp_release(r->mem_pool);
nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
}
static const nxt_http_request_state_t nxt_http_proxy_header_send_state
nxt_aligned(64) =
{

View file

@ -1114,6 +1114,12 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_router_listener_application(tmcf, &name, action);
nxt_router_app_use(task, action->u.application, 1);
} else if (nxt_str_start(&name, "upstreams/", 10)) {
name.length -= 10;
name.start += 10;
nxt_upstream_find(tmcf->router_conf->upstreams, &name, action);
} else if (nxt_str_start(&name, "routes", 6)) {
if (name.length == 6) {

View file

@ -1634,6 +1634,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->routes = routes;
}
ret = nxt_upstreams_create(task, tmcf, conf);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
http = nxt_conf_get_path(conf, &http_path);
#if 0
if (http == NULL) {
@ -2526,6 +2531,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler)
{
nxt_int_t ret;
nxt_joint_job_t *job;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
@ -2559,6 +2565,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
job->work.data = joint;
ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
joint->count = 1;
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);

View file

@ -18,6 +18,8 @@ typedef struct nxt_http_request_s nxt_http_request_t;
typedef struct nxt_http_action_s nxt_http_action_t;
typedef struct nxt_http_routes_s nxt_http_routes_t;
typedef struct nxt_upstream_s nxt_upstream_t;
typedef struct nxt_upstreams_s nxt_upstreams_t;
typedef struct nxt_router_access_log_s nxt_router_access_log_t;
@ -43,6 +45,7 @@ typedef struct {
nxt_router_t *router;
nxt_http_routes_t *routes;
nxt_upstreams_t *upstreams;
nxt_lvlhsh_t mtypes_hash;
@ -196,6 +199,8 @@ typedef struct {
nxt_event_engine_t *engine;
nxt_socket_conf_t *socket_conf;
nxt_upstream_t **upstreams;
/* Modules configuraitons. */
} nxt_socket_conf_joint_t;

View file

@ -4,4 +4,137 @@
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_upstream.h>
static nxt_http_action_t *nxt_upstream_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
nxt_int_t
nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_conf_value_t *conf)
{
size_t size;
uint32_t i, n, next;
nxt_mp_t *mp;
nxt_int_t ret;
nxt_str_t name, *string;
nxt_upstreams_t *upstreams;
nxt_conf_value_t *upstreams_conf, *upcf;
static nxt_str_t upstreams_name = nxt_string("upstreams");
upstreams_conf = nxt_conf_get_object_member(conf, &upstreams_name, NULL);
if (upstreams_conf == NULL) {
return NXT_OK;
}
n = nxt_conf_object_members_count(upstreams_conf);
if (n == 0) {
return NXT_OK;
}
mp = tmcf->router_conf->mem_pool;
size = sizeof(nxt_upstreams_t) + n * sizeof(nxt_upstream_t);
upstreams = nxt_mp_zalloc(mp, size);
if (nxt_slow_path(upstreams == NULL)) {
return NXT_ERROR;
}
upstreams->items = n;
next = 0;
for (i = 0; i < n; i++) {
upcf = nxt_conf_next_object_member(upstreams_conf, &name, &next);
string = nxt_str_dup(mp, &upstreams->upstream[i].name, &name);
if (nxt_slow_path(string == NULL)) {
return NXT_ERROR;
}
ret = nxt_upstream_round_robin_create(task, tmcf, upcf,
&upstreams->upstream[i]);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
}
tmcf->router_conf->upstreams = upstreams;
return NXT_OK;
}
void
nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
nxt_http_action_t *action)
{
uint32_t i, n;
nxt_upstream_t *upstream;
upstream = &upstreams->upstream[0];
n = upstreams->items;
for (i = 0; i < n; i++) {
if (nxt_strstr_eq(&upstream[i].name, name)) {
action->u.upstream_number = i;
action->handler = nxt_upstream_handler;
return;
}
}
}
nxt_int_t
nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf,
nxt_upstream_t ***upstream_joint)
{
uint32_t i, n;
nxt_upstream_t *u, **up;
nxt_upstreams_t *upstreams;
nxt_router_conf_t *router_conf;
router_conf = tmcf->router_conf;
upstreams = router_conf->upstreams;
if (upstreams == NULL) {
*upstream_joint = NULL;
return NXT_OK;
}
n = upstreams->items;
up = nxt_mp_zalloc(router_conf->mem_pool, n * sizeof(nxt_upstream_t *));
if (nxt_slow_path(up == NULL)) {
return NXT_ERROR;
}
u = &upstreams->upstream[0];
for (i = 0; i < n; i++) {
up[i] = u[i].proto->joint_create(tmcf, &u[i]);
if (nxt_slow_path(up[i] == NULL)) {
return NXT_ERROR;
}
}
*upstream_joint = up;
return NXT_OK;
}
static nxt_http_action_t *
nxt_upstream_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_action_t *action)
{
return nxt_upstream_proxy_handler(task, r,
r->conf->upstreams[action->u.upstream_number]);
}

View file

@ -8,4 +8,74 @@
#define _NXT_UPSTREAM_H_INCLUDED_
typedef struct nxt_upstream_proxy_s nxt_upstream_proxy_t;
typedef struct nxt_upstream_round_robin_s nxt_upstream_round_robin_t;
typedef struct nxt_upstream_round_robin_server_s
nxt_upstream_round_robin_server_t;
typedef void (*nxt_upstream_peer_ready_t)(nxt_task_t *task,
nxt_upstream_server_t *us);
typedef void (*nxt_upstream_peer_error_t)(nxt_task_t *task,
nxt_upstream_server_t *us);
typedef struct {
nxt_upstream_peer_ready_t ready;
nxt_upstream_peer_error_t error;
} nxt_upstream_peer_state_t;
typedef nxt_upstream_t *(*nxt_upstream_joint_create_t)(
nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
typedef void (*nxt_upstream_server_get_t)(nxt_task_t *task,
nxt_upstream_server_t *us);
typedef struct {
nxt_upstream_joint_create_t joint_create;
nxt_upstream_server_get_t get;
} nxt_upstream_server_proto_t;
struct nxt_upstream_s {
const nxt_upstream_server_proto_t *proto;
union {
nxt_upstream_proxy_t *proxy;
nxt_upstream_round_robin_t *round_robin;
} type;
nxt_str_t name;
};
struct nxt_upstreams_s {
uint32_t items;
nxt_upstream_t upstream[0];
};
struct nxt_upstream_server_s {
nxt_sockaddr_t *sockaddr;
const nxt_upstream_peer_state_t *state;
nxt_upstream_t *upstream;
uint8_t protocol;
union {
nxt_upstream_round_robin_server_t *round_robin;
} server;
union {
nxt_http_peer_t *http;
} peer;
};
nxt_int_t nxt_upstream_round_robin_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *upstream_conf,
nxt_upstream_t *upstream);
#endif /* _NXT_UPSTREAM_H_INCLUDED_ */

View file

@ -4,3 +4,188 @@
* Copyright (C) NGINX, Inc.
*/
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_upstream.h>
struct nxt_upstream_round_robin_server_s {
nxt_sockaddr_t *sockaddr;
int32_t current_weight;
int32_t effective_weight;
int32_t weight;
uint8_t protocol;
};
struct nxt_upstream_round_robin_s {
uint32_t items;
nxt_upstream_round_robin_server_t server[0];
};
static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
nxt_upstream_server_t *us);
static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
.joint_create = nxt_upstream_round_robin_joint_create,
.get = nxt_upstream_round_robin_server_get,
};
static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = {
{
nxt_string("weight"),
NXT_CONF_MAP_INT32,
offsetof(nxt_upstream_round_robin_server_t, weight),
},
};
nxt_int_t
nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
{
size_t size;
uint32_t i, n, next;
nxt_mp_t *mp;
nxt_str_t name;
nxt_sockaddr_t *sa;
nxt_conf_value_t *servers_conf, *srvcf;
nxt_upstream_round_robin_t *urr;
static nxt_str_t servers = nxt_string("servers");
mp = tmcf->router_conf->mem_pool;
servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
n = nxt_conf_object_members_count(servers_conf);
size = sizeof(nxt_upstream_round_robin_t)
+ n * sizeof(nxt_upstream_round_robin_server_t);
urr = nxt_mp_zalloc(mp, size);
if (nxt_slow_path(urr == NULL)) {
return NXT_ERROR;
}
urr->items = n;
next = 0;
for (i = 0; i < n; i++) {
srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
sa = nxt_sockaddr_parse(mp, &name);
if (nxt_slow_path(sa == NULL)) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
urr->server[i].sockaddr = sa;
urr->server[i].weight = 1;
urr->server[i].protocol = NXT_HTTP_PROTO_H1;
nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf,
nxt_nitems(nxt_upstream_round_robin_server_conf),
&urr->server[i]);
urr->server[i].effective_weight = urr->server[i].weight;
}
upstream->proto = &nxt_upstream_round_robin_proto;
upstream->type.round_robin = urr;
return NXT_OK;
}
static nxt_upstream_t *
nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
nxt_upstream_t *upstream)
{
size_t size;
uint32_t i, n;
nxt_mp_t *mp;
nxt_upstream_t *u;
nxt_upstream_round_robin_t *urr, *urrcf;
mp = tmcf->router_conf->mem_pool;
u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
if (nxt_slow_path(u == NULL)) {
return NULL;
}
*u = *upstream;
urrcf = upstream->type.round_robin;
size = sizeof(nxt_upstream_round_robin_t)
+ urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
urr = nxt_mp_alloc(mp, size);
if (nxt_slow_path(urr == NULL)) {
return NULL;
}
u->type.round_robin = urr;
n = urrcf->items;
urr->items = n;
for (i = 0; i < n; i++) {
urr->server[i] = urrcf->server[i];
}
return u;
}
static void
nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
{
int32_t total;
uint32_t i, n;
nxt_upstream_round_robin_t *round_robin;
nxt_upstream_round_robin_server_t *s, *best;
best = NULL;
total = 0;
round_robin = us->upstream->type.round_robin;
s = round_robin->server;
n = round_robin->items;
for (i = 0; i < n; i++) {
s[i].current_weight += s[i].effective_weight;
total += s[i].effective_weight;
if (s[i].effective_weight < s[i].weight) {
s[i].effective_weight++;
}
if (best == NULL || s[i].current_weight > best->current_weight) {
best = &s[i];
}
}
if (best == NULL) {
us->state->error(task, us);
return;
}
best->current_weight -= total;
us->sockaddr = best->sockaddr;
us->protocol = best->protocol;
us->server.round_robin = best;
us->state->ready(task, us);
}