Commit 20de4327 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Pool http downstream connection per thread

parent 325bb011
......@@ -116,7 +116,8 @@ NGHTTPX_SRCS = \
shrpx_thread_event_receiver.cc shrpx_thread_event_receiver.h \
shrpx_worker.cc shrpx_worker.h \
shrpx_worker_config.cc shrpx_worker_config.h \
shrpx_connect_blocker.cc shrpx_connect_blocker.h
shrpx_connect_blocker.cc shrpx_connect_blocker.h \
shrpx_downstream_connection_pool.cc shrpx_downstream_connection_pool.h
if HAVE_SPDYLAY
NGHTTPX_SRCS += shrpx_spdy_upstream.cc shrpx_spdy_upstream.h
......
......@@ -36,6 +36,7 @@
#include "shrpx_ssl.h"
#include "shrpx_worker.h"
#include "shrpx_worker_config.h"
#include "shrpx_downstream_connection_pool.h"
#ifdef HAVE_SPDYLAY
#include "shrpx_spdy_upstream.h"
#endif // HAVE_SPDYLAY
......@@ -230,8 +231,10 @@ ClientHandler::ClientHandler(bufferevent *bev,
bufferevent_rate_limit_group *rate_limit_group,
int fd, SSL *ssl,
const char *ipaddr,
WorkerStat *worker_stat)
WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool)
: ipaddr_(ipaddr),
dconn_pool_(dconn_pool),
bev_(bev),
http2session_(nullptr),
ssl_(ssl),
......@@ -308,9 +311,6 @@ ClientHandler::~ClientHandler()
shutdown(fd_, SHUT_WR);
close(fd_);
for(auto dconn : dconn_pool_) {
delete dconn;
}
if(LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Deleted";
}
......@@ -477,7 +477,8 @@ void ClientHandler::pool_downstream_connection
if(LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Pooling downstream connection DCONN:" << dconn.get();
}
dconn_pool_.insert(dconn.release());
dconn->set_client_handler(nullptr);
dconn_pool_->add_downstream_connection(std::move(dconn));
}
void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn)
......@@ -486,27 +487,32 @@ void ClientHandler::remove_downstream_connection(DownstreamConnection *dconn)
CLOG(INFO, this) << "Removing downstream connection DCONN:" << dconn
<< " from pool";
}
dconn_pool_.erase(dconn);
delete dconn;
dconn_pool_->remove_downstream_connection(dconn);
}
std::unique_ptr<DownstreamConnection>
ClientHandler::get_downstream_connection()
{
if(dconn_pool_.empty()) {
auto dconn = dconn_pool_->pop_downstream_connection();
if(!dconn) {
if(LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Downstream connection pool is empty."
<< " Create new one";
}
if(http2session_) {
return util::make_unique<Http2DownstreamConnection>(this);
dconn = util::make_unique<Http2DownstreamConnection>
(dconn_pool_, http2session_);
} else {
return util::make_unique<HttpDownstreamConnection>(this);
dconn = util::make_unique<HttpDownstreamConnection>(dconn_pool_);
}
dconn->set_client_handler(this);
return dconn;
}
auto dconn = std::unique_ptr<DownstreamConnection>(*std::begin(dconn_pool_));
dconn_pool_.erase(dconn.get());
dconn->set_client_handler(this);
if(LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "Reuse downstream connection DCONN:" << dconn.get()
<< " from pool";
......
......@@ -27,7 +27,6 @@
#include "shrpx.h"
#include <set>
#include <memory>
#include <event.h>
......@@ -42,6 +41,7 @@ class DownstreamConnection;
class Http2Session;
class HttpsUpstream;
class ConnectBlocker;
class DownstreamConnectionPool;
struct WorkerStat;
class ClientHandler {
......@@ -49,7 +49,8 @@ public:
ClientHandler(bufferevent *bev,
bufferevent_rate_limit_group *rate_limit_group,
int fd, SSL *ssl, const char *ipaddr,
WorkerStat *worker_stat);
WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool);
~ClientHandler();
int on_read();
int on_event();
......@@ -91,9 +92,9 @@ public:
void set_tls_renegotiation(bool f);
bool get_tls_renegotiation() const;
private:
std::set<DownstreamConnection*> dconn_pool_;
std::unique_ptr<Upstream> upstream_;
std::string ipaddr_;
DownstreamConnectionPool *dconn_pool_;
bufferevent *bev_;
// Shared HTTP2 session for each thread. NULL if backend is not
// HTTP2. Not deleted by this object.
......
......@@ -26,17 +26,25 @@
#include "shrpx_client_handler.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection_pool.h"
namespace shrpx {
DownstreamConnection::DownstreamConnection(ClientHandler *client_handler)
: client_handler_(client_handler),
DownstreamConnection::DownstreamConnection
(DownstreamConnectionPool *dconn_pool)
: dconn_pool_(dconn_pool),
client_handler_(nullptr),
downstream_(nullptr)
{}
DownstreamConnection::~DownstreamConnection()
{}
void DownstreamConnection::set_client_handler(ClientHandler *handler)
{
client_handler_ = handler;
}
ClientHandler* DownstreamConnection::get_client_handler()
{
return client_handler_;
......@@ -47,4 +55,9 @@ Downstream* DownstreamConnection::get_downstream()
return downstream_;
}
DownstreamConnectionPool* DownstreamConnection::get_dconn_pool() const
{
return dconn_pool_;
}
} // namespace shrpx
......@@ -34,10 +34,11 @@ namespace shrpx {
class ClientHandler;
class Upstream;
class Downstream;
class DownstreamConnectionPool;
class DownstreamConnection {
public:
DownstreamConnection(ClientHandler *client_handler);
DownstreamConnection(DownstreamConnectionPool *dconn_pool);
virtual ~DownstreamConnection();
virtual int attach_downstream(Downstream *downstream) = 0;
virtual void detach_downstream(Downstream *downstream) = 0;
......@@ -59,9 +60,12 @@ public:
virtual void on_upstream_change(Upstream *uptream) = 0;
virtual int on_priority_change(int32_t pri) = 0;
void set_client_handler(ClientHandler *client_handler);
ClientHandler* get_client_handler();
Downstream* get_downstream();
DownstreamConnectionPool* get_dconn_pool() const;
protected:
DownstreamConnectionPool *dconn_pool_;
ClientHandler *client_handler_;
Downstream *downstream_;
};
......
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "shrpx_downstream_connection_pool.h"
#include "shrpx_downstream_connection.h"
namespace shrpx {
DownstreamConnectionPool::DownstreamConnectionPool()
{}
DownstreamConnectionPool::~DownstreamConnectionPool()
{
for(auto dconn : pool_) {
delete dconn;
}
}
void DownstreamConnectionPool::add_downstream_connection
(std::unique_ptr<DownstreamConnection> dconn)
{
pool_.insert(dconn.release());
}
std::unique_ptr<DownstreamConnection>
DownstreamConnectionPool::pop_downstream_connection()
{
if(pool_.empty()) {
return nullptr;
}
auto dconn = std::unique_ptr<DownstreamConnection>(*std::begin(pool_));
pool_.erase(std::begin(pool_));
return dconn;
}
void DownstreamConnectionPool::remove_downstream_connection
(DownstreamConnection *dconn)
{
pool_.erase(dconn);
delete dconn;
}
} // namespace shrpx
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2014 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef SHRPX_DOWNSTREAM_CONNECTION_POOL_H
#define SHRPX_DOWNSTREAM_CONNECTION_POOL_H
#include "shrpx.h"
#include <memory>
#include <set>
namespace shrpx {
class DownstreamConnection;
class DownstreamConnectionPool {
public:
DownstreamConnectionPool();
~DownstreamConnectionPool();
void add_downstream_connection(std::unique_ptr<DownstreamConnection> dconn);
std::unique_ptr<DownstreamConnection> pop_downstream_connection();
void remove_downstream_connection(DownstreamConnection *dconn);
private:
std::set<DownstreamConnection*> pool_;
};
} // namespace shrpx
#endif // SHRPX_DOWNSTREAM_CONNECTION_POOL_H
......@@ -48,9 +48,9 @@ using namespace nghttp2;
namespace shrpx {
Http2DownstreamConnection::Http2DownstreamConnection
(ClientHandler *client_handler)
: DownstreamConnection(client_handler),
http2session_(client_handler->get_http2_session()),
(DownstreamConnectionPool *dconn_pool, Http2Session *http2session)
: DownstreamConnection(dconn_pool),
http2session_(http2session),
request_body_buf_(nullptr),
sd_(nullptr)
{}
......
......@@ -37,10 +37,12 @@ namespace shrpx {
struct StreamData;
class Http2Session;
class DownstreamConnectionPool;
class Http2DownstreamConnection : public DownstreamConnection {
public:
Http2DownstreamConnection(ClientHandler *client_handler);
Http2DownstreamConnection(DownstreamConnectionPool *dconn_pool,
Http2Session *http2session);
virtual ~Http2DownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
......
......@@ -32,6 +32,7 @@
#include "shrpx_http.h"
#include "shrpx_worker_config.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_downstream_connection_pool.h"
#include "http2.h"
#include "util.h"
#include "libevent_util.h"
......@@ -45,8 +46,8 @@ const size_t OUTBUF_MAX_THRES = 64*1024;
} // namespace
HttpDownstreamConnection::HttpDownstreamConnection
(ClientHandler *client_handler)
: DownstreamConnection(client_handler),
(DownstreamConnectionPool *dconn_pool)
: DownstreamConnection(dconn_pool),
bev_(nullptr),
ioctrl_(nullptr),
response_htp_{0}
......@@ -332,6 +333,16 @@ int HttpDownstreamConnection::end_upload_data()
return 0;
}
namespace {
void idle_readcb(bufferevent *bev, void *arg)
{
auto dconn = static_cast<HttpDownstreamConnection*>(arg);
auto dconn_pool = dconn->get_dconn_pool();
dconn_pool->remove_downstream_connection(dconn);
// dconn was deleted
}
} // namespace
namespace {
// Gets called when DownstreamConnection is pooled in ClientHandler.
void idle_eventcb(bufferevent *bev, short events, void *arg)
......@@ -339,13 +350,10 @@ void idle_eventcb(bufferevent *bev, short events, void *arg)
auto dconn = static_cast<HttpDownstreamConnection*>(arg);
if(events & BEV_EVENT_CONNECTED) {
// Downstream was detached before connection established?
// This may be safe to be left.
if(LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection connected?";
}
return;
}
if(events & BEV_EVENT_EOF) {
} else if(events & BEV_EVENT_EOF) {
if(LOG_ENABLED(INFO)) {
DCLOG(INFO, dconn) << "Idle connection EOF";
}
......@@ -358,8 +366,8 @@ void idle_eventcb(bufferevent *bev, short events, void *arg)
DCLOG(INFO, dconn) << "Idle connection network error";
}
}
auto client_handler = dconn->get_client_handler();
client_handler->remove_downstream_connection(dconn);
auto dconn_pool = dconn->get_dconn_pool();
dconn_pool->remove_downstream_connection(dconn);
// dconn was deleted
}
} // namespace
......@@ -372,7 +380,7 @@ void HttpDownstreamConnection::detach_downstream(Downstream *downstream)
downstream_ = nullptr;
ioctrl_.force_resume_read();
util::bev_enable_unless(bev_, EV_READ);
bufferevent_setcb(bev_, 0, 0, idle_eventcb, this);
bufferevent_setcb(bev_, idle_readcb, nullptr, idle_eventcb, this);
// On idle state, just enable read timeout. Normally idle downstream
// connection will get EOF from the downstream server and closed.
bufferevent_set_timeouts(bev_,
......
......@@ -37,9 +37,11 @@
namespace shrpx {
class DownstreamConnectionPool;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(ClientHandler *client_handler);
HttpDownstreamConnection(DownstreamConnectionPool *dconn_pool);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
virtual void detach_downstream(Downstream *downstream);
......
......@@ -39,6 +39,7 @@
#include "shrpx_config.h"
#include "shrpx_http2_session.h"
#include "shrpx_connect_blocker.h"
#include "shrpx_downstream_connection.h"
#include "util.h"
#include "libevent_util.h"
......@@ -230,7 +231,8 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
auto client = ssl::accept_connection(evbase_, rate_limit_group_,
sv_ssl_ctx_, fd, addr, addrlen,
worker_stat_.get());
worker_stat_.get(),
&dconn_pool_);
if(!client) {
LLOG(ERROR, this) << "ClientHandler creation failed";
......
......@@ -42,6 +42,8 @@
#include <event2/bufferevent.h>
#include <event2/listener.h>
#include "shrpx_downstream_connection_pool.h"
namespace shrpx {
struct WorkerInfo {
......@@ -81,6 +83,7 @@ public:
void join_worker();
void notify_worker_shutdown();
private:
DownstreamConnectionPool dconn_pool_;
std::vector<std::unique_ptr<WorkerInfo>> workers_;
event_base *evbase_;
// The frontend server SSL_CTX
......
......@@ -49,6 +49,7 @@
#include "shrpx_client_handler.h"
#include "shrpx_config.h"
#include "shrpx_worker.h"
#include "shrpx_downstream_connection_pool.h"
#include "util.h"
#include "ssl.h"
......@@ -463,7 +464,8 @@ ClientHandler* accept_connection
SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen,
WorkerStat *worker_stat)
WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool)
{
char host[NI_MAXHOST];
int rv;
......@@ -513,7 +515,8 @@ ClientHandler* accept_connection
return nullptr;
}
return new ClientHandler(bev, rate_limit_group, fd, ssl, host, worker_stat);
return new ClientHandler(bev, rate_limit_group, fd, ssl, host, worker_stat,
dconn_pool);
}
namespace {
......
......@@ -38,6 +38,7 @@ namespace shrpx {
class ClientHandler;
struct WorkerStat;
class DownstreamConnectionPool;
namespace ssl {
......@@ -52,7 +53,8 @@ ClientHandler* accept_connection
SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen,
WorkerStat *worker_stat);
WorkerStat *worker_stat,
DownstreamConnectionPool *dconn_pool);
int check_cert(SSL *ssl);
......
......@@ -123,7 +123,8 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
wev.client_fd,
&wev.client_addr.sa,
wev.client_addrlen,
worker_stat_.get());
worker_stat_.get(),
&dconn_pool_);
if(client_handler) {
client_handler->set_http2_session(http2session_);
client_handler->set_http1_connect_blocker(http1_connect_blocker_);
......
......@@ -34,6 +34,7 @@
#include <event2/bufferevent.h>
#include "shrpx_config.h"
#include "shrpx_downstream_connection_pool.h"
namespace shrpx {
......@@ -66,6 +67,7 @@ public:
~ThreadEventReceiver();
void on_read(bufferevent *bev);
private:
DownstreamConnectionPool dconn_pool_;
event_base *evbase_;
SSL_CTX *ssl_ctx_;
// Shared HTTP2 session for each thread. NULL if not client
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment