Commit d074cb61 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Add rate limit per worker thread

The existing options --{read,write}-{rate,burst} are per connection.
The new options --worker-{read,write}-{rate,burst} are per worker
thread, which is overall rate limit of all connections worker handles.
parent 54dab500
......@@ -429,6 +429,10 @@ void fill_default_config()
mod_config()->read_burst = 4*1024*1024;
mod_config()->write_rate = 0;
mod_config()->write_burst = 0;
mod_config()->worker_read_rate = 0;
mod_config()->worker_read_burst = 0;
mod_config()->worker_write_rate = 0;
mod_config()->worker_write_burst = 0;
mod_config()->npn_list = nullptr;
mod_config()->verify_client = false;
mod_config()->verify_client_cacert = nullptr;
......@@ -529,6 +533,30 @@ void print_help(std::ostream& out)
<< " write burst size is unlimited.\n"
<< " Default: "
<< get_config()->write_burst << "\n"
<< " --worker-read-rate=<RATE>\n"
<< " Set maximum average read rate on frontend\n"
<< " connection per worker. Setting 0 to this\n"
<< " option means read rate is unlimited.\n"
<< " Default: "
<< get_config()->worker_read_rate << "\n"
<< " --worker-read-burst=<SIZE>\n"
<< " Set maximum read burst size on frontend\n"
<< " connection per worker. Setting 0 to this\n"
<< " option means read burst size is unlimited.\n"
<< " Default: "
<< get_config()->worker_read_burst << "\n"
<< " --worker-write-rate=<RATE>\n"
<< " Set maximum average write rate on frontend\n"
<< " connection per worker. Setting 0 to this\n"
<< " option means write rate is unlimited.\n"
<< " Default: "
<< get_config()->worker_write_rate << "\n"
<< " --worker-write-burst=<SIZE>\n"
<< " Set maximum write burst size on frontend\n"
<< " connection per worker. Setting 0 to this\n"
<< " option means write burst size is unlimited.\n"
<< " Default: "
<< get_config()->worker_write_burst << "\n"
<< "\n"
<< "Timeout:\n"
<< " --frontend-http2-read-timeout=<SEC>\n"
......@@ -828,6 +856,10 @@ int main(int argc, char **argv)
{"backend-http2-connection-window-bits", required_argument, &flag, 47},
{"tls-proto-list", required_argument, &flag, 48},
{"padding", required_argument, &flag, 49},
{"worker-read-rate", required_argument, &flag, 50},
{"worker-read-burst", required_argument, &flag, 51},
{"worker-write-rate", required_argument, &flag, 52},
{"worker-write-burst", required_argument, &flag, 53},
{nullptr, 0, nullptr, 0 }
};
......@@ -1068,6 +1100,22 @@ int main(int argc, char **argv)
// --padding
cmdcfgs.emplace_back(SHRPX_OPT_PADDING, optarg);
break;
case 50:
// --worker-read-rate
cmdcfgs.emplace_back(SHRPX_OPT_WORKER_READ_RATE, optarg);
break;
case 51:
// --worker-read-burst
cmdcfgs.emplace_back(SHRPX_OPT_WORKER_READ_BURST, optarg);
break;
case 52:
// --worker-write-rate
cmdcfgs.emplace_back(SHRPX_OPT_WORKER_WRITE_RATE, optarg);
break;
case 53:
// --worker-write-burst
cmdcfgs.emplace_back(SHRPX_OPT_WORKER_WRITE_BURST, optarg);
break;
default:
break;
}
......@@ -1222,6 +1270,13 @@ int main(int argc, char **argv)
get_rate_limit(get_config()->write_burst),
nullptr);
mod_config()->worker_rate_limit_cfg = ev_token_bucket_cfg_new
(get_rate_limit(get_config()->worker_read_rate),
get_rate_limit(get_config()->worker_read_burst),
get_rate_limit(get_config()->worker_write_rate),
get_rate_limit(get_config()->worker_write_burst),
nullptr);
if(get_config()->upstream_frame_debug) {
// To make it sync to logging
set_output(stderr);
......
......@@ -246,7 +246,9 @@ void tls_raw_writecb(evbuffer *buffer, const evbuffer_cb_info *info, void *arg)
}
} // namespace
ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
ClientHandler::ClientHandler(bufferevent *bev,
bufferevent_rate_limit_group *rate_limit_group,
int fd, SSL *ssl,
const char *ipaddr)
: ipaddr_(ipaddr),
bev_(bev),
......@@ -259,10 +261,23 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
tls_renegotiation_(false)
{
int rv;
rv = bufferevent_set_rate_limit(bev_, get_config()->rate_limit_cfg);
auto rate_limit_bev = bufferevent_get_underlying(bev_);
if(!rate_limit_bev) {
rate_limit_bev = bev_;
}
rv = bufferevent_set_rate_limit(rate_limit_bev,
get_config()->rate_limit_cfg);
if(rv == -1) {
CLOG(FATAL, this) << "bufferevent_set_rate_limit() failed";
}
rv = bufferevent_add_to_rate_limit_group(rate_limit_bev, rate_limit_group);
if(rv == -1) {
CLOG(FATAL, this) << "bufferevent_add_to_rate_limit_group() failed";
}
bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
set_upstream_timeouts(&get_config()->upstream_read_timeout,
......@@ -293,7 +308,15 @@ ClientHandler::~ClientHandler()
SSL_set_shutdown(ssl_, SSL_RECEIVED_SHUTDOWN);
SSL_shutdown(ssl_);
}
auto underlying = bufferevent_get_underlying(bev_);
if(underlying) {
bufferevent_remove_from_rate_limit_group(underlying);
} else {
bufferevent_remove_from_rate_limit_group(bev_);
}
bufferevent_disable(bev_, EV_READ | EV_WRITE);
bufferevent_free(bev_);
if(ssl_) {
......
......@@ -31,6 +31,8 @@
#include <memory>
#include <event.h>
#include <event2/bufferevent.h>
#include <openssl/ssl.h>
namespace shrpx {
......@@ -42,7 +44,9 @@ class HttpsUpstream;
class ClientHandler {
public:
ClientHandler(bufferevent *bev, int fd, SSL *ssl, const char *ipaddr);
ClientHandler(bufferevent *bev,
bufferevent_rate_limit_group *rate_limit_group,
int fd, SSL *ssl, const char *ipaddr);
~ClientHandler();
int on_read();
int on_event();
......
......@@ -104,6 +104,10 @@ const char SHRPX_OPT_READ_RATE[] = "read-rate";
const char SHRPX_OPT_READ_BURST[] = "read-burst";
const char SHRPX_OPT_WRITE_RATE[] = "write-rate";
const char SHRPX_OPT_WRITE_BURST[] = "write-burst";
const char SHRPX_OPT_WORKER_READ_RATE[] = "worker-read-rate";
const char SHRPX_OPT_WORKER_READ_BURST[] = "worker-read-burst";
const char SHRPX_OPT_WORKER_WRITE_RATE[] = "worker-write-rate";
const char SHRPX_OPT_WORKER_WRITE_BURST[] = "worker-write-burst";
const char SHRPX_OPT_NPN_LIST[] = "npn-list";
const char SHRPX_OPT_TLS_PROTO_LIST[] = "tls-proto-list";
const char SHRPX_OPT_VERIFY_CLIENT[] = "verify-client";
......@@ -452,6 +456,14 @@ int parse_config(const char *opt, const char *optarg)
mod_config()->write_rate = strtoul(optarg, nullptr, 10);
} else if(util::strieq(opt, SHRPX_OPT_WRITE_BURST)) {
mod_config()->write_burst = strtoul(optarg, nullptr, 10);
} else if(util::strieq(opt, SHRPX_OPT_WORKER_READ_RATE)) {
mod_config()->worker_read_rate = strtoul(optarg, nullptr, 10);
} else if(util::strieq(opt, SHRPX_OPT_WORKER_READ_BURST)) {
mod_config()->worker_read_burst = strtoul(optarg, nullptr, 10);
} else if(util::strieq(opt, SHRPX_OPT_WORKER_WRITE_RATE)) {
mod_config()->worker_write_rate = strtoul(optarg, nullptr, 10);
} else if(util::strieq(opt, SHRPX_OPT_WORKER_WRITE_BURST)) {
mod_config()->worker_write_burst = strtoul(optarg, nullptr, 10);
} else if(util::strieq(opt, SHRPX_OPT_NPN_LIST)) {
delete [] mod_config()->npn_list;
mod_config()->npn_list = parse_config_str_list(&mod_config()->npn_list_len,
......
......@@ -93,6 +93,10 @@ extern const char SHRPX_OPT_READ_RATE[];
extern const char SHRPX_OPT_READ_BURST[];
extern const char SHRPX_OPT_WRITE_RATE[];
extern const char SHRPX_OPT_WRITE_BURST[];
extern const char SHRPX_OPT_WORKER_READ_RATE[];
extern const char SHRPX_OPT_WORKER_READ_BURST[];
extern const char SHRPX_OPT_WORKER_WRITE_RATE[];
extern const char SHRPX_OPT_WORKER_WRITE_BURST[];
extern const char SHRPX_OPT_NPN_LIST[];
extern const char SHRPX_OPT_TLS_PROTO_LIST[];
extern const char SHRPX_OPT_VERIFY_CLIENT[];
......@@ -148,8 +152,10 @@ struct Config {
char *downstream_http_proxy_userinfo;
// host in http proxy URI
char *downstream_http_proxy_host;
// Rate limit configuration
// Rate limit configuration per connection
ev_token_bucket_cfg *rate_limit_cfg;
// Rate limit configuration per worker (thread)
ev_token_bucket_cfg *worker_rate_limit_cfg;
// list of supported NPN/ALPN protocol strings in the order of
// preference. The each element of this list is a NULL-terminated
// string.
......@@ -177,6 +183,10 @@ struct Config {
size_t read_burst;
size_t write_rate;
size_t write_burst;
size_t worker_read_rate;
size_t worker_read_burst;
size_t worker_write_rate;
size_t worker_write_burst;
// The number of elements in npn_list
size_t npn_list_len;
// The number of elements in tls_proto_list
......
......@@ -48,12 +48,16 @@ ListenHandler::ListenHandler(event_base *evbase, SSL_CTX *sv_ssl_ctx,
cl_ssl_ctx_(cl_ssl_ctx),
workers_(nullptr),
http2session_(nullptr),
rate_limit_group_(bufferevent_rate_limit_group_new
(evbase, get_config()->worker_rate_limit_cfg)),
num_worker_(0),
worker_round_robin_cnt_(0)
{}
ListenHandler::~ListenHandler()
{}
{
bufferevent_rate_limit_group_free(rate_limit_group_);
}
void ListenHandler::create_worker_thread(size_t num)
{
......@@ -106,26 +110,26 @@ int ListenHandler::accept_connection(evutil_socket_t fd,
LLOG(INFO, this) << "Accepted connection. fd=" << fd;
}
if(num_worker_ == 0) {
auto client = ssl::accept_connection(evbase_, sv_ssl_ctx_,
fd, addr, addrlen);
auto client = ssl::accept_connection(evbase_, rate_limit_group_,
sv_ssl_ctx_, fd, addr, addrlen);
if(!client) {
LLOG(ERROR, this) << "ClientHandler creation failed";
return 0;
}
client->set_http2_session(http2session_);
} else {
size_t idx = worker_round_robin_cnt_ % num_worker_;
++worker_round_robin_cnt_;
WorkerEvent wev;
memset(&wev, 0, sizeof(wev));
wev.client_fd = fd;
memcpy(&wev.client_addr, addr, addrlen);
wev.client_addrlen = addrlen;
auto output = bufferevent_get_output(workers_[idx].bev);
if(evbuffer_add(output, &wev, sizeof(wev)) != 0) {
LLOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
return 0;
}
size_t idx = worker_round_robin_cnt_ % num_worker_;
++worker_round_robin_cnt_;
WorkerEvent wev;
memset(&wev, 0, sizeof(wev));
wev.client_fd = fd;
memcpy(&wev.client_addr, addr, addrlen);
wev.client_addrlen = addrlen;
auto output = bufferevent_get_output(workers_[idx].bev);
if(evbuffer_add(output, &wev, sizeof(wev)) != 0) {
LLOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
return 0;
}
......
......@@ -33,6 +33,7 @@
#include <openssl/ssl.h>
#include <event.h>
#include <event2/bufferevent.h>
namespace shrpx {
......@@ -63,6 +64,7 @@ private:
// Shared backend HTTP2 session. NULL if multi-threaded. In
// multi-threaded case, see shrpx_worker.cc.
Http2Session *http2session_;
bufferevent_rate_limit_group *rate_limit_group_;
size_t num_worker_;
unsigned int worker_round_robin_cnt_;
};
......
......@@ -428,9 +428,12 @@ SSL_CTX* create_ssl_client_context()
return ssl_ctx;
}
ClientHandler* accept_connection(event_base *evbase, SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen)
ClientHandler* accept_connection
(event_base *evbase,
bufferevent_rate_limit_group *rate_limit_group,
SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen)
{
char host[NI_MAXHOST];
int rv;
......@@ -475,7 +478,7 @@ ClientHandler* accept_connection(event_base *evbase, SSL_CTX *ssl_ctx,
}
return nullptr;
}
return new ClientHandler(bev, fd, ssl, host);
return new ClientHandler(bev, rate_limit_group, fd, ssl, host);
} else {
LOG(ERROR) << "getnameinfo() failed: " << gai_strerror(rv);
return nullptr;
......
......@@ -45,9 +45,12 @@ SSL_CTX* create_ssl_context(const char *private_key_file,
SSL_CTX* create_ssl_client_context();
ClientHandler* accept_connection(event_base *evbase, SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen);
ClientHandler* accept_connection
(event_base *evbase,
bufferevent_rate_limit_group *rate_limit_group,
SSL_CTX *ssl_ctx,
evutil_socket_t fd,
sockaddr *addr, int addrlen);
bool numeric_host(const char *hostname);
......
......@@ -33,14 +33,20 @@
namespace shrpx {
ThreadEventReceiver::ThreadEventReceiver(SSL_CTX *ssl_ctx,
ThreadEventReceiver::ThreadEventReceiver(event_base *evbase,
SSL_CTX *ssl_ctx,
Http2Session *http2session)
: ssl_ctx_(ssl_ctx),
http2session_(http2session)
: evbase_(evbase),
ssl_ctx_(ssl_ctx),
http2session_(http2session),
rate_limit_group_(bufferevent_rate_limit_group_new
(evbase_, get_config()->worker_rate_limit_cfg))
{}
ThreadEventReceiver::~ThreadEventReceiver()
{}
{
bufferevent_rate_limit_group_free(rate_limit_group_);
}
void ThreadEventReceiver::on_read(bufferevent *bev)
{
......@@ -62,12 +68,14 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
<< ", addrlen=" << wev.client_addrlen;
}
auto evbase = bufferevent_get_base(bev);
auto client_handler = ssl::accept_connection(evbase, ssl_ctx_,
auto client_handler = ssl::accept_connection(evbase, rate_limit_group_,
ssl_ctx_,
wev.client_fd,
&wev.client_addr.sa,
wev.client_addrlen);
if(client_handler) {
client_handler->set_http2_session(http2session_);
if(LOG_ENABLED(INFO)) {
TLOG(INFO, this) << "CLIENT_HANDLER:" << client_handler << " created";
}
......
......@@ -45,14 +45,17 @@ struct WorkerEvent {
class ThreadEventReceiver {
public:
ThreadEventReceiver(SSL_CTX *ssl_ctx, Http2Session *http2session);
ThreadEventReceiver(event_base *evbase, SSL_CTX *ssl_ctx,
Http2Session *http2session);
~ThreadEventReceiver();
void on_read(bufferevent *bev);
private:
event_base *evbase_;
SSL_CTX *ssl_ctx_;
// Shared HTTP2 session for each thread. NULL if not client
// mode. Not deleted by this object.
Http2Session *http2session_;
bufferevent_rate_limit_group *rate_limit_group_;
};
} // namespace shrpx
......
......@@ -96,7 +96,8 @@ void Worker::run()
DIE();
}
}
auto receiver = util::make_unique<ThreadEventReceiver>(sv_ssl_ctx_,
auto receiver = util::make_unique<ThreadEventReceiver>(evbase.get(),
sv_ssl_ctx_,
http2session.get());
bufferevent_enable(bev.get(), EV_READ);
bufferevent_setcb(bev.get(), readcb, nullptr, eventcb, receiver.get());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment