Commit 27e161dc authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

src: Add EvbufferBuffer class to simplify the code base

parent 3ca4539f
...@@ -368,11 +368,11 @@ int Http2Handler::on_write() ...@@ -368,11 +368,11 @@ int Http2Handler::on_write()
{ {
int rv; int rv;
uint8_t buf[4096]; uint8_t buf[4096];
size_t buflen = 0;
auto output = bufferevent_get_output(bev_); auto output = bufferevent_get_output(bev_);
util::EvbufferBuffer evbbuf(output, buf, sizeof(buf));
for(;;) { for(;;) {
if(evbuffer_get_length(output) + buflen > if(evbuffer_get_length(output) + evbbuf.get_buflen() >
sessions_->get_config()->output_upper_thres) { sessions_->get_config()->output_upper_thres) {
break; break;
} }
...@@ -388,26 +388,13 @@ int Http2Handler::on_write() ...@@ -388,26 +388,13 @@ int Http2Handler::on_write()
if(datalen == 0) { if(datalen == 0) {
break; break;
} }
if(buflen + datalen > sizeof(buf)) { rv = evbbuf.add(data, datalen);
rv = evbuffer_add(output, buf, buflen); if(rv != 0) {
if(rv != 0) { std::cerr << "evbuffer_add() failed" << std::endl;
std::cerr << "evbuffer_add() failed" << std::endl; return -1;
return -1;
}
buflen = 0;
if(datalen > static_cast<ssize_t>(sizeof(buf))) {
rv = evbuffer_add(output, data, datalen);
if(rv != 0) {
std::cerr << "evbuffer_add() failed" << std::endl;
return -1;
}
continue;
}
} }
memcpy(buf + buflen, data, datalen);
buflen += datalen;
} }
rv = evbuffer_add(output, buf, buflen); rv = evbbuf.flush();
if(rv != 0) { if(rv != 0) {
std::cerr << "evbuffer_add() failed" << std::endl; std::cerr << "evbuffer_add() failed" << std::endl;
return -1; return -1;
......
...@@ -25,6 +25,9 @@ ...@@ -25,6 +25,9 @@
#include "h2load_http2_session.h" #include "h2load_http2_session.h"
#include "h2load.h" #include "h2load.h"
#include "util.h"
using namespace nghttp2;
namespace h2load { namespace h2load {
...@@ -163,8 +166,8 @@ int Http2Session::on_write() ...@@ -163,8 +166,8 @@ int Http2Session::on_write()
{ {
int rv; int rv;
uint8_t buf[4096]; uint8_t buf[4096];
size_t buflen = 0;
auto output = bufferevent_get_output(client_->bev); auto output = bufferevent_get_output(client_->bev);
util::EvbufferBuffer evbbuf(output, buf, sizeof(buf));
for(;;) { for(;;) {
const uint8_t *data; const uint8_t *data;
auto datalen = nghttp2_session_mem_send(session_, &data); auto datalen = nghttp2_session_mem_send(session_, &data);
...@@ -175,24 +178,13 @@ int Http2Session::on_write() ...@@ -175,24 +178,13 @@ int Http2Session::on_write()
if(datalen == 0) { if(datalen == 0) {
break; break;
} }
if(buflen + datalen > sizeof(buf)) { rv = evbbuf.add(data, datalen);
rv = evbuffer_add(output, buf, buflen); if(rv != 0) {
if(rv == -1) { return -1;
return -1;
}
buflen = 0;
if(datalen > static_cast<ssize_t>(sizeof(buf))) {
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
return -1;
}
}
} }
memcpy(buf + buflen, data, datalen);
buflen += datalen;
} }
rv = evbuffer_add(output, buf, buflen); rv = evbbuf.flush();
if(rv == -1) { if(rv != 0) {
return -1; return -1;
} }
if(nghttp2_session_want_read(session_) == 0 && if(nghttp2_session_want_read(session_) == 0 &&
......
...@@ -102,27 +102,11 @@ ssize_t send_callback(spdylay_session *session, ...@@ -102,27 +102,11 @@ ssize_t send_callback(spdylay_session *session,
{ {
auto client = static_cast<Client*>(user_data); auto client = static_cast<Client*>(user_data);
auto spdy_session = static_cast<SpdySession*>(client->session.get()); auto spdy_session = static_cast<SpdySession*>(client->session.get());
auto output = bufferevent_get_output(client->bev);
int rv; int rv;
if(spdy_session->sendbuflen + length > spdy_session->sendbufmax) { rv = spdy_session->sendbuf.add(data, length);
rv = evbuffer_add(output, spdy_session->sendbuf, spdy_session->sendbuflen); if(rv != 0) {
if(rv == -1) { return SPDYLAY_ERR_CALLBACK_FAILURE;
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
spdy_session->sendbuflen = 0;
if(length > spdy_session->sendbufmax) {
rv = evbuffer_add(output, data, length);
if(rv == -1) {
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
} else {
memcpy(spdy_session->sendbuf + spdy_session->sendbuflen, data, length);
spdy_session->sendbuflen += length;
}
} else {
memcpy(spdy_session->sendbuf + spdy_session->sendbuflen, data, length);
spdy_session->sendbuflen += length;
} }
return length; return length;
} }
...@@ -173,17 +157,15 @@ int SpdySession::on_write() ...@@ -173,17 +157,15 @@ int SpdySession::on_write()
int rv; int rv;
uint8_t buf[4096]; uint8_t buf[4096];
sendbuf = buf; sendbuf.reset(bufferevent_get_output(client_->bev), buf, sizeof(buf));
sendbuflen = 0;
sendbufmax = sizeof(buf);
rv = spdylay_session_send(session_); rv = spdylay_session_send(session_);
if(rv != 0) { if(rv != 0) {
return -1; return -1;
} }
rv = bufferevent_write(client_->bev, sendbuf, sendbuflen); rv = sendbuf.flush();
if(rv == -1) { if(rv != 0) {
return -1; return -1;
} }
......
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
#include <spdylay/spdylay.h> #include <spdylay/spdylay.h>
#include "util.h"
namespace h2load { namespace h2load {
struct Client; struct Client;
...@@ -43,9 +45,7 @@ public: ...@@ -43,9 +45,7 @@ public:
virtual int on_write(); virtual int on_write();
virtual void terminate(); virtual void terminate();
uint8_t *sendbuf; nghttp2::util::EvbufferBuffer sendbuf;
size_t sendbuflen;
size_t sendbufmax;
private: private:
Client *client_; Client *client_;
spdylay_session *session_; spdylay_session *session_;
......
...@@ -704,10 +704,12 @@ struct HttpClient { ...@@ -704,10 +704,12 @@ struct HttpClient {
int on_write() int on_write()
{ {
int rv; int rv;
uint8_t buf[4096];
auto output = bufferevent_get_output(bev); auto output = bufferevent_get_output(bev);
util::EvbufferBuffer evbbuf(output, buf, sizeof(buf));
for(;;) { for(;;) {
if(evbuffer_get_length(output) > config.output_upper_thres) { if(evbuffer_get_length(output) + evbbuf.get_buflen() >
config.output_upper_thres) {
break; break;
} }
...@@ -722,12 +724,17 @@ struct HttpClient { ...@@ -722,12 +724,17 @@ struct HttpClient {
if(datalen == 0) { if(datalen == 0) {
break; break;
} }
rv = evbuffer_add(output, data, datalen); rv = evbbuf.add(data, datalen);
if(rv == -1) { if(rv != 0) {
std::cerr << "evbuffer_add() failed" << std::endl; std::cerr << "evbuffer_add() failed" << std::endl;
return -1; return -1;
} }
} }
rv = evbbuf.flush();
if(rv != 0) {
std::cerr << "evbuffer_add() failed" << std::endl;
return -1;
}
if(nghttp2_session_want_read(session) == 0 && if(nghttp2_session_want_read(session) == 0 &&
nghttp2_session_want_write(session) == 0 && nghttp2_session_want_write(session) == 0 &&
evbuffer_get_length(output) == 0) { evbuffer_get_length(output) == 0) {
......
...@@ -1261,11 +1261,12 @@ int Http2Session::send() ...@@ -1261,11 +1261,12 @@ int Http2Session::send()
{ {
int rv; int rv;
uint8_t buf[4096]; uint8_t buf[4096];
size_t buflen = 0;
auto output = bufferevent_get_output(bev_); auto output = bufferevent_get_output(bev_);
util::EvbufferBuffer evbbuf(output, buf, sizeof(buf));
for(;;) { for(;;) {
// Check buffer length and return WOULDBLOCK if it is large enough. // Check buffer length and return WOULDBLOCK if it is large enough.
if(evbuffer_get_length(output) + buflen > Http2Session::OUTBUF_MAX_THRES) { if(evbuffer_get_length(output) + evbbuf.get_buflen() >
Http2Session::OUTBUF_MAX_THRES) {
return NGHTTP2_ERR_WOULDBLOCK; return NGHTTP2_ERR_WOULDBLOCK;
} }
...@@ -1280,28 +1281,15 @@ int Http2Session::send() ...@@ -1280,28 +1281,15 @@ int Http2Session::send()
if(datalen == 0) { if(datalen == 0) {
break; break;
} }
if(buflen + datalen > sizeof(buf)) { rv = evbbuf.add(data, datalen);
rv = evbuffer_add(output, buf, buflen); if(rv != 0) {
if(rv == -1) { SSLOG(FATAL, this) << "evbuffer_add() failed";
SSLOG(FATAL, this) << "evbuffer_add() failed"; return -1;
return -1;
}
buflen = 0;
if(datalen > static_cast<ssize_t>(sizeof(buf))) {
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
SSLOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
continue;
}
} }
memcpy(buf + buflen, data, datalen);
buflen += datalen;
} }
rv = evbuffer_add(output, buf, buflen); rv = evbbuf.flush();
if(rv == -1) { if(rv != 0) {
SSLOG(FATAL, this) << "evbuffer_add() failed"; SSLOG(FATAL, this) << "evbuffer_add() failed";
return -1; return -1;
} }
......
...@@ -567,12 +567,13 @@ int Http2Upstream::send() ...@@ -567,12 +567,13 @@ int Http2Upstream::send()
{ {
int rv; int rv;
uint8_t buf[4096]; uint8_t buf[4096];
size_t buflen = 0;
auto bev = handler_->get_bev(); auto bev = handler_->get_bev();
auto output = bufferevent_get_output(bev); auto output = bufferevent_get_output(bev);
util::EvbufferBuffer evbbuf(output, buf, sizeof(buf));
for(;;) { for(;;) {
// Check buffer length and return WOULDBLOCK if it is large enough. // Check buffer length and return WOULDBLOCK if it is large enough.
if(handler_->get_outbuf_length() + buflen > OUTBUF_MAX_THRES) { if(handler_->get_outbuf_length() + evbbuf.get_buflen() >
OUTBUF_MAX_THRES) {
break; break;
} }
...@@ -587,28 +588,15 @@ int Http2Upstream::send() ...@@ -587,28 +588,15 @@ int Http2Upstream::send()
if(datalen == 0) { if(datalen == 0) {
break; break;
} }
if(buflen + datalen > sizeof(buf)) { rv = evbbuf.add(data, datalen);
rv = evbuffer_add(output, buf, buflen); if(rv != 0) {
if(rv == -1) { ULOG(FATAL, this) << "evbuffer_add() failed";
ULOG(FATAL, this) << "evbuffer_add() failed"; return -1;
return -1;
}
buflen = 0;
if(datalen > static_cast<ssize_t>(sizeof(buf))) {
rv = evbuffer_add(output, data, datalen);
if(rv == -1) {
ULOG(FATAL, this) << "evbuffer_add() failed";
return -1;
}
continue;
}
} }
memcpy(buf + buflen, data, datalen);
buflen += datalen;
} }
rv = evbuffer_add(output, buf, buflen); rv = evbbuf.flush();
if(rv == -1) { if(rv != 0) {
ULOG(FATAL, this) << "evbuffer_add() failed"; ULOG(FATAL, this) << "evbuffer_add() failed";
return -1; return -1;
} }
......
...@@ -56,34 +56,17 @@ ssize_t send_callback(spdylay_session *session, ...@@ -56,34 +56,17 @@ ssize_t send_callback(spdylay_session *session,
int rv; int rv;
auto upstream = static_cast<SpdyUpstream*>(user_data); auto upstream = static_cast<SpdyUpstream*>(user_data);
auto handler = upstream->get_client_handler(); auto handler = upstream->get_client_handler();
auto bev = handler->get_bev();
auto output = bufferevent_get_output(bev);
// Check buffer length and return WOULDBLOCK if it is large enough. // Check buffer length and return WOULDBLOCK if it is large enough.
if(handler->get_outbuf_length() + upstream->sendbuflen > OUTBUF_MAX_THRES) { if(handler->get_outbuf_length() + upstream->sendbuf.get_buflen() >
OUTBUF_MAX_THRES) {
return SPDYLAY_ERR_WOULDBLOCK; return SPDYLAY_ERR_WOULDBLOCK;
} }
if(upstream->sendbuflen + len > upstream->sendbufmax) { rv = upstream->sendbuf.add(data, len);
rv = evbuffer_add(output, upstream->sendbuf, upstream->sendbuflen); if(rv != 0) {
if(rv == -1) { ULOG(FATAL, upstream) << "evbuffer_add() failed";
ULOG(FATAL, upstream) << "evbuffer_add() failed"; return SPDYLAY_ERR_CALLBACK_FAILURE;
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
upstream->sendbuflen = 0;
if(len > upstream->sendbufmax) {
rv = evbuffer_add(output, data, len);
if(rv == -1) {
ULOG(FATAL, upstream) << "evbuffer_add() failed";
return SPDYLAY_ERR_CALLBACK_FAILURE;
}
} else {
memcpy(upstream->sendbuf + upstream->sendbuflen, data, len);
upstream->sendbuflen += len;
}
} else {
memcpy(upstream->sendbuf + upstream->sendbuflen, data, len);
upstream->sendbuflen += len;
} }
return len; return len;
} }
...@@ -478,9 +461,7 @@ int SpdyUpstream::send() ...@@ -478,9 +461,7 @@ int SpdyUpstream::send()
int rv = 0; int rv = 0;
uint8_t buf[4096]; uint8_t buf[4096];
sendbuf = buf; sendbuf.reset(bufferevent_get_output(handler_->get_bev()), buf, sizeof(buf));
sendbuflen = 0;
sendbufmax = sizeof(buf);
rv = spdylay_session_send(session_); rv = spdylay_session_send(session_);
if(rv != 0) { if(rv != 0) {
...@@ -488,8 +469,9 @@ int SpdyUpstream::send() ...@@ -488,8 +469,9 @@ int SpdyUpstream::send()
<< spdylay_strerror(rv); << spdylay_strerror(rv);
return rv; return rv;
} }
rv = bufferevent_write(handler_->get_bev(), sendbuf, sendbuflen);
if(rv == -1) { rv = sendbuf.flush();
if(rv != 0) {
ULOG(FATAL, this) << "evbuffer_add() failed"; ULOG(FATAL, this) << "evbuffer_add() failed";
return -1; return -1;
} }
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "shrpx_upstream.h" #include "shrpx_upstream.h"
#include "shrpx_downstream_queue.h" #include "shrpx_downstream_queue.h"
#include "util.h"
namespace shrpx { namespace shrpx {
...@@ -68,9 +69,7 @@ public: ...@@ -68,9 +69,7 @@ public:
bool get_flow_control() const; bool get_flow_control() const;
uint8_t *sendbuf; nghttp2::util::EvbufferBuffer sendbuf;
size_t sendbuflen;
size_t sendbufmax;
private: private:
DownstreamQueue downstream_queue_; DownstreamQueue downstream_queue_;
ClientHandler *handler_; ClientHandler *handler_;
......
...@@ -469,6 +469,66 @@ void write_uri_field(std::ostream& o, ...@@ -469,6 +469,66 @@ void write_uri_field(std::ostream& o,
} }
} }
EvbufferBuffer::EvbufferBuffer()
: evbuffer_(nullptr),
buf_(nullptr),
bufmax_(0),
buflen_(0)
{}
EvbufferBuffer::EvbufferBuffer(evbuffer *evbuffer, uint8_t *buf, size_t bufmax)
: evbuffer_(evbuffer),
buf_(buf),
bufmax_(bufmax),
buflen_(0)
{}
void EvbufferBuffer::reset(evbuffer *evbuffer, uint8_t *buf, size_t bufmax)
{
evbuffer_ = evbuffer;
buf_ = buf;
bufmax_ = bufmax;
buflen_ = 0;
}
int EvbufferBuffer::flush()
{
int rv;
rv = evbuffer_add(evbuffer_, buf_, buflen_);
if(rv == -1) {
return -1;
}
buflen_ = 0;
return 0;
}
int EvbufferBuffer::add(const uint8_t *data, size_t datalen)
{
int rv;
if(buflen_ + datalen > bufmax_) {
rv = evbuffer_add(evbuffer_, buf_, buflen_);
if(rv == -1) {
return -1;
}
buflen_ = 0;
if(datalen > bufmax_) {
rv = evbuffer_add(evbuffer_, data, datalen);
if(rv == -1) {
return -1;
}
return 0;
}
}
memcpy(buf_ + buflen_, data, datalen);
buflen_ += datalen;
return 0;
}
size_t EvbufferBuffer::get_buflen() const
{
return buflen_;
}
} // namespace util } // namespace util
} // namespace nghttp2 } // namespace nghttp2
...@@ -37,6 +37,8 @@ ...@@ -37,6 +37,8 @@
#include <sstream> #include <sstream>
#include <memory> #include <memory>
#include <event2/buffer.h>
#include "http-parser/http_parser.h" #include "http-parser/http_parser.h"
namespace nghttp2 { namespace nghttp2 {
...@@ -432,6 +434,21 @@ void write_uri_field(std::ostream& o, ...@@ -432,6 +434,21 @@ void write_uri_field(std::ostream& o,
const char *uri, const http_parser_url &u, const char *uri, const http_parser_url &u,
http_parser_url_fields field); http_parser_url_fields field);
class EvbufferBuffer {
public:
EvbufferBuffer();
EvbufferBuffer(evbuffer *evbuffer, uint8_t *buf, size_t bufmax);
void reset(evbuffer *evbuffer, uint8_t *buf, size_t bufmax);
int flush();
int add(const uint8_t *data, size_t datalen);
size_t get_buflen() const;
private:
evbuffer *evbuffer_;
uint8_t *buf_;
size_t bufmax_;
size_t buflen_;
};
} // namespace util } // namespace util
} // namespace nghttp2 } // namespace nghttp2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment