Commit 3f97e6cd authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa Committed by GitHub

Merge pull request #776 from nghttp2/nghttpx-memchunkbuffer

nghttpx: Use Memchunk based read buffer for frontend connection
parents e8b25080 4fa150c4
......@@ -434,6 +434,102 @@ inline int limit_iovec(struct iovec *iov, int iovcnt, size_t max) {
return iovcnt;
}
// MemchunkBuffer is similar to Buffer, but it uses pooled Memchunk
// for its underlying buffer.
template <typename Memchunk> struct MemchunkBuffer {
MemchunkBuffer(Pool<Memchunk> *pool) : pool(pool), chunk(nullptr) {}
MemchunkBuffer(const MemchunkBuffer &) = delete;
MemchunkBuffer(MemchunkBuffer &&other) noexcept
: pool(other.pool), chunk(other.chunk) {
other.chunk = nullptr;
}
MemchunkBuffer &operator=(const MemchunkBuffer &) = delete;
MemchunkBuffer &operator=(MemchunkBuffer &&other) noexcept {
if (this == &other) {
return *this;
}
pool = other.pool;
chunk = other.chunk;
other.chunk = nullptr;
return *this;
}
~MemchunkBuffer() {
if (!pool || !chunk) {
return;
}
pool->recycle(chunk);
}
// Ensures that the underlying buffer is allocated.
void ensure_chunk() {
if (chunk) {
return;
}
chunk = pool->get();
}
// Releases the underlying buffer.
void release_chunk() {
if (!chunk) {
return;
}
pool->recycle(chunk);
chunk = nullptr;
}
// Returns true if the underlying buffer is allocated.
bool chunk_avail() const { return chunk != nullptr; }
// The functions below must be called after the underlying buffer is
// allocated (use ensure_chunk).
// MemchunkBuffer provides the same interface functions with Buffer.
// Since we has chunk as a member variable, pos and last are
// implemented as wrapper functions.
uint8_t *pos() const { return chunk->pos; }
uint8_t *last() const { return chunk->last; }
size_t rleft() const { return chunk->len(); }
size_t wleft() const { return chunk->left(); }
size_t write(const void *src, size_t count) {
count = std::min(count, wleft());
auto p = static_cast<const uint8_t *>(src);
chunk->last = std::copy_n(p, count, chunk->last);
return count;
}
size_t write(size_t count) {
count = std::min(count, wleft());
chunk->last += count;
return count;
}
size_t drain(size_t count) {
count = std::min(count, rleft());
chunk->pos += count;
return count;
}
size_t drain_reset(size_t count) {
count = std::min(count, rleft());
std::copy(chunk->pos + count, chunk->last, std::begin(chunk->buf));
chunk->last = std::begin(chunk->buf) + (chunk->last - (chunk->pos + count));
chunk->pos = std::begin(chunk->buf);
return count;
}
void reset() { chunk->reset(); }
uint8_t *begin() { return std::begin(chunk->buf); }
uint8_t &operator[](size_t n) { return chunk->buf[n]; }
const uint8_t &operator[](size_t n) const { return chunk->buf[n]; }
Pool<Memchunk> *pool;
Memchunk *chunk;
};
using DefaultMemchunkBuffer = MemchunkBuffer<Memchunk16K>;
} // namespace nghttp2
#endif // MEMCHUNK_H
......@@ -117,6 +117,7 @@ void writecb(struct ev_loop *loop, ev_io *w, int revents) {
int ClientHandler::noop() { return 0; }
int ClientHandler::read_clear() {
rb_.ensure_chunk();
for (;;) {
if (rb_.rleft() && on_read() != 0) {
return -1;
......@@ -132,9 +133,12 @@ int ClientHandler::read_clear() {
return 0;
}
auto nread = conn_.read_clear(rb_.last, rb_.wleft());
auto nread = conn_.read_clear(rb_.last(), rb_.wleft());
if (nread == 0) {
if (rb_.rleft() == 0) {
rb_.release_chunk();
}
return 0;
}
......@@ -209,6 +213,8 @@ int ClientHandler::tls_handshake() {
int ClientHandler::read_tls() {
ERR_clear_error();
rb_.ensure_chunk();
for (;;) {
// we should process buffered data first before we read EOF.
if (rb_.rleft() && on_read() != 0) {
......@@ -225,9 +231,12 @@ int ClientHandler::read_tls() {
return 0;
}
auto nread = conn_.read_tls(rb_.last, rb_.wleft());
auto nread = conn_.read_tls(rb_.last(), rb_.wleft());
if (nread == 0) {
if (rb_.rleft() == 0) {
rb_.release_chunk();
}
return 0;
}
......@@ -303,7 +312,7 @@ int ClientHandler::upstream_write() {
int ClientHandler::upstream_http2_connhd_read() {
auto nread = std::min(left_connhd_len_, rb_.rleft());
if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_,
rb_.pos, nread) != 0) {
rb_.pos(), nread) != 0) {
// There is no downgrade path here. Just drop the connection.
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "invalid client connection header";
......@@ -332,7 +341,7 @@ int ClientHandler::upstream_http2_connhd_read() {
int ClientHandler::upstream_http1_connhd_read() {
auto nread = std::min(left_connhd_len_, rb_.rleft());
if (memcmp(NGHTTP2_CLIENT_MAGIC + NGHTTP2_CLIENT_MAGIC_LEN - left_connhd_len_,
rb_.pos, nread) != 0) {
rb_.pos(), nread) != 0) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "This is HTTP/1.1 connection, "
<< "but may be upgraded to HTTP/2 later.";
......@@ -386,6 +395,7 @@ ClientHandler::ClientHandler(Worker *worker, int fd, SSL *ssl,
// so the required space is 64 + 48 + 16 + 48 + 16 + 16 + 16 +
// 32 + 8 + 8 * 8 = 328.
balloc_(512, 512),
rb_(worker->get_mcpool()),
conn_(worker->get_loop(), fd, ssl, worker->get_mcpool(),
get_config()->conn.upstream.timeout.write,
get_config()->conn.upstream.timeout.read,
......@@ -647,9 +657,11 @@ int ClientHandler::do_read() { return read_(*this); }
int ClientHandler::do_write() { return write_(*this); }
int ClientHandler::on_read() {
auto rv = on_read_(*this);
if (rv != 0) {
return rv;
if (rb_.chunk_avail()) {
auto rv = on_read_(*this);
if (rv != 0) {
return rv;
}
}
conn_.handle_tls_pending_read();
return 0;
......@@ -1325,7 +1337,7 @@ ssize_t parse_proxy_line_port(const uint8_t *first, const uint8_t *last) {
int ClientHandler::on_proxy_protocol_finish() {
if (conn_.tls.ssl) {
conn_.tls.rbuf.append(rb_.pos, rb_.rleft());
conn_.tls.rbuf.append(rb_.pos(), rb_.rleft());
rb_.reset();
}
......@@ -1346,7 +1358,7 @@ int ClientHandler::proxy_protocol_read() {
CLOG(INFO, this) << "PROXY-protocol: Started";
}
auto first = rb_.pos;
auto first = rb_.pos();
// NULL character really destroys functions which expects NULL
// terminated string. We won't expect it in PROXY protocol line, so
......@@ -1355,12 +1367,12 @@ int ClientHandler::proxy_protocol_read() {
constexpr size_t MAX_PROXY_LINELEN = 107;
auto bufend = rb_.pos + std::min(MAX_PROXY_LINELEN, rb_.rleft());
auto bufend = rb_.pos() + std::min(MAX_PROXY_LINELEN, rb_.rleft());
auto end =
std::find_first_of(rb_.pos, bufend, std::begin(chrs), std::end(chrs));
std::find_first_of(rb_.pos(), bufend, std::begin(chrs), std::end(chrs));
if (end == bufend || *end == '\0' || end == rb_.pos || *(end - 1) != '\r') {
if (end == bufend || *end == '\0' || end == rb_.pos() || *(end - 1) != '\r') {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: No ending CR LF sequence found";
}
......@@ -1371,14 +1383,14 @@ int ClientHandler::proxy_protocol_read() {
constexpr auto HEADER = StringRef::from_lit("PROXY ");
if (static_cast<size_t>(end - rb_.pos) < HEADER.size()) {
if (static_cast<size_t>(end - rb_.pos()) < HEADER.size()) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: PROXY version 1 ID not found";
}
return -1;
}
if (!util::streq(HEADER, StringRef{rb_.pos, HEADER.size()})) {
if (!util::streq(HEADER, StringRef{rb_.pos(), HEADER.size()})) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Bad PROXY protocol version 1 ID";
}
......@@ -1389,22 +1401,22 @@ int ClientHandler::proxy_protocol_read() {
int family;
if (rb_.pos[0] == 'T') {
if (end - rb_.pos < 5) {
if (rb_.pos()[0] == 'T') {
if (end - rb_.pos() < 5) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
}
return -1;
}
if (rb_.pos[1] != 'C' || rb_.pos[2] != 'P') {
if (rb_.pos()[1] != 'C' || rb_.pos()[2] != 'P') {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
}
return -1;
}
switch (rb_.pos[3]) {
switch (rb_.pos()[3]) {
case '4':
family = AF_INET;
break;
......@@ -1420,26 +1432,26 @@ int ClientHandler::proxy_protocol_read() {
rb_.drain(5);
} else {
if (end - rb_.pos < 7) {
if (end - rb_.pos() < 7) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: INET protocol family not found";
}
return -1;
}
if (!util::streq_l("UNKNOWN", rb_.pos, 7)) {
if (!util::streq_l("UNKNOWN", rb_.pos(), 7)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Unknown INET protocol family";
}
return -1;
}
rb_.drain(end + 2 - rb_.pos);
rb_.drain(end + 2 - rb_.pos());
return on_proxy_protocol_finish();
}
// source address
auto token_end = std::find(rb_.pos, end, ' ');
auto token_end = std::find(rb_.pos(), end, ' ');
if (token_end == end) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Source address not found";
......@@ -1448,20 +1460,20 @@ int ClientHandler::proxy_protocol_read() {
}
*token_end = '\0';
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos), family)) {
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source address";
}
return -1;
}
auto src_addr = rb_.pos;
auto src_addrlen = token_end - rb_.pos;
auto src_addr = rb_.pos();
auto src_addrlen = token_end - rb_.pos();
rb_.drain(token_end - rb_.pos + 1);
rb_.drain(token_end - rb_.pos() + 1);
// destination address
token_end = std::find(rb_.pos, end, ' ');
token_end = std::find(rb_.pos(), end, ' ');
if (token_end == end) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Destination address not found";
......@@ -1470,7 +1482,7 @@ int ClientHandler::proxy_protocol_read() {
}
*token_end = '\0';
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos), family)) {
if (!util::numeric_host(reinterpret_cast<const char *>(rb_.pos()), family)) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination address";
}
......@@ -1479,26 +1491,26 @@ int ClientHandler::proxy_protocol_read() {
// Currently we don't use destination address
rb_.drain(token_end - rb_.pos + 1);
rb_.drain(token_end - rb_.pos() + 1);
// source port
auto n = parse_proxy_line_port(rb_.pos, end);
if (n <= 0 || *(rb_.pos + n) != ' ') {
auto n = parse_proxy_line_port(rb_.pos(), end);
if (n <= 0 || *(rb_.pos() + n) != ' ') {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid source port";
}
return -1;
}
rb_.pos[n] = '\0';
auto src_port = rb_.pos;
rb_.pos()[n] = '\0';
auto src_port = rb_.pos();
auto src_portlen = n;
rb_.drain(n + 1);
// destination port
n = parse_proxy_line_port(rb_.pos, end);
if (n <= 0 || rb_.pos + n != end) {
n = parse_proxy_line_port(rb_.pos(), end);
if (n <= 0 || rb_.pos() + n != end) {
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Invalid destination port";
}
......@@ -1507,14 +1519,14 @@ int ClientHandler::proxy_protocol_read() {
// Currently we don't use destination port
rb_.drain(end + 2 - rb_.pos);
rb_.drain(end + 2 - rb_.pos());
ipaddr_ =
make_string_ref(balloc_, StringRef{src_addr, src_addr + src_addrlen});
port_ = make_string_ref(balloc_, StringRef{src_port, src_port + src_portlen});
if (LOG_ENABLED(INFO)) {
CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos - first)
CLOG(INFO, this) << "PROXY-protocol-v1: Finished, " << (rb_.pos() - first)
<< " bytes read";
}
......
......@@ -124,7 +124,7 @@ public:
int64_t body_bytes_sent);
Worker *get_worker() const;
using ReadBuf = Buffer<16_k>;
using ReadBuf = DefaultMemchunkBuffer;
ReadBuf *get_rb();
......@@ -171,6 +171,7 @@ private:
// sure that the allocations must be bounded, and not proportional
// to the number of requests.
BlockAllocator balloc_;
DefaultMemchunkBuffer rb_;
Connection conn_;
ev_timer reneg_shutdown_timer_;
std::unique_ptr<Upstream> upstream_;
......@@ -197,7 +198,6 @@ private:
bool should_close_after_write_;
// true if affinity_hash_ is computed
bool affinity_hash_computed_;
ReadBuf rb_;
};
} // namespace shrpx
......
......@@ -1055,7 +1055,7 @@ int Http2Upstream::on_read() {
auto rlimit = handler_->get_rlimit();
if (rb->rleft()) {
rv = nghttp2_session_mem_recv(session_, rb->pos, rb->rleft());
rv = nghttp2_session_mem_recv(session_, rb->pos(), rb->rleft());
if (rv < 0) {
if (rv != NGHTTP2_ERR_BAD_CLIENT_MAGIC) {
ULOG(ERROR, this) << "nghttp2_session_mem_recv() returned error: "
......
......@@ -532,7 +532,7 @@ int HttpsUpstream::on_read() {
// callback chain called by http_parser_execute()
if (downstream && downstream->get_upgraded()) {
auto rv = downstream->push_upload_data_chunk(rb->pos, rb->rleft());
auto rv = downstream->push_upload_data_chunk(rb->pos(), rb->rleft());
if (rv != 0) {
return -1;
......@@ -565,8 +565,9 @@ int HttpsUpstream::on_read() {
}
// http_parser_execute() does nothing once it entered error state.
auto nread = http_parser_execute(
&htp_, &htp_hooks, reinterpret_cast<const char *>(rb->pos), rb->rleft());
auto nread = http_parser_execute(&htp_, &htp_hooks,
reinterpret_cast<const char *>(rb->pos()),
rb->rleft());
rb->drain(nread);
rlimit->startw();
......
......@@ -105,7 +105,7 @@ ssize_t recv_callback(spdylay_session *session, uint8_t *buf, size_t len,
auto nread = std::min(rb->rleft(), len);
memcpy(buf, rb->pos, nread);
memcpy(buf, rb->pos(), nread);
rb->drain(nread);
rlimit->startw();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment