Commit 8f45b8d5 authored by Tudor Bosman's avatar Tudor Bosman Committed by Jordan DeLong

AsyncIO in folly

Summary:
Interface extended and cleaned up.  Also, now
actually allows you to retrieve IO errors.  Also moved some useful functions
out of Subprocess.cpp into a separate header file.

Test Plan: async_io_test, subprocess_test

Reviewed By: philipp@fb.com

FB internal diff: D698412
parent 43300949
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_EXCEPTION_H_
#define FOLLY_EXCEPTION_H_
#include <errno.h>
#include <stdexcept>
#include <system_error>
#include "folly/Likely.h"
namespace folly {
// Helper to throw std::system_error
void throwSystemError(int err, const char* msg) __attribute__((noreturn));
inline void throwSystemError(int err, const char* msg) {
throw std::system_error(err, std::system_category(), msg);
}
// Helper to throw std::system_error from errno
void throwSystemError(const char* msg) __attribute__((noreturn));
inline void throwSystemError(const char* msg) {
throwSystemError(errno, msg);
}
// Check a Posix return code (0 on success, error number on error), throw
// on error.
inline void checkPosixError(int err, const char* msg) {
if (UNLIKELY(err != 0)) {
throwSystemError(err, msg);
}
}
// Check a Linux kernel-style return code (>= 0 on success, negative error
// number on error), throw on error.
inline void checkKernelError(ssize_t ret, const char* msg) {
if (UNLIKELY(ret < 0)) {
throwSystemError(-ret, msg);
}
}
// Check a traditional Uinx return code (-1 and sets errno on error), throw
// on error.
inline void checkUnixError(ssize_t ret, const char* msg) {
if (UNLIKELY(ret == -1)) {
throwSystemError(msg);
}
}
inline void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
if (UNLIKELY(ret == -1)) {
throwSystemError(savedErrno, msg);
}
}
} // namespace folly
#endif /* FOLLY_EXCEPTION_H_ */
......@@ -31,6 +31,7 @@
#include <glog/logging.h>
#include "folly/Conv.h"
#include "folly/Exception.h"
#include "folly/ScopeGuard.h"
#include "folly/String.h"
#include "folly/io/Cursor.h"
......@@ -101,39 +102,6 @@ std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
return d;
}
// Helper to throw std::system_error
void throwSystemError(int err, const char* msg) __attribute__((noreturn));
void throwSystemError(int err, const char* msg) {
throw std::system_error(err, std::system_category(), msg);
}
// Helper to throw std::system_error from errno
void throwSystemError(const char* msg) __attribute__((noreturn));
void throwSystemError(const char* msg) {
throwSystemError(errno, msg);
}
// Check a Posix return code (0 on success, error number on error), throw
// on error.
void checkPosixError(int err, const char* msg) {
if (err != 0) {
throwSystemError(err, msg);
}
}
// Check a traditional Uinx return code (-1 and sets errno on error), throw
// on error.
void checkUnixError(ssize_t ret, const char* msg) {
if (ret == -1) {
throwSystemError(msg);
}
}
void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
if (ret == -1) {
throwSystemError(savedErrno, msg);
}
}
// Check a wait() status, throw on non-successful
void checkStatus(ProcessReturnCode returnCode) {
if (returnCode.state() != ProcessReturnCode::EXITED ||
......
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/experimental/io/AsyncIO.h"
#include <cerrno>
#include <glog/logging.h>
#include "folly/Exception.h"
#include "folly/Likely.h"
#include "folly/String.h"
#include "folly/eventfd.h"
namespace folly {
AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: ctx_(0),
pending_(0),
capacity_(capacity),
pollFd_(-1) {
if (UNLIKELY(capacity_ == 0)) {
throw std::out_of_range("AsyncIO: capacity must not be 0");
}
completed_.reserve(capacity_);
if (pollMode == POLLABLE) {
pollFd_ = eventfd(0, EFD_NONBLOCK);
checkUnixError(pollFd_, "AsyncIO: eventfd creation failed");
}
}
AsyncIO::~AsyncIO() {
CHECK_EQ(pending_, 0);
if (ctx_) {
int rc = io_queue_release(ctx_);
CHECK_EQ(rc, 0) << "io_queue_release: " << errnoStr(-rc);
}
if (pollFd_ != -1) {
CHECK_ERR(close(pollFd_));
}
}
void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
iocb cb;
io_prep_pread(&cb, fd, buf, size, start);
submit(op, &cb);
}
void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
off_t start) {
pread(op, fd, range.begin(), range.size(), start);
}
void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
off_t start) {
iocb cb;
io_prep_preadv(&cb, fd, iov, iovcnt, start);
submit(op, &cb);
}
void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
off_t start) {
iocb cb;
io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
submit(op, &cb);
}
void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
off_t start) {
pwrite(op, fd, range.begin(), range.size(), start);
}
void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
off_t start) {
iocb cb;
io_prep_pwritev(&cb, fd, iov, iovcnt, start);
submit(op, &cb);
}
void AsyncIO::initializeContext() {
if (!ctx_) {
int rc = io_queue_init(capacity_, &ctx_);
// returns negative errno
checkKernelError(rc, "AsyncIO: io_queue_init failed");
DCHECK(ctx_);
}
}
void AsyncIO::submit(Op* op, iocb* cb) {
if (UNLIKELY(pending_ >= capacity_)) {
throw std::out_of_range("AsyncIO: too many pending requests");
}
if (UNLIKELY(op->state() != Op::UNINITIALIZED)) {
throw std::logic_error("AsyncIO: Invalid Op state in submit");
}
initializeContext(); // on demand
cb->data = op;
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
int rc = io_submit(ctx_, 1, &cb);
checkKernelError(rc, "AsyncIO: io_submit failed");
DCHECK_EQ(rc, 1);
op->start();
++pending_;
}
Range<AsyncIO::Op**> AsyncIO::wait(size_t minRequests) {
if (UNLIKELY(!ctx_)) {
throw std::logic_error("AsyncIO: wait called with no requests");
}
if (UNLIKELY(pollFd_ != -1)) {
throw std::logic_error("AsyncIO: wait not allowed on pollable object");
}
return doWait(minRequests, pending_);
}
Range<AsyncIO::Op**> AsyncIO::pollCompleted() {
if (UNLIKELY(!ctx_)) {
throw std::logic_error("AsyncIO: pollCompleted called with no requests");
}
if (UNLIKELY(pollFd_ == -1)) {
throw std::logic_error(
"AsyncIO: pollCompleted not allowed on non-pollable object");
}
uint64_t numEvents;
// This sets the eventFd counter to 0, see
// http://www.kernel.org/doc/man-pages/online/pages/man2/eventfd.2.html
ssize_t rc;
do {
rc = ::read(pollFd_, &numEvents, 8);
} while (rc == -1 && errno == EINTR);
if (UNLIKELY(rc == -1 && errno == EAGAIN)) {
return Range<Op**>(); // nothing completed
}
checkUnixError(rc, "AsyncIO: read from event fd failed");
DCHECK_EQ(rc, 8);
DCHECK_GT(numEvents, 0);
DCHECK_LE(numEvents, pending_);
// Don't reap more than numEvents, as we've just reset the counter to 0.
return doWait(numEvents, numEvents);
}
Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
io_event events[pending_];
int count;
do {
// Wait forever
count = io_getevents(ctx_, minRequests, maxRequests, events, nullptr);
} while (count == -EINTR);
checkKernelError(count, "AsyncIO: io_getevents failed");
DCHECK_GE(count, minRequests); // the man page says so
DCHECK_LE(count, pending_);
completed_.clear();
if (count == 0) {
return folly::Range<Op**>();
}
for (size_t i = 0; i < count; ++i) {
Op* op = static_cast<Op*>(events[i].data);
DCHECK(op);
op->complete(events[i].res);
completed_.push_back(op);
}
pending_ -= count;
return folly::Range<Op**>(&completed_.front(), count);
}
AsyncIO::Op::Op()
: state_(UNINITIALIZED),
result_(-EINVAL) {
}
void AsyncIO::Op::reset() {
if (UNLIKELY(state_ == PENDING)) {
throw std::logic_error("AsyncIO: invalid state for reset");
}
state_ = UNINITIALIZED;
result_ = -EINVAL;
}
AsyncIO::Op::~Op() {
CHECK_NE(state_, PENDING);
}
void AsyncIO::Op::start() {
DCHECK_EQ(state_, UNINITIALIZED);
state_ = PENDING;
}
void AsyncIO::Op::complete(ssize_t result) {
DCHECK_EQ(state_, PENDING);
state_ = COMPLETED;
result_ = result;
onCompleted();
}
void AsyncIO::Op::onCompleted() { } // default: do nothing
ssize_t AsyncIO::Op::result() const {
if (UNLIKELY(state_ != COMPLETED)) {
throw std::logic_error("AsyncIO: Invalid Op state in result");
}
return result_;
}
CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
CallbackOp::~CallbackOp() { }
CallbackOp* CallbackOp::make(Callback&& callback) {
// Ensure created on the heap
return new CallbackOp(std::move(callback));
}
void CallbackOp::onCompleted() {
callback_(result());
delete this;
}
} // namespace folly
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_IO_ASYNCIO_H_
#define FOLLY_IO_ASYNCIO_H_
#include <sys/types.h>
#include <sys/uio.h>
#include <libaio.h>
#include <cstdint>
#include <functional>
#include <utility>
#include <vector>
#include <boost/noncopyable.hpp>
#include "folly/Portability.h"
#include "folly/Range.h"
namespace folly {
/**
* C++ interface around Linux Async IO.
*/
class AsyncIO : private boost::noncopyable {
public:
enum PollMode {
NOT_POLLABLE,
POLLABLE
};
/**
* Create an AsyncIO context capacble of holding at most 'capacity' pending
* requests at the same time. As requests complete, others can be scheduled,
* as long as this limit is not exceeded.
*
* Note: the maximum number of allowed concurrent requests is controlled
* by the fs.aio-max-nr sysctl, the default value is usually 64K.
*
* If pollMode is POLLABLE, pollFd() will return a file descriptor that
* can be passed to poll / epoll / select and will become readable when
* any IOs on this AioReader have completed. If you do this, you must use
* pollCompleted() instead of wait() -- do not read from the pollFd()
* file descriptor directly.
*/
explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
~AsyncIO();
/**
* An Op represents a pending operation. You may inherit from Op (and
* override onCompleted) in order to be notified of completion (see
* CallbackOp below for an example), or you may use Op's methods directly.
*
* The Op must remain allocated until completion.
*/
class Op : private boost::noncopyable {
friend class AsyncIO;
public:
Op();
virtual ~Op();
// There would be a cancel() method here if Linux AIO actually implemented
// it. But let's not get your hopes up.
enum State {
UNINITIALIZED,
PENDING,
COMPLETED
};
/**
* Return the current operation state.
*/
State state() const { return state_; }
/**
* Reset the operation for reuse. It is an error to call reset() on
* an Op that is still pending.
*/
void reset();
/**
* Retrieve the result of this operation. Returns >=0 on success,
* -errno on failure (that is, using the Linux kernel error reporting
* conventions). Use checkKernelError (folly/Exception.h) on the result to
* throw a std::system_error in case of error instead.
*
* It is an error to call this if the Op hasn't yet started or is still
* pending.
*/
ssize_t result() const;
private:
void start();
void complete(ssize_t result);
virtual void onCompleted();
State state_;
ssize_t result_;
};
/**
* Initiate a read request.
*/
void pread(Op* op, int fd, void* buf, size_t size, off_t start);
void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
/**
* Initiate a write request.
*/
void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
/**
* Wait for at least minRequests to complete. Returns the requests that
* have completed; the returned range is valid until the next call to
* wait(). minRequests may be 0 to not block.
*/
Range<Op**> wait(size_t minRequests);
/**
* Return the number of pending requests.
*/
size_t pending() const { return pending_; }
/**
* Return the maximum number of requests that can be kept outstanding
* at any one time.
*/
size_t capacity() const { return capacity_; }
/**
* If POLLABLE, return a file descriptor that can be passed to poll / epoll
* and will become readable when any async IO operations have completed.
* If NOT_POLLABLE, return -1.
*/
int pollFd() const { return pollFd_; }
/**
* If POLLABLE, call instead of wait after the file descriptor returned
* by pollFd() became readable. The returned range is valid until the next
* call to pollCompleted().
*/
Range<Op**> pollCompleted();
private:
void initializeContext();
void submit(Op* op, iocb* cb);
Range<Op**> doWait(size_t minRequests, size_t maxRequests);
io_context_t ctx_;
size_t pending_;
size_t capacity_;
int pollFd_;
std::vector<Op*> completed_;
};
/**
* Implementation of AsyncIO::Op that calls a callback and then deletes
* itself.
*/
class CallbackOp : public AsyncIO::Op {
public:
typedef std::function<void(ssize_t)> Callback;
static CallbackOp* make(Callback&& callback);
private:
explicit CallbackOp(Callback&& callback);
~CallbackOp();
void onCompleted() FOLLY_OVERRIDE;
Callback callback_;
};
} // namespace folly
#endif /* FOLLY_IO_ASYNCIO_H_ */
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/experimental/io/AsyncIO.h"
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <poll.h>
#include <cstdlib>
#include <cstdio>
#include <memory>
#include <random>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "folly/experimental/io/FsUtil.h"
#include "folly/ScopeGuard.h"
#include "folly/String.h"
namespace fs = folly::fs;
using folly::AsyncIO;
namespace {
constexpr size_t kAlignment = 512; // align reads to 512 B (for O_DIRECT)
struct TestSpec {
off_t start;
size_t size;
};
void waitUntilReadable(int fd) {
pollfd pfd;
pfd.fd = fd;
pfd.events = POLLIN;
int r;
do {
r = poll(&pfd, 1, -1); // wait forever
} while (r == -1 && errno == EINTR);
PCHECK(r == 1);
CHECK_EQ(pfd.revents, POLLIN); // no errors etc
}
folly::Range<AsyncIO::Op**> readerWait(AsyncIO* reader) {
int fd = reader->pollFd();
if (fd == -1) {
return reader->wait(1);
} else {
waitUntilReadable(fd);
return reader->pollCompleted();
}
}
// Temporary file that is NOT kept open but is deleted on exit.
// Generate random-looking but reproduceable data.
class TemporaryFile {
public:
explicit TemporaryFile(size_t size);
~TemporaryFile();
const fs::path path() const { return path_; }
private:
fs::path path_;
};
TemporaryFile::TemporaryFile(size_t size)
: path_(fs::temp_directory_path() / fs::unique_path()) {
CHECK_EQ(size % sizeof(uint32_t), 0);
size /= sizeof(uint32_t);
const uint32_t seed = 42;
std::mt19937 rnd(seed);
const size_t bufferSize = 1U << 16;
uint32_t buffer[bufferSize];
FILE* fp = ::fopen(path_.c_str(), "wb");
PCHECK(fp != nullptr);
while (size) {
size_t n = std::min(size, bufferSize);
for (size_t i = 0; i < n; ++i) {
buffer[i] = rnd();
}
size_t written = ::fwrite(buffer, sizeof(uint32_t), n, fp);
PCHECK(written == n);
size -= written;
}
PCHECK(::fclose(fp) == 0);
}
TemporaryFile::~TemporaryFile() {
try {
fs::remove(path_);
} catch (const fs::filesystem_error& e) {
LOG(ERROR) << "fs::remove: " << folly::exceptionStr(e);
}
}
TemporaryFile thisBinary(6 << 20); // 6MiB
void testReadsSerially(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(1, pollMode);
AsyncIO::Op op;
int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
for (int i = 0; i < specs.size(); i++) {
std::unique_ptr<char[]> buf(new char[specs[i].size]);
aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
EXPECT_EQ(aioReader.pending(), 1);
auto ops = readerWait(&aioReader);
EXPECT_EQ(1, ops.size());
EXPECT_TRUE(ops[0] == &op);
EXPECT_EQ(aioReader.pending(), 0);
ssize_t res = op.result();
EXPECT_LE(0, res) << folly::errnoStr(-res);
EXPECT_EQ(specs[i].size, res);
op.reset();
}
}
void testReadsParallel(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
std::vector<std::unique_ptr<char[]>> bufs(specs.size());
int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
for (int i = 0; i < specs.size(); i++) {
bufs[i].reset(new char[specs[i].size]);
aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
specs[i].start);
}
std::vector<bool> pending(specs.size(), true);
size_t remaining = specs.size();
while (remaining != 0) {
EXPECT_EQ(remaining, aioReader.pending());
auto completed = readerWait(&aioReader);
size_t nrRead = completed.size();
EXPECT_NE(nrRead, 0);
remaining -= nrRead;
for (int i = 0; i < nrRead; i++) {
int id = completed[i] - ops.get();
EXPECT_GE(id, 0);
EXPECT_LT(id, specs.size());
EXPECT_TRUE(pending[id]);
pending[id] = false;
ssize_t res = ops[id].result();
EXPECT_LE(0, res) << folly::errnoStr(-res);
EXPECT_EQ(specs[id].size, res);
}
}
EXPECT_EQ(aioReader.pending(), 0);
for (int i = 0; i < pending.size(); i++) {
EXPECT_FALSE(pending[i]);
}
}
void testReads(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
testReadsParallel(specs, pollMode);
}
} // anonymous namespace
TEST(AsyncIO, ZeroAsyncDataNotPollable) {
testReads({{0, 0}}, AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, ZeroAsyncDataPollable) {
testReads({{0, 0}}, AsyncIO::POLLABLE);
}
TEST(AsyncIO, SingleAsyncDataNotPollable) {
testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
testReads({{0, 512}}, AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, SingleAsyncDataPollable) {
testReads({{0, 512}}, AsyncIO::POLLABLE);
testReads({{0, 512}}, AsyncIO::POLLABLE);
}
TEST(AsyncIO, MultipleAsyncDataNotPollable) {
testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::NOT_POLLABLE);
testReads({
{0, 5*1024*1024},
{512, 5*1024*1024},
}, AsyncIO::NOT_POLLABLE);
testReads({
{512, 0},
{512, 512},
{512, 1024},
{512, 10*1024},
{512, 1024*1024},
}, AsyncIO::NOT_POLLABLE);
}
TEST(AsyncIO, MultipleAsyncDataPollable) {
testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
testReads({{512, 1024}, {512, 1024}, {512, 2048}}, AsyncIO::POLLABLE);
testReads({
{0, 5*1024*1024},
{512, 5*1024*1024},
}, AsyncIO::POLLABLE);
testReads({
{512, 0},
{512, 512},
{512, 1024},
{512, 10*1024},
{512, 1024*1024},
}, AsyncIO::POLLABLE);
}
TEST(AsyncIO, ManyAsyncDataNotPollable) {
{
std::vector<TestSpec> v;
for (int i = 0; i < 1000; i++) {
v.push_back({512 * i, 512});
}
testReads(v, AsyncIO::NOT_POLLABLE);
}
}
TEST(AsyncIO, ManyAsyncDataPollable) {
{
std::vector<TestSpec> v;
for (int i = 0; i < 1000; i++) {
v.push_back({512 * i, 512});
}
testReads(v, AsyncIO::POLLABLE);
}
}
TEST(AsyncIO, NonBlockingWait) {
AsyncIO aioReader(1, AsyncIO::NOT_POLLABLE);
AsyncIO::Op op;
int fd = ::open(thisBinary.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
size_t size = 1024;
std::unique_ptr<char[]> buf(new char[size]);
aioReader.pread(&op, fd, buf.get(), size, 0);
EXPECT_EQ(aioReader.pending(), 1);
folly::Range<AsyncIO::Op**> completed;
while (completed.empty()) {
// poll without blocking until the read request completes.
completed = aioReader.wait(0);
}
EXPECT_EQ(completed.size(), 1);
EXPECT_TRUE(completed[0] == &op);
ssize_t res = op.result();
EXPECT_LE(0, res) << folly::errnoStr(-res);
EXPECT_EQ(size, res);
EXPECT_EQ(aioReader.pending(), 0);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment