Commit b38b233a authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

nghttpx: Check failure of evbuffer_* and bufferevent_* functions

parent 873d457d
...@@ -221,7 +221,11 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl, ...@@ -221,7 +221,11 @@ ClientHandler::ClientHandler(bufferevent *bev, int fd, SSL *ssl,
fd_(fd), fd_(fd),
should_close_after_write_(false) should_close_after_write_(false)
{ {
bufferevent_set_rate_limit(bev_, get_config()->rate_limit_cfg); int rv;
rv = bufferevent_set_rate_limit(bev_, get_config()->rate_limit_cfg);
if(rv == -1) {
CLOG(FATAL, this) << "bufferevent_set_rate_limit() failed";
}
bufferevent_enable(bev_, EV_READ | EV_WRITE); bufferevent_enable(bev_, EV_READ | EV_WRITE);
bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK); bufferevent_setwatermark(bev_, EV_READ, 0, SHRPX_READ_WARTER_MARK);
set_upstream_timeouts(&get_config()->upstream_read_timeout, set_upstream_timeouts(&get_config()->upstream_read_timeout,
......
...@@ -593,7 +593,7 @@ int Downstream::init_response_body_buf() ...@@ -593,7 +593,7 @@ int Downstream::init_response_body_buf()
{ {
if(!response_body_buf_) { if(!response_body_buf_) {
response_body_buf_ = evbuffer_new(); response_body_buf_ = evbuffer_new();
if(response_body_buf_ == 0) { if(response_body_buf_ == nullptr) {
DIE(); DIE();
} }
evbuffer_setcb(response_body_buf_, body_buf_cb, this); evbuffer_setcb(response_body_buf_, body_buf_cb, this);
......
...@@ -175,6 +175,10 @@ ssize_t http2_data_read_callback(nghttp2_session *session, ...@@ -175,6 +175,10 @@ ssize_t http2_data_read_callback(nghttp2_session *session,
int nread = 0; int nread = 0;
for(;;) { for(;;) {
nread = evbuffer_remove(body, buf, length); nread = evbuffer_remove(body, buf, length);
if(nread == -1) {
DCLOG(FATAL, dconn) << "evbuffer_remove() failed";
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if(nread == 0) { if(nread == 0) {
if(downstream->get_request_state() == Downstream::MSG_COMPLETE) { if(downstream->get_request_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgrade_request() || if(!downstream->get_upgrade_request() ||
......
...@@ -538,7 +538,10 @@ int Http2Session::on_read_proxy() ...@@ -538,7 +538,10 @@ int Http2Session::on_read_proxy()
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
evbuffer_get_length(input)); evbuffer_get_length(input));
evbuffer_drain(input, nread); if(evbuffer_drain(input, nread) != 0) {
SSLOG(FATAL, this) << "evbuffer_drain() failed";
return -1;
}
auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get()); auto htperr = HTTP_PARSER_ERRNO(proxy_htp_.get());
if(htperr == HPE_OK) { if(htperr == HPE_OK) {
return 0; return 0;
...@@ -1182,8 +1185,12 @@ int Http2Session::on_connect() ...@@ -1182,8 +1185,12 @@ int Http2Session::on_connect()
} }
} }
bufferevent_write(bev_, NGHTTP2_CLIENT_CONNECTION_HEADER, rv = bufferevent_write(bev_, NGHTTP2_CLIENT_CONNECTION_HEADER,
NGHTTP2_CLIENT_CONNECTION_HEADER_LEN); NGHTTP2_CLIENT_CONNECTION_HEADER_LEN);
if(rv != 0) {
SSLOG(FATAL, this) << "bufferevent_write() failed";
return -1;
}
rv = send(); rv = send();
if(rv != 0) { if(rv != 0) {
...@@ -1252,14 +1259,18 @@ int Http2Session::send() ...@@ -1252,14 +1259,18 @@ int Http2Session::send()
void Http2Session::clear_notify() void Http2Session::clear_notify()
{ {
auto input = bufferevent_get_output(rdbev_); auto input = bufferevent_get_output(rdbev_);
evbuffer_drain(input, evbuffer_get_length(input)); if(evbuffer_drain(input, evbuffer_get_length(input)) != 0) {
SSLOG(FATAL, this) << "evbuffer_drain() failed";
}
notified_ = false; notified_ = false;
} }
void Http2Session::notify() void Http2Session::notify()
{ {
if(!notified_) { if(!notified_) {
bufferevent_write(wrbev_, "1", 1); if(bufferevent_write(wrbev_, "1", 1) != 0) {
SSLOG(FATAL, this) << "bufferevent_write failed";
}
notified_ = true; notified_ = true;
} }
} }
......
...@@ -832,17 +832,20 @@ ssize_t downstream_data_read_callback(nghttp2_session *session, ...@@ -832,17 +832,20 @@ ssize_t downstream_data_read_callback(nghttp2_session *session,
void *user_data) void *user_data)
{ {
auto downstream = reinterpret_cast<Downstream*>(source->ptr); auto downstream = reinterpret_cast<Downstream*>(source->ptr);
auto upstream = reinterpret_cast<Http2Upstream*>(downstream->get_upstream());
auto body = downstream->get_response_body_buf(); auto body = downstream->get_response_body_buf();
assert(body); assert(body);
int nread = evbuffer_remove(body, buf, length); int nread = evbuffer_remove(body, buf, length);
if(nread == -1) {
ULOG(FATAL, upstream) << "evbuffer_remove() failed";
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
if(nread == 0 && if(nread == 0 &&
downstream->get_response_state() == Downstream::MSG_COMPLETE) { downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgraded()) { if(!downstream->get_upgraded()) {
*eof = 1; *eof = 1;
} else { } else {
// For tunneling, issue RST_STREAM to finish the stream. // For tunneling, issue RST_STREAM to finish the stream.
auto upstream = reinterpret_cast<Http2Upstream*>
(downstream->get_upstream());
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
ULOG(INFO, upstream) << "RST_STREAM to tunneled stream stream_id=" ULOG(INFO, upstream) << "RST_STREAM to tunneled stream stream_id="
<< stream_id; << stream_id;
......
...@@ -484,14 +484,23 @@ int HttpDownstreamConnection::on_read() ...@@ -484,14 +484,23 @@ int HttpDownstreamConnection::on_read()
int rv; int rv;
rv = downstream_->get_upstream()->on_downstream_body rv = downstream_->get_upstream()->on_downstream_body
(downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen); (downstream_, reinterpret_cast<const uint8_t*>(mem), inputlen);
evbuffer_drain(input, inputlen); if(rv != 0) {
return rv; return rv;
}
if(evbuffer_drain(input, inputlen) != 0) {
DCLOG(FATAL, this) << "evbuffer_drain() failed";
return -1;
}
return 0;
} }
size_t nread = http_parser_execute(&response_htp_, &htp_hooks, size_t nread = http_parser_execute(&response_htp_, &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
inputlen); inputlen);
evbuffer_drain(input, nread); if(evbuffer_drain(input, nread) != 0) {
DCLOG(FATAL, this) << "evbuffer_drain() failed";
return -1;
}
auto htperr = HTTP_PARSER_ERRNO(&response_htp_); auto htperr = HTTP_PARSER_ERRNO(&response_htp_);
if(htperr == HPE_OK) { if(htperr == HPE_OK) {
return 0; return 0;
......
...@@ -262,10 +262,13 @@ int HttpsUpstream::on_read() ...@@ -262,10 +262,13 @@ int HttpsUpstream::on_read()
if(downstream && downstream->get_upgraded()) { if(downstream && downstream->get_upgraded()) {
int rv = downstream->push_upload_data_chunk int rv = downstream->push_upload_data_chunk
(reinterpret_cast<const uint8_t*>(mem), inputlen); (reinterpret_cast<const uint8_t*>(mem), inputlen);
evbuffer_drain(input, inputlen);
if(rv != 0) { if(rv != 0) {
return -1; return -1;
} }
if(evbuffer_drain(input, inputlen) != 0) {
ULOG(FATAL, this) << "evbuffer_drain() failed";
return -1;
}
if(downstream->get_output_buffer_full()) { if(downstream->get_output_buffer_full()) {
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
ULOG(INFO, this) << "Downstream output buffer is full"; ULOG(INFO, this) << "Downstream output buffer is full";
...@@ -278,7 +281,10 @@ int HttpsUpstream::on_read() ...@@ -278,7 +281,10 @@ int HttpsUpstream::on_read()
size_t nread = http_parser_execute(&htp_, &htp_hooks, size_t nread = http_parser_execute(&htp_, &htp_hooks,
reinterpret_cast<const char*>(mem), reinterpret_cast<const char*>(mem),
inputlen); inputlen);
evbuffer_drain(input, nread); if(evbuffer_drain(input, nread) != 0) {
ULOG(FATAL, this) << "evbuffer_drain() failed";
return -1;
}
// Well, actually header length + some body bytes // Well, actually header length + some body bytes
current_header_length_ += nread; current_header_length_ += nread;
// Get downstream again because it may be initialized in http parser // Get downstream again because it may be initialized in http parser
......
...@@ -730,17 +730,20 @@ ssize_t spdy_data_read_callback(spdylay_session *session, ...@@ -730,17 +730,20 @@ ssize_t spdy_data_read_callback(spdylay_session *session,
void *user_data) void *user_data)
{ {
Downstream *downstream = reinterpret_cast<Downstream*>(source->ptr); Downstream *downstream = reinterpret_cast<Downstream*>(source->ptr);
auto upstream = reinterpret_cast<SpdyUpstream*>(downstream->get_upstream());
evbuffer *body = downstream->get_response_body_buf(); evbuffer *body = downstream->get_response_body_buf();
assert(body); assert(body);
int nread = evbuffer_remove(body, buf, length); int nread = evbuffer_remove(body, buf, length);
if(nread == -1) {
ULOG(FATAL, upstream) << "evbuffer_remove() failed";
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
if(nread == 0 && if(nread == 0 &&
downstream->get_response_state() == Downstream::MSG_COMPLETE) { downstream->get_response_state() == Downstream::MSG_COMPLETE) {
if(!downstream->get_upgraded()) { if(!downstream->get_upgraded()) {
*eof = 1; *eof = 1;
} else { } else {
// For tunneling, issue RST_STREAM to finish the stream. // For tunneling, issue RST_STREAM to finish the stream.
SpdyUpstream *upstream;
upstream = reinterpret_cast<SpdyUpstream*>(downstream->get_upstream());
if(LOG_ENABLED(INFO)) { if(LOG_ENABLED(INFO)) {
ULOG(INFO, upstream) << "RST_STREAM to tunneled stream stream_id=" ULOG(INFO, upstream) << "RST_STREAM to tunneled stream stream_id="
<< stream_id; << stream_id;
......
...@@ -48,6 +48,10 @@ void ThreadEventReceiver::on_read(bufferevent *bev) ...@@ -48,6 +48,10 @@ void ThreadEventReceiver::on_read(bufferevent *bev)
while(evbuffer_get_length(input) >= sizeof(WorkerEvent)) { while(evbuffer_get_length(input) >= sizeof(WorkerEvent)) {
WorkerEvent wev; WorkerEvent wev;
int nread = evbuffer_remove(input, &wev, sizeof(wev)); int nread = evbuffer_remove(input, &wev, sizeof(wev));
if(nread == -1) {
TLOG(FATAL, this) << "evbuffer_remove() failed";
continue;
}
if(nread != sizeof(wev)) { if(nread != sizeof(wev)) {
TLOG(FATAL, this) << "evbuffer_remove() removed fewer bytes. Expected:" TLOG(FATAL, this) << "evbuffer_remove() removed fewer bytes. Expected:"
<< sizeof(wev) << " Actual:" << nread; << sizeof(wev) << " Actual:" << nread;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment