Commit 59ff0b2f authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Propagate upstream priority change to downstream

parent 68b5204f
......@@ -121,7 +121,10 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg)
// At this point, input buffer is already filled with some
// bytes. The read callback is not called until new data
// come. So consume input buffer here.
handler->get_upstream()->on_read();
if(handler->get_upstream()->on_read() != 0) {
delete handler;
return;
}
}
}
}
......
......@@ -616,6 +616,15 @@ int Downstream::on_read()
return dconn_->on_read();
}
int Downstream::change_priority(int32_t pri)
{
if(!dconn_) {
DLOG(INFO, this) << "dconn_ is NULL";
return -1;
}
return dconn_->on_priority_change(pri);
}
void Downstream::set_response_state(int state)
{
response_state_ = state;
......
......@@ -197,6 +197,9 @@ public:
// connection.
int on_read();
// Change the priority of downstream
int change_priority(int32_t pri);
static const size_t OUTPUT_UPPER_THRES = 64*1024;
private:
Headers request_headers_;
......
......@@ -56,6 +56,7 @@ public:
virtual int on_write() = 0;
virtual void on_upstream_change(Upstream *uptream) = 0;
virtual int on_priority_change(int32_t pri) = 0;
ClientHandler* get_client_handler();
Downstream* get_downstream();
......
......@@ -518,4 +518,23 @@ bool Http2DownstreamConnection::get_output_buffer_full()
}
}
int Http2DownstreamConnection::on_priority_change(int32_t pri)
{
int rv;
if(downstream_->get_priorty() == pri) {
return 0;
}
downstream_->set_priority(pri);
if(http2session_->get_state() != Http2Session::CONNECTED) {
return 0;
}
rv = http2session_->submit_priority(this, pri);
if(rv != 0) {
DLOG(FATAL, this) << "nghttp2_submit_priority() failed";
return -1;
}
http2session_->notify();
return 0;
}
} // namespace shrpx
......@@ -59,6 +59,7 @@ public:
virtual int on_write();
virtual void on_upstream_change(Upstream *upstream) {}
virtual int on_priority_change(int32_t pri);
int send();
......
......@@ -629,6 +629,25 @@ int Http2Session::submit_window_update(Http2DownstreamConnection *dconn,
return 0;
}
int Http2Session::submit_priority(Http2DownstreamConnection *dconn,
int32_t pri)
{
assert(state_ == CONNECTED);
if(!dconn) {
return 0;
}
int rv;
rv = nghttp2_submit_priority(session_, NGHTTP2_FLAG_NONE,
dconn->get_downstream()->
get_downstream_stream_id(), pri);
if(rv < NGHTTP2_ERR_FATAL) {
SSLOG(FATAL, this) << "nghttp2_submit_priority() failed: "
<< nghttp2_strerror(rv);
return -1;
}
return 0;
}
nghttp2_session* Http2Session::get_session() const
{
return session_;
......
......@@ -74,6 +74,8 @@ public:
// |dconn|.
int submit_window_update(Http2DownstreamConnection *dconn, int32_t amount);
int submit_priority(Http2DownstreamConnection *dconn, int32_t pri);
int terminate_session(nghttp2_error_code error_code);
nghttp2_session* get_session() const;
......
......@@ -373,6 +373,17 @@ int on_frame_recv_callback
downstream->init_response_body_buf();
break;
}
case NGHTTP2_PRIORITY: {
auto downstream = upstream->find_downstream(frame->hd.stream_id);
if(!downstream) {
break;
}
rv = downstream->change_priority(frame->priority.pri);
if(rv != 0) {
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
break;
}
case NGHTTP2_SETTINGS:
if((frame->hd.flags & NGHTTP2_FLAG_ACK) == 0) {
break;
......
......@@ -58,6 +58,10 @@ public:
virtual int on_write();
virtual void on_upstream_change(Upstream *upstream);
virtual int on_priority_change(int32_t pri)
{
return 0;
}
bufferevent* get_bev();
private:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment