Commit f620655d authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttp, nghttpx: Add HTTP Upgrade from HTTP/1.1 to HTTP/2.0

nghttpx does not perform upgrade if the request has request body.
parent ea5a1b60
......@@ -65,6 +65,7 @@
#include "app_helper.h"
#include "HtmlParser.h"
#include "util.h"
#include "base64.h"
#ifndef O_BINARY
# define O_BINARY (0)
......@@ -81,6 +82,7 @@ struct Config {
bool no_tls;
bool no_connection_flow_control;
bool no_stream_flow_control;
bool upgrade;
int multiply;
// milliseconds
int timeout;
......@@ -99,6 +101,7 @@ struct Config {
no_tls(false),
no_connection_flow_control(false),
no_stream_flow_control(false),
upgrade(false),
multiply(1),
timeout(-1),
window_bits(-1),
......@@ -319,10 +322,39 @@ struct SessionStat {
Config config;
namespace {
size_t populate_settings(nghttp2_settings_entry *iv)
{
size_t niv = 2;
iv[0].settings_id = NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS;
iv[0].value = 100;
iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
if(config.window_bits != -1) {
iv[1].value = 1 << config.window_bits;
} else {
iv[1].value = NGHTTP2_INITIAL_WINDOW_SIZE;
}
if(config.no_connection_flow_control && config.no_stream_flow_control) {
iv[niv].settings_id = NGHTTP2_SETTINGS_FLOW_CONTROL_OPTIONS;
iv[niv].value = 1;
++niv;
}
return niv;
}
} // namespace
namespace {
void eventcb(bufferevent *bev, short events, void *ptr);
} // namespace
namespace {
extern http_parser_settings htp_hooks;
} // namespace
namespace {
void upgrade_readcb(bufferevent *bev, void *ptr);
} // namespace
namespace {
void readcb(bufferevent *bev, void *ptr);
} // namespace
......@@ -339,6 +371,11 @@ void submit_request(HttpClient *client,
Request *req);
} // namespace
namespace {
void check_stream_id(nghttp2_session *session, int32_t stream_id,
void *user_data);
} // namespace
enum client_state {
STATE_IDLE,
STATE_CONNECTED
......@@ -363,6 +400,18 @@ struct HttpClient {
size_t complete;
std::string hostport;
SessionStat stat;
// Used for parse the HTTP upgrade response from server
http_parser *htp;
// true if the response message of HTTP Upgrade request is fully
// received. It is not relevant the upgrade succeeds, or not.
bool upgrade_response_complete;
// The HTTP status code of the response message of HTTP Upgrade.
unsigned int upgrade_response_status_code;
// SETTINGS payload sent as token68 in HTTP Upgrade
uint8_t settings_payload[16];
// The length of settings_payload
size_t settings_payloadlen;
HttpClient(const nghttp2_session_callbacks* callbacks,
event_base *evbase, SSL_CTX *ssl_ctx)
: session(nullptr),
......@@ -373,12 +422,16 @@ struct HttpClient {
ssl(nullptr),
bev(nullptr),
state(STATE_IDLE),
complete(0)
complete(0),
htp(nullptr),
upgrade_response_complete(false),
upgrade_response_status_code(0)
{}
~HttpClient()
{
disconnect();
delete htp;
}
int initiate_connection(const std::string& host, uint16_t port)
......@@ -406,8 +459,6 @@ struct HttpClient {
std::cerr << ERR_error_string(ERR_get_error(), 0) << std::endl;
return -1;
}
// If state_ == PROXY_CONNECTED, we has connected to the proxy
// using fd_ and tunnel has been established.
bev = bufferevent_openssl_socket_new(evbase, -1, ssl,
BUFFEREVENT_SSL_CONNECTING,
BEV_OPT_DEFER_CALLBACKS);
......@@ -422,7 +473,14 @@ struct HttpClient {
return -1;
}
bufferevent_enable(bev, EV_READ);
if(config.upgrade) {
htp = new http_parser();
http_parser_init(htp, HTTP_RESPONSE);
htp->data = this;
bufferevent_setcb(bev, upgrade_readcb, nullptr, eventcb, this);
} else {
bufferevent_setcb(bev, readcb, writecb, eventcb, this);
}
if(config.timeout != -1) {
timeval tv = { config.timeout, 0 };
bufferevent_set_timeouts(bev, &tv, &tv);
......@@ -453,34 +511,136 @@ struct HttpClient {
}
}
int on_upgrade_connect()
{
record_handshake_time();
assert(!reqvec.empty());
nghttp2_settings_entry iv[16];
size_t niv = populate_settings(iv);
assert(sizeof(settings_payload) >= 8*niv);
settings_payloadlen =
nghttp2_pack_settings_payload(settings_payload, iv, niv);
auto token68 = base64::encode(&settings_payload[0],
&settings_payload[settings_payloadlen]);
util::to_token68(token68);
std::string req;
if(reqvec[0]->data_prd) {
// If the request contains upload data, use OPTIONS * to upgrade
req = "OPTIONS *";
} else {
req = "GET ";
req += reqvec[0]->make_reqpath();
}
req += " HTTP/1.1\r\n"
"Host: ";
req += hostport;
req += "\r\n";
req += "Connection: Upgrade, HTTP2-Settings\r\n"
"Upgrade: " NGHTTP2_PROTO_VERSION_ID "\r\n"
"HTTP2-Settings: ";
req += token68;
req += "\r\n";
req += "Accept: */*\r\n"
"User-Agent: nghttp2/" NGHTTP2_VERSION "\r\n"
"\r\n";
bufferevent_write(bev, req.c_str(), req.size());
if(config.verbose) {
print_timer();
std::cout << " HTTP Upgrade request\n"
<< req << std::endl;
}
return 0;
}
int on_upgrade_read()
{
int rv;
auto input = bufferevent_get_input(bev);
auto inputlen = evbuffer_get_length(input);
if(inputlen == 0) {
return 0;
}
auto mem = evbuffer_pullup(input, -1);
auto nread = http_parser_execute(htp, &htp_hooks,
reinterpret_cast<const char*>(mem),
inputlen);
if(config.verbose) {
std::cout.write(reinterpret_cast<const char*>(mem), nread);
}
evbuffer_drain(input, nread);
auto htperr = HTTP_PARSER_ERRNO(htp);
if(htperr == HPE_OK) {
if(upgrade_response_complete) {
if(config.verbose) {
std::cout << std::endl;
}
if(upgrade_response_status_code == 101) {
if(config.verbose) {
print_timer();
std::cout << " HTTP Upgrade success" << std::endl;
}
bufferevent_setcb(bev, readcb, writecb, eventcb, this);
rv = on_connect();
if(rv != 0) {
return rv;
}
} else {
std::cerr << "HTTP Upgrade failed" << std::endl;
return -1;
}
}
} else {
std::cerr << "Failed to parse HTTP Upgrade response header: "
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr) << std::endl;
return -1;
}
return 0;
}
int on_connect()
{
int rv;
if(!config.upgrade) {
record_handshake_time();
}
rv = nghttp2_session_client_new(&session, callbacks, this);
if(rv != 0) {
return -1;
}
if(config.upgrade) {
// Adjust stream user-data depending on the existence of upload
// data
Request *stream_user_data = nullptr;
if(!reqvec[0]->data_prd) {
stream_user_data = reqvec[0].get();
}
rv = nghttp2_session_upgrade(session, settings_payload,
settings_payloadlen, stream_user_data);
if(rv != 0) {
std::cerr << "nghttp2_session_upgrade() returned error: "
<< nghttp2_strerror(rv) << std::endl;
return -1;
}
if(stream_user_data) {
check_stream_id(session, 1, this);
}
}
// Send connection header here
bufferevent_write(bev, NGHTTP2_CLIENT_CONNECTION_HEADER,
NGHTTP2_CLIENT_CONNECTION_HEADER_LEN);
nghttp2_settings_entry iv[2];
size_t niv = 0;
if(config.window_bits != -1) {
iv[niv].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
iv[niv].value = 1 << config.window_bits;
++niv;
}
if(config.no_connection_flow_control && config.no_stream_flow_control) {
iv[niv].settings_id = NGHTTP2_SETTINGS_FLOW_CONTROL_OPTIONS;
iv[niv].value = 1;
++niv;
}
// If upgrade succeeds, the SETTINGS value sent with
// HTTP2-Settings header field has already been submitted to
// session object.
if(!config.upgrade) {
nghttp2_settings_entry iv[16];
auto niv = populate_settings(iv);
rv = nghttp2_submit_settings(session, iv, niv);
if(rv != 0) {
return -1;
}
}
if(config.no_connection_flow_control && !config.no_stream_flow_control) {
rv = nghttp2_submit_window_update(session, NGHTTP2_FLAG_END_FLOW_CONTROL,
0, 0);
......@@ -488,8 +648,11 @@ struct HttpClient {
return -1;
}
}
for(auto& req : reqvec) {
submit_request(this, config.headers, req.get());
// Adjust first request depending on the existence of the upload
// data
for(auto i = std::begin(reqvec)+(config.upgrade && !reqvec[0]->data_prd);
i != std::end(reqvec); ++i) {
submit_request(this, config.headers, (*i).get());
}
return on_write();
}
......@@ -612,6 +775,48 @@ struct HttpClient {
}
};
namespace {
int htp_msg_begincb(http_parser *htp)
{
if(config.verbose) {
print_timer();
std::cout << " HTTP Upgrade response" << std::endl;
}
return 0;
}
} // namespace
namespace {
int htp_status_completecb(http_parser *htp)
{
auto client = reinterpret_cast<HttpClient*>(htp->data);
client->upgrade_response_status_code = htp->status_code;
return 0;
}
} // namespace
namespace {
int htp_msg_completecb(http_parser *htp)
{
auto client = reinterpret_cast<HttpClient*>(htp->data);
client->upgrade_response_complete = true;
return 0;
}
} // namespace
namespace {
http_parser_settings htp_hooks = {
htp_msg_begincb, /*http_cb on_message_begin;*/
nullptr, /*http_data_cb on_url;*/
htp_status_completecb, /*http_cb on_status_complete */
nullptr, /*http_data_cb on_header_field;*/
nullptr, /*http_data_cb on_header_value;*/
nullptr, /*http_cb on_headers_complete;*/
nullptr, /*http_data_cb on_body;*/
htp_msg_completecb /*http_cb on_message_complete;*/
};
} // namespace
namespace {
void submit_request(HttpClient *client,
const std::map<std::string, std::string>& headers,
......@@ -756,28 +961,30 @@ void on_data_chunk_recv_callback
}
}
void check_stream_id(nghttp2_session *session, nghttp2_frame *frame,
namespace {
void check_stream_id(nghttp2_session *session, int32_t stream_id,
void *user_data)
{
HttpClient *client = get_session(user_data);
int32_t stream_id = frame->hd.stream_id;
Request *req = (Request*)nghttp2_session_get_stream_user_data(session,
stream_id);
client->streams[stream_id] = req;
req->record_syn_stream_time();
if(!config.no_connection_flow_control && config.no_stream_flow_control) {
nghttp2_submit_window_update(session,
NGHTTP2_FLAG_END_FLOW_CONTROL,
stream_id, 0);
}
}
} // namespace
void on_frame_send_callback2
(nghttp2_session *session, nghttp2_frame *frame, void *user_data)
{
if(frame->hd.type == NGHTTP2_HEADERS &&
frame->headers.cat == NGHTTP2_HCAT_REQUEST) {
check_stream_id(session, frame, user_data);
if(!config.no_connection_flow_control && config.no_stream_flow_control) {
nghttp2_submit_window_update(session,
NGHTTP2_FLAG_END_FLOW_CONTROL,
frame->hd.stream_id, 0);
}
check_stream_id(session, frame->hd.stream_id, user_data);
}
if(config.verbose) {
on_frame_send_callback(session, frame, user_data);
......@@ -826,9 +1033,12 @@ void on_frame_recv_callback2
frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
auto req = (Request*)nghttp2_session_get_stream_user_data
(session, frame->hd.stream_id);
assert(req);
// If this is the HTTP Upgrade with OPTIONS method to avoid POST,
// req is nullptr.
if(req) {
req->record_syn_reply_time();
}
}
check_response_header(session, frame, user_data);
if(config.verbose) {
on_frame_recv_callback(session, frame, user_data);
......@@ -916,6 +1126,18 @@ int client_select_next_proto_cb(SSL* ssl,
}
} // namespace
namespace {
void upgrade_readcb(bufferevent *bev, void *ptr)
{
int rv;
auto client = reinterpret_cast<HttpClient*>(ptr);
rv = client->on_upgrade_read();
if(rv != 0) {
client->disconnect();
}
}
} // namespace
namespace {
void readcb(bufferevent *bev, void *ptr)
{
......@@ -946,12 +1168,10 @@ void writecb(bufferevent *bev, void *ptr)
namespace {
void eventcb(bufferevent *bev, short events, void *ptr)
{
int rv;
HttpClient *client = reinterpret_cast<HttpClient*>(ptr);
if(events & BEV_EVENT_CONNECTED) {
client->state = STATE_CONNECTED;
// TODO Check NPN result and fail fast?
client->on_connect();
/* Send connection header here */
int fd = bufferevent_getfd(bev);
int val = 1;
if(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
......@@ -959,6 +1179,16 @@ void eventcb(bufferevent *bev, short events, void *ptr)
std::cerr << "Setting option TCP_NODELAY failed: errno="
<< errno << std::endl;
}
if(config.upgrade) {
rv = client->on_upgrade_connect();
} else {
// TODO Check NPN result and fail fast?
rv = client->on_connect();
}
if(rv != 0) {
client->disconnect();
return;
}
} else if(events & BEV_EVENT_EOF) {
std::cerr << "EOF" << std::endl;
client->disconnect();
......@@ -1174,7 +1404,7 @@ int run(char **uris, int n)
void print_usage(std::ostream& out)
{
out << "Usage: nghttp [-FOafnsv] [-t <SECONDS>] [-w <WINDOW_BITS>] [--cert=<CERT>]\n"
out << "Usage: nghttp [-FOafnsuv] [-t <SECONDS>] [-w <WINDOW_BITS>] [--cert=<CERT>]\n"
<< " [--key=<KEY>] [--no-tls] [-d <FILE>] [-m <N>] <URI>..."
<< std::endl;
}
......@@ -1215,6 +1445,10 @@ void print_help(std::ostream& out)
<< " Disables connection level flow control.\n"
<< " -f, --no-stream-flow-control\n"
<< " Disables stream level flow control.\n"
<< " -u, --upgrade Perform HTTP Upgrade for HTTP/2.0. This\n"
<< " option is ignored if --no-tls is not given.\n"
<< " If -d is used, the HTTP upgrade request is\n"
<< " performed with OPTIONS method.\n"
<< std::endl;
}
......@@ -1239,10 +1473,11 @@ int main(int argc, char **argv)
{"multiply", required_argument, 0, 'm' },
{"no-connection-flow-control", no_argument, 0, 'F'},
{"no-stream-flow-control", no_argument, 0, 'f'},
{"upgrade", no_argument, 0, 'u'},
{0, 0, 0, 0 }
};
int option_index = 0;
int c = getopt_long(argc, argv, "FOad:fm:nhH:vst:w:", long_options,
int c = getopt_long(argc, argv, "FOad:fm:nhH:vst:uw:", long_options,
&option_index);
if(c == -1) {
break;
......@@ -1269,6 +1504,9 @@ int main(int argc, char **argv)
case 't':
config.timeout = atoi(optarg) * 1000;
break;
case 'u':
config.upgrade = true;
break;
case 'w': {
errno = 0;
unsigned long int n = strtoul(optarg, 0, 10);
......
......@@ -366,8 +366,7 @@ void fill_default_config()
mod_config()->downstream_addrlen = 0;
mod_config()->num_worker = 1;
mod_config()->spdy_max_concurrent_streams =
NGHTTP2_INITIAL_MAX_CONCURRENT_STREAMS;
mod_config()->spdy_max_concurrent_streams = 100;
mod_config()->add_x_forwarded_for = false;
mod_config()->no_via = false;
mod_config()->accesslog = false;
......
......@@ -122,13 +122,14 @@ void upstream_eventcb(bufferevent *bev, short events, void *arg)
} // namespace
namespace {
void upstream_connhd_readcb(bufferevent *bev, void *arg)
void upstream_http2_connhd_readcb(bufferevent *bev, void *arg)
{
// This callback assumes upstream is Http2Upstream.
uint8_t data[NGHTTP2_CLIENT_CONNECTION_HEADER_LEN];
auto handler = reinterpret_cast<ClientHandler*>(arg);
size_t leftlen = handler->get_left_connhd_len();
auto leftlen = handler->get_left_connhd_len();
auto input = bufferevent_get_input(bev);
int readlen = evbuffer_remove(input, data, leftlen);
auto readlen = evbuffer_remove(input, data, leftlen);
if(readlen == -1) {
delete handler;
return;
......@@ -136,6 +137,10 @@ void upstream_connhd_readcb(bufferevent *bev, void *arg)
if(memcmp(NGHTTP2_CLIENT_CONNECTION_HEADER +
NGHTTP2_CLIENT_CONNECTION_HEADER_LEN - leftlen,
data, readlen) != 0) {
// There is no downgrade path here. Just drop the connection.
if(LOG_ENABLED(INFO)) {
CLOG(INFO, handler) << "invalid client connection header";
}
delete handler;
return;
}
......@@ -153,6 +158,56 @@ void upstream_connhd_readcb(bufferevent *bev, void *arg)
}
} // namespace
namespace {
void upstream_http1_connhd_readcb(bufferevent *bev, void *arg)
{
// This callback assumes upstream is HttpsUpstream.
uint8_t data[NGHTTP2_CLIENT_CONNECTION_HEADER_LEN];
auto handler = reinterpret_cast<ClientHandler*>(arg);
auto leftlen = handler->get_left_connhd_len();
auto input = bufferevent_get_input(bev);
auto readlen = evbuffer_copyout(input, data, leftlen);
if(readlen == -1) {
delete handler;
return;
}
if(memcmp(NGHTTP2_CLIENT_CONNECTION_HEADER +
NGHTTP2_CLIENT_CONNECTION_HEADER_LEN - leftlen,
data, readlen) != 0) {
if(LOG_ENABLED(INFO)) {
CLOG(INFO, handler) << "This is HTTP/1.1 connection, "
<< "but may be upgraded to HTTP/2.0 later.";
}
// Reset header length for later HTTP/2.0 upgrade
handler->set_left_connhd_len(NGHTTP2_CLIENT_CONNECTION_HEADER_LEN);
if(handler->on_read() != 0) {
delete handler;
return;
}
return;
}
if(evbuffer_drain(input, readlen) == -1) {
delete handler;
return;
}
leftlen -= readlen;
handler->set_left_connhd_len(leftlen);
if(leftlen == 0) {
if(LOG_ENABLED(INFO)) {
CLOG(INFO, handler) << "direct HTTP/2.0 connection";
}
handler->direct_http2_upgrade();
handler->set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb);
// Run on_read to process data left in buffer since they are not
// notified further
if(handler->on_read() != 0) {
delete handler;
return;
}
}
}
} // namespace
ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
const char *ipaddr)
: bev_(bev),
......@@ -171,15 +226,11 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
if(ssl_) {
set_bev_cb(nullptr, upstream_writecb, upstream_eventcb);
} else {
if(get_config()->client_mode) {
// Client mode
// For non-TLS version, first create HttpsUpstream. It may be
// upgraded to HTTP/2.0 through HTTP Upgrade or direct HTTP/2.0
// connection.
upstream_ = new HttpsUpstream(this);
set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb);
} else {
// no-TLS SPDY
upstream_ = new Http2Upstream(this);
set_bev_cb(upstream_connhd_readcb, upstream_writecb, upstream_eventcb);
}
set_bev_cb(upstream_http1_connhd_readcb, nullptr, upstream_eventcb);
}
}
......@@ -249,7 +300,8 @@ int ClientHandler::validate_next_proto()
CLOG(INFO, this) << "The negotiated next protocol: " << proto;
}
if(proto == NGHTTP2_PROTO_VERSION_ID) {
set_bev_cb(upstream_connhd_readcb, upstream_writecb, upstream_eventcb);
set_bev_cb(upstream_http2_connhd_readcb, upstream_writecb,
upstream_eventcb);
upstream_ = new Http2Upstream(this);
return 0;
} else {
......@@ -369,4 +421,38 @@ void ClientHandler::set_left_connhd_len(size_t left)
left_connhd_len_ = left;
}
void ClientHandler::direct_http2_upgrade()
{
delete upstream_;
upstream_= new Http2Upstream(this);
set_bev_cb(upstream_readcb, upstream_writecb, upstream_eventcb);
}
int ClientHandler::perform_http2_upgrade(HttpsUpstream *http)
{
int rv;
auto upstream = new Http2Upstream(this);
if(upstream->upgrade_upstream(http) != 0) {
delete upstream;
return -1;
}
upstream_ = upstream;
set_bev_cb(upstream_http2_connhd_readcb, upstream_writecb, upstream_eventcb);
static char res[] = "HTTP/1.1 101 Switching Protocols\r\n"
"Connection: Upgrade\r\n"
"Upgrade: HTTP/2.0\r\n"
"\r\n";
rv = bufferevent_write(bev_, res, sizeof(res) - 1);
if(rv != 0) {
CLOG(FATAL, this) << "bufferevent_write() faild";
return -1;
}
return 0;
}
bool ClientHandler::get_http2_upgrade_allowed() const
{
return !ssl_;
}
} // namespace shrpx
......@@ -37,6 +37,7 @@ namespace shrpx {
class Upstream;
class DownstreamConnection;
class SpdySession;
class HttpsUpstream;
class ClientHandler {
public:
......@@ -65,6 +66,14 @@ public:
SpdySession* get_spdy_session() const;
size_t get_left_connhd_len() const;
void set_left_connhd_len(size_t left);
// Call this function when HTTP/2.0 connection header is received at
// the start of the connection.
void direct_http2_upgrade();
// Performs HTTP/2.0 Upgrade from the connection managed by
// |http|. If this function fails, the connection must be
// terminated. This function returns 0 if it succeeds, or -1.
int perform_http2_upgrade(HttpsUpstream *http);
bool get_http2_upgrade_allowed() const;
private:
bufferevent *bev_;
int fd_;
......
......@@ -52,6 +52,7 @@ Downstream::Downstream(Upstream *upstream, int stream_id, int priority)
request_connection_close_(false),
request_expect_100_continue_(false),
request_header_key_prev_(false),
request_bodylen_(0),
response_state_(INITIAL),
response_http_status_(0),
response_major_(1),
......@@ -242,11 +243,24 @@ int Downstream::get_request_minor() const
return request_minor_;
}
void Downstream::reset_upstream(Upstream* upstream)
{
upstream_ = upstream;
if(dconn_) {
dconn_->on_upstream_change(upstream);
}
}
Upstream* Downstream::get_upstream() const
{
return upstream_;
}
void Downstream::set_stream_id(int32_t stream_id)
{
stream_id_ = stream_id;
}
int32_t Downstream::get_stream_id() const
{
return stream_id_;
......@@ -306,6 +320,7 @@ int Downstream::push_upload_data_chunk(const uint8_t *data, size_t datalen)
DLOG(WARNING, this) << "dconn_ is NULL";
return 0;
}
request_bodylen_ += datalen;
return dconn_->push_upload_data_chunk(data, datalen);
}
......@@ -482,6 +497,7 @@ void Downstream::check_upgrade_fulfilled()
upgraded_ = 200 <= response_http_status_ && response_http_status_ < 300;
} else {
// TODO Do more strict checking for upgrade headers
if(response_http_status_ == 101) {
for(auto& hd : request_headers_) {
if(util::strieq("upgrade", hd.first.c_str())) {
upgraded_ = true;
......@@ -489,6 +505,7 @@ void Downstream::check_upgrade_fulfilled()
}
}
}
}
}
bool Downstream::get_upgraded() const
......@@ -516,6 +533,27 @@ bool Downstream::get_upgrade_request() const
return upgrade_request_;
}
bool Downstream::http2_upgrade_request() const
{
if(request_bodylen_ != 0) {
return false;
}
bool upgrade_seen = false;
bool http2_settings_seen = false;
for(auto& hd : request_headers_) {
// For now just check NGHTTP2_PROTO_VERSION_ID in Upgrade header
// field and existence of HTTP2-Settings header field.
if(util::strieq(hd.first.c_str(), "upgrade")) {
if(util::strieq(hd.second.c_str(), NGHTTP2_PROTO_VERSION_ID)) {
upgrade_seen = true;
}
} else if(util::strieq(hd.first.c_str(), "http2-settings")) {
http2_settings_seen = true;
}
}
return upgrade_seen && http2_settings_seen;
}
void Downstream::set_downstream_stream_id(int32_t stream_id)
{
downstream_stream_id_ = stream_id;
......
......@@ -50,7 +50,9 @@ class Downstream {
public:
Downstream(Upstream *upstream, int stream_id, int priority);
~Downstream();
void reset_upstream(Upstream *upstream);
Upstream* get_upstream() const;
void set_stream_id(int32_t stream_id);
int32_t get_stream_id() const;
void set_priority(int pri);
void pause_read(IOCtrlReason reason);
......@@ -78,6 +80,8 @@ public:
// Returns true if the upgrade is succeded as a result of the call
// check_upgrade_fulfilled().
bool get_upgraded() const;
// Returns true if the request is HTTP Upgrade for HTTP/2.0
bool http2_upgrade_request() const;
// downstream request API
const Headers& get_request_headers() const;
void add_request_header(const std::string& name, const std::string& value);
......@@ -169,6 +173,8 @@ private:
bool request_expect_100_continue_;
Headers request_headers_;
bool request_header_key_prev_;
// the length of request body
int64_t request_bodylen_;
int response_state_;
unsigned int response_http_status_;
......
......@@ -32,6 +32,7 @@
namespace shrpx {
class ClientHandler;
class Upstream;
class Downstream;
class DownstreamConnection {
......@@ -54,6 +55,8 @@ public:
virtual int on_read() = 0;
virtual int on_write() = 0;
virtual void on_upstream_change(Upstream *uptream) = 0;
ClientHandler* get_client_handler();
Downstream* get_downstream();
protected:
......
......@@ -30,12 +30,14 @@
#include <sstream>
#include "shrpx_client_handler.h"
#include "shrpx_https_upstream.h"
#include "shrpx_downstream.h"
#include "shrpx_downstream_connection.h"
#include "shrpx_config.h"
#include "shrpx_http.h"
#include "shrpx_accesslog.h"
#include "util.h"
#include "base64.h"
using namespace nghttp2;
......@@ -135,6 +137,40 @@ void on_stream_close_callback
}
} // namespace
int Http2Upstream::upgrade_upstream(HttpsUpstream *http)
{
int rv;
std::string settings_payload;
auto downstream = http->get_downstream();
for(auto& hd : downstream->get_request_headers()) {
if(util::strieq(hd.first.c_str(), "http2-settings")) {
auto val = hd.second;
util::to_base64(val);
settings_payload = base64::decode(std::begin(val), std::end(val));
break;
}
}
rv = nghttp2_session_upgrade
(session_,
reinterpret_cast<const uint8_t*>(settings_payload.c_str()),
settings_payload.size(),
nullptr);
if(rv != 0) {
ULOG(WARNING, this) << "nghttp2_session_upgrade() returned error: "
<< nghttp2_strerror(rv);
return -1;
}
pre_upstream_ = http;
http->pop_downstream();
downstream->reset_upstream(this);
add_downstream(downstream);
downstream->init_response_body_buf();
downstream->set_stream_id(1);
downstream->set_priority(0);
return 0;
}
namespace {
void on_frame_recv_callback
(nghttp2_session *session, nghttp2_frame *frame, void *user_data)
......@@ -335,7 +371,8 @@ nghttp2_error_code infer_upstream_rst_stream_error_code
Http2Upstream::Http2Upstream(ClientHandler *handler)
: handler_(handler),
session_(nullptr)
session_(nullptr),
pre_upstream_(nullptr)
{
//handler->set_bev_cb(spdy_readcb, 0, spdy_eventcb);
handler->set_upstream_timeouts(&get_config()->spdy_upstream_read_timeout,
......@@ -380,13 +417,12 @@ Http2Upstream::Http2Upstream(ClientHandler *handler)
rv = nghttp2_submit_window_update(session_, NGHTTP2_FLAG_END_FLOW_CONTROL,
0, 0);
assert(rv == 0);
send();
}
Http2Upstream::~Http2Upstream()
{
nghttp2_session_del(session_);
delete pre_upstream_;
}
int Http2Upstream::on_read()
......
......@@ -35,6 +35,7 @@
namespace shrpx {
class ClientHandler;
class HttpsUpstream;
class Http2Upstream : public Upstream {
public:
......@@ -68,12 +69,17 @@ public:
bool get_flow_control() const;
int32_t get_initial_window_size() const;
// Perform HTTP/2.0 upgrade from |upstream|. On success, this object
// takes ownership of the |upstream|. This function returns 0 if it
// succeeds, or -1.
int upgrade_upstream(HttpsUpstream *upstream);
private:
ClientHandler *handler_;
nghttp2_session *session_;
bool flow_control_;
int32_t initial_window_size_;
DownstreamQueue downstream_queue_;
HttpsUpstream *pre_upstream_;
};
} // namespace shrpx
......
......@@ -125,27 +125,37 @@ int HttpDownstreamConnection::push_request_headers()
for(Headers::const_iterator i = request_headers.begin();
i != request_headers.end(); ++i) {
if(util::strieq((*i).first.c_str(), "connection")) {
if(util::strifind((*i).second.c_str(), "upgrade")) {
// nghttpx handles HTTP/2.0 upgrade and does not relay it to the
// downstream.
if(util::strifind((*i).second.c_str(), "upgrade") &&
!util::strifind((*i).second.c_str(), "http2-settings")) {
connection_upgrade = (*i).second;
}
continue;
} else if(util::strieq((*i).first.c_str(), "upgrade")) {
// nghttpx handles HTTP/2.0 upgrade and does not relay it to the
// downstream.
if(util::strieq((*i).second.c_str(), NGHTTP2_PROTO_VERSION_ID)) {
continue;
}
} else if(util::strieq((*i).first.c_str(), "x-forwarded-proto") ||
util::strieq((*i).first.c_str(), "keep-alive") ||
util::strieq((*i).first.c_str(), "proxy-connection")) {
util::strieq((*i).first.c_str(), "proxy-connection") ||
util::strieq((*i).first.c_str(), "http2-settings")) {
continue;
}
if(!get_config()->no_via && util::strieq((*i).first.c_str(), "via")) {
} else if(util::strieq((*i).first.c_str(), "via")) {
if(!get_config()->no_via) {
via_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "x-forwarded-for")) {
} else if(util::strieq((*i).first.c_str(), "x-forwarded-for")) {
xff_value = (*i).second;
continue;
}
if(util::strieq((*i).first.c_str(), "expect") &&
util::strifind((*i).second.c_str(), "100-continue")) {
} else if(util::strieq((*i).first.c_str(), "expect")) {
if(util::strifind((*i).second.c_str(), "100-continue")) {
continue;
}
}
hdrs += (*i).first;
http::capitalize(hdrs, hdrs.size()-(*i).first.size());
hdrs += ": ";
......@@ -493,4 +503,12 @@ int HttpDownstreamConnection::on_write()
return 0;
}
void HttpDownstreamConnection::on_upstream_change(Upstream *upstream)
{
bufferevent_setcb(bev_,
upstream->get_downstream_readcb(),
upstream->get_downstream_writecb(),
upstream->get_downstream_eventcb(), this);
}
} // namespace shrpx
......@@ -57,6 +57,8 @@ public:
virtual int on_read();
virtual int on_write();
virtual void on_upstream_change(Upstream *upstream);
bufferevent* get_bev();
private:
bufferevent *bev_;
......
......@@ -293,11 +293,11 @@ int HttpsUpstream::on_read()
// Get downstream again because it may be initialized in http parser
// execution
downstream = get_downstream();
auto handler = get_client_handler();
http_errno htperr = HTTP_PARSER_ERRNO(htp_);
if(htperr == HPE_PAUSED) {
if(downstream->get_request_state() == Downstream::CONNECT_FAIL) {
get_client_handler()->set_should_close_after_write(true);
handler->set_should_close_after_write(true);
// Following paues_read is needed to avoid reading next data.
pause_read(SHRPX_MSG_BLOCK);
if(error_reply(503) != 0) {
......@@ -311,6 +311,13 @@ int HttpsUpstream::on_read()
assert(downstream->get_response_state() == Downstream::MSG_COMPLETE);
delete_downstream();
} else {
if(handler->get_http2_upgrade_allowed() &&
downstream->http2_upgrade_request()) {
if(handler->perform_http2_upgrade(this) != 0) {
return -1;
}
return 0;
}
pause_read(SHRPX_MSG_BLOCK);
}
}
......@@ -322,7 +329,7 @@ int HttpsUpstream::on_read()
ULOG(WARNING, this) << "Request Header too long:"
<< current_header_length_
<< " bytes";
get_client_handler()->set_should_close_after_write(true);
handler->set_should_close_after_write(true);
pause_read(SHRPX_MSG_BLOCK);
if(error_reply(400) != 0) {
return -1;
......@@ -340,7 +347,7 @@ int HttpsUpstream::on_read()
<< "(" << http_errno_name(htperr) << ") "
<< http_errno_description(htperr);
}
get_client_handler()->set_should_close_after_write(true);
handler->set_should_close_after_write(true);
pause_read(SHRPX_MSG_BLOCK);
if(error_reply(400) != 0) {
return -1;
......@@ -635,6 +642,13 @@ Downstream* HttpsUpstream::get_downstream() const
return downstream_;
}
Downstream* HttpsUpstream::pop_downstream()
{
auto downstream = downstream_;
downstream_ = nullptr;
return downstream;
}
int HttpsUpstream::on_downstream_header_complete(Downstream *downstream)
{
if(LOG_ENABLED(INFO)) {
......
......@@ -52,6 +52,7 @@ public:
void attach_downstream(Downstream *downstream);
void delete_downstream();
Downstream* get_downstream() const;
Downstream* pop_downstream();
int error_reply(int status_code);
virtual void pause_read(IOCtrlReason reason);
......
......@@ -285,30 +285,41 @@ int SpdyDownstreamConnection::push_request_headers()
chunked_encoding = true;
}
// Ignore transfer-encoding
continue;
} else if(util::strieq((*i).first.c_str(), "upgrade")) {
// nghttpx handles HTTP/2.0 upgrade and does not relay it to the
// downstream.
if(util::strieq((*i).second.c_str(), NGHTTP2_PROTO_VERSION_ID)) {
continue;
}
} else if(util::strieq((*i).first.c_str(), "x-forwarded-proto") ||
util::strieq((*i).first.c_str(), "keep-alive") ||
util::strieq((*i).first.c_str(), "connection") ||
util:: strieq((*i).first.c_str(), "proxy-connection")) {
util::strieq((*i).first.c_str(), "proxy-connection") ||
util::strieq((*i).first.c_str(), "http2-settings")) {
// These are ignored
continue;
} else if(!get_config()->no_via &&
util::strieq((*i).first.c_str(), "via")) {
via_value = (*i).second;
continue;
} else if(util::strieq((*i).first.c_str(), "x-forwarded-for")) {
xff_value = (*i).second;
continue;
} else if(util::strieq((*i).first.c_str(), "expect") &&
util::strifind((*i).second.c_str(), "100-continue")) {
// Ignore
continue;
} else if(util::strieq((*i).first.c_str(), "host")) {
nv[hdidx++] = ":host";
nv[hdidx++] = (*i).second.c_str();
} else {
if(util::strieq((*i).first.c_str(), "content-length")) {
continue;
} else if(util::strieq((*i).first.c_str(), "content-length")) {
content_length = true;
}
nv[hdidx++] = (*i).first.c_str();
nv[hdidx++] = (*i).second.c_str();
}
}
if(get_config()->add_x_forwarded_for) {
nv[hdidx++] = "x-forwarded-for";
......
......@@ -58,6 +58,8 @@ public:
virtual int on_read();
virtual int on_write();
virtual void on_upstream_change(Upstream *upstream) {}
int send();
int init_request_body_buf();
......
......@@ -214,6 +214,40 @@ std::string format_hex(const unsigned char *s, size_t len)
return res;
}
void to_token68(std::string& base64str)
{
for(auto i = std::begin(base64str); i != std::end(base64str); ++i) {
switch(*i) {
case '+':
*i = '-';
break;
case '/':
*i = '_';
break;
case '=':
base64str.erase(i, std::end(base64str));
return;
}
}
return;
}
void to_base64(std::string& token68str)
{
for(auto i = std::begin(token68str); i != std::end(token68str); ++i) {
switch(*i) {
case '-':
*i = '+';
break;
case '_':
*i = '/';
break;
}
}
token68str += std::string(4 - token68str.size() % 4, '=');
return;
}
} // namespace util
} // namespace nghttp2
......@@ -375,6 +375,9 @@ make_unique(size_t size)
return std::unique_ptr<T>(new typename std::remove_extent<T>::type[size]());
}
void to_token68(std::string& base64str);
void to_base64(std::string& token68str);
} // namespace util
} // namespace nghttp2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment