Commit 0e8419ac authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Add backend-connections-per-frontend option

This option limits the number of backend connections per frontend.
This is meaningful for the combination of HTTP/2 and SPDY frontend and
HTTP/1 backend.
parent da08ba5d
......@@ -796,6 +796,7 @@ void fill_default_config()
mod_config()->no_location_rewrite = false;
mod_config()->argc = 0;
mod_config()->argv = nullptr;
mod_config()->max_downstream_connections = 100;
}
} // namespace
......@@ -891,6 +892,13 @@ Performance:
Set maximum number of simultaneous connections
frontend accepts. Setting 0 means unlimited.
Default: 0
--backend-connections-per-frontend=<NUM>
Set maximum number of backend simultaneous
connections per frontend. This option is
meaningful when the combination of HTTP/2 or SPDY
frontend and HTTP/1 backend is used.
Default: )"
<< get_config()->max_downstream_connections << R"(
Timeout:
--frontend-http2-read-timeout=<SEC>
......@@ -1246,6 +1254,7 @@ int main(int argc, char **argv)
{"stream-read-timeout", required_argument, &flag, 60},
{"stream-write-timeout", required_argument, &flag, 61},
{"no-location-rewrite", no_argument, &flag, 62},
{"backend-connections-per-frontend", required_argument, &flag, 63},
{nullptr, 0, nullptr, 0 }
};
......@@ -1518,6 +1527,11 @@ int main(int argc, char **argv)
// --no-location-rewrite
cmdcfgs.emplace_back(SHRPX_OPT_NO_LOCATION_REWRITE, "yes");
break;
case 63:
// --backend-connections-per-frontend
cmdcfgs.emplace_back(SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND,
optarg);
break;
default:
break;
}
......
......@@ -125,6 +125,8 @@ const char SHRPX_OPT_ADD_RESPONSE_HEADER[] = "add-response-header";
const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[] =
"worker-frontend-connections";
const char SHRPX_OPT_NO_LOCATION_REWRITE[] = "no-location-rewrite";
const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[] =
"backend-connections-per-frontend";
namespace {
Config *config = nullptr;
......@@ -840,6 +842,22 @@ int parse_config(const char *opt, const char *optarg)
return 0;
}
if(util::strieq(opt, SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND)) {
errno = 0;
auto n = strtoul(optarg, nullptr, 10);
if(errno != 0 || n < 1) {
LOG(ERROR) << "backend-connections-per-frontend: "
<< "specify the integer more than or equal to 1";
return -1;
}
mod_config()->max_downstream_connections = n;
return 0;
}
if(util::strieq(opt, "conf")) {
LOG(WARNING) << "conf is ignored";
......
......@@ -115,6 +115,7 @@ extern const char SHRPX_OPT_ALTSVC[];
extern const char SHRPX_OPT_ADD_RESPONSE_HEADER[];
extern const char SHRPX_OPT_WORKER_FRONTEND_CONNECTIONS[];
extern const char SHRPX_OPT_NO_LOCATION_REWRITE[];
extern const char SHRPX_OPT_BACKEND_CONNECTIONS_PER_FRONTEND[];
union sockaddr_union {
sockaddr sa;
......@@ -217,6 +218,7 @@ struct Config {
size_t http2_downstream_window_bits;
size_t http2_upstream_connection_window_bits;
size_t http2_downstream_connection_window_bits;
size_t max_downstream_connections;
// actual size of downstream_http_proxy_addr
size_t downstream_http_proxy_addrlen;
size_t read_rate;
......
......@@ -35,29 +35,87 @@ DownstreamQueue::DownstreamQueue()
DownstreamQueue::~DownstreamQueue()
{
for(auto& kv : downstreams_) {
for(auto& kv : pending_downstreams_) {
delete kv.second;
}
for(auto& kv : active_downstreams_) {
delete kv.second;
}
for(auto& kv : failure_downstreams_) {
delete kv.second;
}
}
void DownstreamQueue::add_pending(Downstream *downstream)
{
pending_downstreams_[downstream->get_stream_id()] = downstream;
}
void DownstreamQueue::add_failure(Downstream *downstream)
{
failure_downstreams_[downstream->get_stream_id()] = downstream;
}
void DownstreamQueue::add(Downstream *downstream)
void DownstreamQueue::add_active(Downstream *downstream)
{
downstreams_[downstream->get_stream_id()] = downstream;
active_downstreams_[downstream->get_stream_id()] = downstream;
}
void DownstreamQueue::remove(Downstream *downstream)
{
downstreams_.erase(downstream->get_stream_id());
pending_downstreams_.erase(downstream->get_stream_id());
active_downstreams_.erase(downstream->get_stream_id());
failure_downstreams_.erase(downstream->get_stream_id());
}
Downstream* DownstreamQueue::find(int32_t stream_id)
{
auto kv = downstreams_.find(stream_id);
if(kv == std::end(downstreams_)) {
return nullptr;
} else {
auto kv = pending_downstreams_.find(stream_id);
if(kv != std::end(pending_downstreams_)) {
return (*kv).second;
}
kv = active_downstreams_.find(stream_id);
if(kv != std::end(active_downstreams_)) {
return (*kv).second;
}
kv = failure_downstreams_.find(stream_id);
if(kv != std::end(failure_downstreams_)) {
return (*kv).second;
}
return nullptr;
}
Downstream* DownstreamQueue::pop_pending()
{
auto i = std::begin(pending_downstreams_);
if(i == std::end(pending_downstreams_)) {
return nullptr;
}
auto downstream = (*i).second;
pending_downstreams_.erase(i);
return downstream;
}
size_t DownstreamQueue::num_active() const
{
return active_downstreams_.size();
}
bool DownstreamQueue::pending_empty() const
{
return pending_downstreams_.empty();
}
} // namespace shrpx
......@@ -39,11 +39,30 @@ class DownstreamQueue {
public:
DownstreamQueue();
~DownstreamQueue();
void add(Downstream *downstream);
void add_pending(Downstream *downstream);
void add_failure(Downstream *downstream);
void add_active(Downstream *downstream);
// Removes |downstream| from either pending_downstreams_,
// active_downstreams_ or failure_downstreams_.
void remove(Downstream *downstream);
// Finds Downstream object denoted by |stream_id| either in
// pending_downstreams_, active_downstreams_ or
// failure_downstreams_.
Downstream* find(int32_t stream_id);
// Returns the number of active Downstream objects.
size_t num_active() const;
// Returns true if pending_downstreams_ is empty.
bool pending_empty() const;
// Pops first Downstream object in pending_downstreams_ and returns
// it.
Downstream* pop_pending();
private:
std::map<int32_t, Downstream*> downstreams_;
// Downstream objects, not processed yet
std::map<int32_t, Downstream*> pending_downstreams_;
// Downstream objects in use, consuming downstream concurrency limit
std::map<int32_t, Downstream*> active_downstreams_;
// Downstream objects, failed to connect to downstream server
std::map<int32_t, Downstream*> failure_downstreams_;
};
} // namespace shrpx
......
......@@ -295,8 +295,6 @@ int on_request_headers(Http2Upstream *upstream,
nghttp2_session *session,
const nghttp2_frame *frame)
{
int rv;
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
return 0;
}
......@@ -370,37 +368,66 @@ int on_request_headers(Http2Upstream *upstream,
downstream->inspect_http2_request();
auto dconn = upstream->get_client_handler()->get_downstream_connection();
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->disable_upstream_rtimer();
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
upstream->maintain_downstream_concurrency();
return 0;
}
} // namespace
void Http2Upstream::maintain_downstream_concurrency()
{
while(get_config()->max_downstream_connections >
downstream_queue_.num_active()) {
auto downstream = downstream_queue_.pop_pending();
if(!downstream) {
break;
}
initiate_downstream(downstream);
}
}
void Http2Upstream::initiate_downstream(Downstream *downstream)
{
int rv;
auto dconn = handler_->get_downstream_connection();
rv = dconn->attach_downstream(downstream);
if(rv != 0) {
downstream_queue_.add_failure(downstream);
// downstream connection fails, send error page
if(upstream->error_reply(downstream, 503) != 0) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
if(error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
downstream->set_request_state(Downstream::CONNECT_FAIL);
return 0;
return;
}
rv = downstream->push_request_headers();
if(rv != 0) {
if(upstream->error_reply(downstream, 503) != 0) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
downstream_queue_.add_failure(downstream);
return 0;
if(error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
downstream->disable_upstream_rtimer();
downstream->set_request_state(Downstream::MSG_COMPLETE);
return;
}
return 0;
}
downstream_queue_.add_active(downstream);
} // namespace
return;
}
namespace {
int on_frame_recv_callback
......@@ -1143,12 +1170,14 @@ bufferevent_event_cb Http2Upstream::get_downstream_eventcb()
void Http2Upstream::add_downstream(Downstream *downstream)
{
downstream_queue_.add(downstream);
downstream_queue_.add_pending(downstream);
}
void Http2Upstream::remove_downstream(Downstream *downstream)
{
downstream_queue_.remove(downstream);
maintain_downstream_concurrency();
}
Downstream* Http2Upstream::find_downstream(int32_t stream_id)
......
......@@ -82,6 +82,8 @@ public:
int consume(int32_t stream_id, size_t len);
void log_response_headers(Downstream *downstream,
const std::vector<nghttp2_nv>& nva) const;
void maintain_downstream_concurrency();
void initiate_downstream(Downstream *downstream);
private:
DownstreamQueue downstream_queue_;
std::unique_ptr<HttpsUpstream> pre_upstream_;
......
......@@ -230,24 +230,14 @@ void on_ctrl_recv_callback
<< "\n" << ss.str();
}
auto dconn = upstream->get_client_handler()->get_downstream_connection();
int rv = dconn->attach_downstream(downstream);
if(rv != 0) {
// If downstream connection fails, issue RST_STREAM.
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
return;
}
rv = downstream->push_request_headers();
if(rv != 0) {
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
return;
}
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(frame->syn_stream.hd.flags & SPDYLAY_CTRL_FLAG_FIN) {
downstream->disable_upstream_rtimer();
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
upstream->maintain_downstream_concurrency();
break;
}
default:
......@@ -256,6 +246,43 @@ void on_ctrl_recv_callback
}
} // namespace
void SpdyUpstream::maintain_downstream_concurrency()
{
while(get_config()->max_downstream_connections >
downstream_queue_.num_active()) {
auto downstream = downstream_queue_.pop_pending();
if(!downstream) {
break;
}
initiate_downstream(downstream);
}
}
void SpdyUpstream::initiate_downstream(Downstream *downstream)
{
auto dconn = handler_->get_downstream_connection();
int rv = dconn->attach_downstream(downstream);
if(rv != 0) {
downstream_queue_.add_failure(downstream);
// If downstream connection fails, issue RST_STREAM.
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
return;
}
rv = downstream->push_request_headers();
if(rv != 0) {
downstream_queue_.add_failure(downstream);
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
return;
}
downstream_queue_.add_active(downstream);
}
namespace {
void on_data_chunk_recv_callback(spdylay_session *session,
uint8_t flags, int32_t stream_id,
......@@ -876,12 +903,14 @@ bufferevent_event_cb SpdyUpstream::get_downstream_eventcb()
void SpdyUpstream::add_downstream(Downstream *downstream)
{
downstream_queue_.add(downstream);
downstream_queue_.add_pending(downstream);
}
void SpdyUpstream::remove_downstream(Downstream *downstream)
{
downstream_queue_.remove(downstream);
maintain_downstream_concurrency();
}
Downstream* SpdyUpstream::find_downstream(int32_t stream_id)
......
......@@ -74,6 +74,9 @@ public:
int handle_ign_data_chunk(size_t len);
void maintain_downstream_concurrency();
void initiate_downstream(Downstream *downstream);
nghttp2::util::EvbufferBuffer sendbuf;
private:
DownstreamQueue downstream_queue_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment