Commit 273d9f4f authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Use std::unique_ptr for Downstream object

parent 500c5eea
......@@ -38,7 +38,7 @@
namespace shrpx {
Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
Downstream::Downstream(Upstream *upstream, int32_t stream_id, int32_t priority)
: request_bodylen_(0),
response_bodylen_(0),
upstream_(upstream),
......
......@@ -49,7 +49,7 @@ class DownstreamConnection;
class Downstream {
public:
Downstream(Upstream *upstream, int stream_id, int priority);
Downstream(Upstream *upstream, int32_t stream_id, int32_t priority);
~Downstream();
void reset_upstream(Upstream *upstream);
Upstream* get_upstream() const;
......
......@@ -34,40 +34,53 @@ DownstreamQueue::DownstreamQueue()
{}
DownstreamQueue::~DownstreamQueue()
{
for(auto& kv : pending_downstreams_) {
delete kv.second;
}
for(auto& kv : active_downstreams_) {
delete kv.second;
}
for(auto& kv : failure_downstreams_) {
delete kv.second;
}
}
{}
void DownstreamQueue::add_pending(Downstream *downstream)
void DownstreamQueue::add_pending(std::unique_ptr<Downstream> downstream)
{
pending_downstreams_[downstream->get_stream_id()] = downstream;
auto stream_id = downstream->get_stream_id();
pending_downstreams_[stream_id] = std::move(downstream);
}
void DownstreamQueue::add_failure(Downstream *downstream)
void DownstreamQueue::add_failure(std::unique_ptr<Downstream> downstream)
{
failure_downstreams_[downstream->get_stream_id()] = downstream;
auto stream_id = downstream->get_stream_id();
failure_downstreams_[stream_id] = std::move(downstream);
}
void DownstreamQueue::add_active(Downstream *downstream)
void DownstreamQueue::add_active(std::unique_ptr<Downstream> downstream)
{
active_downstreams_[downstream->get_stream_id()] = downstream;
auto stream_id = downstream->get_stream_id();
active_downstreams_[stream_id] = std::move(downstream);
}
void DownstreamQueue::remove(Downstream *downstream)
std::unique_ptr<Downstream> DownstreamQueue::remove(int32_t stream_id)
{
pending_downstreams_.erase(downstream->get_stream_id());
active_downstreams_.erase(downstream->get_stream_id());
failure_downstreams_.erase(downstream->get_stream_id());
auto kv = pending_downstreams_.find(stream_id);
if(kv != std::end(pending_downstreams_)) {
auto downstream = std::move((*kv).second);
pending_downstreams_.erase(kv);
return downstream;
}
kv = active_downstreams_.find(stream_id);
if(kv != std::end(active_downstreams_)) {
auto downstream = std::move((*kv).second);
active_downstreams_.erase(kv);
return downstream;
}
kv = failure_downstreams_.find(stream_id);
if(kv != std::end(failure_downstreams_)) {
auto downstream = std::move((*kv).second);
failure_downstreams_.erase(kv);
return downstream;
}
return nullptr;
}
Downstream* DownstreamQueue::find(int32_t stream_id)
......@@ -75,25 +88,25 @@ Downstream* DownstreamQueue::find(int32_t stream_id)
auto kv = pending_downstreams_.find(stream_id);
if(kv != std::end(pending_downstreams_)) {
return (*kv).second;
return (*kv).second.get();
}
kv = active_downstreams_.find(stream_id);
if(kv != std::end(active_downstreams_)) {
return (*kv).second;
return (*kv).second.get();
}
kv = failure_downstreams_.find(stream_id);
if(kv != std::end(failure_downstreams_)) {
return (*kv).second;
return (*kv).second.get();
}
return nullptr;
}
Downstream* DownstreamQueue::pop_pending()
std::unique_ptr<Downstream> DownstreamQueue::pop_pending()
{
auto i = std::begin(pending_downstreams_);
......@@ -101,7 +114,7 @@ Downstream* DownstreamQueue::pop_pending()
return nullptr;
}
auto downstream = (*i).second;
auto downstream = std::move((*i).second);
pending_downstreams_.erase(i);
......
......@@ -30,6 +30,7 @@
#include <stdint.h>
#include <map>
#include <memory>
namespace shrpx {
......@@ -39,12 +40,13 @@ class DownstreamQueue {
public:
DownstreamQueue();
~DownstreamQueue();
void add_pending(Downstream *downstream);
void add_failure(Downstream *downstream);
void add_active(Downstream *downstream);
void add_pending(std::unique_ptr<Downstream> downstream);
void add_failure(std::unique_ptr<Downstream> downstream);
void add_active(std::unique_ptr<Downstream> downstream);
// Removes |downstream| from either pending_downstreams_,
// active_downstreams_ or failure_downstreams_.
void remove(Downstream *downstream);
// active_downstreams_ or failure_downstreams_ and returns it
// wrapped in std::unique_ptr.
std::unique_ptr<Downstream> remove(int32_t stream_id);
// Finds Downstream object denoted by |stream_id| either in
// pending_downstreams_, active_downstreams_ or
// failure_downstreams_.
......@@ -55,14 +57,14 @@ public:
bool pending_empty() const;
// Pops first Downstream object in pending_downstreams_ and returns
// it.
Downstream* pop_pending();
std::unique_ptr<Downstream> pop_pending();
private:
// Downstream objects, not processed yet
std::map<int32_t, Downstream*> pending_downstreams_;
std::map<int32_t, std::unique_ptr<Downstream>> pending_downstreams_;
// Downstream objects in use, consuming downstream concurrency limit
std::map<int32_t, Downstream*> active_downstreams_;
std::map<int32_t, std::unique_ptr<Downstream>> active_downstreams_;
// Downstream objects, failed to connect to downstream server
std::map<int32_t, Downstream*> failure_downstreams_;
std::map<int32_t, std::unique_ptr<Downstream>> failure_downstreams_;
};
} // namespace shrpx
......
......@@ -73,8 +73,7 @@ int on_stream_close_callback
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
upstream->remove_downstream(downstream);
delete downstream;
// downstream was deleted
return 0;
}
......@@ -94,8 +93,7 @@ int on_stream_close_callback
}
upstream->remove_downstream(downstream);
delete downstream;
// downstream was deleted
return 0;
}
......@@ -105,7 +103,8 @@ int on_stream_close_callback
// If shrpx_downstream::push_request_headers() failed, the
// error is handled here.
upstream->remove_downstream(downstream);
delete downstream;
// downstream was deleted
// How to test this case? Request sufficient large download
// and make client send RST_STREAM after it gets first DATA
// frame chunk.
......@@ -117,9 +116,8 @@ int on_stream_close_callback
int Http2Upstream::upgrade_upstream(HttpsUpstream *http)
{
int rv;
auto downstream = http->get_downstream();
auto http2_settings = downstream->get_http2_settings();
auto http2_settings = http->get_downstream()->get_http2_settings();
util::to_base64(http2_settings);
auto settings_payload = base64::decode(std::begin(http2_settings),
......@@ -136,16 +134,17 @@ int Http2Upstream::upgrade_upstream(HttpsUpstream *http)
return -1;
}
pre_upstream_.reset(http);
http->pop_downstream();
auto downstream = http->pop_downstream();
downstream->reset_upstream(this);
downstream->set_stream_id(1);
downstream_queue_.add_active(downstream);
downstream->init_upstream_timer();
downstream->reset_upstream_rtimer();
downstream->init_response_body_buf();
downstream->set_stream_id(1);
downstream->set_priority(0);
downstream_queue_.add_active(std::move(downstream));
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Connection upgraded to HTTP/2";
}
......@@ -272,11 +271,9 @@ int on_begin_headers_callback(nghttp2_session *session,
}
// TODO Use priority 0 for now
auto downstream = new Downstream(upstream,
frame->hd.stream_id,
0);
auto downstream = util::make_unique<Downstream>
(upstream, frame->hd.stream_id, 0);
upstream->add_pending_downstream(downstream);
downstream->init_upstream_timer();
downstream->reset_upstream_rtimer();
downstream->init_response_body_buf();
......@@ -286,6 +283,8 @@ int on_begin_headers_callback(nghttp2_session *session,
downstream->set_request_major(2);
downstream->set_request_minor(0);
upstream->add_pending_downstream(std::move(downstream));
return 0;
}
} // namespace
......@@ -392,40 +391,41 @@ void Http2Upstream::maintain_downstream_concurrency()
break;
}
initiate_downstream(downstream);
initiate_downstream(std::move(downstream));
}
}
void Http2Upstream::initiate_downstream(Downstream *downstream)
void Http2Upstream::initiate_downstream(std::unique_ptr<Downstream> downstream)
{
int rv;
auto dconn = handler_->get_downstream_connection();
rv = dconn->attach_downstream(downstream);
rv = dconn->attach_downstream(downstream.get());
if(rv != 0) {
downstream_queue_.add_failure(downstream);
// downstream connection fails, send error page
if(error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
if(error_reply(downstream.get(), 503) != 0) {
rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR);
}
downstream->set_request_state(Downstream::CONNECT_FAIL);
downstream_queue_.add_failure(std::move(downstream));
return;
}
rv = downstream->push_request_headers();
if(rv != 0) {
downstream_queue_.add_failure(downstream);
if(error_reply(downstream, 503) != 0) {
rst_stream(downstream, NGHTTP2_INTERNAL_ERROR);
if(error_reply(downstream.get(), 503) != 0) {
rst_stream(downstream.get(), NGHTTP2_INTERNAL_ERROR);
}
downstream_queue_.add_failure(std::move(downstream));
return;
}
downstream_queue_.add_active(downstream);
downstream_queue_.add_active(std::move(downstream));
return;
}
......@@ -837,7 +837,8 @@ void downstream_readcb(bufferevent *bev, void *ptr)
// because there is no consumer now. Downstream connection is also
// closed in this case.
upstream->remove_downstream(downstream);
delete downstream;
// downstream was deleted
return;
}
......@@ -925,7 +926,7 @@ void downstream_eventcb(bufferevent *bev, short events, void *ptr)
// If stream was closed already, we don't need to send reply at
// the first place. We can delete downstream.
upstream->remove_downstream(downstream);
delete downstream;
// downstream was deleted
return;
}
......@@ -981,7 +982,7 @@ void downstream_eventcb(bufferevent *bev, short events, void *ptr)
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
upstream->remove_downstream(downstream);
delete downstream;
// downstream was deleted
return;
}
......@@ -1169,14 +1170,15 @@ bufferevent_event_cb Http2Upstream::get_downstream_eventcb()
return downstream_eventcb;
}
void Http2Upstream::add_pending_downstream(Downstream *downstream)
void Http2Upstream::add_pending_downstream
(std::unique_ptr<Downstream> downstream)
{
downstream_queue_.add_pending(downstream);
downstream_queue_.add_pending(std::move(downstream));
}
void Http2Upstream::remove_downstream(Downstream *downstream)
{
downstream_queue_.remove(downstream);
downstream_queue_.remove(downstream->get_stream_id());
maintain_downstream_concurrency();
}
......
......@@ -54,7 +54,7 @@ public:
virtual bufferevent_data_cb get_downstream_readcb();
virtual bufferevent_data_cb get_downstream_writecb();
virtual bufferevent_event_cb get_downstream_eventcb();
void add_pending_downstream(Downstream *downstream);
void add_pending_downstream(std::unique_ptr<Downstream> downstream);
void remove_downstream(Downstream *downstream);
Downstream* find_downstream(int32_t stream_id);
......@@ -83,7 +83,7 @@ public:
void log_response_headers(Downstream *downstream,
const std::vector<nghttp2_nv>& nva) const;
void maintain_downstream_concurrency();
void initiate_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream);
private:
DownstreamQueue downstream_queue_;
std::unique_ptr<HttpsUpstream> pre_upstream_;
......
......@@ -50,7 +50,6 @@ const size_t OUTBUF_MAX_THRES = 16*1024;
HttpsUpstream::HttpsUpstream(ClientHandler *handler)
: handler_(handler),
current_header_length_(0),
downstream_(nullptr),
ioctrl_(handler->get_bev())
{
http_parser_init(&htp_, HTTP_REQUEST);
......@@ -58,9 +57,7 @@ HttpsUpstream::HttpsUpstream(ClientHandler *handler)
}
HttpsUpstream::~HttpsUpstream()
{
delete downstream_;
}
{}
void HttpsUpstream::reset_current_header_length()
{
......@@ -76,8 +73,7 @@ int htp_msg_begin(http_parser *htp)
}
upstream->reset_current_header_length();
// TODO specify 0 as priority for now
auto downstream = new Downstream(upstream, 0, 0);
upstream->attach_downstream(downstream);
upstream->attach_downstream(util::make_unique<Downstream>(upstream, 0, 0));
return 0;
}
} // namespace
......@@ -723,28 +719,25 @@ bufferevent_event_cb HttpsUpstream::get_downstream_eventcb()
return https_downstream_eventcb;
}
void HttpsUpstream::attach_downstream(Downstream *downstream)
void HttpsUpstream::attach_downstream(std::unique_ptr<Downstream> downstream)
{
assert(!downstream_);
downstream_ = downstream;
downstream_ = std::move(downstream);
}
void HttpsUpstream::delete_downstream()
{
delete downstream_;
downstream_ = 0;
downstream_.reset();
}
Downstream* HttpsUpstream::get_downstream() const
{
return downstream_;
return downstream_.get();
}
Downstream* HttpsUpstream::pop_downstream()
std::unique_ptr<Downstream> HttpsUpstream::pop_downstream()
{
auto downstream = downstream_;
downstream_ = nullptr;
return downstream;
return std::unique_ptr<Downstream>(downstream_.release());
}
int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
......
......@@ -29,6 +29,8 @@
#include <stdint.h>
#include <memory>
#include "http-parser/http_parser.h"
#include "shrpx_upstream.h"
......@@ -50,10 +52,10 @@ public:
virtual bufferevent_data_cb get_downstream_readcb();
virtual bufferevent_data_cb get_downstream_writecb();
virtual bufferevent_event_cb get_downstream_eventcb();
void attach_downstream(Downstream *downstream);
void attach_downstream(std::unique_ptr<Downstream> downstream);
void delete_downstream();
Downstream* get_downstream() const;
Downstream* pop_downstream();
std::unique_ptr<Downstream> pop_downstream();
int error_reply(unsigned int status_code);
virtual void pause_read(IOCtrlReason reason);
......@@ -70,7 +72,7 @@ private:
ClientHandler *handler_;
http_parser htp_;
size_t current_header_length_;
Downstream *downstream_;
std::unique_ptr<Downstream> downstream_;
IOControl ioctrl_;
};
......
......@@ -109,7 +109,8 @@ void on_stream_close_callback
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
upstream->remove_downstream(downstream);
delete downstream;
// downstrea was deleted
return;
}
......@@ -125,7 +126,8 @@ void on_stream_close_callback
}
}
upstream->remove_downstream(downstream);
delete downstream;
// downstrea was deleted
return;
}
......@@ -134,7 +136,8 @@ void on_stream_close_callback
// If shrpx_downstream::push_request_headers() failed, the
// error is handled here.
upstream->remove_downstream(downstream);
delete downstream;
// downstrea was deleted
// How to test this case? Request sufficient large download
// and make client send RST_STREAM after it gets first DATA
// frame chunk.
......@@ -153,10 +156,10 @@ void on_ctrl_recv_callback
ULOG(INFO, upstream) << "Received upstream SYN_STREAM stream_id="
<< frame->syn_stream.stream_id;
}
auto downstream = new Downstream(upstream,
frame->syn_stream.stream_id,
frame->syn_stream.pri);
upstream->add_downstream(downstream);
auto downstream = upstream->add_pending_downstream
(frame->syn_stream.stream_id, frame->syn_stream.pri);
downstream->init_upstream_timer();
downstream->reset_upstream_rtimer();
downstream->init_response_body_buf();
......@@ -256,31 +259,33 @@ void SpdyUpstream::maintain_downstream_concurrency()
break;
}
initiate_downstream(downstream);
initiate_downstream(std::move(downstream));
}
}
void SpdyUpstream::initiate_downstream(Downstream *downstream)
void SpdyUpstream::initiate_downstream(std::unique_ptr<Downstream> downstream)
{
auto dconn = handler_->get_downstream_connection();
int rv = dconn->attach_downstream(downstream);
int rv = dconn->attach_downstream(downstream.get());
if(rv != 0) {
downstream_queue_.add_failure(downstream);
// If downstream connection fails, issue RST_STREAM.
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR);
downstream->set_request_state(Downstream::CONNECT_FAIL);
downstream_queue_.add_failure(std::move(downstream));
return;
}
rv = downstream->push_request_headers();
if(rv != 0) {
downstream_queue_.add_failure(downstream);
rst_stream(downstream.get(), SPDYLAY_INTERNAL_ERROR);
downstream_queue_.add_failure(std::move(downstream));
rst_stream(downstream, SPDYLAY_INTERNAL_ERROR);
return;
}
downstream_queue_.add_active(downstream);
downstream_queue_.add_active(std::move(downstream));
}
namespace {
......@@ -566,7 +571,8 @@ void spdy_downstream_readcb(bufferevent *bev, void *ptr)
// because there is no consumer now. Downstream connection is also
// closed in this case.
upstream->remove_downstream(downstream);
delete downstream;
// downstrea was deleted
return;
}
......@@ -654,7 +660,8 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
// If stream was closed already, we don't need to send reply at
// the first place. We can delete downstream.
upstream->remove_downstream(downstream);
delete downstream;
// downstrea was deleted
return;
}
......@@ -709,7 +716,8 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
}
if(downstream->get_request_state() == Downstream::STREAM_CLOSED) {
upstream->remove_downstream(downstream);
delete downstream;
// downstrea was deleted
return;
}
......@@ -900,14 +908,20 @@ bufferevent_event_cb SpdyUpstream::get_downstream_eventcb()
return spdy_downstream_eventcb;
}
void SpdyUpstream::add_downstream(Downstream *downstream)
Downstream* SpdyUpstream::add_pending_downstream
(int32_t stream_id, int32_t priority)
{
downstream_queue_.add_pending(downstream);
auto downstream = util::make_unique<Downstream>(this, stream_id, priority);
auto res = downstream.get();
downstream_queue_.add_pending(std::move(downstream));
return res;
}
void SpdyUpstream::remove_downstream(Downstream *downstream)
{
downstream_queue_.remove(downstream);
downstream_queue_.remove(downstream->get_stream_id());
maintain_downstream_concurrency();
}
......
......@@ -27,6 +27,8 @@
#include "shrpx.h"
#include <memory>
#include <spdylay/spdylay.h>
#include "shrpx_upstream.h"
......@@ -52,7 +54,7 @@ public:
virtual bufferevent_data_cb get_downstream_readcb();
virtual bufferevent_data_cb get_downstream_writecb();
virtual bufferevent_event_cb get_downstream_eventcb();
void add_downstream(Downstream *downstream);
Downstream* add_pending_downstream(int32_t stream_id, int32_t priority);
void remove_downstream(Downstream *downstream);
Downstream* find_downstream(int32_t stream_id);
......@@ -75,7 +77,7 @@ public:
int handle_ign_data_chunk(size_t len);
void maintain_downstream_concurrency();
void initiate_downstream(Downstream *downstream);
void initiate_downstream(std::unique_ptr<Downstream> downstream);
nghttp2::util::EvbufferBuffer sendbuf;
private:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment