Commit 6a4d3877 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Switch getReadBuffers to be IOBufVecQueue::IoVecVec based

Summary:
Switch getReadBuffers to be IOBufVecQueue::IoVecVec based

(Note: this ignores all push blocking failures!)

Reviewed By: simpkins

Differential Revision: D28467957

fbshipit-source-id: 637bf07d516120cb2bab89c3cbc28f58b700748a
parent 2b7d6e1d
......@@ -76,7 +76,7 @@ class IOBufIovecBuilder {
// This is safe, because it means this is the last reference
// Anything trying to copy it is already undefined behavior.
if (refcount_.load(std::memory_order_acquire) > 1) {
uint32_t newcnt = refcount_.fetch_sub(1, std::memory_order_acq_rel) - 1;
size_t newcnt = refcount_.fetch_sub(1, std::memory_order_acq_rel) - 1;
if (newcnt > 0) {
return;
}
......
......@@ -2396,10 +2396,10 @@ void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
readCallback_->getReadBuffer(buf, buflen);
}
size_t AsyncSocket::prepareReadBuffers(struct iovec* iovs, size_t num) {
void AsyncSocket::prepareReadBuffers(IOBufIovecBuilder::IoVecVec& iovs) {
// no matter what, buffers should be prepared for non-ssl socket
CHECK(readCallback_);
return readCallback_->getReadBuffers(iovs, num);
readCallback_->getReadBuffers(iovs);
}
size_t AsyncSocket::handleErrMessages() noexcept {
......@@ -2608,12 +2608,12 @@ void AsyncSocket::handleRead() noexcept {
// Get the buffer(s) to read into.
void* buf = nullptr;
size_t buflen = 0, offset = 0, num = 0;
static constexpr size_t kNumIov = 16;
std::array<struct iovec, kNumIov> iovs;
IOBufIovecBuilder::IoVecVec iovs; // this can be an Asyncsocket member too
try {
if (readMode == AsyncReader::ReadCallback::ReadMode::ReadVec) {
num = prepareReadBuffers(iovs.data(), iovs.size());
prepareReadBuffers(iovs);
num = iovs.size();
VLOG(5) << "prepareReadBuffers() bufs=" << iovs.data()
<< ", num=" << num;
} else {
......
......@@ -27,6 +27,7 @@
#include <folly/SocketAddress.h>
#include <folly/detail/SocketFastOpen.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufIovecBuilder.h>
#include <folly/io/ShutdownSocketSet.h>
#include <folly/io/SocketOptionMap.h>
#include <folly/io/async/AsyncSocketException.h>
......@@ -1292,7 +1293,7 @@ class AsyncSocket : public AsyncTransport {
virtual void checkForImmediateRead() noexcept;
virtual void handleInitialReadWrite() noexcept;
virtual void prepareReadBuffer(void** buf, size_t* buflen);
virtual size_t prepareReadBuffers(struct iovec* iovs, size_t num);
virtual void prepareReadBuffers(IOBufIovecBuilder::IoVecVec& iovs);
virtual size_t handleErrMessages() noexcept;
virtual void handleRead() noexcept;
virtual void handleWrite() noexcept;
......
......@@ -21,6 +21,7 @@
#include <folly/Optional.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufIovecBuilder.h>
#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/async/AsyncTransportCertificate.h>
#include <folly/io/async/DelayedDestruction.h>
......@@ -206,7 +207,7 @@ class AsyncReader {
* returned different buffers, the ReadCallback is responsible for ensuring
* that they are not leaked.
*
* If getReadBuffera() throws an exception or returns a zero length array
* If getReadBuffers() throws an exception or returns a zero length array
* the ReadCallback will be uninstalled and its readError() method will be
* invoked.
*
......@@ -215,14 +216,10 @@ class AsyncReader {
* set a different read callback.)
*
* @param iovs getReadBuffers() will copy up to num iovec entries into
* iovs. iovs cannot be nullptr unless num is 0
* @param num number of iovec entries in the iovs array
* @return number of entried copied to the iovs array
* this is less than or equal to num
* iovs
*/
virtual size_t getReadBuffers(
FOLLY_MAYBE_UNUSED struct iovec* iovs, FOLLY_MAYBE_UNUSED size_t num) {
return 0;
virtual void getReadBuffers(IOBufIovecBuilder::IoVecVec& iovs) {
iovs.clear();
}
/**
......
......@@ -207,127 +207,11 @@ class ReadCallback : public folly::AsyncTransport::ReadCallback {
};
class ReadvCallback : public folly::AsyncTransport::ReadCallback {
private:
class IOBufVecQueue {
private:
struct RefCountMem {
explicit RefCountMem(size_t size) {
mem_ = ::malloc(size);
len_ = size;
}
~RefCountMem() { ::free(mem_); }
void* usableMem() const {
return reinterpret_cast<uint8_t*>(mem_) + used_;
}
size_t usableSize() const { return len_ - used_; }
void incUsedMem(size_t len) { used_ += len; }
static void freeMem(void* buf, void* userData) {
std::ignore = buf;
reinterpret_cast<RefCountMem*>(userData)->decRef();
}
void addRef() { ++count_; }
void decRef() {
if (--count_ == 0) {
delete this;
}
}
private:
std::atomic<size_t> count_{1};
void* mem_{nullptr};
size_t len_{0};
size_t used_{0};
};
public:
struct Options {
static constexpr size_t kBlockSize = 16 * 1024;
size_t blockSize_{kBlockSize};
};
IOBufVecQueue() = default;
explicit IOBufVecQueue(const Options& options) : options_(options) {}
~IOBufVecQueue() {
for (auto& buf : buffers_) {
buf->decRef();
}
}
static Options getBlockSizeOptions(size_t blockSize) {
Options options;
options.blockSize_ = blockSize;
return options;
}
size_t preallocate(size_t len, struct iovec* iovs, size_t num) {
size_t total = 0;
size_t i = 0;
for (; (i < num) && (total < len); ++i) {
if (i >= buffers_.size()) {
buffers_.push_back(new RefCountMem(options_.blockSize_));
}
iovs[i].iov_base = buffers_[i]->usableMem();
iovs[i].iov_len = buffers_[i]->usableSize();
total += buffers_[i]->usableSize();
}
return i;
}
std::unique_ptr<folly::IOBuf> postallocate(size_t len) {
std::unique_ptr<folly::IOBuf> ret, tmp;
while (len > 0) {
CHECK(!buffers_.empty());
auto* buf = buffers_.front();
auto size = buf->usableSize();
if (len >= size) {
// no need to inc the ref count since we're transferring ownership
tmp = folly::IOBuf::takeOwnership(
buf->usableMem(), size, RefCountMem::freeMem, buf);
buffers_.pop_front();
len -= size;
} else {
buf->addRef();
tmp = folly::IOBuf::takeOwnership(
buf->usableMem(), len, RefCountMem::freeMem, buf);
buf->incUsedMem(len);
len = 0;
}
CHECK(!tmp->isShared());
if (ret) {
ret->prependChain(std::move(tmp));
} else {
ret = std::move(tmp);
}
}
return ret;
}
private:
Options options_;
std::deque<RefCountMem*> buffers_;
};
public:
ReadvCallback(size_t bufferSize, size_t len)
: state_(STATE_WAITING),
exception_(folly::AsyncSocketException::UNKNOWN, "none"),
queue_(IOBufVecQueue::getBlockSizeOptions(bufferSize)),
queue_(folly::IOBufIovecBuilder::Options().setBlockSize(bufferSize)),
len_(len) {
setReadMode(folly::AsyncTransport::ReadCallback::ReadMode::ReadVec);
}
......@@ -341,12 +225,12 @@ class ReadvCallback : public folly::AsyncTransport::ReadCallback {
CHECK(false); // this should not be called
}
size_t getReadBuffers(struct iovec* iovs, size_t num) override {
return queue_.preallocate(len_, iovs, num);
void getReadBuffers(folly::IOBufIovecBuilder::IoVecVec& iovs) override {
queue_.allocateBuffers(iovs, len_);
}
void readDataAvailable(size_t len) noexcept override {
auto tmp = queue_.postallocate(len);
auto tmp = queue_.extractIOBufChain(len);
if (!buf_) {
buf_ = std::move(tmp);
} else {
......@@ -374,7 +258,7 @@ class ReadvCallback : public folly::AsyncTransport::ReadCallback {
private:
StateEnum state_;
folly::AsyncSocketException exception_;
IOBufVecQueue queue_;
folly::IOBufIovecBuilder queue_;
std::unique_ptr<folly::IOBuf> buf_;
const size_t len_;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment