Commit 924b1bd6 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

Use unmodified http-parser

Handle HTTP Upgrade and CONNECT explicitly
parent bd64619c
...@@ -1606,14 +1606,9 @@ size_t http_parser_execute (http_parser *parser, ...@@ -1606,14 +1606,9 @@ size_t http_parser_execute (http_parser *parser,
/* Exit, the rest of the connect is in a different protocol. */ /* Exit, the rest of the connect is in a different protocol. */
if (parser->upgrade) { if (parser->upgrade) {
/* We want to use http_parser for tunneling connection parser->state = NEW_MESSAGE();
transparently */ CALLBACK_NOTIFY(message_complete);
/* Read body until EOF */ return (p - data) + 1;
parser->state = s_body_identity_eof;
break;
/* parser->state = NEW_MESSAGE(); */
/* CALLBACK_NOTIFY(message_complete); */
/* return (p - data) + 1; */
} }
if (parser->flags & F_SKIPBODY) { if (parser->flags & F_SKIPBODY) {
......
...@@ -43,6 +43,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority) ...@@ -43,6 +43,8 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
stream_id_(stream_id), stream_id_(stream_id),
priority_(priority), priority_(priority),
downstream_stream_id_(-1), downstream_stream_id_(-1),
upgrade_request_(false),
upgraded_(false),
request_state_(INITIAL), request_state_(INITIAL),
request_major_(1), request_major_(1),
request_minor_(1), request_minor_(1),
...@@ -474,10 +476,44 @@ void Downstream::set_recv_window_size(int32_t new_size) ...@@ -474,10 +476,44 @@ void Downstream::set_recv_window_size(int32_t new_size)
recv_window_size_ = new_size; recv_window_size_ = new_size;
} }
bool Downstream::tunnel_established() const void Downstream::check_upgrade_fulfilled()
{ {
return request_method_ == "CONNECT" && if(request_method_ == "CONNECT") {
200 <= response_http_status_ && response_http_status_ < 300; upgraded_ = 200 <= response_http_status_ && response_http_status_ < 300;
} else {
// TODO Do more strict checking for upgrade headers
for(auto& hd : request_headers_) {
if(util::strieq("upgrade", hd.first.c_str())) {
upgraded_ = true;
break;
}
}
}
}
bool Downstream::get_upgraded() const
{
return upgraded_;
}
void Downstream::check_upgrade_request()
{
if(request_method_ == "CONNECT") {
upgrade_request_ = true;
} else {
// TODO Do more strict checking for upgrade headers
for(auto& hd : request_headers_) {
if(util::strieq("upgrade", hd.first.c_str())) {
upgrade_request_ = true;
break;
}
}
}
}
bool Downstream::get_upgrade_request() const
{
return upgrade_request_;
} }
void Downstream::set_downstream_stream_id(int32_t stream_id) void Downstream::set_downstream_stream_id(int32_t stream_id)
......
...@@ -68,8 +68,16 @@ public: ...@@ -68,8 +68,16 @@ public:
int32_t get_recv_window_size() const; int32_t get_recv_window_size() const;
void inc_recv_window_size(int32_t amount); void inc_recv_window_size(int32_t amount);
void set_recv_window_size(int32_t new_size); void set_recv_window_size(int32_t new_size);
// Returns true if tunnel connection has been established. // Returns true if upgrade (HTTP Upgrade or CONNECT) is succeeded.
bool tunnel_established() const; void check_upgrade_fulfilled();
// Checks request headers whether the request is upgrade request or
// not.
void check_upgrade_request();
// Returns true if the request is upgrade.
bool get_upgrade_request() const;
// Returns true if the upgrade is succeded as a result of the call
// check_upgrade_fulfilled().
bool get_upgraded() const;
// downstream request API // downstream request API
const Headers& get_request_headers() const; const Headers& get_request_headers() const;
void add_request_header(const std::string& name, const std::string& value); void add_request_header(const std::string& name, const std::string& value);
...@@ -145,6 +153,11 @@ private: ...@@ -145,6 +153,11 @@ private:
int priority_; int priority_;
// stream ID in backend connection // stream ID in backend connection
int32_t downstream_stream_id_; int32_t downstream_stream_id_;
// true if the request contains upgrade token (HTTP Upgrade or
// CONNECT)
bool upgrade_request_;
// true if the connection is upgraded (HTTP Upgrade or CONNECT)
bool upgraded_;
int request_state_; int request_state_;
std::string request_method_; std::string request_method_;
......
...@@ -108,7 +108,7 @@ void on_stream_close_callback ...@@ -108,7 +108,7 @@ void on_stream_close_callback
downstream->set_request_state(Downstream::STREAM_CLOSED); downstream->set_request_state(Downstream::STREAM_CLOSED);
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
// At this point, downstream response was read // At this point, downstream response was read
if(!downstream->tunnel_established() && if(!downstream->get_upgraded() &&
!downstream->get_response_connection_close()) { !downstream->get_response_connection_close()) {
// Keep-alive // Keep-alive
DownstreamConnection *dconn; DownstreamConnection *dconn;
...@@ -192,6 +192,7 @@ void on_frame_recv_callback ...@@ -192,6 +192,7 @@ void on_frame_recv_callback
} }
downstream->add_request_header("host", host); downstream->add_request_header("host", host);
downstream->check_upgrade_request();
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
std::stringstream ss; std::stringstream ss;
...@@ -588,7 +589,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) ...@@ -588,7 +589,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
} else { } else {
DCLOG(INFO, dconn) << "Timeout"; DCLOG(INFO, dconn) << "Timeout";
} }
if(downstream->tunnel_established()) { if(downstream->get_upgraded()) {
DCLOG(INFO, dconn) << "Note: this is tunnel connection"; DCLOG(INFO, dconn) << "Note: this is tunnel connection";
} }
} }
...@@ -678,7 +679,7 @@ ssize_t spdy_data_read_callback(nghttp2_session *session, ...@@ -678,7 +679,7 @@ ssize_t spdy_data_read_callback(nghttp2_session *session,
int nread = evbuffer_remove(body, buf, length); int nread = evbuffer_remove(body, buf, length);
if(nread == 0 && if(nread == 0 &&
downstream->get_response_state() == Downstream::MSG_COMPLETE) { downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(!downstream->tunnel_established()) { if(!downstream->get_upgraded()) {
*eof = 1; *eof = 1;
} else { } else {
// For tunneling, issue RST_STREAM to finish the stream. // For tunneling, issue RST_STREAM to finish the stream.
......
...@@ -356,13 +356,26 @@ int htp_hdrs_completecb(http_parser *htp) ...@@ -356,13 +356,26 @@ int htp_hdrs_completecb(http_parser *htp)
downstream->set_response_minor(htp->http_minor); downstream->set_response_minor(htp->http_minor);
downstream->set_response_connection_close(!http_should_keep_alive(htp)); downstream->set_response_connection_close(!http_should_keep_alive(htp));
downstream->set_response_state(Downstream::HEADER_COMPLETE); downstream->set_response_state(Downstream::HEADER_COMPLETE);
if(downstream->tunnel_established()) { downstream->check_upgrade_fulfilled();
if(downstream->get_upgraded()) {
downstream->set_response_connection_close(true); downstream->set_response_connection_close(true);
} }
if(downstream->get_upstream()->on_downstream_header_complete(downstream) if(downstream->get_upstream()->on_downstream_header_complete(downstream)
!= 0) { != 0) {
return -1; return -1;
} }
if(downstream->get_upgraded()) {
// Upgrade complete, read until EOF in both ends
downstream->get_upstream()->resume_read(SHRPX_MSG_BLOCK, downstream);
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(LOG_ENABLED(INFO)) {
LOG(INFO) << "HTTP upgrade success. stream_id="
<< downstream->get_stream_id();
}
}
unsigned int status = downstream->get_response_http_status(); unsigned int status = downstream->get_response_http_status();
// Ignore the response body. HEAD response may contain // Ignore the response body. HEAD response may contain
// Content-Length or Transfer-Encoding: chunked. Some server send // Content-Length or Transfer-Encoding: chunked. Some server send
...@@ -441,11 +454,19 @@ http_parser_settings htp_hooks = { ...@@ -441,11 +454,19 @@ http_parser_settings htp_hooks = {
int HttpDownstreamConnection::on_read() int HttpDownstreamConnection::on_read()
{ {
evbuffer *input = bufferevent_get_input(bev_); evbuffer *input = bufferevent_get_input(bev_);
size_t inputlen = evbuffer_get_length(input);
unsigned char *mem = evbuffer_pullup(input, -1); unsigned char *mem = evbuffer_pullup(input, -1);
if(downstream_->get_upgraded()) {
// For upgraded connection, just pass data to the upstream.
int rv;
rv = downstream_->get_upstream()->on_downstream_body
(downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen);
evbuffer_drain(input, inputlen);
return rv;
}
size_t nread = http_parser_execute(response_htp_, &htp_hooks, size_t nread = http_parser_execute(response_htp_, &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
evbuffer_get_length(input)); inputlen);
evbuffer_drain(input, nread); evbuffer_drain(input, nread);
http_errno htperr = HTTP_PARSER_ERRNO(response_htp_); http_errno htperr = HTTP_PARSER_ERRNO(response_htp_);
......
...@@ -142,6 +142,7 @@ int htp_hdrs_completecb(http_parser *htp) ...@@ -142,6 +142,7 @@ int htp_hdrs_completecb(http_parser *htp)
downstream->set_request_connection_close(!http_should_keep_alive(htp)); downstream->set_request_connection_close(!http_should_keep_alive(htp));
downstream->check_upgrade_request();
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
std::stringstream ss; std::stringstream ss;
ss << downstream->get_request_method() << " " ss << downstream->get_request_method() << " "
...@@ -258,20 +259,40 @@ int HttpsUpstream::on_read() ...@@ -258,20 +259,40 @@ int HttpsUpstream::on_read()
{ {
bufferevent *bev = handler_->get_bev(); bufferevent *bev = handler_->get_bev();
evbuffer *input = bufferevent_get_input(bev); evbuffer *input = bufferevent_get_input(bev);
size_t inputlen = evbuffer_get_length(input);
unsigned char *mem = evbuffer_pullup(input, -1); unsigned char *mem = evbuffer_pullup(input, -1);
if(evbuffer_get_length(input) == 0) { if(inputlen == 0) {
return 0;
}
auto downstream = get_downstream();
// downstream can be nullptr here, because it is initialized in the
// callback chain called by http_parser_execute()
if(downstream && downstream->get_upgraded()) {
int rv = downstream->push_upload_data_chunk
(reinterpret_cast<const uint8_t*>(mem), inputlen);
evbuffer_drain(input, inputlen);
if(rv != 0) {
return -1;
}
if(downstream->get_output_buffer_full()) {
if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream output buffer is full";
}
pause_read(SHRPX_NO_BUFFER);
}
return 0; return 0;
} }
size_t nread = http_parser_execute(htp_, &htp_hooks, size_t nread = http_parser_execute(htp_, &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
evbuffer_get_length(input)); inputlen);
evbuffer_drain(input, nread); evbuffer_drain(input, nread);
// Well, actually header length + some body bytes // Well, actually header length + some body bytes
current_header_length_ += nread; current_header_length_ += nread;
Downstream *downstream = get_downstream(); // Get downstream again because it may be initialized in http parser
// execution
downstream = get_downstream();
http_errno htperr = HTTP_PARSER_ERRNO(htp_); http_errno htperr = HTTP_PARSER_ERRNO(htp_);
if(htperr == HPE_PAUSED) { if(htperr == HPE_PAUSED) {
...@@ -403,7 +424,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr) ...@@ -403,7 +424,7 @@ void https_downstream_readcb(bufferevent *bev, void *ptr)
return; return;
} }
} }
} else if(downstream->tunnel_established()) { } else if(downstream->get_upgraded()) {
// This path is effectively only taken for SPDY downstream // This path is effectively only taken for SPDY downstream
// because only SPDY downstream sets response_state to // because only SPDY downstream sets response_state to
// MSG_COMPLETE and this function. For HTTP downstream, EOF // MSG_COMPLETE and this function. For HTTP downstream, EOF
...@@ -660,7 +681,7 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream) ...@@ -660,7 +681,7 @@ int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
} else if(connection_upgrade) { } else if(connection_upgrade) {
hdrs += "Connection: upgrade\r\n"; hdrs += "Connection: upgrade\r\n";
} }
} else { } else if(!downstream->get_upgraded()) {
hdrs += "Connection: close\r\n"; hdrs += "Connection: close\r\n";
} }
if(!get_config()->no_via) { if(!get_config()->no_via) {
......
...@@ -180,7 +180,9 @@ ssize_t spdy_data_read_callback(nghttp2_session *session, ...@@ -180,7 +180,9 @@ ssize_t spdy_data_read_callback(nghttp2_session *session,
nread = evbuffer_remove(body, buf, length); nread = evbuffer_remove(body, buf, length);
if(nread == 0) { if(nread == 0) {
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgrade_request()) {
*eof = 1; *eof = 1;
}
break; break;
} else { } else {
// This is important because it will handle flow control // This is important because it will handle flow control
......
...@@ -814,8 +814,16 @@ void on_frame_recv_callback ...@@ -814,8 +814,16 @@ void on_frame_recv_callback
auto upstream = downstream->get_upstream(); auto upstream = downstream->get_upstream();
downstream->set_response_state(Downstream::HEADER_COMPLETE); downstream->set_response_state(Downstream::HEADER_COMPLETE);
if(downstream->tunnel_established()) { downstream->check_upgrade_fulfilled();
if(downstream->get_upgraded()) {
downstream->set_response_connection_close(true); downstream->set_response_connection_close(true);
// On upgrade sucess, both ends can send data
upstream->resume_read(SHRPX_MSG_BLOCK, downstream);
downstream->set_request_state(Downstream::HEADER_COMPLETE);
if(LOG_ENABLED(INFO)) {
SSLOG(INFO, spdy) << "HTTP upgrade success. stream_id="
<< frame->hd.stream_id;
}
} }
rv = upstream->on_downstream_header_complete(downstream); rv = upstream->on_downstream_header_complete(downstream);
if(rv != 0) { if(rv != 0) {
...@@ -833,7 +841,7 @@ void on_frame_recv_callback ...@@ -833,7 +841,7 @@ void on_frame_recv_callback
auto downstream = sd->dconn->get_downstream(); auto downstream = sd->dconn->get_downstream();
if(downstream && if(downstream &&
downstream->get_downstream_stream_id() == frame->hd.stream_id) { downstream->get_downstream_stream_id() == frame->hd.stream_id) {
if(downstream->tunnel_established() && if(downstream->get_upgraded() &&
downstream->get_response_state() == Downstream::HEADER_COMPLETE) { downstream->get_response_state() == Downstream::HEADER_COMPLETE) {
// For tunneled connection, we has to submit RST_STREAM to // For tunneled connection, we has to submit RST_STREAM to
// upstream *after* whole response body is sent. We just set // upstream *after* whole response body is sent. We just set
......
...@@ -110,7 +110,7 @@ void on_stream_close_callback ...@@ -110,7 +110,7 @@ void on_stream_close_callback
downstream->set_request_state(Downstream::STREAM_CLOSED); downstream->set_request_state(Downstream::STREAM_CLOSED);
if(downstream->get_response_state() == Downstream::MSG_COMPLETE) { if(downstream->get_response_state() == Downstream::MSG_COMPLETE) {
// At this point, downstream response was read // At this point, downstream response was read
if(!downstream->tunnel_established() && if(!downstream->get_upgraded() &&
!downstream->get_response_connection_close()) { !downstream->get_response_connection_close()) {
// Keep-alive // Keep-alive
DownstreamConnection *dconn; DownstreamConnection *dconn;
...@@ -194,6 +194,7 @@ void on_ctrl_recv_callback ...@@ -194,6 +194,7 @@ void on_ctrl_recv_callback
} }
downstream->add_request_header("host", host); downstream->add_request_header("host", host);
downstream->check_upgrade_request();
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
std::stringstream ss; std::stringstream ss;
...@@ -592,7 +593,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr) ...@@ -592,7 +593,7 @@ void spdy_downstream_eventcb(bufferevent *bev, short events, void *ptr)
} else { } else {
DCLOG(INFO, dconn) << "Timeout"; DCLOG(INFO, dconn) << "Timeout";
} }
if(downstream->tunnel_established()) { if(downstream->get_upgraded()) {
DCLOG(INFO, dconn) << "Note: this is tunnel connection"; DCLOG(INFO, dconn) << "Note: this is tunnel connection";
} }
} }
...@@ -680,7 +681,7 @@ ssize_t spdy_data_read_callback(spdylay_session *session, ...@@ -680,7 +681,7 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
int nread = evbuffer_remove(body, buf, length); int nread = evbuffer_remove(body, buf, length);
if(nread == 0 && if(nread == 0 &&
downstream->get_response_state() == Downstream::MSG_COMPLETE) { downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(!downstream->tunnel_established()) { if(!downstream->get_upgraded()) {
*eof = 1; *eof = 1;
} else { } else {
// For tunneling, issue RST_STREAM to finish the stream. // For tunneling, issue RST_STREAM to finish the stream.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment