Commit d46e50b1 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Refactor DownstreamQueue to avoid expensive std::map

parent 0f87cedc
......@@ -33,6 +33,7 @@
#include "shrpx_config.h"
#include "shrpx_error.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_downstream_queue.h"
#include "util.h"
#include "http2.h"
......@@ -106,24 +107,27 @@ void downstream_wtimeoutcb(struct ev_loop *loop, ev_timer *w, int revents) {
// upstream could be nullptr for unittests
Downstream::Downstream(Upstream *upstream, int32_t stream_id, int32_t priority)
: request_start_time_(std::chrono::high_resolution_clock::now()),
: dlnext(nullptr), dlprev(nullptr),
request_start_time_(std::chrono::high_resolution_clock::now()),
request_buf_(upstream ? upstream->get_mcpool() : nullptr),
response_buf_(upstream ? upstream->get_mcpool() : nullptr),
request_bodylen_(0), response_bodylen_(0), response_sent_bodylen_(0),
request_content_length_(-1), response_content_length_(-1),
upstream_(upstream), request_headers_sum_(0), response_headers_sum_(0),
request_datalen_(0), response_datalen_(0), num_retry_(0),
stream_id_(stream_id), priority_(priority), downstream_stream_id_(-1),
upstream_(upstream), blocked_link_(nullptr), request_headers_sum_(0),
response_headers_sum_(0), request_datalen_(0), response_datalen_(0),
num_retry_(0), stream_id_(stream_id), priority_(priority),
downstream_stream_id_(-1),
response_rst_stream_error_code_(NGHTTP2_NO_ERROR),
request_state_(INITIAL), request_major_(1), request_minor_(1),
response_state_(INITIAL), response_http_status_(0), response_major_(1),
response_minor_(1), upgrade_request_(false), upgraded_(false),
http2_upgrade_seen_(false), chunked_request_(false),
request_connection_close_(false), request_header_key_prev_(false),
request_trailer_key_prev_(false), request_http2_expect_body_(false),
chunked_response_(false), response_connection_close_(false),
response_header_key_prev_(false), response_trailer_key_prev_(false),
expect_final_response_(false), request_pending_(false) {
response_minor_(1), dispatch_state_(DISPATCH_NONE),
upgrade_request_(false), upgraded_(false), http2_upgrade_seen_(false),
chunked_request_(false), request_connection_close_(false),
request_header_key_prev_(false), request_trailer_key_prev_(false),
request_http2_expect_body_(false), chunked_response_(false),
response_connection_close_(false), response_header_key_prev_(false),
response_trailer_key_prev_(false), expect_final_response_(false),
request_pending_(false) {
ev_timer_init(&upstream_rtimer_, &upstream_rtimeoutcb, 0.,
get_config()->stream_read_timeout);
......@@ -148,6 +152,10 @@ Downstream::~Downstream() {
DLOG(INFO, this) << "Deleting";
}
if (blocked_link_) {
detach_blocked_link(blocked_link_);
}
// check nullptr for unittest
if (upstream_) {
auto loop = upstream_->get_client_handler()->get_loop();
......@@ -1176,4 +1184,23 @@ bool Downstream::request_submission_ready() const {
request_pending_ && response_state_ == Downstream::INITIAL;
}
int Downstream::get_dispatch_state() const { return dispatch_state_; }
void Downstream::set_dispatch_state(int s) { dispatch_state_ = s; }
void Downstream::attach_blocked_link(BlockedLink *l) {
assert(!blocked_link_);
l->downstream = this;
blocked_link_ = l;
}
void Downstream::detach_blocked_link(BlockedLink *l) {
assert(blocked_link_);
assert(l->downstream == this);
l->downstream = nullptr;
blocked_link_ = nullptr;
}
} // namespace shrpx
......@@ -48,6 +48,7 @@ namespace shrpx {
class Upstream;
class DownstreamConnection;
struct BlockedLink;
class Downstream {
public:
......@@ -319,11 +320,27 @@ public:
// true if retry attempt should not be done.
bool no_more_retry() const;
int get_dispatch_state() const;
void set_dispatch_state(int s);
void attach_blocked_link(BlockedLink *l);
void detach_blocked_link(BlockedLink *l);
enum {
EVENT_ERROR = 0x1,
EVENT_TIMEOUT = 0x2,
};
enum {
DISPATCH_NONE,
DISPATCH_PENDING,
DISPATCH_BLOCKED,
DISPATCH_ACTIVE,
DISPATCH_FAILURE,
};
Downstream *dlnext, *dlprev;
private:
Headers request_headers_;
Headers response_headers_;
......@@ -370,6 +387,9 @@ private:
Upstream *upstream_;
std::unique_ptr<DownstreamConnection> dconn_;
// only used by HTTP/2 or SPDY upstream
BlockedLink *blocked_link_;
size_t request_headers_sum_;
size_t response_headers_sum_;
......@@ -396,6 +416,9 @@ private:
int response_major_;
int response_minor_;
// only used by HTTP/2 or SPDY upstream
int dispatch_state_;
http2::HeaderIndex request_hdidx_;
http2::HeaderIndex response_hdidx_;
......
......@@ -39,16 +39,21 @@ DownstreamQueue::DownstreamQueue(size_t conn_max_per_host, bool unified_host)
: conn_max_per_host),
unified_host_(unified_host) {}
DownstreamQueue::~DownstreamQueue() {}
DownstreamQueue::~DownstreamQueue() {
dlist_delete_all(downstreams_);
for (auto &p : host_entries_) {
auto &ent = p.second;
dlist_delete_all(ent.blocked);
}
}
void DownstreamQueue::add_pending(std::unique_ptr<Downstream> downstream) {
auto stream_id = downstream->get_stream_id();
pending_downstreams_[stream_id] = std::move(downstream);
downstream->set_dispatch_state(Downstream::DISPATCH_PENDING);
downstreams_.append(downstream.release());
}
void DownstreamQueue::add_failure(std::unique_ptr<Downstream> downstream) {
auto stream_id = downstream->get_stream_id();
failure_downstreams_[stream_id] = std::move(downstream);
void DownstreamQueue::mark_failure(Downstream *downstream) {
downstream->set_dispatch_state(Downstream::DISPATCH_FAILURE);
}
DownstreamQueue::HostEntry &
......@@ -76,19 +81,21 @@ DownstreamQueue::make_host_key(Downstream *downstream) const {
return make_host_key(downstream->get_request_http2_authority());
}
void DownstreamQueue::add_active(std::unique_ptr<Downstream> downstream) {
auto &ent = find_host_entry(make_host_key(downstream.get()));
void DownstreamQueue::mark_active(Downstream *downstream) {
auto &ent = find_host_entry(make_host_key(downstream));
++ent.num_active;
auto stream_id = downstream->get_stream_id();
active_downstreams_[stream_id] = std::move(downstream);
downstream->set_dispatch_state(Downstream::DISPATCH_ACTIVE);
}
void DownstreamQueue::add_blocked(std::unique_ptr<Downstream> downstream) {
auto &ent = find_host_entry(make_host_key(downstream.get()));
auto stream_id = downstream->get_stream_id();
ent.blocked.insert(stream_id);
blocked_downstreams_[stream_id] = std::move(downstream);
void DownstreamQueue::mark_blocked(Downstream *downstream) {
auto &ent = find_host_entry(make_host_key(downstream));
downstream->set_dispatch_state(Downstream::DISPATCH_BLOCKED);
auto link = new BlockedLink{};
downstream->attach_blocked_link(link);
ent.blocked.append(link);
}
bool DownstreamQueue::can_activate(const std::string &host) const {
......@@ -100,16 +107,6 @@ bool DownstreamQueue::can_activate(const std::string &host) const {
return ent.num_active < conn_max_per_host_;
}
namespace {
std::unique_ptr<Downstream>
pop_downstream(DownstreamQueue::DownstreamMap::iterator i,
DownstreamQueue::DownstreamMap &downstreams) {
auto downstream = std::move((*i).second);
downstreams.erase(i);
return downstream;
}
} // namespace
namespace {
bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent,
DownstreamQueue::HostEntryMap &host_entries,
......@@ -122,21 +119,19 @@ bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent,
}
} // namespace
std::unique_ptr<Downstream> DownstreamQueue::pop_pending(int32_t stream_id) {
auto itr = pending_downstreams_.find(stream_id);
if (itr == std::end(pending_downstreams_)) {
Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream) {
// Delete downstream when this function returns.
auto delptr = std::unique_ptr<Downstream>(downstream);
if (downstream->get_dispatch_state() != Downstream::DISPATCH_ACTIVE) {
assert(downstream->get_dispatch_state() != Downstream::DISPATCH_NONE);
downstreams_.remove(downstream);
return nullptr;
}
return pop_downstream(itr, pending_downstreams_);
}
std::unique_ptr<Downstream>
DownstreamQueue::remove_and_pop_blocked(int32_t stream_id) {
auto kv = active_downstreams_.find(stream_id);
downstreams_.remove(downstream);
if (kv != std::end(active_downstreams_)) {
auto downstream = pop_downstream(kv, active_downstreams_);
auto &host = make_host_key(downstream.get());
auto &host = make_host_key(downstream);
auto &ent = find_host_entry(host);
--ent.num_active;
......@@ -144,84 +139,29 @@ DownstreamQueue::remove_and_pop_blocked(int32_t stream_id) {
return nullptr;
}
if (ent.blocked.empty() || ent.num_active >= conn_max_per_host_) {
if (ent.num_active >= conn_max_per_host_) {
return nullptr;
}
auto next_stream_id = *std::begin(ent.blocked);
ent.blocked.erase(std::begin(ent.blocked));
auto itr = blocked_downstreams_.find(next_stream_id);
assert(itr != std::end(blocked_downstreams_));
auto next_downstream = pop_downstream(itr, blocked_downstreams_);
remove_host_entry_if_empty(ent, host_entries_, host);
return next_downstream;
for (auto link = ent.blocked.head; link;) {
auto next = link->dlnext;
if (!link->downstream) {
ent.blocked.remove(link);
link = next;
continue;
}
kv = blocked_downstreams_.find(stream_id);
if (kv != std::end(blocked_downstreams_)) {
auto downstream = pop_downstream(kv, blocked_downstreams_);
auto &host = make_host_key(downstream.get());
auto &ent = find_host_entry(host);
ent.blocked.erase(stream_id);
auto next_downstream = link->downstream;
next_downstream->detach_blocked_link(link);
ent.blocked.remove(link);
delete link;
remove_host_entry_if_empty(ent, host_entries_, host);
return nullptr;
}
kv = pending_downstreams_.find(stream_id);
if (kv != std::end(pending_downstreams_)) {
pop_downstream(kv, pending_downstreams_);
return nullptr;
}
kv = failure_downstreams_.find(stream_id);
if (kv != std::end(failure_downstreams_)) {
pop_downstream(kv, failure_downstreams_);
return nullptr;
}
return nullptr;
}
Downstream *DownstreamQueue::find(int32_t stream_id) {
auto kv = active_downstreams_.find(stream_id);
if (kv != std::end(active_downstreams_)) {
return (*kv).second.get();
}
kv = blocked_downstreams_.find(stream_id);
if (kv != std::end(blocked_downstreams_)) {
return (*kv).second.get();
}
kv = pending_downstreams_.find(stream_id);
if (kv != std::end(pending_downstreams_)) {
return (*kv).second.get();
}
kv = failure_downstreams_.find(stream_id);
if (kv != std::end(failure_downstreams_)) {
return (*kv).second.get();
return next_downstream;
}
return nullptr;
}
const DownstreamQueue::DownstreamMap &
DownstreamQueue::get_active_downstreams() const {
return active_downstreams_;
Downstream *DownstreamQueue::get_downstreams() const {
return downstreams_.head;
}
} // namespace shrpx
......@@ -33,17 +33,27 @@
#include <set>
#include <memory>
#include "template.h"
using namespace nghttp2;
namespace shrpx {
class Downstream;
// Link entry in HostEntry.blocked and downstream because downstream
// could be deleted in anytime and we'd like to find Downstream in
// O(1). Downstream has field to link back to this object.
struct BlockedLink {
Downstream *downstream;
BlockedLink *dlnext, *dlprev;
};
class DownstreamQueue {
public:
typedef std::map<int32_t, std::unique_ptr<Downstream>> DownstreamMap;
struct HostEntry {
// Set of stream ID that blocked by conn_max_per_host_.
std::set<int32_t> blocked;
DList<BlockedLink> blocked;
// The number of connections currently made to this host.
size_t num_active;
HostEntry();
......@@ -54,52 +64,38 @@ public:
// conn_max_per_host == 0 means no limit for downstream connection.
DownstreamQueue(size_t conn_max_per_host = 0, bool unified_host = true);
~DownstreamQueue();
// Add |downstream| to this queue. This is entry point for
// Downstream object.
void add_pending(std::unique_ptr<Downstream> downstream);
void add_failure(std::unique_ptr<Downstream> downstream);
// Adds |downstream| to active_downstreams_, which means that
// downstream connection has been started.
void add_active(std::unique_ptr<Downstream> downstream);
// Adds |downstream| to blocked_downstreams_, which means that
// download connection was blocked because conn_max_per_host_ limit.
void add_blocked(std::unique_ptr<Downstream> downstream);
// Set |downstream| to failure state, which means that downstream
// failed to connect to backend.
void mark_failure(Downstream *downstream);
// Set |downstream| to active state, which means that downstream
// connection has started.
void mark_active(Downstream *downstream);
// Set |downstream| to blocked state, which means that download
// connection was blocked because conn_max_per_host_ limit.
void mark_blocked(Downstream *downstream);
// Returns true if we can make downstream connection to given
// |host|.
bool can_activate(const std::string &host) const;
// Removes pending Downstream object whose stream ID is |stream_id|
// from pending_downstreams_ and returns it.
std::unique_ptr<Downstream> pop_pending(int32_t stream_id);
// Removes Downstream object whose stream ID is |stream_id| from
// either pending_downstreams_, active_downstreams_,
// blocked_downstreams_ or failure_downstreams_. If a Downstream
// object is removed from active_downstreams_, this function may
// return Downstream object with the same target host in
// blocked_downstreams_ if its connection is now not blocked by
// conn_max_per_host_ limit.
std::unique_ptr<Downstream> remove_and_pop_blocked(int32_t stream_id);
// Finds Downstream object denoted by |stream_id| either in
// pending_downstreams_, active_downstreams_, blocked_downstreams_
// or failure_downstreams_.
Downstream *find(int32_t stream_id);
const DownstreamMap &get_active_downstreams() const;
// Removes and frees |downstream| object. If |downstream| is in
// Downstream::DISPATCH_ACTIVE, this function may return Downstream
// object with the same target host in Downstream::DISPATCH_BLOCKED
// if its connection is now not blocked by conn_max_per_host_ limit.
Downstream *remove_and_get_blocked(Downstream *downstream);
Downstream *get_downstreams() const;
HostEntry &find_host_entry(const std::string &host);
const std::string &make_host_key(const std::string &host) const;
const std::string &make_host_key(Downstream *downstream) const;
// Maximum number of concurrent connections to the same host.
size_t conn_max_per_host_;
private:
// Per target host structure to keep track of the number of
// connections to the same host.
std::map<std::string, HostEntry> host_entries_;
// Downstream objects, not processed yet
DownstreamMap pending_downstreams_;
// Downstream objects, failed to connect to downstream server
DownstreamMap failure_downstreams_;
// Downstream objects, downstream connection started
DownstreamMap active_downstreams_;
// Downstream objects, blocked by conn_max_per_host_
DownstreamMap blocked_downstreams_;
DList<Downstream> downstreams_;
// Maximum number of concurrent connections to the same host.
size_t conn_max_per_host_;
// true if downstream host is treated as the same. Used for reverse
// proxying.
bool unified_host_;
......
......@@ -55,7 +55,8 @@ int on_stream_close_callback(nghttp2_session *session, int32_t stream_id,
<< " is being closed";
}
auto downstream = upstream->find_downstream(stream_id);
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, stream_id));
if (!downstream) {
return 0;
......@@ -161,7 +162,9 @@ int Http2Upstream::upgrade_upstream(HttpsUpstream *http) {
downstream->set_request_http2_authority(authority);
downstream->set_request_http2_scheme(scheme);
downstream_queue_.add_active(std::move(downstream));
auto ptr = downstream.get();
downstream_queue_.add_pending(std::move(downstream));
downstream_queue_.mark_active(ptr);
if (LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Connection upgraded to HTTP/2";
......@@ -191,7 +194,8 @@ int on_header_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0;
}
auto upstream = static_cast<Http2Upstream *>(user_data);
auto downstream = upstream->find_downstream(frame->hd.stream_id);
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!downstream) {
return 0;
}
......@@ -254,6 +258,8 @@ int on_begin_headers_callback(nghttp2_session *session,
// TODO Use priority 0 for now
auto downstream = make_unique<Downstream>(upstream, frame->hd.stream_id, 0);
nghttp2_session_set_stream_user_data(session, frame->hd.stream_id,
downstream.get());
downstream->reset_upstream_rtimer();
......@@ -268,9 +274,8 @@ int on_begin_headers_callback(nghttp2_session *session,
}
} // namespace
namespace {
int on_request_headers(Http2Upstream *upstream, Downstream *downstream,
nghttp2_session *session, const nghttp2_frame *frame) {
int Http2Upstream::on_request_headers(Downstream *downstream,
const nghttp2_frame *frame) {
if (downstream->get_response_state() == Downstream::MSG_COMPLETE) {
return 0;
}
......@@ -282,7 +287,7 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream,
for (auto &nv : nva) {
ss << TTY_HTTP_HD << nv.name << TTY_RST << ": " << nv.value << "\n";
}
ULOG(INFO, upstream) << "HTTP request headers. stream_id="
ULOG(INFO, this) << "HTTP request headers. stream_id="
<< downstream->get_stream_id() << "\n" << ss.str();
}
......@@ -299,7 +304,7 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream,
// For HTTP/2 proxy, we request :authority.
if (method->value != "CONNECT" && get_config()->http2_proxy && !authority) {
upstream->rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
rst_stream(downstream, NGHTTP2_PROTOCOL_ERROR);
return 0;
}
......@@ -321,57 +326,51 @@ int on_request_headers(Http2Upstream *upstream, Downstream *downstream,
downstream->set_request_state(Downstream::MSG_COMPLETE);
}
upstream->start_downstream(downstream);
start_downstream(downstream);
return 0;
}
} // namespace
void Http2Upstream::start_downstream(Downstream *downstream) {
auto next_downstream =
downstream_queue_.pop_pending(downstream->get_stream_id());
assert(next_downstream);
if (downstream_queue_.can_activate(
downstream->get_request_http2_authority())) {
initiate_downstream(std::move(next_downstream));
initiate_downstream(downstream);
return;
}
downstream_queue_.add_blocked(std::move(next_downstream));
downstream_queue_.mark_blocked(downstream);
}
void
Http2Upstream::initiate_downstream(std::unique_ptr<Downstream> downstream) {
void Http2Upstream::initiate_downstream(Downstream *downstream) {
int rv;
rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection());
if (rv != 0) {
// downstream connection fails, send error page
if (error_reply(downstream.get(), 503) != 0) {
rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR);
if (error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
downstream->set_request_state(Downstream::CONNECT_FAIL);
downstream_queue_.add_failure(std::move(downstream));
downstream_queue_.mark_failure(downstream);
return;
}
rv = downstream->push_request_headers();
if (rv != 0) {
if (error_reply(downstream.get(), 503) != 0) {
rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR);
if (error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
downstream_queue_.add_failure(std::move(downstream));
downstream_queue_.mark_failure(downstream);
return;
}
downstream_queue_.add_active(std::move(downstream));
downstream_queue_.mark_active(downstream);
return;
}
......@@ -386,7 +385,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
switch (frame->hd.type) {
case NGHTTP2_DATA: {
auto downstream = upstream->find_downstream(frame->hd.stream_id);
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!downstream) {
return 0;
}
......@@ -401,7 +401,8 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
return 0;
}
case NGHTTP2_HEADERS: {
auto downstream = upstream->find_downstream(frame->hd.stream_id);
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (!downstream) {
return 0;
}
......@@ -409,7 +410,7 @@ int on_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame,
if (frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
downstream->reset_upstream_rtimer();
return on_request_headers(upstream, downstream, session, frame);
return upstream->on_request_headers(downstream, frame);
}
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
......@@ -449,7 +450,8 @@ int on_data_chunk_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data) {
auto upstream = static_cast<Http2Upstream *>(user_data);
auto downstream = upstream->find_downstream(stream_id);
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, stream_id));
if (!downstream || !downstream->get_downstream_connection()) {
if (upstream->consume(stream_id, len) != 0) {
......@@ -566,7 +568,8 @@ int on_frame_not_send_callback(nghttp2_session *session,
lib_error_code != NGHTTP2_ERR_STREAM_CLOSED &&
lib_error_code != NGHTTP2_ERR_STREAM_CLOSING) {
// To avoid stream hanging around, issue RST_STREAM.
auto downstream = upstream->find_downstream(frame->hd.stream_id);
auto downstream = static_cast<Downstream *>(
nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if (downstream) {
upstream->rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
}
......@@ -1188,18 +1191,16 @@ void Http2Upstream::remove_downstream(Downstream *downstream) {
handler_->write_accesslog(downstream);
}
auto next_downstream =
downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id());
nghttp2_session_set_stream_user_data(session_, downstream->get_stream_id(),
nullptr);
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
if (next_downstream) {
initiate_downstream(std::move(next_downstream));
initiate_downstream(next_downstream);
}
}
Downstream *Http2Upstream::find_downstream(int32_t stream_id) {
return downstream_queue_.find(stream_id);
}
// WARNING: Never call directly or indirectly nghttp2_session_send or
// nghttp2_session_recv. These calls may delete downstream.
int Http2Upstream::on_downstream_header_complete(Downstream *downstream) {
......@@ -1447,9 +1448,10 @@ int Http2Upstream::on_timeout(Downstream *downstream) {
}
void Http2Upstream::on_handler_delete() {
for (auto &ent : downstream_queue_.get_active_downstreams()) {
if (ent.second->accesslog_ready()) {
handler_->write_accesslog(ent.second.get());
for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
if (d->get_dispatch_state() == Downstream::DISPATCH_ACTIVE &&
d->accesslog_ready()) {
handler_->write_accesslog(d);
}
}
}
......@@ -1457,8 +1459,12 @@ void Http2Upstream::on_handler_delete() {
int Http2Upstream::on_downstream_reset(bool no_retry) {
int rv;
for (auto &ent : downstream_queue_.get_active_downstreams()) {
auto downstream = ent.second.get();
for (auto downstream = downstream_queue_.get_downstreams(); downstream;
downstream = downstream->dlnext) {
if (downstream->get_dispatch_state() != Downstream::DISPATCH_ACTIVE) {
continue;
}
if (!downstream->request_submission_ready()) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
downstream->pop_downstream_connection();
......
......@@ -62,7 +62,6 @@ public:
void add_pending_downstream(std::unique_ptr<Downstream> downstream);
void remove_downstream(Downstream *downstream);
Downstream *find_downstream(int32_t stream_id);
int rst_stream(Downstream *downstream, uint32_t error_code);
int terminate_session(uint32_t error_code);
......@@ -93,7 +92,7 @@ public:
void log_response_headers(Downstream *downstream,
const std::vector<nghttp2_nv> &nva) const;
void start_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream);
void initiate_downstream(Downstream *downstream);
void submit_goaway();
void check_shutdown();
......@@ -101,6 +100,8 @@ public:
int prepare_push_promise(Downstream *downstream);
int submit_push_promise(const std::string &path, Downstream *downstream);
int on_request_headers(Downstream *downstream, const nghttp2_frame *frame);
private:
// must be put before downstream_queue_
std::unique_ptr<HttpsUpstream> pre_upstream_;
......
......@@ -92,7 +92,8 @@ void on_stream_close_callback(spdylay_session *session, int32_t stream_id,
ULOG(INFO, upstream) << "Stream stream_id=" << stream_id
<< " is being closed";
}
auto downstream = upstream->find_downstream(stream_id);
auto downstream = static_cast<Downstream *>(
spdylay_session_get_stream_user_data(session, stream_id));
if (!downstream) {
return;
}
......@@ -223,41 +224,37 @@ void on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type,
} // namespace
void SpdyUpstream::start_downstream(Downstream *downstream) {
auto next_downstream =
downstream_queue_.pop_pending(downstream->get_stream_id());
assert(next_downstream);
if (downstream_queue_.can_activate(
downstream->get_request_http2_authority())) {
initiate_downstream(std::move(next_downstream));
initiate_downstream(downstream);
return;
}
downstream_queue_.add_blocked(std::move(next_downstream));
downstream_queue_.mark_blocked(downstream);
}
void SpdyUpstream::initiate_downstream(std::unique_ptr<Downstream> downstream) {
void SpdyUpstream::initiate_downstream(Downstream *downstream) {
int rv = downstream->attach_downstream_connection(
handler_->get_downstream_connection());
if (rv != 0) {
// If downstream connection fails, issue RST_STREAM.
rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR);
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
downstream_queue_.add_failure(std::move(downstream));
downstream_queue_.mark_failure(downstream);
return;
}
rv = downstream->push_request_headers();
if (rv != 0) {
rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR);
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream_queue_.add_failure(std::move(downstream));
downstream_queue_.mark_failure(downstream);
return;
}
downstream_queue_.add_active(std::move(downstream));
downstream_queue_.mark_active(downstream);
}
namespace {
......@@ -265,7 +262,8 @@ void on_data_chunk_recv_callback(spdylay_session *session, uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t len, void *user_data) {
auto upstream = static_cast<SpdyUpstream *>(user_data);
auto downstream = upstream->find_downstream(stream_id);
auto downstream = static_cast<Downstream *>(
spdylay_session_get_stream_user_data(session, stream_id));
if (!downstream) {
upstream->consume(stream_id, len);
......@@ -323,7 +321,8 @@ namespace {
void on_data_recv_callback(spdylay_session *session, uint8_t flags,
int32_t stream_id, int32_t length, void *user_data) {
auto upstream = static_cast<SpdyUpstream *>(user_data);
auto downstream = upstream->find_downstream(stream_id);
auto downstream = static_cast<Downstream *>(
spdylay_session_get_stream_user_data(session, stream_id));
if (downstream && (flags & SPDYLAY_DATA_FLAG_FIN)) {
if (!downstream->validate_request_bodylen()) {
upstream->rst_stream(downstream, SPDYLAY_PROTOCOL_ERROR);
......@@ -351,7 +350,9 @@ void on_ctrl_not_send_callback(spdylay_session *session,
error_code != SPDYLAY_ERR_STREAM_CLOSING) {
// To avoid stream hanging around, issue RST_STREAM.
auto stream_id = frame->syn_reply.stream_id;
auto downstream = upstream->find_downstream(stream_id);
// TODO Could be always nullptr
auto downstream = static_cast<Downstream *>(
spdylay_session_get_stream_user_data(session, stream_id));
if (downstream) {
upstream->rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
}
......@@ -797,6 +798,7 @@ int SpdyUpstream::error_reply(Downstream *downstream,
Downstream *SpdyUpstream::add_pending_downstream(int32_t stream_id,
int32_t priority) {
auto downstream = make_unique<Downstream>(this, stream_id, priority);
spdylay_session_set_stream_user_data(session_, stream_id, downstream.get());
auto res = downstream.get();
downstream_queue_.add_pending(std::move(downstream));
......@@ -809,18 +811,16 @@ void SpdyUpstream::remove_downstream(Downstream *downstream) {
handler_->write_accesslog(downstream);
}
auto next_downstream =
downstream_queue_.remove_and_pop_blocked(downstream->get_stream_id());
spdylay_session_set_stream_user_data(session_, downstream->get_stream_id(),
nullptr);
auto next_downstream = downstream_queue_.remove_and_get_blocked(downstream);
if (next_downstream) {
initiate_downstream(std::move(next_downstream));
initiate_downstream(next_downstream);
}
}
Downstream *SpdyUpstream::find_downstream(int32_t stream_id) {
return downstream_queue_.find(stream_id);
}
// WARNING: Never call directly or indirectly spdylay_session_send or
// spdylay_session_recv. These calls may delete downstream.
int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) {
......@@ -1026,9 +1026,10 @@ int SpdyUpstream::on_timeout(Downstream *downstream) {
}
void SpdyUpstream::on_handler_delete() {
for (auto &ent : downstream_queue_.get_active_downstreams()) {
if (ent.second->accesslog_ready()) {
handler_->write_accesslog(ent.second.get());
for (auto d = downstream_queue_.get_downstreams(); d; d = d->dlnext) {
if (d->get_dispatch_state() == Downstream::DISPATCH_ACTIVE &&
d->accesslog_ready()) {
handler_->write_accesslog(d);
}
}
}
......@@ -1036,8 +1037,12 @@ void SpdyUpstream::on_handler_delete() {
int SpdyUpstream::on_downstream_reset(bool no_retry) {
int rv;
for (auto &ent : downstream_queue_.get_active_downstreams()) {
auto downstream = ent.second.get();
for (auto downstream = downstream_queue_.get_downstreams(); downstream;
downstream = downstream->dlnext) {
if (downstream->get_dispatch_state() != Downstream::DISPATCH_ACTIVE) {
continue;
}
if (!downstream->request_submission_ready()) {
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
downstream->pop_downstream_connection();
......
......@@ -58,7 +58,6 @@ public:
virtual int downstream_error(DownstreamConnection *dconn, int events);
Downstream *add_pending_downstream(int32_t stream_id, int32_t priority);
void remove_downstream(Downstream *downstream);
Downstream *find_downstream(int32_t stream_id);
int rst_stream(Downstream *downstream, int status_code);
int error_reply(Downstream *downstream, unsigned int status_code);
......@@ -82,7 +81,7 @@ public:
int consume(int32_t stream_id, size_t len);
void start_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream);
void initiate_downstream(Downstream *downstream);
private:
// must be put before downstream_queue_
......
......@@ -132,9 +132,19 @@ template <typename T> struct DList {
t->dlprev = t->dlnext = nullptr;
}
bool empty() const { return head == nullptr; }
T *head, *tail;
};
template <typename T> void dlist_delete_all(DList<T> &dl) {
for (auto e = dl.head; e;) {
auto next = e->dlnext;
delete e;
e = next;
}
}
} // namespace nghttp2
#endif // TEMPLATE_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment