Commit 22c88af1 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Resume deferred DATA after complete DATA frame arrived on backend

If SPDY or HTTP/2 ustream is used and HTTP/2 downstream is used, only
call {spdylay,nghttp2}_resume_data when complete DATA frame was read
in backend to avoid to transmit too small DATA frame to the upstream.
parent 5d80d18f
...@@ -960,8 +960,32 @@ namespace { ...@@ -960,8 +960,32 @@ namespace {
int on_frame_recv_callback int on_frame_recv_callback
(nghttp2_session *session, const nghttp2_frame *frame, void *user_data) (nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
{ {
int rv;
auto http2session = static_cast<Http2Session*>(user_data); auto http2session = static_cast<Http2Session*>(user_data);
switch(frame->hd.type) { switch(frame->hd.type) {
case NGHTTP2_DATA: {
auto sd = static_cast<StreamData*>
(nghttp2_session_get_stream_user_data(session, frame->hd.stream_id));
if(!sd || !sd->dconn) {
break;
}
auto downstream = sd->dconn->get_downstream();
if(!downstream ||
downstream->get_downstream_stream_id() != frame->hd.stream_id) {
break;
}
auto upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, nullptr, 0, true);
if(rv != 0) {
http2session->submit_rst_stream(frame->hd.stream_id,
NGHTTP2_INTERNAL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET);
}
call_downstream_readcb(http2session, downstream);
break;
}
case NGHTTP2_HEADERS: case NGHTTP2_HEADERS:
return on_response_headers(http2session, session, frame); return on_response_headers(http2session, session, frame);
case NGHTTP2_RST_STREAM: { case NGHTTP2_RST_STREAM: {
...@@ -1039,7 +1063,7 @@ int on_data_chunk_recv_callback(nghttp2_session *session, ...@@ -1039,7 +1063,7 @@ int on_data_chunk_recv_callback(nghttp2_session *session,
} }
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
rv = upstream->on_downstream_body(downstream, data, len); rv = upstream->on_downstream_body(downstream, data, len, false);
if(rv != 0) { if(rv != 0) {
http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR); http2session->submit_rst_stream(stream_id, NGHTTP2_INTERNAL_ERROR);
downstream->set_response_state(Downstream::MSG_RESET); downstream->set_response_state(Downstream::MSG_RESET);
......
...@@ -905,11 +905,13 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, ...@@ -905,11 +905,13 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
auto handler = upstream->get_client_handler(); auto handler = upstream->get_client_handler();
auto body = downstream->get_response_body_buf(); auto body = downstream->get_response_body_buf();
assert(body); assert(body);
int nread = evbuffer_remove(body, buf, length); int nread = evbuffer_remove(body, buf, length);
if(nread == -1) { if(nread == -1) {
ULOG(FATAL, upstream) << "evbuffer_remove() failed"; ULOG(FATAL, upstream) << "evbuffer_remove() failed";
return NGHTTP2_ERR_CALLBACK_FAILURE; return NGHTTP2_ERR_CALLBACK_FAILURE;
} }
if(nread == 0 && if(nread == 0 &&
downstream->get_response_state() == Downstream::MSG_COMPLETE) { downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgraded()) { if(!downstream->get_upgraded()) {
...@@ -1094,7 +1096,8 @@ int Http2Upstream::on_downstream_header_complete(Downstream *downstream) ...@@ -1094,7 +1096,8 @@ int Http2Upstream::on_downstream_header_complete(Downstream *downstream)
// WARNING: Never call directly or indirectly nghttp2_session_send or // WARNING: Never call directly or indirectly nghttp2_session_send or
// nghttp2_session_recv. These calls may delete downstream. // nghttp2_session_recv. These calls may delete downstream.
int Http2Upstream::on_downstream_body(Downstream *downstream, int Http2Upstream::on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len) const uint8_t *data, size_t len,
bool flush)
{ {
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
auto handler = upstream->get_client_handler(); auto handler = upstream->get_client_handler();
...@@ -1104,11 +1107,18 @@ int Http2Upstream::on_downstream_body(Downstream *downstream, ...@@ -1104,11 +1107,18 @@ int Http2Upstream::on_downstream_body(Downstream *downstream,
ULOG(FATAL, this) << "evbuffer_add() failed"; ULOG(FATAL, this) << "evbuffer_add() failed";
return -1; return -1;
} }
if(flush) {
nghttp2_session_resume_data(session_, downstream->get_stream_id()); nghttp2_session_resume_data(session_, downstream->get_stream_id());
}
auto outbuflen = handler->get_outbuf_length() + auto outbuflen = handler->get_outbuf_length() +
evbuffer_get_length(body); evbuffer_get_length(body);
if(outbuflen > OUTBUF_MAX_THRES) { if(outbuflen > OUTBUF_MAX_THRES) {
if(!flush) {
nghttp2_session_resume_data(session_, downstream->get_stream_id());
}
downstream->pause_read(SHRPX_NO_BUFFER); downstream->pause_read(SHRPX_NO_BUFFER);
} }
......
...@@ -69,7 +69,7 @@ public: ...@@ -69,7 +69,7 @@ public:
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len); const uint8_t *data, size_t len, bool flush);
virtual int on_downstream_body_complete(Downstream *downstream); virtual int on_downstream_body_complete(Downstream *downstream);
bool get_flow_control() const; bool get_flow_control() const;
......
...@@ -468,7 +468,7 @@ int htp_bodycb(http_parser *htp, const char *data, size_t len) ...@@ -468,7 +468,7 @@ int htp_bodycb(http_parser *htp, const char *data, size_t len)
{ {
auto downstream = static_cast<Downstream*>(htp->data); auto downstream = static_cast<Downstream*>(htp->data);
return downstream->get_upstream()->on_downstream_body return downstream->get_upstream()->on_downstream_body
(downstream, reinterpret_cast<const uint8_t*>(data), len); (downstream, reinterpret_cast<const uint8_t*>(data), len, true);
} }
} // namespace } // namespace
...@@ -507,7 +507,7 @@ int HttpDownstreamConnection::on_read() ...@@ -507,7 +507,7 @@ int HttpDownstreamConnection::on_read()
// For upgraded connection, just pass data to the upstream. // For upgraded connection, just pass data to the upstream.
int rv; int rv;
rv = downstream_->get_upstream()->on_downstream_body rv = downstream_->get_upstream()->on_downstream_body
(downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen); (downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen, true);
if(rv != 0) { if(rv != 0) {
return rv; return rv;
} }
......
...@@ -734,7 +734,8 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) ...@@ -734,7 +734,8 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
} }
int HttpsUpstream::on_downstream_body(Downstream *downstream, int HttpsUpstream::on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len) const uint8_t *data, size_t len,
bool flush)
{ {
int rv; int rv;
if(len == 0) { if(len == 0) {
......
...@@ -60,7 +60,7 @@ public: ...@@ -60,7 +60,7 @@ public:
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len); const uint8_t *data, size_t len, bool flush);
virtual int on_downstream_body_complete(Downstream *downstream); virtual int on_downstream_body_complete(Downstream *downstream);
void reset_current_header_length(); void reset_current_header_length();
......
...@@ -919,7 +919,8 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream) ...@@ -919,7 +919,8 @@ int SpdyUpstream::on_downstream_header_complete(Downstream *downstream)
// WARNING: Never call directly or indirectly spdylay_session_send or // WARNING: Never call directly or indirectly spdylay_session_send or
// spdylay_session_recv. These calls may delete downstream. // spdylay_session_recv. These calls may delete downstream.
int SpdyUpstream::on_downstream_body(Downstream *downstream, int SpdyUpstream::on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len) const uint8_t *data, size_t len,
bool flush)
{ {
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
auto body = downstream->get_response_body_buf(); auto body = downstream->get_response_body_buf();
...@@ -928,11 +929,18 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream, ...@@ -928,11 +929,18 @@ int SpdyUpstream::on_downstream_body(Downstream *downstream,
ULOG(FATAL, this) << "evbuffer_add() failed"; ULOG(FATAL, this) << "evbuffer_add() failed";
return -1; return -1;
} }
if(flush) {
spdylay_session_resume_data(session_, downstream->get_stream_id()); spdylay_session_resume_data(session_, downstream->get_stream_id());
}
auto outbuflen = upstream->get_client_handler()->get_outbuf_length() + auto outbuflen = upstream->get_client_handler()->get_outbuf_length() +
evbuffer_get_length(body); evbuffer_get_length(body);
if(outbuflen > OUTBUF_MAX_THRES) { if(outbuflen > OUTBUF_MAX_THRES) {
if(!flush) {
spdylay_session_resume_data(session_, downstream->get_stream_id());
}
downstream->pause_read(SHRPX_NO_BUFFER); downstream->pause_read(SHRPX_NO_BUFFER);
} }
......
...@@ -64,7 +64,7 @@ public: ...@@ -64,7 +64,7 @@ public:
virtual int on_downstream_header_complete(Downstream *downstream); virtual int on_downstream_header_complete(Downstream *downstream);
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len); const uint8_t *data, size_t len, bool flush);
virtual int on_downstream_body_complete(Downstream *downstream); virtual int on_downstream_body_complete(Downstream *downstream);
bool get_flow_control() const; bool get_flow_control() const;
......
...@@ -49,7 +49,8 @@ public: ...@@ -49,7 +49,8 @@ public:
virtual int on_downstream_header_complete(Downstream *downstream) = 0; virtual int on_downstream_header_complete(Downstream *downstream) = 0;
virtual int on_downstream_body(Downstream *downstream, virtual int on_downstream_body(Downstream *downstream,
const uint8_t *data, size_t len) = 0; const uint8_t *data, size_t len,
bool flush) = 0;
virtual int on_downstream_body_complete(Downstream *downstream) = 0; virtual int on_downstream_body_complete(Downstream *downstream) = 0;
virtual void pause_read(IOCtrlReason reason) = 0; virtual void pause_read(IOCtrlReason reason) = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment