Commit 1ff9de4c authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Backend address selection with weight

parent 9b6ced66
......@@ -163,6 +163,7 @@ if(ENABLE_APP)
memchunk_test.cc
template_test.cc
base64_test.cc
priority_queue_test.cc
)
add_executable(nghttpx-unittest EXCLUDE_FROM_ALL
${NGHTTPX_UNITTEST_SOURCES}
......
......@@ -140,7 +140,8 @@ NGHTTPX_SRCS = \
shrpx_dual_dns_resolver.cc shrpx_dual_dns_resolver.h \
shrpx_dns_tracker.cc shrpx_dns_tracker.h \
buffer.h memchunk.h template.h allocator.h \
xsi_strerror.c xsi_strerror.h
xsi_strerror.c xsi_strerror.h \
priority_queue.h
if HAVE_MRUBY
NGHTTPX_SRCS += \
......@@ -186,7 +187,8 @@ nghttpx_unittest_SOURCES = shrpx-unittest.cc \
buffer_test.cc buffer_test.h \
memchunk_test.cc memchunk_test.h \
template_test.cc template_test.h \
base64_test.cc base64_test.h
base64_test.cc base64_test.h \
priority_queue_test.cc priority_queue_test.h
nghttpx_unittest_CPPFLAGS = ${AM_CPPFLAGS} \
-DNGHTTP2_SRC_DIR=\"$(top_srcdir)/src\"
nghttpx_unittest_LDADD = libnghttpx.a ${LDADD} @CUNIT_LIBS@ @TESTLDADD@
......
......@@ -187,6 +187,14 @@ template <typename Memchunk> struct Memchunks {
size_t append(const ImmutableString &s) {
return append(s.c_str(), s.size());
}
size_t copy(Memchunks &dest) {
auto m = head;
while (m) {
dest.append(m->pos, m->len());
m = m->next;
}
return len;
}
size_t remove(void *dest, size_t count) {
if (!tail || count == 0) {
return 0;
......
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef PRIORITY_QUEUE_H
#define PRIORITY_QUEUE_H
#include "nghttp2_config.h"
#include <cassert>
#include <functional>
#include <utility>
#include <vector>
namespace nghttp2 {
template <typename KeyType, typename ValueType, typename Compare = std::less<>>
class PriorityQueue {
public:
const ValueType &top() const;
const KeyType &key_top() const;
void push(const KeyType &key, const ValueType &value);
void push(KeyType &&key, ValueType &&value);
template <typename... Args> void emplace(Args &&... args);
void pop();
bool empty() const;
size_t size() const;
private:
void bubble_up(size_t idx);
void bubble_down(size_t idx);
std::vector<std::pair<KeyType, ValueType>> c_;
Compare comp_;
};
template <typename KeyType, typename ValueType, typename Compare>
const ValueType &PriorityQueue<KeyType, ValueType, Compare>::top() const {
assert(!c_.empty());
return c_[0].second;
}
template <typename KeyType, typename ValueType, typename Compare>
const KeyType &PriorityQueue<KeyType, ValueType, Compare>::key_top() const {
assert(!c_.empty());
return c_[0].first;
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::push(const KeyType &key,
const ValueType &value) {
c_.push_back(std::pair<KeyType, ValueType>{key, value});
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::push(KeyType &&key,
ValueType &&value) {
c_.push_back(std::pair<KeyType, ValueType>{std::move(key), std::move(value)});
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
template <typename... Args>
void PriorityQueue<KeyType, ValueType, Compare>::emplace(Args &&... args) {
c_.emplace_back(std::forward<Args>(args)...);
bubble_up(c_.size() - 1);
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::pop() {
assert(!c_.empty());
c_[0] = std::move(c_.back());
c_.resize(c_.size() - 1);
bubble_down(0);
}
template <typename KeyType, typename ValueType, typename Compare>
bool PriorityQueue<KeyType, ValueType, Compare>::empty() const {
return c_.empty();
}
template <typename KeyType, typename ValueType, typename Compare>
size_t PriorityQueue<KeyType, ValueType, Compare>::size() const {
return c_.size();
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::bubble_up(size_t idx) {
using std::swap;
while (idx != 0) {
auto parent = (idx - 1) / 2;
if (!comp_(c_[idx].first, c_[parent].first)) {
return;
}
swap(c_[idx], c_[parent]);
idx = parent;
}
}
template <typename KeyType, typename ValueType, typename Compare>
void PriorityQueue<KeyType, ValueType, Compare>::bubble_down(size_t idx) {
using std::swap;
for (;;) {
auto j = idx * 2 + 1;
auto minidx = idx;
for (auto i = 0; i < 2; ++i, ++j) {
if (j >= c_.size()) {
break;
}
if (comp_(c_[j].first, c_[minidx].first)) {
minidx = j;
}
}
if (minidx == idx) {
return;
}
swap(c_[idx], c_[minidx]);
idx = minidx;
}
}
} // namespace nghttp2
#endif // PRIORITY_QUEUE_H
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "priority_queue_test.h"
#include <string>
#include <CUnit/CUnit.h>
#include "priority_queue.h"
namespace nghttp2 {
void test_priority_queue_push(void) {
PriorityQueue<std::string, int64_t> pq;
CU_ASSERT(pq.empty());
CU_ASSERT(0 == pq.size());
pq.push("foo", 1);
CU_ASSERT(!pq.empty());
CU_ASSERT(1 == pq.size());
auto top = pq.top();
CU_ASSERT(1 == top);
pq.emplace("bar", 2);
top = pq.top();
CU_ASSERT(2 == top);
pq.push("baz", 3);
top = pq.top();
CU_ASSERT(2 == top);
pq.push("C", 4);
CU_ASSERT(4 == pq.size());
top = pq.top();
CU_ASSERT(4 == top);
pq.pop();
CU_ASSERT(3 == pq.size());
top = pq.top();
CU_ASSERT(2 == top);
pq.pop();
top = pq.top();
CU_ASSERT(3 == top);
pq.pop();
top = pq.top();
CU_ASSERT(1 == top);
pq.pop();
CU_ASSERT(pq.empty());
CU_ASSERT(0 == pq.size());
}
} // namespace nghttp2
/*
* nghttp2 - HTTP/2 C Library
*
* Copyright (c) 2019 Tatsuhiro Tsujikawa
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
* LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef PRIORITY_QUEUE_TEST_H
#define PRIORITY_QUEUE_TEST_H
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
namespace nghttp2 {
void test_priority_queue_push(void);
} // namespace nghttp2
#endif // PRIORITY_QUEUE_TEST_H
......@@ -45,6 +45,7 @@
#include "shrpx_config.h"
#include "tls.h"
#include "shrpx_router_test.h"
#include "priority_queue_test.h"
#include "shrpx_log.h"
static int init_suite1(void) { return 0; }
......@@ -217,7 +218,9 @@ int main(int argc, char *argv[]) {
!CU_add_test(pSuite, "template_string_ref",
nghttp2::test_template_string_ref) ||
!CU_add_test(pSuite, "base64_encode", nghttp2::test_base64_encode) ||
!CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode)) {
!CU_add_test(pSuite, "base64_decode", nghttp2::test_base64_decode) ||
!CU_add_test(pSuite, "priority_queue_push",
nghttp2::test_priority_queue_push)) {
CU_cleanup_registry();
return CU_get_error();
}
......
......@@ -1738,13 +1738,13 @@ Connections:
"sni=<SNI_HOST>", "fall=<N>", "rise=<N>",
"affinity=<METHOD>", "dns", "redirect-if-not-tls",
"upgrade-scheme", "mruby=<PATH>",
"read-timeout=<DURATION>", and
"write-timeout=<DURATION>". The parameter consists of
keyword, and optionally followed by "=" and value. For
example, the parameter "proto=h2" consists of the
keyword "proto" and value "h2". The parameter "tls"
consists of the keyword "tls" without value. Each
parameter is described as follows.
"read-timeout=<DURATION>", "write-timeout=<DURATION>",
"group=<GROUP>", "group-weight=<N>", and "weight=<N>".
The parameter consists of keyword, and optionally
followed by "=" and value. For example, the parameter
"proto=h2" consists of the keyword "proto" and value
"h2". The parameter "tls" consists of the keyword "tls"
without value. Each parameter is described as follows.
The backend application protocol can be specified using
optional "proto" parameter, and in the form of
......@@ -1850,6 +1850,31 @@ Connections:
pattern, --backend-read-timeout and
--backend-write-timeout are used.
"group=<GROUP>" parameter specifies the name of group
this backend address belongs to. By default, it belongs
to the unnamed default group. The name of group is
unique per pattern. "group-weight=<N>" parameter
specifies the weight of the group. The higher weight
gets more frequently selected by the load balancing
algorithm. <N> must be [1, 256] inclusive. The weight
8 has 4 times more weight than 2. <N> must be the same
for all addresses which share the same <GROUP>. If
"group-weight" is omitted in an address, but the other
address which belongs to the same group specifies
"group-weight", its weight is used. If no
"group-weight" is specified for all addresses, the
weight of a group becomes 1. "group" and "group-weight"
are ignored if session affinity is enabled.
"weight=<N>" parameter specifies the weight of the
backend address inside a group which this address
belongs to. The higher weight gets more frequently
selected by the load balancing algorithm. <N> must be
[1, 256] inclusive. The weight 8 has 4 times more
weight than weight 2. If this parameter is omitted,
weight becomes 1. "weight" is ignored if session
affinity is enabled.
Since ";" and ":" are used as delimiter, <PATTERN> must
not contain these characters. Since ";" has special
meaning in shell, the option value must be quoted.
......
This diff is collapsed.
......@@ -99,14 +99,14 @@ public:
void pool_downstream_connection(std::unique_ptr<DownstreamConnection> dconn);
void remove_downstream_connection(DownstreamConnection *dconn);
DownstreamAddr *get_downstream_addr(int &err, DownstreamAddrGroup *group,
Downstream *downstream);
// Returns DownstreamConnection object based on request path. This
// function returns non-null DownstreamConnection, and assigns 0 to
// |err| if it succeeds, or returns nullptr, and assigns negative
// error code to |err|. If |pref_proto| is not PROTO_NONE, choose
// backend whose protocol is |pref_proto|.
// error code to |err|.
std::unique_ptr<DownstreamConnection>
get_downstream_connection(int &err, Downstream *downstream,
Proto pref_proto = Proto::NONE);
get_downstream_connection(int &err, Downstream *downstream);
MemchunkPool *get_mcpool();
SSL *get_ssl() const;
// Call this function when HTTP/2 connection header is received at
......@@ -150,10 +150,8 @@ public:
StringRef get_forwarded_for() const;
Http2Session *
select_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group);
Http2Session *select_http2_session_with_affinity(
const std::shared_ptr<DownstreamAddrGroup> &group, DownstreamAddr *addr);
get_http2_session(const std::shared_ptr<DownstreamAddrGroup> &group,
DownstreamAddr *addr);
// Returns an affinity cookie value for |downstream|. |cookie_name|
// is used to inspect cookie header field in request header fields.
......
......@@ -47,6 +47,7 @@
#include <cerrno>
#include <limits>
#include <fstream>
#include <unordered_map>
#include <nghttp2/nghttp2.h>
......@@ -813,11 +814,14 @@ int parse_upstream_params(UpstreamParams &out, const StringRef &src_params) {
struct DownstreamParams {
StringRef sni;
StringRef mruby;
StringRef group;
AffinityConfig affinity;
ev_tstamp read_timeout;
ev_tstamp write_timeout;
size_t fall;
size_t rise;
uint32_t weight;
uint32_t group_weight;
Proto proto;
bool tls;
bool dns;
......@@ -960,6 +964,43 @@ int parse_downstream_params(DownstreamParams &out,
StringRef{first + str_size("write-timeout="), end}) == -1) {
return -1;
}
} else if (util::istarts_with_l(param, "weight=")) {
auto valstr = StringRef{first + str_size("weight="), end};
if (valstr.empty()) {
LOG(ERROR)
<< "backend: weight: non-negative integer [1, 256] is expected";
return -1;
}
auto n = util::parse_uint(valstr);
if (n < 1 || n > 256) {
LOG(ERROR)
<< "backend: weight: non-negative integer [1, 256] is expected";
return -1;
}
out.weight = n;
} else if (util::istarts_with_l(param, "group=")) {
auto valstr = StringRef{first + str_size("group="), end};
if (valstr.empty()) {
LOG(ERROR) << "backend: group: empty string is not allowed";
return -1;
}
out.group = valstr;
} else if (util::istarts_with_l(param, "group-weight=")) {
auto valstr = StringRef{first + str_size("group-weight="), end};
if (valstr.empty()) {
LOG(ERROR) << "backend: group-weight: non-negative integer [1, 256] is "
"expected";
return -1;
}
auto n = util::parse_uint(valstr);
if (n < 1 || n > 256) {
LOG(ERROR) << "backend: group-weight: non-negative integer [1, 256] is "
"expected";
return -1;
}
out.group_weight = n;
} else if (!param.empty()) {
LOG(ERROR) << "backend: " << param << ": unknown keyword";
return -1;
......@@ -996,6 +1037,7 @@ int parse_mapping(Config *config, DownstreamAddrConfig &addr,
DownstreamParams params{};
params.proto = Proto::HTTP1;
params.weight = 1;
if (parse_downstream_params(params, src_params) != 0) {
return -1;
......@@ -1015,6 +1057,9 @@ int parse_mapping(Config *config, DownstreamAddrConfig &addr,
addr.fall = params.fall;
addr.rise = params.rise;
addr.weight = params.weight;
addr.group = make_string_ref(downstreamconf.balloc, params.group);
addr.group_weight = params.group_weight;
addr.proto = params.proto;
addr.tls = params.tls;
addr.sni = make_string_ref(downstreamconf.balloc, params.sni);
......@@ -4025,7 +4070,17 @@ int configure_downstream_group(Config *config, bool http2_proxy,
auto resolve_flags = numeric_addr_only ? AI_NUMERICHOST | AI_NUMERICSERV : 0;
for (auto &g : addr_groups) {
std::unordered_map<StringRef, uint32_t> wgchk;
for (auto &addr : g.addrs) {
if (addr.group_weight) {
auto it = wgchk.find(addr.group);
if (it == std::end(wgchk)) {
wgchk.emplace(addr.group, addr.group_weight);
} else if ((*it).second != addr.group_weight) {
LOG(FATAL) << "backend: inconsistent group-weight for a single group";
return -1;
}
}
if (addr.host_unix) {
// for AF_UNIX socket, we use "localhost" as host for backend
......@@ -4078,6 +4133,17 @@ int configure_downstream_group(Config *config, bool http2_proxy,
}
}
for (auto &addr : g.addrs) {
if (addr.group_weight == 0) {
auto it = wgchk.find(addr.group);
if (it == std::end(wgchk)) {
addr.group_weight = 1;
} else {
addr.group_weight = (*it).second;
}
}
}
if (g.affinity.type != SessionAffinity::NONE) {
size_t idx = 0;
for (auto &addr : g.addrs) {
......
......@@ -468,8 +468,15 @@ struct DownstreamAddrConfig {
StringRef hostport;
// hostname sent as SNI field
StringRef sni;
// name of group which this address belongs to.
StringRef group;
size_t fall;
size_t rise;
// weight of this address inside a weight group. Its range is [1,
// 256], inclusive.
uint32_t weight;
// weight of the weight group. Its range is [1, 256], inclusive.
uint32_t group_weight;
// Application protocol used in this group
Proto proto;
// backend port. 0 if |host_unix| is true.
......
......@@ -128,8 +128,16 @@ void ConnectBlocker::online() {
bool ConnectBlocker::in_offline() const { return offline_; }
void ConnectBlocker::call_block_func() { block_func_(); }
void ConnectBlocker::call_block_func() {
if (block_func_) {
block_func_();
}
}
void ConnectBlocker::call_unblock_func() { unblock_func_(); }
void ConnectBlocker::call_unblock_func() {
if (unblock_func_) {
unblock_func_();
}
}
} // namespace shrpx
......@@ -2318,22 +2318,6 @@ Http2Session::get_downstream_addr_group() const {
return group_;
}
void Http2Session::add_to_avail_freelist() {
if (freelist_zone_ != FreelistZone::NONE) {
return;
}
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Append to http2_avail_freelist, group="
<< group_.get() << ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
freelist_zone_ = FreelistZone::AVAIL;
group_->shared_addr->http2_avail_freelist.append(this);
addr_->in_avail = true;
}
void Http2Session::add_to_extra_freelist() {
if (freelist_zone_ != FreelistZone::NONE) {
return;
......@@ -2353,15 +2337,6 @@ void Http2Session::remove_from_freelist() {
switch (freelist_zone_) {
case FreelistZone::NONE:
return;
case FreelistZone::AVAIL:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove from http2_avail_freelist, group=" << group_
<< ", freelist.size="
<< group_->shared_addr->http2_avail_freelist.size();
}
group_->shared_addr->http2_avail_freelist.remove(this);
addr_->in_avail = false;
break;
case FreelistZone::EXTRA:
if (LOG_ENABLED(INFO)) {
SSLOG(INFO, this) << "Remove from http2_extra_freelist, addr=" << addr_
......
......@@ -61,9 +61,6 @@ struct StreamData {
enum class FreelistZone {
// Http2Session object is not linked in any freelist.
NONE,
// Http2Session object is linked in group scope
// http2_avail_freelist.
AVAIL,
// Http2Session object is linked in address scope
// http2_extra_freelist.
EXTRA,
......
......@@ -456,38 +456,33 @@ void Http2Upstream::start_downstream(Downstream *downstream) {
void Http2Upstream::initiate_downstream(Downstream *downstream) {
int rv;
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
if (rv == SHRPX_ERR_TLS_REQUIRED) {
rv = redirect_to_https(downstream);
} else {
rv = error_reply(downstream, 502);
}
if (rv != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
DownstreamConnection *dconn_ptr;
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
downstream_queue_.mark_failure(downstream);
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
if (rv == SHRPX_ERR_TLS_REQUIRED) {
rv = redirect_to_https(downstream);
} else {
rv = error_reply(downstream, 502);
}
if (rv != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
return;
}
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
downstream_queue_.mark_failure(downstream);
return;
}
#ifdef HAVE_MRUBY
auto dconn_ptr = dconn.get();
dconn_ptr = dconn.get();
#endif // HAVE_MRUBY
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
// downstream connection fails, send error page
if (error_reply(downstream, 502) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv == 0) {
break;
}
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
downstream_queue_.mark_failure(downstream);
return;
}
#ifdef HAVE_MRUBY
......@@ -2076,14 +2071,16 @@ int Http2Upstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
// downstream connection is clean; we can retry with new
// downstream connection.
dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
goto fail;
}
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream);
if (!dconn) {
goto fail;
}
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
rv = downstream->attach_downstream_connection(std::move(dconn));
if (rv == 0) {
break;
}
}
rv = downstream->push_request_headers();
......
This diff is collapsed.
......@@ -44,7 +44,7 @@ struct DNSQuery;
class HttpDownstreamConnection : public DownstreamConnection {
public:
HttpDownstreamConnection(const std::shared_ptr<DownstreamAddrGroup> &group,
size_t initial_addr_idx, struct ev_loop *loop,
DownstreamAddr *addr, struct ev_loop *loop,
Worker *worker);
virtual ~HttpDownstreamConnection();
virtual int attach_downstream(Downstream *downstream);
......@@ -71,7 +71,7 @@ public:
int initiate_connection();
int write_reuse_first();
int write_first();
int read_clear();
int write_clear();
int read_tls();
......@@ -110,14 +110,12 @@ private:
std::unique_ptr<DNSQuery> dns_query_;
IOControl ioctrl_;
http_parser response_htp_;
// Index to backend address. If client affinity is enabled, it is
// the index to affinity_hash. Otherwise, it is 0, and not used.
size_t initial_addr_idx_;
// true if first write of reused connection succeeded. For
// convenience, this is initialized as true.
bool reuse_first_write_done_;
// true if first write succeeded.
bool first_write_done_;
// true if this object can be reused
bool reusable_;
// true if request header is written to request buffer.
bool request_header_written_;
};
} // namespace shrpx
......
......@@ -426,24 +426,26 @@ int htp_hdrs_completecb(http_parser *htp) {
return 0;
}
auto dconn = handler->get_downstream_connection(rv, downstream);
DownstreamConnection *dconn_ptr;
if (!dconn) {
if (rv == SHRPX_ERR_TLS_REQUIRED) {
upstream->redirect_to_https(downstream);
}
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
for (;;) {
auto dconn = handler->get_downstream_connection(rv, downstream);
return -1;
}
if (!dconn) {
if (rv == SHRPX_ERR_TLS_REQUIRED) {
upstream->redirect_to_https(downstream);
}
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
return -1;
}
#ifdef HAVE_MRUBY
auto dconn_ptr = dconn.get();
dconn_ptr = dconn.get();
#endif // HAVE_MRUBY
if (downstream->attach_downstream_connection(std::move(dconn)) != 0) {
downstream->set_request_state(DownstreamState::CONNECT_FAIL);
return -1;
if (downstream->attach_downstream_connection(std::move(dconn)) == 0) {
break;
}
}
#ifdef HAVE_MRUBY
......@@ -1427,14 +1429,16 @@ int HttpsUpstream::on_downstream_reset(Downstream *downstream, bool no_retry) {
goto fail;
}
dconn = handler_->get_downstream_connection(rv, downstream_.get());
if (!dconn) {
goto fail;
}
for (;;) {
auto dconn = handler_->get_downstream_connection(rv, downstream_.get());
if (!dconn) {
goto fail;
}
rv = downstream_->attach_downstream_connection(std::move(dconn));
if (rv != 0) {
goto fail;
rv = downstream_->attach_downstream_connection(std::move(dconn));
if (rv == 0) {
break;
}
}
rv = downstream_->push_request_headers();
......
......@@ -75,8 +75,9 @@ DownstreamAddrGroup::~DownstreamAddrGroup() {}
// DownstreamKey is used to index SharedDownstreamAddr in order to
// find the same configuration.
using DownstreamKey =
std::tuple<std::vector<std::tuple<StringRef, StringRef, size_t, size_t,
Proto, uint16_t, bool, bool, bool, bool>>,
std::tuple<std::vector<std::tuple<StringRef, StringRef, StringRef, size_t,
size_t, Proto, uint32_t, uint32_t,
uint32_t, bool, bool, bool, bool>>,
bool, SessionAffinity, StringRef, StringRef,
SessionAffinityCookieSecure, int64_t, int64_t>;
......@@ -91,14 +92,17 @@ DownstreamKey create_downstream_key(
for (auto &a : shared_addr->addrs) {
std::get<0>(*p) = a.host;
std::get<1>(*p) = a.sni;
std::get<2>(*p) = a.fall;
std::get<3>(*p) = a.rise;
std::get<4>(*p) = a.proto;
std::get<5>(*p) = a.port;
std::get<6>(*p) = a.host_unix;
std::get<7>(*p) = a.tls;
std::get<8>(*p) = a.dns;
std::get<9>(*p) = a.upgrade_scheme;
std::get<2>(*p) = a.group;
std::get<3>(*p) = a.fall;
std::get<4>(*p) = a.rise;
std::get<5>(*p) = a.proto;
std::get<6>(*p) = a.port;
std::get<7>(*p) = a.weight;
std::get<8>(*p) = a.group_weight;
std::get<9>(*p) = a.host_unix;
std::get<10>(*p) = a.tls;
std::get<11>(*p) = a.dns;
std::get<12>(*p) = a.upgrade_scheme;
++p;
}
std::sort(std::begin(addrs), std::end(addrs));
......@@ -134,7 +138,7 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
conn_handler_(conn_handler),
ticket_keys_(ticket_keys),
connect_blocker_(
std::make_unique<ConnectBlocker>(randgen_, loop_, []() {}, []() {})),
std::make_unique<ConnectBlocker>(randgen_, loop_, nullptr, nullptr)),
graceful_shutdown_(false) {
ev_async_init(&w_, eventcb);
w_.data = this;
......@@ -158,6 +162,39 @@ Worker::Worker(struct ev_loop *loop, SSL_CTX *sv_ssl_ctx, SSL_CTX *cl_ssl_ctx,
replace_downstream_config(std::move(downstreamconf));
}
namespace {
void ensure_enqueue_addr(PriorityQueue<DownstreamAddrKey, WeightGroup *,
DownstreamAddrKeyLess> &wgpq,
WeightGroup *wg, DownstreamAddr *addr) {
uint32_t cycle;
if (!wg->pq.empty()) {
auto key = wg->pq.key_top();
cycle = key.first;
} else {
cycle = 0;
}
addr->cycle = cycle;
addr->pending_penalty = 0;
wg->pq.emplace(DownstreamAddrKey{addr->cycle, addr->seq}, addr);
addr->queued = true;
if (!wg->queued) {
if (!wgpq.empty()) {
auto key = wgpq.key_top();
cycle = key.first;
} else {
cycle = 0;
}
wg->cycle = cycle;
wg->pending_penalty = 0;
wgpq.emplace(DownstreamAddrKey{wg->cycle, wg->seq}, wg);
wg->queued = true;
}
}
} // namespace
void Worker::replace_downstream_config(
std::shared_ptr<DownstreamConfig> downstreamconf) {
for (auto &g : downstream_addr_groups_) {
......@@ -222,9 +259,6 @@ void Worker::replace_downstream_config(
shared_addr->timeout.read = src.timeout.read;
shared_addr->timeout.write = src.timeout.write;
size_t num_http1 = 0;
size_t num_http2 = 0;
for (size_t j = 0; j < src.addrs.size(); ++j) {
auto &src_addr = src.addrs[j];
auto &dst_addr = shared_addr->addrs[j];
......@@ -235,6 +269,9 @@ void Worker::replace_downstream_config(
make_string_ref(shared_addr->balloc, src_addr.hostport);
dst_addr.port = src_addr.port;
dst_addr.host_unix = src_addr.host_unix;
dst_addr.weight = src_addr.weight;
dst_addr.group = make_string_ref(shared_addr->balloc, src_addr.group);
dst_addr.group_weight = src_addr.group_weight;
dst_addr.proto = src_addr.proto;
dst_addr.tls = src_addr.tls;
dst_addr.sni = make_string_ref(shared_addr->balloc, src_addr.sni);
......@@ -246,41 +283,17 @@ void Worker::replace_downstream_config(
auto shared_addr_ptr = shared_addr.get();
dst_addr.connect_blocker = std::make_unique<ConnectBlocker>(
randgen_, loop_,
[shared_addr_ptr, &dst_addr]() {
switch (dst_addr.proto) {
case Proto::HTTP1:
--shared_addr_ptr->http1_pri.weight;
break;
case Proto::HTTP2:
--shared_addr_ptr->http2_pri.weight;
break;
default:
assert(0);
}
},
[shared_addr_ptr, &dst_addr]() {
switch (dst_addr.proto) {
case Proto::HTTP1:
++shared_addr_ptr->http1_pri.weight;
break;
case Proto::HTTP2:
++shared_addr_ptr->http2_pri.weight;
break;
default:
assert(0);
randgen_, loop_, nullptr, [shared_addr_ptr, &dst_addr]() {
if (!dst_addr.queued) {
if (!dst_addr.wg) {
return;
}
ensure_enqueue_addr(shared_addr_ptr->pq, dst_addr.wg, &dst_addr);
}
});
dst_addr.live_check = std::make_unique<LiveCheck>(
loop_, cl_ssl_ctx_, this, &dst_addr, randgen_);
if (dst_addr.proto == Proto::HTTP2) {
++num_http2;
} else {
assert(dst_addr.proto == Proto::HTTP1);
++num_http1;
}
}
// share the connection if patterns have the same set of backend
......@@ -290,19 +303,47 @@ void Worker::replace_downstream_config(
auto it = addr_groups_indexer.find(dkey);
if (it == std::end(addr_groups_indexer)) {
if (LOG_ENABLED(INFO)) {
LOG(INFO) << "number of http/1.1 backend: " << num_http1
<< ", number of h2 backend: " << num_http2;
}
shared_addr->http1_pri.weight = num_http1;
shared_addr->http2_pri.weight = num_http2;
std::shuffle(std::begin(shared_addr->addrs), std::end(shared_addr->addrs),
randgen_);
size_t seq = 0;
for (auto &addr : shared_addr->addrs) {
addr.dconn_pool = std::make_unique<DownstreamConnectionPool>();
addr.seq = seq++;
}
if (shared_addr->affinity.type == SessionAffinity::NONE) {
std::map<StringRef, WeightGroup *> wgs;
size_t num_wgs = 0;
for (auto &addr : shared_addr->addrs) {
if (wgs.find(addr.group) == std::end(wgs)) {
++num_wgs;
wgs.emplace(addr.group, nullptr);
}
}
shared_addr->wgs = std::vector<WeightGroup>(num_wgs);
for (auto &addr : shared_addr->addrs) {
auto &wg = wgs[addr.group];
if (wg == nullptr) {
wg = &shared_addr->wgs[--num_wgs];
wg->seq = num_wgs;
}
wg->weight = addr.group_weight;
wg->pq.emplace(DownstreamAddrKey{0, addr.seq}, &addr);
addr.queued = true;
addr.wg = wg;
}
assert(num_wgs == 0);
for (auto &kv : wgs) {
shared_addr->pq.emplace(DownstreamAddrKey{0, kv.second->seq},
kv.second);
kv.second->queued = true;
}
}
dst->shared_addr = shared_addr;
......
......@@ -50,6 +50,7 @@
#include "shrpx_connect_blocker.h"
#include "shrpx_dns_tracker.h"
#include "allocator.h"
#include "priority_queue.h"
using namespace nghttp2;
......@@ -73,6 +74,8 @@ namespace tls {
class CertLookupTree;
} // namespace tls
struct WeightGroup;
struct DownstreamAddr {
Address addr;
// backend address. If |host_unix| is true, this is UNIX domain
......@@ -96,21 +99,33 @@ struct DownstreamAddr {
size_t rise;
// Client side TLS session cache
tls::TLSSessionCache tls_session_cache;
// Http2Session object created for this address. This list chains
// all Http2Session objects that is not in group scope
// http2_avail_freelist, and is not reached in maximum concurrency.
//
// If session affinity is enabled, http2_avail_freelist is not used,
// and this list is solely used.
// List of Http2Session which is not fully utilized (i.e., the
// server advertised maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
DList<Http2Session> http2_extra_freelist;
// true if Http2Session for this address is in group scope
// SharedDownstreamAddr.http2_avail_freelist
bool in_avail;
WeightGroup *wg;
// total number of streams created in HTTP/2 connections for this
// address.
size_t num_dconn;
// the sequence number of this address to randomize the order access
// threads.
size_t seq;
// Application protocol used in this backend
Proto proto;
// cycle is used to prioritize this address. Lower value takes
// higher priority.
uint32_t cycle;
// penalty which is applied to the next cycle calculation.
uint32_t pending_penalty;
// Weight of this address inside a weight group. Its range is [1,
// 256], inclusive.
uint32_t weight;
// name of group which this address belongs to.
StringRef group;
// Weight of the weight group which this address belongs to. Its
// range is [1, 256], inclusive.
uint32_t group_weight;
// true if TLS is used in this backend
bool tls;
// true if dynamic DNS is enabled
......@@ -119,28 +134,39 @@ struct DownstreamAddr {
// variant (e.g., "https") when forwarding request to a backend
// connected by TLS connection.
bool upgrade_scheme;
// true if this address is queued.
bool queued;
};
// Simplified weighted fair queuing. Actually we don't use queue here
// since we have just 2 items. This is the same algorithm used in
// stream priority, but ignores remainder.
struct WeightedPri {
// current cycle of this item. The lesser cycle has higher
// priority. This is unsigned 32 bit integer, so it may overflow.
// But with the same theory described in stream priority, it is no
// problem.
uint32_t cycle;
// weight, larger weight means more frequent use.
constexpr uint32_t MAX_DOWNSTREAM_ADDR_WEIGHT = 256;
using DownstreamAddrKey = std::pair<uint32_t, size_t>;
struct DownstreamAddrKeyLess {
bool operator()(const DownstreamAddrKey &lhs,
const DownstreamAddrKey &rhs) const {
auto d = rhs.first - lhs.first;
if (d == 0) {
return lhs.second < rhs.second;
}
return d <= MAX_DOWNSTREAM_ADDR_WEIGHT;
}
};
struct WeightGroup {
PriorityQueue<DownstreamAddrKey, DownstreamAddr *, DownstreamAddrKeyLess> pq;
size_t seq;
uint32_t weight;
uint32_t cycle;
uint32_t pending_penalty;
// true if this object is queued.
bool queued;
};
struct SharedDownstreamAddr {
SharedDownstreamAddr()
: balloc(1024, 1024),
affinity{SessionAffinity::NONE},
next{0},
http1_pri{},
http2_pri{},
redirect_if_not_tls{false} {}
SharedDownstreamAddr(const SharedDownstreamAddr &) = delete;
......@@ -150,31 +176,13 @@ struct SharedDownstreamAddr {
BlockAllocator balloc;
std::vector<DownstreamAddr> addrs;
std::vector<WeightGroup> wgs;
PriorityQueue<DownstreamAddrKey, WeightGroup *, DownstreamAddrKeyLess> pq;
// Bunch of session affinity hash. Only used if affinity ==
// SessionAffinity::IP.
std::vector<AffinityHash> affinity_hash;
// List of Http2Session which is not fully utilized (i.e., the
// server advertised maximum concurrency is not reached). We will
// coalesce as much stream as possible in one Http2Session to fully
// utilize TCP connection.
//
// If session affinity is enabled, this list is not used. Per
// address http2_extra_freelist is used instead.
//
// TODO Verify that this approach performs better in performance
// wise.
DList<Http2Session> http2_avail_freelist;
// Configuration for session affinity
AffinityConfig affinity;
// Next http/1.1 downstream address index in addrs.
size_t next;
// http1_pri and http2_pri are used to which protocols are used
// between HTTP/1.1 or HTTP/2 if they both are available in
// backends. They are choosed proportional to the number available
// backend. Usually, if http1_pri.cycle < http2_pri.cycle, choose
// HTTP/1.1. Otherwise, choose HTTP/2.
WeightedPri http1_pri;
WeightedPri http2_pri;
// Session affinity
// true if this group requires that client connection must be TLS,
// and the request must be redirected to https URI.
......
......@@ -530,4 +530,19 @@ inline int run_app(std::function<int(int, char **)> app, int argc,
} // namespace nghttp2
namespace std {
template <> struct hash<nghttp2::StringRef> {
std::size_t operator()(const nghttp2::StringRef &s) const noexcept {
// 32 bit FNV-1a:
// https://tools.ietf.org/html/draft-eastlake-fnv-16#section-6.1.1
uint32_t h = 2166136261u;
for (auto c : s) {
h ^= static_cast<uint8_t>(c);
h += (h << 1) + (h << 4) + (h << 7) + (h << 8) + (h << 24);
}
return h;
}
};
} // namespace std
#endif // TEMPLATE_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment