Commit 446de923 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Support multiple HTTP/2 session per worker

Currently, we use same number of HTTP/2 sessions per worker with given
backend addresses.  New option to specify the number of HTTP/2 session
per worker will follow.
parent c5860fc6
......@@ -918,6 +918,7 @@ void fill_default_config() {
mod_config()->no_server_push = false;
mod_config()->host_unix = false;
mod_config()->http2_downstream_connchk = false;
mod_config()->http2_downstream_connections_per_worker = 0;
}
} // namespace
......@@ -2064,6 +2065,11 @@ int main(int argc, char **argv) {
}
}
if (get_config()->http2_downstream_connections_per_worker == 0) {
mod_config()->http2_downstream_connections_per_worker =
get_config()->downstream_addrs.size();
}
if (get_config()->rlimit_nofile) {
struct rlimit lim = {static_cast<rlim_t>(get_config()->rlimit_nofile),
static_cast<rlim_t>(get_config()->rlimit_nofile)};
......
......@@ -576,6 +576,9 @@ void ClientHandler::set_should_close_after_write(bool f) {
void ClientHandler::pool_downstream_connection(
std::unique_ptr<DownstreamConnection> dconn) {
if (!dconn->poolable()) {
return;
}
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get();
}
......@@ -605,7 +608,7 @@ ClientHandler::get_downstream_connection() {
}
auto dconn_pool = worker_->get_dconn_pool();
auto http2session = worker_->get_http2_session();
auto http2session = worker_->next_http2_session();
if (http2session) {
dconn = make_unique<Http2DownstreamConnection>(dconn_pool, http2session);
......@@ -628,12 +631,8 @@ ClientHandler::get_downstream_connection() {
SSL *ClientHandler::get_ssl() const { return conn_.tls.ssl; }
Http2Session *ClientHandler::get_http2_session() const {
return worker_->get_http2_session();
}
ConnectBlocker *ClientHandler::get_http1_connect_blocker() const {
return worker_->get_http1_connect_blocker();
ConnectBlocker *ClientHandler::get_connect_blocker() const {
return worker_->get_connect_blocker();
}
void ClientHandler::direct_http2_upgrade() {
......
......@@ -93,8 +93,7 @@ public:
void remove_downstream_connection(DownstreamConnection *dconn);
std::unique_ptr<DownstreamConnection> get_downstream_connection();
SSL *get_ssl() const;
Http2Session *get_http2_session() const;
ConnectBlocker *get_http1_connect_blocker() const;
ConnectBlocker *get_connect_blocker() const;
// Call this function when HTTP/2 connection header is received at
// the start of the connection.
void direct_http2_upgrade();
......
......@@ -260,6 +260,7 @@ struct Config {
size_t http2_downstream_window_bits;
size_t http2_upstream_connection_window_bits;
size_t http2_downstream_connection_window_bits;
size_t http2_downstream_connections_per_worker;
size_t downstream_connections_per_host;
size_t downstream_connections_per_frontend;
// actual size of downstream_http_proxy_addr
......
......@@ -58,6 +58,9 @@ public:
virtual void on_upstream_change(Upstream *uptream) = 0;
virtual int on_priority_change(int32_t pri) = 0;
// true if this object is poolable.
virtual bool poolable() const = 0;
void set_client_handler(ClientHandler *client_handler);
ClientHandler *get_client_handler();
Downstream *get_downstream();
......
......@@ -62,6 +62,10 @@ public:
virtual void on_upstream_change(Upstream *upstream) {}
virtual int on_priority_change(int32_t pri);
// This object is not poolable because we dont' have facility to
// migrate to another Http2Session object.
virtual bool poolable() const { return false; }
int send();
void attach_stream_data(StreamData *sd);
......
......@@ -40,6 +40,7 @@
#include "shrpx_ssl.h"
#include "shrpx_http.h"
#include "shrpx_worker.h"
#include "shrpx_connect_blocker.h"
#include "http2.h"
#include "util.h"
#include "base64.h"
......@@ -140,14 +141,14 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
} // namespace
Http2Session::Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
Worker *worker)
ConnectBlocker *connect_blocker, Worker *worker)
: conn_(loop, -1, nullptr, get_config()->downstream_write_timeout,
get_config()->downstream_read_timeout, 0, 0, 0, 0, writecb, readcb,
timeoutcb, this),
worker_(worker), ssl_ctx_(ssl_ctx), session_(nullptr),
data_pending_(nullptr), data_pendinglen_(0), addr_idx_(0),
state_(DISCONNECTED), connection_check_state_(CONNECTION_CHECK_NONE),
flow_control_(false) {
worker_(worker), connect_blocker_(connect_blocker), ssl_ctx_(ssl_ctx),
session_(nullptr), data_pending_(nullptr), data_pendinglen_(0),
addr_idx_(0), state_(DISCONNECTED),
connection_check_state_(CONNECTION_CHECK_NONE), flow_control_(false) {
read_ = write_ = &Http2Session::noop;
on_read_ = on_write_ = &Http2Session::noop;
......@@ -237,6 +238,14 @@ int Http2Session::initiate_connection() {
int rv = 0;
if (state_ == DISCONNECTED) {
if (connect_blocker_->blocked()) {
if (LOG_ENABLED(INFO)) {
DCLOG(INFO, this)
<< "Downstream connection was blocked by connect_blocker";
}
return -1;
}
auto worker_stat = worker_->get_worker_stat();
addr_idx_ = worker_stat->next_downstream;
++worker_stat->next_downstream;
......@@ -261,6 +270,7 @@ int Http2Session::initiate_connection() {
get_config()->downstream_http_proxy_addr.storage.ss_family);
if (conn_.fd == -1) {
connect_blocker_->on_failure();
return -1;
}
......@@ -270,6 +280,7 @@ int Http2Session::initiate_connection() {
SSLOG(ERROR, this) << "Failed to connect to the proxy "
<< get_config()->downstream_http_proxy_host.get()
<< ":" << get_config()->downstream_http_proxy_port;
connect_blocker_->on_failure();
return -1;
}
......@@ -329,6 +340,7 @@ int Http2Session::initiate_connection() {
conn_.fd = util::create_nonblock_socket(
downstream_addr.addr.storage.ss_family);
if (conn_.fd == -1) {
connect_blocker_->on_failure();
return -1;
}
......@@ -337,6 +349,7 @@ int Http2Session::initiate_connection() {
const_cast<sockaddr *>(&downstream_addr.addr.sa),
downstream_addr.addrlen);
if (rv != 0 && errno != EINPROGRESS) {
connect_blocker_->on_failure();
return -1;
}
......@@ -358,12 +371,14 @@ int Http2Session::initiate_connection() {
downstream_addr.addr.storage.ss_family);
if (conn_.fd == -1) {
connect_blocker_->on_failure();
return -1;
}
rv = connect(conn_.fd, const_cast<sockaddr *>(&downstream_addr.addr.sa),
downstream_addr.addrlen);
if (rv != 0 && errno != EINPROGRESS) {
connect_blocker_->on_failure();
return -1;
}
......@@ -1377,7 +1392,9 @@ void Http2Session::signal_write() {
LOG(INFO) << "Start connecting to backend server";
}
if (initiate_connection() != 0) {
SSLOG(FATAL, this) << "Could not initiate backend connection";
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Could not initiate backend connection";
}
disconnect(true);
}
break;
......@@ -1513,6 +1530,8 @@ int Http2Session::connected() {
return -1;
}
connect_blocker_->on_success();
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Connection established";
}
......
......@@ -47,6 +47,7 @@ namespace shrpx {
class Http2DownstreamConnection;
class Worker;
class ConnectBlocker;
struct StreamData {
Http2DownstreamConnection *dconn;
......@@ -54,7 +55,8 @@ struct StreamData {
class Http2Session {
public:
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx, Worker *worker);
Http2Session(struct ev_loop *loop, SSL_CTX *ssl_ctx,
ConnectBlocker *connect_blocker, Worker *worker);
~Http2Session();
int check_cert();
......@@ -192,6 +194,7 @@ private:
// Used to parse the response from HTTP proxy
std::unique_ptr<http_parser> proxy_htp_;
Worker *worker_;
ConnectBlocker *connect_blocker_;
// NULL if no TLS is configured
SSL_CTX *ssl_ctx_;
nghttp2_session *session_;
......
......@@ -1200,8 +1200,6 @@ Downstream *Http2Upstream::find_downstream(int32_t stream_id) {
return downstream_queue_.find(stream_id);
}
nghttp2_session *Http2Upstream::get_http2_session() { return session_; }
// WARNING: Never call directly or indirectly nghttp2_session_send or
// nghttp2_session_recv. These calls may delete downstream.
int Http2Upstream::on_downstream_header_complete(Downstream *downstream) {
......
......@@ -64,8 +64,6 @@ public:
void remove_downstream(Downstream *downstream);
Downstream *find_downstream(int32_t stream_id);
nghttp2_session *get_http2_session();
int rst_stream(Downstream *downstream, uint32_t error_code);
int terminate_session(uint32_t error_code);
int error_reply(Downstream *downstream, unsigned int status_code);
......
......@@ -130,7 +130,7 @@ int HttpDownstreamConnection::attach_downstream(Downstream *downstream) {
}
if (conn_.fd == -1) {
auto connect_blocker = client_handler_->get_http1_connect_blocker();
auto connect_blocker = client_handler_->get_connect_blocker();
if (connect_blocker->blocked()) {
if (LOG_ENABLED(INFO)) {
......@@ -768,7 +768,7 @@ int HttpDownstreamConnection::on_write() {
}
int HttpDownstreamConnection::on_connect() {
auto connect_blocker = client_handler_->get_http1_connect_blocker();
auto connect_blocker = client_handler_->get_connect_blocker();
if (!util::check_socket_connected(conn_.fd)) {
conn_.wlimit.stopw();
......
......@@ -59,6 +59,8 @@ public:
virtual void on_upstream_change(Upstream *upstream);
virtual int on_priority_change(int32_t pri) { return 0; }
virtual bool poolable() const { return true; }
int on_connect();
void signal_write();
......
......@@ -821,8 +821,6 @@ Downstream *SpdyUpstream::find_downstream(int32_t stream_id) {
return downstream_queue_.find(stream_id);
}
spdylay_session *SpdyUpstream::get_http2_session() { return session_; }
// WARNING: Never call directly or indirectly spdylay_session_send or
// spdylay_session_recv. These calls may delete downstream.
int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) {
......
......@@ -60,8 +60,6 @@ public:
void remove_downstream(Downstream *downstream);
Downstream *find_downstream(int32_t stream_id);
spdylay_session *get_http2_session();
int rst_stream(Downstream *downstream, int status_code);
int error_reply(Downstream *downstream, unsigned int status_code);
......
......@@ -51,17 +51,20 @@ void eventcb(struct ev_loop *loop, ev_async *w, int revents) {
Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
ssl::CertLookupTree *cert_tree,
const std::shared_ptr<TicketKeys> &ticket_keys)
: loop_(loop), sv_ssl_ctx_(sv_ssl_ctx), cl_ssl_ctx_(cl_ssl_ctx),
cert_tree_(cert_tree), ticket_keys_(ticket_keys),
: next_http2session_(0), loop_(loop), sv_ssl_ctx_(sv_ssl_ctx),
cl_ssl_ctx_(cl_ssl_ctx), cert_tree_(cert_tree), ticket_keys_(ticket_keys),
connect_blocker_(make_unique<ConnectBlocker>(loop_)),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
w_.data = this;
ev_async_start(loop_, &w_);
if (get_config()->downstream_proto == PROTO_HTTP2) {
http2session_ = make_unique<Http2Session>(loop_, cl_ssl_ctx, this);
} else {
http1_connect_blocker_ = make_unique<ConnectBlocker>(loop_);
auto n = get_config()->http2_downstream_connections_per_worker;
for (; n > 0; --n) {
http2sessions_.push_back(make_unique<Http2Session>(
loop_, cl_ssl_ctx, connect_blocker_.get(), this));
}
}
}
......@@ -185,10 +188,22 @@ WorkerStat *Worker::get_worker_stat() { return &worker_stat_; }
DownstreamConnectionPool *Worker::get_dconn_pool() { return &dconn_pool_; }
Http2Session *Worker::get_http2_session() const { return http2session_.get(); }
Http2Session *Worker::next_http2_session() {
if (http2sessions_.empty()) {
return nullptr;
}
auto res = http2sessions_[next_http2session_].get();
++next_http2session_;
if (next_http2session_ >= http2sessions_.size()) {
next_http2session_ = 0;
}
return res;
}
ConnectBlocker *Worker::get_http1_connect_blocker() const {
return http1_connect_blocker_.get();
ConnectBlocker *Worker::get_connect_blocker() const {
return connect_blocker_.get();
}
struct ev_loop *Worker::get_loop() const {
......
......@@ -29,6 +29,7 @@
#include <mutex>
#include <deque>
#include <vector>
#include <thread>
#ifndef NOTHREADS
#include <future>
......@@ -94,8 +95,8 @@ public:
void set_ticket_keys(std::shared_ptr<TicketKeys> ticket_keys);
WorkerStat *get_worker_stat();
DownstreamConnectionPool *get_dconn_pool();
Http2Session *get_http2_session() const;
ConnectBlocker *get_http1_connect_blocker() const;
Http2Session *next_http2_session();
ConnectBlocker *get_connect_blocker() const;
struct ev_loop *get_loop() const;
SSL_CTX *get_sv_ssl_ctx() const;
SSL_CTX *get_cl_ssl_ctx() const;
......@@ -104,6 +105,8 @@ public:
bool get_graceful_shutdown() const;
private:
std::vector<std::unique_ptr<Http2Session>> http2sessions_;
size_t next_http2session_;
#ifndef NOTHREADS
std::future<void> fut_;
#endif // NOTHREADS
......@@ -121,8 +124,7 @@ private:
ssl::CertLookupTree *cert_tree_;
std::shared_ptr<TicketKeys> ticket_keys_;
std::unique_ptr<Http2Session> http2session_;
std::unique_ptr<ConnectBlocker> http1_connect_blocker_;
std::unique_ptr<ConnectBlocker> connect_blocker_;
bool graceful_shutdown_;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment