Commit e31125bc authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add support for AsyncSocket readv operations

Summary:
Add support for AsyncSocket readv operations

(Note: this ignores all push blocking failures!)

Reviewed By: danobi

Differential Revision: D27585275

fbshipit-source-id: d9a410a4c5d60af06c187e0abb5bd11b7054a6b9
parent 8b26cdbd
......@@ -2271,20 +2271,44 @@ void AsyncSocket::ioReady(uint16_t events) noexcept {
AsyncSocket::ReadResult AsyncSocket::performRead(
void** buf, size_t* buflen, size_t* /* offset */) {
VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
<< ", buflen=" << *buflen;
struct iovec iov;
// Data buffer pointer and length
iov.iov_base = *buf;
iov.iov_len = *buflen;
return performReadInternal(&iov, 1);
}
AsyncSocket::ReadResult AsyncSocket::performReadv(
struct iovec* iovs, size_t num) {
return performReadInternal(iovs, num);
}
AsyncSocket::ReadResult AsyncSocket::performReadInternal(
struct iovec* iovs, size_t num) {
VLOG(5) << "AsyncSocket::performReadInternal() this=" << this
<< ", iovs=" << iovs << ", num=" << num;
if (!num) {
return ReadResult(READ_ERROR);
}
if (preReceivedData_ && !preReceivedData_->empty()) {
VLOG(5) << "AsyncSocket::performRead() this=" << this
VLOG(5) << "AsyncSocket::performReadInternal() this=" << this
<< ", reading pre-received data";
io::Cursor cursor(preReceivedData_.get());
auto len = cursor.pullAtMost(*buf, *buflen);
ssize_t len = 0;
for (size_t i = 0; (i < num) && (!preReceivedData_->empty()); ++i) {
io::Cursor cursor(preReceivedData_.get());
auto ret = cursor.pullAtMost(iovs[i].iov_base, iovs[i].iov_len);
len += ret;
IOBufQueue queue;
queue.append(std::move(preReceivedData_));
queue.trimStart(len);
preReceivedData_ = queue.move();
IOBufQueue queue;
queue.append(std::move(preReceivedData_));
queue.trimStart(ret);
preReceivedData_ = queue.move();
}
appBytesReceived_ += len;
return ReadResult(len);
......@@ -2292,36 +2316,35 @@ AsyncSocket::ReadResult AsyncSocket::performRead(
ssize_t bytes = 0;
// No callback to read ancillary data was set
if (readAncillaryDataCallback_ == nullptr) {
bytes = netops_->recv(fd_, *buf, *buflen, MSG_DONTWAIT);
} else {
struct msghdr msg;
struct iovec iov;
struct msghdr msg;
// Ancillary data buffer and length
msg.msg_control =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().data();
msg.msg_controllen =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().size();
if (readAncillaryDataCallback_ == nullptr && num == 1) {
bytes = netops_->recv(fd_, iovs[0].iov_base, iovs[0].iov_len, MSG_DONTWAIT);
} else {
if (readAncillaryDataCallback_) {
// Ancillary data buffer and length
msg.msg_control =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().data();
msg.msg_controllen =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().size();
} else {
msg.msg_control = nullptr;
msg.msg_controllen = 0;
}
// Dest address info
msg.msg_name = nullptr;
msg.msg_namelen = 0;
// Array of data buffers (scatter/gather)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
// Data buffer pointer and length
iov.iov_base = *buf;
iov.iov_len = *buflen;
msg.msg_iov = iovs;
msg.msg_iovlen = num;
bytes = netops::recvmsg(fd_, &msg, 0);
}
if (bytes > 0) {
readAncillaryDataCallback_->ancillaryData(msg);
}
if (readAncillaryDataCallback_ && (bytes > 0)) {
readAncillaryDataCallback_->ancillaryData(msg);
}
if (bytes < 0) {
......@@ -2338,11 +2361,17 @@ AsyncSocket::ReadResult AsyncSocket::performRead(
}
void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
// no matter what, buffer should be preapared for non-ssl socket
// no matter what, buffer should be prepared for non-ssl socket
CHECK(readCallback_);
readCallback_->getReadBuffer(buf, buflen);
}
size_t AsyncSocket::prepareReadBuffers(struct iovec* iovs, size_t num) {
// no matter what, buffers should be prepared for non-ssl socket
CHECK(readCallback_);
return readCallback_->getReadBuffers(iovs, num);
}
size_t AsyncSocket::handleErrMessages() noexcept {
// This method has non-empty implementation only for platforms
// supporting per-socket error queues.
......@@ -2541,12 +2570,22 @@ void AsyncSocket::handleRead() noexcept {
size_t numReads = maxReadsPerEvent_ ? maxReadsPerEvent_ : size_t(-1);
EventBase* originalEventBase = eventBase_;
while (readCallback_ && eventBase_ == originalEventBase && numReads--) {
// Get the buffer to read into.
auto readMode = readCallback_->getReadMode();
// Get the buffer(s) to read into.
void* buf = nullptr;
size_t buflen = 0, offset = 0;
size_t buflen = 0, offset = 0, num = 0;
static constexpr size_t kNumIov = 16;
std::array<struct iovec, kNumIov> iovs;
try {
prepareReadBuffer(&buf, &buflen);
VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
if (readMode == AsyncReader::ReadCallback::ReadMode::ReadVec) {
num = prepareReadBuffers(iovs.data(), iovs.size());
VLOG(5) << "prepareReadBuffers() bufs=" << iovs.data()
<< ", num=" << num;
} else {
prepareReadBuffer(&buf, &buflen);
VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
}
} catch (const AsyncSocketException& ex) {
return failRead(__func__, ex);
} catch (const std::exception& ex) {
......@@ -2563,7 +2602,7 @@ void AsyncSocket::handleRead() noexcept {
"non-exception type");
return failRead(__func__, ex);
}
if (buf == nullptr || buflen == 0) {
if ((num == 0) && (buf == nullptr || buflen == 0)) {
AsyncSocketException ex(
AsyncSocketException::BAD_ARGS,
"ReadCallback::getReadBuffer() returned "
......@@ -2572,7 +2611,9 @@ void AsyncSocket::handleRead() noexcept {
}
// Perform the read
auto readResult = performRead(&buf, &buflen, &offset);
auto readResult = (readMode == AsyncReader::ReadCallback::ReadMode::ReadVec)
? performReadv(iovs.data(), num)
: performRead(&buf, &buflen, &offset);
auto bytesRead = readResult.readReturn;
VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
<< bytesRead << " bytes";
......
......@@ -1280,6 +1280,7 @@ class AsyncSocket : public AsyncTransport {
virtual void checkForImmediateRead() noexcept;
virtual void handleInitialReadWrite() noexcept;
virtual void prepareReadBuffer(void** buf, size_t* buflen);
virtual size_t prepareReadBuffers(struct iovec* iovs, size_t num);
virtual size_t handleErrMessages() noexcept;
virtual void handleRead() noexcept;
virtual void handleWrite() noexcept;
......@@ -1287,7 +1288,7 @@ class AsyncSocket : public AsyncTransport {
void timeoutExpired() noexcept;
/**
* Attempt to read from the socket.
* Attempt to read from the socket into a single buffer
*
* @param buf The buffer to read data into.
* @param buflen The length of the buffer.
......@@ -1296,6 +1297,16 @@ class AsyncSocket : public AsyncTransport {
*/
virtual ReadResult performRead(void** buf, size_t* buflen, size_t* offset);
/**
* Attempt to read from the socket into an iovec array
*
* @param iovs The iovec array to read data into.
* @param num The number of elements in the iovec array
*
* @return Returns a read result. See read result for details.
*/
virtual ReadResult performReadv(struct iovec* iovs, size_t num);
/**
* Populate an iovec array from an IOBuf and attempt to write it.
*
......@@ -1402,6 +1413,9 @@ class AsyncSocket : public AsyncTransport {
*/
bool updateEventRegistration(uint16_t enable, uint16_t disable);
// read methods
ReadResult performReadInternal(struct iovec* iovs, size_t num);
// Actually close the file descriptor and set it to -1 so we don't
// accidentally close it again.
void doClose();
......
......@@ -152,17 +152,28 @@ class AsyncReader {
public:
class ReadCallback {
public:
enum class ReadMode : uint8_t {
ReadBuffer = 0,
ReadVec = 1,
};
virtual ~ReadCallback() = default;
ReadMode getReadMode() const noexcept { return readMode_; }
void setReadMode(ReadMode readMode) noexcept { readMode_ = readMode; }
/**
* When data becomes available, getReadBuffer() will be invoked to get the
* buffer into which data should be read.
* When data becomes available, getReadBuffer()/getReadBuffers() will be
* invoked to get the buffer/buffers into which data should be read.
*
* This method allows the ReadCallback to delay buffer allocation until
* These methods allows the ReadCallback to delay buffer allocation until
* data becomes available. This allows applications to manage large
* numbers of idle connections, without having to maintain a separate read
* buffer for each idle connection.
*
*/
/**
* It is possible that in some cases, getReadBuffer() may be called
* multiple times before readDataAvailable() is invoked. In this case, the
* data will be written to the buffer returned from the most recent call to
......@@ -187,9 +198,37 @@ class AsyncReader {
*/
virtual void getReadBuffer(void** bufReturn, size_t* lenReturn) = 0;
/**
* It is possible that in some cases, getReadBuffers() may be called
* multiple times before readDataAvailable() is invoked. In this case, the
* data will be written to the buffer returned from the most recent call to
* readDataAvailable(). If the previous calls to readDataAvailable()
* returned different buffers, the ReadCallback is responsible for ensuring
* that they are not leaked.
*
* If getReadBuffera() throws an exception or returns a zero length array
* the ReadCallback will be uninstalled and its readError() method will be
* invoked.
*
* getReadBuffers() is not allowed to change the transport state before it
* returns. (For example, it should never uninstall the read callback, or
* set a different read callback.)
*
* @param iovs getReadBuffers() will copy up to num iovec entries into
* iovs. iovs cannot be nullptr unless num is 0
* @param num number of iovec entries in the iovs array
* @return number of entried copied to the iovs array
* this is less than or equal to num
*/
virtual size_t getReadBuffers(
FOLLY_MAYBE_UNUSED struct iovec* iovs, FOLLY_MAYBE_UNUSED size_t num) {
return 0;
}
/**
* readDataAvailable() will be invoked when data has been successfully read
* into the buffer returned by the last call to getReadBuffer().
* into the buffer(s) returned by the last call to
* getReadBuffer()/getReadBuffers()
*
* The read callback remains installed after readDataAvailable() returns.
* It must be explicitly uninstalled to stop receiving read events.
......@@ -271,6 +310,9 @@ class AsyncReader {
* @param ex An exception describing the error that occurred.
*/
virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
protected:
ReadMode readMode_{ReadMode::ReadBuffer};
};
// Read methods that aren't part of AsyncTransport.
......
......@@ -206,6 +206,179 @@ class ReadCallback : public folly::AsyncTransport::ReadCallback {
const size_t maxBufferSz;
};
class ReadvCallback : public folly::AsyncTransport::ReadCallback {
private:
class IOBufVecQueue {
private:
struct RefCountMem {
explicit RefCountMem(size_t size) {
mem_ = ::malloc(size);
len_ = size;
}
~RefCountMem() { ::free(mem_); }
void* usableMem() const {
return reinterpret_cast<uint8_t*>(mem_) + used_;
}
size_t usableSize() const { return len_ - used_; }
void incUsedMem(size_t len) { used_ += len; }
static void freeMem(void* buf, void* userData) {
std::ignore = buf;
reinterpret_cast<RefCountMem*>(userData)->decRef();
}
void addRef() { ++count_; }
void decRef() {
if (--count_ == 0) {
delete this;
}
}
private:
std::atomic<size_t> count_{1};
void* mem_{nullptr};
size_t len_{0};
size_t used_{0};
};
public:
struct Options {
static constexpr size_t kBlockSize = 16 * 1024;
size_t blockSize_{kBlockSize};
};
IOBufVecQueue() = default;
explicit IOBufVecQueue(const Options& options) : options_(options) {}
~IOBufVecQueue() {
for (auto& buf : buffers_) {
buf->decRef();
}
}
static Options getBlockSizeOptions(size_t blockSize) {
Options options;
options.blockSize_ = blockSize;
return options;
}
size_t preallocate(size_t len, struct iovec* iovs, size_t num) {
size_t total = 0;
size_t i = 0;
for (; (i < num) && (total < len); ++i) {
if (i >= buffers_.size()) {
buffers_.push_back(new RefCountMem(options_.blockSize_));
}
iovs[i].iov_base = buffers_[i]->usableMem();
iovs[i].iov_len = buffers_[i]->usableSize();
total += buffers_[i]->usableSize();
}
return i;
}
std::unique_ptr<folly::IOBuf> postallocate(size_t len) {
std::unique_ptr<folly::IOBuf> ret, tmp;
while (len > 0) {
CHECK(!buffers_.empty());
auto* buf = buffers_.front();
auto size = buf->usableSize();
if (len >= size) {
// no need to inc the ref count since we're transferring ownership
tmp = folly::IOBuf::takeOwnership(
buf->usableMem(), size, RefCountMem::freeMem, buf);
buffers_.pop_front();
len -= size;
} else {
buf->addRef();
tmp = folly::IOBuf::takeOwnership(
buf->usableMem(), len, RefCountMem::freeMem, buf);
buf->incUsedMem(len);
len = 0;
}
CHECK(!tmp->isShared());
if (ret) {
ret->prependChain(std::move(tmp));
} else {
ret = std::move(tmp);
}
}
return ret;
}
private:
Options options_;
std::deque<RefCountMem*> buffers_;
};
public:
ReadvCallback(size_t bufferSize, size_t len)
: state_(STATE_WAITING),
exception_(folly::AsyncSocketException::UNKNOWN, "none"),
queue_(IOBufVecQueue::getBlockSizeOptions(bufferSize)),
len_(len) {
setReadMode(folly::AsyncTransport::ReadCallback::ReadMode::ReadVec);
}
~ReadvCallback() override = default;
void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
std::ignore = bufReturn;
std::ignore = lenReturn;
CHECK(false); // this should not be called
}
size_t getReadBuffers(struct iovec* iovs, size_t num) override {
return queue_.preallocate(len_, iovs, num);
}
void readDataAvailable(size_t len) noexcept override {
auto tmp = queue_.postallocate(len);
if (!buf_) {
buf_ = std::move(tmp);
} else {
buf_->prependChain(std::move(tmp));
}
}
void reset() { buf_.reset(); }
void readEOF() noexcept override { state_ = STATE_SUCCEEDED; }
void readErr(const folly::AsyncSocketException& ex) noexcept override {
state_ = STATE_FAILED;
exception_ = ex;
}
void verifyData(const std::string& data) const {
CHECK(buf_);
auto r = buf_->coalesce();
std::string tmp;
tmp.assign(reinterpret_cast<const char*>(r.begin()), r.end() - r.begin());
CHECK_EQ(data, tmp);
}
private:
StateEnum state_;
folly::AsyncSocketException exception_;
IOBufVecQueue queue_;
std::unique_ptr<folly::IOBuf> buf_;
const size_t len_;
};
class BufferCallback : public folly::AsyncTransport::BufferCallback {
public:
BufferCallback(folly::AsyncSocket* socket, size_t expectedBytes)
......
......@@ -474,6 +474,50 @@ TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
ASSERT_FALSE(socket->isClosedByPeer());
}
TEST_P(AsyncSocketConnectTest, ConnectAndReadv) {
TestServer server;
// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
if (GetParam() == TFOState::ENABLED) {
socket->enableTFO();
}
ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 30);
static constexpr size_t kBuffSize = 10;
static constexpr size_t kLen = 40;
static constexpr size_t kDataSize = 128;
ReadvCallback rcb(kBuffSize, kLen);
socket->setReadCB(&rcb);
if (GetParam() == TFOState::ENABLED) {
// Trigger a connection
socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
}
// Even though we haven't looped yet, we should be able to accept
// the connection and send data to it.
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
std::string data(kDataSize, 'A');
acceptedSocket->write(
reinterpret_cast<unsigned char*>(data.data()), data.size());
acceptedSocket->flush();
acceptedSocket->close();
// Loop, although there shouldn't be anything to do.
evb.loop();
ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
rcb.verifyData(data);
ASSERT_FALSE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
}
/**
* Test installing a read callback and then closing immediately before the
* connect attempt finishes.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment