Commit 0f9ed40b authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Share connection among different patterns if address set are same

parent 4bb88b35
......@@ -666,7 +666,7 @@ void ClientHandler::pool_downstream_connection(
<< " in group " << group;
}
auto &dconn_pool = group->dconn_pool;
auto &dconn_pool = group->shared_addr->dconn_pool;
dconn_pool.add_downstream_connection(std::move(dconn));
}
......@@ -675,7 +675,8 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn) {
CLOG(INFO, this) << "Removing downstream connection DCONN:" << dconn
<< " from pool";
}
auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool;
auto &dconn_pool =
dconn->get_downstream_addr_group()->shared_addr->dconn_pool;
dconn_pool.remove_downstream_connection(dconn);
}
......@@ -722,7 +723,9 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
}
auto &group = worker_->get_downstream_addr_groups()[group_idx];
auto &dconn_pool = group.dconn_pool;
auto &shared_addr = group.shared_addr;
auto &dconn_pool = shared_addr->dconn_pool;
auto dconn = dconn_pool.pop_downstream_connection();
if (!dconn) {
......@@ -731,25 +734,27 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< " Create new one";
}
if (group.proto == PROTO_HTTP2) {
if (group.http2_freelist.empty() ||
group.http2_freelist.size() < group.addrs.size()) {
if (shared_addr->proto == PROTO_HTTP2) {
auto &http2_freelist = shared_addr->http2_freelist;
if (http2_freelist.empty() ||
http2_freelist.size() < shared_addr->addrs.size()) {
if (LOG_ENABLED(INFO)) {
if (group.http2_freelist.empty()) {
if (http2_freelist.empty()) {
CLOG(INFO, this)
<< "http2_freelist is empty; create new Http2Session";
} else {
CLOG(INFO, this) << "Create new Http2Session; current "
<< group.http2_freelist.size() << ", min "
<< group.addrs.size();
<< http2_freelist.size() << ", min "
<< shared_addr->addrs.size();
}
}
auto session = make_unique<Http2Session>(
conn_.loop, worker_->get_cl_ssl_ctx(), worker_, &group);
group.http2_freelist.append(session.release());
http2_freelist.append(session.release());
}
auto http2session = group.http2_freelist.head;
auto http2session = http2_freelist.head;
if (http2session->max_concurrency_reached(1)) {
if (LOG_ENABLED(INFO)) {
......@@ -757,7 +762,7 @@ ClientHandler::get_downstream_connection(Downstream *downstream) {
<< http2session
<< "). Remove Http2Session from http2_freelist";
}
group.http2_freelist.remove(http2session);
http2_freelist.remove(http2session);
}
dconn = make_unique<Http2DownstreamConnection>(http2session);
......
......@@ -208,7 +208,7 @@ Http2Session::~Http2Session() {
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Removed from http2_freelist";
}
group_->http2_freelist.remove(this);
group_->shared_addr->http2_freelist.remove(this);
}
}
......@@ -280,7 +280,8 @@ int Http2Session::disconnect(bool hard) {
int Http2Session::initiate_connection() {
int rv = 0;
auto &addrs = group_->addrs;
auto &shared_addr = group_->shared_addr;
auto &addrs = shared_addr->addrs;
auto worker_blocker = worker_->get_connect_blocker();
if (state_ == DISCONNECTED) {
......@@ -292,7 +293,7 @@ int Http2Session::initiate_connection() {
return -1;
}
auto &next_downstream = group_->next;
auto &next_downstream = shared_addr->next;
auto end = next_downstream;
for (;;) {
......@@ -643,7 +644,7 @@ void Http2Session::remove_downstream_connection(
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to Http2Session freelist";
}
group_->http2_freelist.append(this);
group_->shared_addr->http2_freelist.append(this);
}
}
......@@ -1242,6 +1243,10 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0;
}
case NGHTTP2_GOAWAY:
SSLOG(WARN, http2session)
<< "GOAWAY received; error_code=" << frame->goaway.error_code;
return 0;
default:
return 0;
}
......@@ -2056,9 +2061,11 @@ int Http2Session::handle_downstream_push_promise_complete(
size_t Http2Session::get_num_dconns() const { return dconns_.size(); }
bool Http2Session::in_freelist() const {
auto &shared_addr = group_->shared_addr;
auto &http2_freelist = shared_addr->http2_freelist;
return dlnext != nullptr || dlprev != nullptr ||
group_->http2_freelist.head == this ||
group_->http2_freelist.tail == this;
http2_freelist.head == this || http2_freelist.tail == this;
}
bool Http2Session::max_concurrency_reached(size_t extra) const {
......
......@@ -158,8 +158,9 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
conn_.set_ssl(ssl);
}
auto &addrs = group_->addrs;
auto &next_downstream = group_->next;
auto &shared_addr = group_->shared_addr;
auto &addrs = shared_addr->addrs;
auto &next_downstream = shared_addr->next;
auto end = next_downstream;
for (;;) {
auto &addr = addrs[next_downstream];
......@@ -511,7 +512,8 @@ void idle_readcb(struct ev_loop *loop, ev_io *w, int revents) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection EOF";
}
auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool;
auto &dconn_pool =
dconn->get_downstream_addr_group()->shared_addr->dconn_pool;
dconn_pool.remove_downstream_connection(dconn);
// dconn was deleted
}
......@@ -524,7 +526,8 @@ void idle_timeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection timeout";
}
auto &dconn_pool = dconn->get_downstream_addr_group()->dconn_pool;
auto &dconn_pool =
dconn->get_downstream_addr_group()->shared_addr->dconn_pool;
dconn_pool.remove_downstream_connection(dconn);
// dconn was deleted
}
......
......@@ -62,6 +62,28 @@ void mcpool_clear_cb(struct ev_loop *loop, ev_timer *w, int revents) {
}
} // namespace
namespace {
bool match_shared_downstream_addr(
const std::shared_ptr<SharedDownstreamAddr> &lhs,
const std::shared_ptr<SharedDownstreamAddr> &rhs) {
if (lhs->addrs.size() != rhs->addrs.size() || lhs->proto != rhs->proto) {
return false;
}
for (auto &a : lhs->addrs) {
if (std::find_if(std::begin(rhs->addrs), std::end(rhs->addrs),
[&a](const DownstreamAddr &b) {
return a.host == b.host && a.port == b.port &&
a.host_unix == b.host_unix;
}) == std::end(rhs->addrs)) {
return false;
}
}
return true;
}
} // namespace
namespace {
std::random_device rd;
} // namespace
......@@ -103,12 +125,14 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
auto &dst = downstream_addr_groups_[i];
dst.pattern = src.pattern;
dst.addrs.resize(src.addrs.size());
dst.proto = src.proto;
auto shared_addr = std::make_shared<SharedDownstreamAddr>();
shared_addr->addrs.resize(src.addrs.size());
shared_addr->proto = src.proto;
for (size_t j = 0; j < src.addrs.size(); ++j) {
auto &src_addr = src.addrs[j];
auto &dst_addr = dst.addrs[j];
auto &dst_addr = shared_addr->addrs[j];
dst_addr.addr = src_addr.addr;
dst_addr.host = src_addr.host;
......@@ -118,6 +142,21 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
dst_addr.connect_blocker = make_unique<ConnectBlocker>(randgen_, loop_);
}
// share the connection if patterns have the same set of backend
// addresses.
auto end = std::begin(downstream_addr_groups_) + i;
auto it = std::find_if(std::begin(downstream_addr_groups_), end,
[&shared_addr](const DownstreamAddrGroup &group) {
return match_shared_downstream_addr(
group.shared_addr, shared_addr);
});
if (it == end) {
dst.shared_addr = shared_addr;
} else {
dst.shared_addr = (*it).shared_addr;
}
}
}
......
......@@ -83,8 +83,7 @@ struct DownstreamAddr {
TLSSessionCache tls_session_cache;
};
struct DownstreamAddrGroup {
ImmutableString pattern;
struct SharedDownstreamAddr {
std::vector<DownstreamAddr> addrs;
// Application protocol used in this group
shrpx_proto proto;
......@@ -101,6 +100,11 @@ struct DownstreamAddrGroup {
size_t next;
};
struct DownstreamAddrGroup {
ImmutableString pattern;
std::shared_ptr<SharedDownstreamAddr> shared_addr;
};
struct WorkerStat {
size_t num_connections;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment