Commit 2bee060d authored by Yuri Putivsky's avatar Yuri Putivsky Committed by Facebook Github Bot 0

folly AsyncPipeReader supports IOBuf

Summary: folly AsyncPipeReader takes a callback of type AsyncReader::ReadCallback. Now AsyncReader::ReadCallback class supports IOBuf as a buffer for transfer read bytes. Need to extend AsyncPipeReader class to support IOBuf as well

Reviewed By: yfeldblum

Differential Revision: D3650893

fbshipit-source-id: e2142341c8b8b0b2ef248c1f13a8caba9d50ba67
parent 832479bd
...@@ -62,36 +62,57 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept { ...@@ -62,36 +62,57 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
assert(readCallback_ != nullptr); assert(readCallback_ != nullptr);
while (readCallback_) { while (readCallback_) {
// - What API does callback support?
const auto movable = readCallback_->isBufferMovable(); // noexcept
// Get the buffer to read into. // Get the buffer to read into.
void* buf = nullptr; void* buf = nullptr;
size_t buflen = 0; size_t buflen = 0;
try { std::unique_ptr<IOBuf> ioBuf;
readCallback_->getReadBuffer(&buf, &buflen);
} catch (const std::exception& ex) { if (movable) {
AsyncSocketException aex(AsyncSocketException::BAD_ARGS, ioBuf = IOBuf::create(readCallback_->maxBufferSize());
string("ReadCallback::getReadBuffer() " buf = ioBuf->writableBuffer();
"threw exception: ") + ex.what()); buflen = ioBuf->capacity();
failRead(aex); } else {
return; try {
} catch (...) { readCallback_->getReadBuffer(&buf, &buflen);
AsyncSocketException ex(AsyncSocketException::BAD_ARGS, } catch (const std::exception& ex) {
string("ReadCallback::getReadBuffer() " AsyncSocketException aex(
"threw non-exception type")); AsyncSocketException::BAD_ARGS,
failRead(ex); string("ReadCallback::getReadBuffer() "
return; "threw exception: ") +
} ex.what());
if (buf == nullptr || buflen == 0) { failRead(aex);
AsyncSocketException ex(AsyncSocketException::INVALID_STATE, return;
string("ReadCallback::getReadBuffer() " } catch (...) {
"returned empty buffer")); AsyncSocketException aex(
failRead(ex); AsyncSocketException::BAD_ARGS,
return; string("ReadCallback::getReadBuffer() "
"threw non-exception type"));
failRead(aex);
return;
}
if (buf == nullptr || buflen == 0) {
AsyncSocketException aex(
AsyncSocketException::INVALID_STATE,
string("ReadCallback::getReadBuffer() "
"returned empty buffer"));
failRead(aex);
return;
}
} }
// Perform the read // Perform the read
ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen); ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
if (bytesRead > 0) { if (bytesRead > 0) {
readCallback_->readDataAvailable(bytesRead); if (movable) {
ioBuf->append(bytesRead);
readCallback_->readBufferAvailable(std::move(ioBuf));
} else {
readCallback_->readDataAvailable(bytesRead);
}
// Fall through and continue around the loop if the read // Fall through and continue around the loop if the read
// completely filled the available buffer. // completely filled the available buffer.
// Note that readCallback_ may have been uninstalled or changed inside // Note that readCallback_ may have been uninstalled or changed inside
......
...@@ -471,6 +471,15 @@ class AsyncReader { ...@@ -471,6 +471,15 @@ class AsyncReader {
return false; return false;
} }
/**
* Suggested buffer size, allocated for read operations,
* if callback is movable and supports folly::IOBuf
*/
virtual size_t maxBufferSize() const {
return 64 * 1024; // 64K
}
/** /**
* readBufferAvailable() will be invoked when data has been successfully * readBufferAvailable() will be invoked when data has been successfully
* read. * read.
......
...@@ -27,6 +27,18 @@ namespace { ...@@ -27,6 +27,18 @@ namespace {
class TestReadCallback : public folly::AsyncReader::ReadCallback { class TestReadCallback : public folly::AsyncReader::ReadCallback {
public: public:
bool isBufferMovable() noexcept override {
return movable_;
}
void setMovable(bool movable) {
movable_ = movable;
}
void readBufferAvailable(
std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
readBuffer_.append(std::move(readBuf));
}
void readDataAvailable(size_t len) noexcept override { void readDataAvailable(size_t len) noexcept override {
readBuffer_.postallocate(len); readBuffer_.postallocate(len);
} }
...@@ -49,8 +61,15 @@ class TestReadCallback : public folly::AsyncReader::ReadCallback { ...@@ -49,8 +61,15 @@ class TestReadCallback : public folly::AsyncReader::ReadCallback {
return std::string((char *)buf->data(), buf->length()); return std::string((char *)buf->data(), buf->length());
} }
void reset() {
movable_ = false;
error_ = false;
readBuffer_.clear();
}
folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()}; folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
bool error_{false}; bool error_{false};
bool movable_{false};
}; };
class TestWriteCallback : public folly::AsyncWriter::WriteCallback { class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
...@@ -61,13 +80,23 @@ class TestWriteCallback : public folly::AsyncWriter::WriteCallback { ...@@ -61,13 +80,23 @@ class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
error_ = true; error_ = true;
} }
void reset() {
writes_ = 0;
error_ = false;
}
uint32_t writes_{0}; uint32_t writes_{0};
bool error_{false}; bool error_{false};
}; };
class AsyncPipeTest: public Test { class AsyncPipeTest: public Test {
public: public:
void SetUp() override { void reset(bool movable) {
reader_.reset();
readCallback_.reset();
writer_.reset();
writeCallback_.reset();
int rc = pipe(pipeFds_); int rc = pipe(pipeFds_);
EXPECT_EQ(rc, 0); EXPECT_EQ(rc, 0);
...@@ -77,6 +106,8 @@ class AsyncPipeTest: public Test { ...@@ -77,6 +106,8 @@ class AsyncPipeTest: public Test {
&eventBase_, pipeFds_[0]); &eventBase_, pipeFds_[0]);
writer_ = folly::AsyncPipeWriter::newWriter( writer_ = folly::AsyncPipeWriter::newWriter(
&eventBase_, pipeFds_[1]); &eventBase_, pipeFds_[1]);
readCallback_.setMovable(movable);
} }
protected: protected:
...@@ -97,46 +128,55 @@ std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) { ...@@ -97,46 +128,55 @@ std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
TEST_F(AsyncPipeTest, simple) { TEST_F(AsyncPipeTest, simple) {
reader_->setReadCB(&readCallback_); for (int pass = 0; pass < 2; ++pass) {
writer_->write(getBuf("hello"), &writeCallback_); reset(pass % 2 != 0);
writer_->closeOnEmpty(); reader_->setReadCB(&readCallback_);
eventBase_.loop(); writer_->write(getBuf("hello"), &writeCallback_);
EXPECT_EQ(readCallback_.getData(), "hello"); writer_->closeOnEmpty();
EXPECT_FALSE(readCallback_.error_); eventBase_.loop();
EXPECT_EQ(writeCallback_.writes_, 1); EXPECT_EQ(readCallback_.getData(), "hello");
EXPECT_FALSE(writeCallback_.error_); EXPECT_FALSE(readCallback_.error_);
EXPECT_EQ(writeCallback_.writes_, 1);
EXPECT_FALSE(writeCallback_.error_);
}
} }
TEST_F(AsyncPipeTest, blocked_writes) { TEST_F(AsyncPipeTest, blocked_writes) {
uint32_t writeAttempts = 0; for (int pass = 0; pass < 2; ++pass) {
do { reset(pass % 2 != 0);
++writeAttempts; uint32_t writeAttempts = 0;
writer_->write(getBuf("hello"), &writeCallback_); do {
} while (writeCallback_.writes_ == writeAttempts); ++writeAttempts;
// there is one blocked write writer_->write(getBuf("hello"), &writeCallback_);
writer_->closeOnEmpty(); } while (writeCallback_.writes_ == writeAttempts);
// there is one blocked write
reader_->setReadCB(&readCallback_); writer_->closeOnEmpty();
eventBase_.loop(); reader_->setReadCB(&readCallback_);
std::string expected;
for (uint32_t i = 0; i < writeAttempts; i++) { eventBase_.loop();
expected += "hello"; std::string expected;
for (uint32_t i = 0; i < writeAttempts; i++) {
expected += "hello";
}
EXPECT_EQ(readCallback_.getData(), expected);
EXPECT_FALSE(readCallback_.error_);
EXPECT_EQ(writeCallback_.writes_, writeAttempts);
EXPECT_FALSE(writeCallback_.error_);
} }
EXPECT_EQ(readCallback_.getData(), expected);
EXPECT_FALSE(readCallback_.error_);
EXPECT_EQ(writeCallback_.writes_, writeAttempts);
EXPECT_FALSE(writeCallback_.error_);
} }
TEST_F(AsyncPipeTest, writeOnClose) { TEST_F(AsyncPipeTest, writeOnClose) {
reader_->setReadCB(&readCallback_); for (int pass = 0; pass < 2; ++pass) {
writer_->write(getBuf("hello"), &writeCallback_); reset(pass % 2 != 0);
writer_->closeOnEmpty(); reader_->setReadCB(&readCallback_);
writer_->write(getBuf("hello"), &writeCallback_); writer_->write(getBuf("hello"), &writeCallback_);
eventBase_.loop(); writer_->closeOnEmpty();
EXPECT_EQ(readCallback_.getData(), "hello"); writer_->write(getBuf("hello"), &writeCallback_);
EXPECT_FALSE(readCallback_.error_); eventBase_.loop();
EXPECT_EQ(writeCallback_.writes_, 1); EXPECT_EQ(readCallback_.getData(), "hello");
EXPECT_TRUE(writeCallback_.error_); EXPECT_FALSE(readCallback_.error_);
EXPECT_EQ(writeCallback_.writes_, 1);
EXPECT_TRUE(writeCallback_.error_);
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment