Commit c9a4f293 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: ConnectBlocker per backend address

parent 61579ad2
......@@ -389,7 +389,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
pinned_http2sessions_(
get_config()->conn.downstream.proto == PROTO_HTTP2
? make_unique<std::vector<ssize_t>>(
get_config()->conn.downstream.addr_groups.size(), -1)
worker->get_downstream_addr_groups().size(), -1)
: nullptr),
ipaddr_(ipaddr),
port_(port),
......@@ -664,8 +664,8 @@ std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection(Downstream *downstream) {
size_t group;
auto &downstreamconf = get_config()->conn.downstream;
auto &groups = downstreamconf.addr_groups;
auto catch_all = downstreamconf.addr_group_catch_all;
auto &groups = worker_->get_downstream_addr_groups();
const auto &req = downstream->request();
......@@ -746,10 +746,6 @@ MemchunkPool *ClientHandler::get_mcpool() { return worker_->get_mcpool(); }
SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
ConnectBlocker *ClientHandler::get_connect_blocker() const {
return worker_->get_connect_blocker();
}
void ClientHandler::direct_http2_upgrade() {
upstream_ = make_unique<Http2Upstream>(this);
alpn_ = NGHTTP2_CLEARTEXT_PROTO_VERSION_ID;
......
......@@ -99,7 +99,6 @@ public:
get_downstream_connection(Downstream *downstream);
MemchunkPool *get_mcpool();
SSL *get_ssl() const;
ConnectBlocker *get_connect_blocker() const;
// Call this function when HTTP/2 connection header is received at
// the start of the connection.
void direct_http2_upgrade();
......
......@@ -59,6 +59,7 @@ using namespace nghttp2;
namespace shrpx {
struct LogFragment;
class ConnectBlocker;
namespace ssl {
......@@ -294,6 +295,7 @@ struct DownstreamAddr {
// socket path.
ImmutableString host;
ImmutableString hostport;
ConnectBlocker *connect_blocker;
// backend port. 0 if |host_unix| is true.
uint16_t port;
// true if |host| contains UNIX domain socket path.
......
......@@ -26,20 +26,16 @@
namespace shrpx {
namespace {
const ev_tstamp INITIAL_SLEEP = 2.;
} // namespace
namespace {
void connect_blocker_cb(struct ev_loop *loop, ev_timer *w, int revents) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "unblock downstream connection";
LOG(INFO) << "Unblock";
}
}
} // namespace
ConnectBlocker::ConnectBlocker(struct ev_loop *loop)
: loop_(loop), sleep_(INITIAL_SLEEP) {
ConnectBlocker::ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop)
: gen_(gen), loop_(loop), fail_count_(0) {
ev_timer_init(&timer_, connect_blocker_cb, 0., 0.);
}
......@@ -47,18 +43,27 @@ ConnectBlocker::~ConnectBlocker() { ev_timer_stop(loop_, &timer_); }
bool ConnectBlocker::blocked() const { return ev_is_active(&timer_); }
void ConnectBlocker::on_success() { sleep_ = INITIAL_SLEEP; }
void ConnectBlocker::on_success() { fail_count_ = 0; }
namespace {
constexpr size_t MAX_BACKOFF_EXP = 10;
} // namespace
void ConnectBlocker::on_failure() {
if (ev_is_active(&timer_)) {
return;
}
sleep_ = std::min(128., sleep_ * 2);
++fail_count_;
auto max_backoff = (1 << std::min(MAX_BACKOFF_EXP, fail_count_)) - 1;
auto dist = std::uniform_int_distribution<>(0, max_backoff);
auto backoff = dist(gen_);
LOG(WARN) << "connect failure, start sleeping " << sleep_;
LOG(WARN) << "Could not connect " << fail_count_
<< " times in a row; sleep for " << backoff << " seconds";
ev_timer_set(&timer_, sleep_, 0.);
ev_timer_set(&timer_, backoff, 0.);
ev_timer_start(loop_, &timer_);
}
......
......@@ -27,13 +27,15 @@
#include "shrpx.h"
#include <random>
#include <ev.h>
namespace shrpx {
class ConnectBlocker {
public:
ConnectBlocker(struct ev_loop *loop);
ConnectBlocker(std::mt19937 &gen, struct ev_loop *loop);
~ConnectBlocker();
// Returns true if making connection is not allowed.
......@@ -41,14 +43,18 @@ public:
// Call this function if connect operation succeeded. This will
// reset sleep_ to minimum value.
void on_success();
// Call this function if connect operation failed. This will start
// timer and blocks connection establishment for sleep_ seconds.
// Call this function if connect operations failed. This will start
// timer and blocks connection establishment with exponential
// backoff.
void on_failure();
private:
std::mt19937 gen_;
ev_timer timer_;
struct ev_loop *loop_;
ev_tstamp sleep_;
// The number of consecutive connection failure. Reset to 0 on
// success.
size_t fail_count_;
};
} // namespace
......
......@@ -98,6 +98,9 @@ int Http2DownstreamConnection::attach_downstream(Downstream *downstream) {
http2session_->add_downstream_connection(this);
if (http2session_->get_state() == Http2Session::DISCONNECTED) {
http2session_->signal_write();
if (http2session_->get_state() == Http2Session::DISCONNECTED) {
return -1;
}
}
downstream_ = downstream;
......
......@@ -147,8 +147,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
ConnectBlocker *connect_blocker, Worker *worker,
size_t group, size_t idx)
Worker *worker, size_t group, size_t idx)
: conn_(loop, -1, nullptr, worker->get_mcpool(),
get_config()->conn.downstream.timeout.write,
get_config()->conn.downstream.timeout.read, {}, {}, writecb, readcb,
......@@ -156,7 +155,6 @@ Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
get_config()->tls.dyn_rec.idle_timeout),
wb_(worker->get_mcpool()),
worker_(worker),
connect_blocker_(connect_blocker),
ssl_ctx_(ssl_ctx),
addr_(nullptr),
session_(nullptr),
......@@ -254,30 +252,49 @@ int Http2Session::disconnect(bool hard) {
int Http2Session::initiate_connection() {
int rv = 0;
auto &addrs = get_config()->conn.downstream.addr_groups[group_].addrs;
auto &groups = worker_->get_downstream_addr_groups();
auto &addrs = groups[group_].addrs;
if (state_ == DISCONNECTED) {
if (connect_blocker_->blocked()) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this)
<< "Downstream connection was blocked by connect_blocker";
auto &next_downstream = worker_->get_dgrp(group_)->next;
auto end = next_downstream;
for (;;) {
auto &addr = addrs[next_downstream];
if (++next_downstream >= addrs.size()) {
next_downstream = 0;
}
return -1;
}
auto &next_downstream = worker_->get_dgrp(group_)->next;
addr_ = &addrs[next_downstream];
auto &connect_blocker = addr.connect_blocker;
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Using downstream address idx=" << next_downstream
<< " out of " << addrs.size();
}
if (connect_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Backend server "
<< (addr.host_unix ? addr.host : addr.hostport)
<< " was not available temporarily";
}
if (end == next_downstream) {
return -1;
}
continue;
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Using downstream address idx=" << next_downstream
<< " out of " << addrs.size();
}
addr_ = &addr;
if (++next_downstream >= addrs.size()) {
next_downstream = 0;
break;
}
}
auto &connect_blocker = addr_->connect_blocker;
const auto &proxy = get_config()->downstream_http_proxy;
if (!proxy.host.empty() && state_ == DISCONNECTED) {
if (LOG_ENABLED(INFO)) {
......@@ -288,7 +305,7 @@ int Http2Session::initiate_connection() {
conn_.fd = util::create_nonblock_socket(proxy.addr.su.storage.ss_family);
if (conn_.fd == -1) {
connect_blocker_->on_failure();
connect_blocker->on_failure();
return -1;
}
......@@ -296,7 +313,7 @@ int Http2Session::initiate_connection() {
if (rv != 0 && errno != EINPROGRESS) {
SSLOG(ERROR, this) << "Failed to connect to the proxy " << proxy.host
<< ":" << proxy.port;
connect_blocker_->on_failure();
connect_blocker->on_failure();
return -1;
}
......@@ -356,7 +373,7 @@ int Http2Session::initiate_connection() {
conn_.fd =
util::create_nonblock_socket(addr_->addr.su.storage.ss_family);
if (conn_.fd == -1) {
connect_blocker_->on_failure();
connect_blocker->on_failure();
return -1;
}
......@@ -365,7 +382,7 @@ int Http2Session::initiate_connection() {
const_cast<sockaddr *>(&addr_->addr.su.sa),
addr_->addr.len);
if (rv != 0 && errno != EINPROGRESS) {
connect_blocker_->on_failure();
connect_blocker->on_failure();
return -1;
}
......@@ -383,14 +400,14 @@ int Http2Session::initiate_connection() {
util::create_nonblock_socket(addr_->addr.su.storage.ss_family);
if (conn_.fd == -1) {
connect_blocker_->on_failure();
connect_blocker->on_failure();
return -1;
}
rv = connect(conn_.fd, const_cast<sockaddr *>(&addr_->addr.su.sa),
addr_->addr.len);
if (rv != 0 && errno != EINPROGRESS) {
connect_blocker_->on_failure();
connect_blocker->on_failure();
return -1;
}
......@@ -1615,11 +1632,15 @@ int Http2Session::read_noop(const uint8_t *data, size_t datalen) { return 0; }
int Http2Session::write_noop() { return 0; }
int Http2Session::connected() {
auto &connect_blocker = addr_->connect_blocker;
if (!util::check_socket_connected(conn_.fd)) {
connect_blocker->on_failure();
return -1;
}
connect_blocker_->on_success();
connect_blocker->on_success();
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Connection established";
......
......@@ -48,7 +48,6 @@ namespace shrpx {
class Http2DownstreamConnection;
class Worker;
class ConnectBlocker;
struct StreamData {
StreamData *dlnext, *dlprev;
......@@ -57,9 +56,8 @@ struct StreamData {
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
ConnectBlocker *connect_blocker, Worker *worker, size_t group,
size_t idx);
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker,
size_t group, size_t idx);
~Http2Session();
// If hard is true, all pending requests are abandoned and
......@@ -203,7 +201,6 @@ private:
// Used to parse the response from HTTP proxy
std::unique_ptr<http_parser> proxy_htp_;
Worker *worker_;
ConnectBlocker *connect_blocker_;
// NULL if no TLS is configured
SSL_CTX *ssl_ctx_;
// Address of remote endpoint
......
......@@ -147,16 +147,6 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
auto &downstreamconf = get_config()->conn.downstream;
if (conn_.fd == -1) {
auto connect_blocker = client_handler_->get_connect_blocker();
if (connect_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this)
<< "Downstream connection was blocked by connect_blocker";
}
return -1;
}
if (ssl_ctx_) {
auto ssl = ssl::create_ssl(ssl_ctx_);
if (!ssl) {
......@@ -168,7 +158,8 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
auto &next_downstream = worker_->get_dgrp(group_)->next;
auto end = next_downstream;
auto &addrs = downstreamconf.addr_groups[group_].addrs;
auto &groups = worker_->get_downstream_addr_groups();
auto &addrs = groups[group_].addrs;
for (;;) {
auto &addr = addrs[next_downstream];
......@@ -176,6 +167,22 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
next_downstream = 0;
}
auto &connect_blocker = addr.connect_blocker;
if (connect_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this) << "Backend server "
<< (addr.host_unix ? addr.host : addr.hostport)
<< " was not available temporarily";
}
if (end == next_downstream) {
return SHRPX_ERR_NETWORK;
}
continue;
}
conn_.fd = util::create_nonblock_socket(addr.addr.su.storage.ss_family);
if (conn_.fd == -1) {
......@@ -1012,7 +1019,7 @@ int HttpDownstreamConnection::process_input(const uint8_t *data,
}
int HttpDownstreamConnection::connected() {
auto connect_blocker = client_handler_->get_connect_blocker();
auto connect_blocker = addr_->connect_blocker;
if (!util::check_socket_connected(conn_.fd)) {
conn_.wlimit.stopw();
......@@ -1021,6 +1028,8 @@ int HttpDownstreamConnection::connected() {
DLOG(INFO, this) << "downstream connect failed";
}
connect_blocker->on_failure();
downstream_->set_request_state(Downstream::CONNECT_FAIL);
return -1;
......
......@@ -80,7 +80,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
cl_ssl_ctx_(cl_ssl_ctx),
cert_tree_(cert_tree),
ticket_keys_(ticket_keys),
connect_blocker_(make_unique<ConnectBlocker>(loop_)),
downstream_addr_groups_(get_config()->conn.downstream.addr_groups),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
w_.data = this;
......@@ -109,17 +109,29 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
m = downstreamconf.addr_groups[group].addrs.size();
}
for (size_t idx = 0; idx < m; ++idx) {
dgrp.http2sessions.push_back(make_unique<Http2Session>(
loop_, cl_ssl_ctx, connect_blocker_.get(), this, group, idx));
dgrp.http2sessions.push_back(
make_unique<Http2Session>(loop_, cl_ssl_ctx, this, group, idx));
}
++group;
}
}
for (auto &group : downstream_addr_groups_) {
for (auto &addr : group.addrs) {
addr.connect_blocker = new ConnectBlocker(randgen_, loop_);
}
}
}
Worker::~Worker() {
ev_async_stop(loop_, &w_);
ev_timer_stop(loop_, &mcpool_clear_timer_);
for (auto &group : downstream_addr_groups_) {
for (auto &addr : group.addrs) {
delete addr.connect_blocker;
}
}
}
void Worker::schedule_clear_mcpool() {
......@@ -259,10 +271,6 @@ Http2Session *Worker::next_http2_session(size_t group) {
return res;
}
ConnectBlocker *Worker::get_connect_blocker() const {
return connect_blocker_.get();
}
struct ev_loop *Worker::get_loop() const {
return loop_;
}
......@@ -361,4 +369,8 @@ SSL_SESSION *Worker::reuse_client_tls_session(const Address *addr) {
return d2i_SSL_SESSION(nullptr, &p, ent.session_data.size());
}
std::vector<DownstreamAddrGroup> &Worker::get_downstream_addr_groups() {
return downstream_addr_groups_;
}
} // namespace shrpx
......@@ -131,7 +131,6 @@ public:
WorkerStat *get_worker_stat();
DownstreamConnectionPool *get_dconn_pool();
Http2Session *next_http2_session(size_t group);
ConnectBlocker *get_connect_blocker() const;
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
......@@ -164,6 +163,8 @@ public:
// found associated to |addr|, nullptr will be returned.
SSL_SESSION *reuse_client_tls_session(const Address *addr);
std::vector<DownstreamAddrGroup> &get_downstream_addr_groups();
private:
#ifndef NOTHREADS
std::future<void> fut_;
......@@ -196,7 +197,7 @@ private:
ssl::CertLookupTree *cert_tree_;
std::shared_ptr<TicketKeys> ticket_keys_;
std::unique_ptr<ConnectBlocker> connect_blocker_;
std::vector<DownstreamAddrGroup> downstream_addr_groups_;
bool graceful_shutdown_;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment