Commit a8b4b5ea authored by Tudor Bosman's avatar Tudor Bosman Committed by Jordan DeLong

Rework folly::AsyncIO interface to make it easier for other classes to use Op

Summary:
AsyncIOOp no longer requires derivation to be able to use callbacks; the
callback is passed in.  This makes composition easier (see AsyncIOQueue, added
in this diff).

Test Plan: async_io_test, test added

Reviewed By: lucian@fb.com

FB internal diff: D709648
parent cf583f13
......@@ -18,6 +18,7 @@
#include <cerrno>
#include <boost/intrusive/parent_from_member.hpp>
#include <glog/logging.h>
#include "folly/Exception.h"
......@@ -27,6 +28,77 @@
namespace folly {
AsyncIOOp::AsyncIOOp(NotificationCallback cb)
: cb_(std::move(cb)),
state_(UNINITIALIZED),
result_(-EINVAL) {
memset(&iocb_, 0, sizeof(iocb_));
}
void AsyncIOOp::reset(NotificationCallback cb) {
CHECK_NE(state_, PENDING);
cb_ = std::move(cb);
state_ = UNINITIALIZED;
result_ = -EINVAL;
memset(&iocb_, 0, sizeof(iocb_));
}
AsyncIOOp::~AsyncIOOp() {
CHECK_NE(state_, PENDING);
}
void AsyncIOOp::start() {
DCHECK_EQ(state_, INITIALIZED);
state_ = PENDING;
}
void AsyncIOOp::complete(ssize_t result) {
DCHECK_EQ(state_, PENDING);
state_ = COMPLETED;
result_ = result;
if (cb_) {
cb_(this);
}
}
ssize_t AsyncIOOp::result() const {
CHECK_EQ(state_, COMPLETED);
return result_;
}
void AsyncIOOp::pread(int fd, void* buf, size_t size, off_t start) {
init();
io_prep_pread(&iocb_, fd, buf, size, start);
}
void AsyncIOOp::pread(int fd, Range<unsigned char*> range, off_t start) {
pread(fd, range.begin(), range.size(), start);
}
void AsyncIOOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
init();
io_prep_preadv(&iocb_, fd, iov, iovcnt, start);
}
void AsyncIOOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
init();
io_prep_pwrite(&iocb_, fd, const_cast<void*>(buf), size, start);
}
void AsyncIOOp::pwrite(int fd, Range<const unsigned char*> range, off_t start) {
pwrite(fd, range.begin(), range.size(), start);
}
void AsyncIOOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
init();
io_prep_pwritev(&iocb_, fd, iov, iovcnt, start);
}
void AsyncIOOp::init() {
CHECK_EQ(state_, UNINITIALIZED);
state_ = INITIALIZED;
}
AsyncIO::AsyncIO(size_t capacity, PollMode pollMode)
: ctx_(0),
pending_(0),
......@@ -51,43 +123,6 @@ AsyncIO::~AsyncIO() {
}
}
void AsyncIO::pread(Op* op, int fd, void* buf, size_t size, off_t start) {
iocb cb;
io_prep_pread(&cb, fd, buf, size, start);
submit(op, &cb);
}
void AsyncIO::pread(Op* op, int fd, Range<unsigned char*> range,
off_t start) {
pread(op, fd, range.begin(), range.size(), start);
}
void AsyncIO::preadv(Op* op, int fd, const iovec* iov, int iovcnt,
off_t start) {
iocb cb;
io_prep_preadv(&cb, fd, iov, iovcnt, start);
submit(op, &cb);
}
void AsyncIO::pwrite(Op* op, int fd, const void* buf, size_t size,
off_t start) {
iocb cb;
io_prep_pwrite(&cb, fd, const_cast<void*>(buf), size, start);
submit(op, &cb);
}
void AsyncIO::pwrite(Op* op, int fd, Range<const unsigned char*> range,
off_t start) {
pwrite(op, fd, range.begin(), range.size(), start);
}
void AsyncIO::pwritev(Op* op, int fd, const iovec* iov, int iovcnt,
off_t start) {
iocb cb;
io_prep_pwritev(&cb, fd, iov, iovcnt, start);
submit(op, &cb);
}
void AsyncIO::initializeContext() {
if (!ctx_) {
int rc = io_queue_init(capacity_, &ctx_);
......@@ -97,11 +132,12 @@ void AsyncIO::initializeContext() {
}
}
void AsyncIO::submit(Op* op, iocb* cb) {
CHECK_EQ(op->state(), Op::UNINITIALIZED);
void AsyncIO::submit(Op* op) {
CHECK_EQ(op->state(), Op::INITIALIZED);
CHECK_LT(pending_, capacity_) << "too many pending requests";
initializeContext(); // on demand
cb->data = op;
iocb* cb = &op->iocb_;
cb->data = nullptr; // unused
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
......@@ -158,62 +194,53 @@ Range<AsyncIO::Op**> AsyncIO::doWait(size_t minRequests, size_t maxRequests) {
}
for (size_t i = 0; i < count; ++i) {
Op* op = static_cast<Op*>(events[i].data);
DCHECK(op);
DCHECK(events[i].obj);
Op* op = boost::intrusive::get_parent_from_member(
events[i].obj, &AsyncIOOp::iocb_);
--pending_;
op->complete(events[i].res);
completed_.push_back(op);
}
pending_ -= count;
return folly::Range<Op**>(&completed_.front(), count);
}
AsyncIO::Op::Op()
: state_(UNINITIALIZED),
result_(-EINVAL) {
AsyncIOQueue::AsyncIOQueue(AsyncIO* asyncIO)
: asyncIO_(asyncIO) {
}
void AsyncIO::Op::reset() {
CHECK_NE(state_, PENDING);
state_ = UNINITIALIZED;
result_ = -EINVAL;
}
AsyncIO::Op::~Op() {
CHECK_NE(state_, PENDING);
AsyncIOQueue::~AsyncIOQueue() {
CHECK_EQ(asyncIO_->pending(), 0);
}
void AsyncIO::Op::start() {
DCHECK_EQ(state_, UNINITIALIZED);
state_ = PENDING;
void AsyncIOQueue::submit(AsyncIOOp* op) {
submit([op]() { return op; });
}
void AsyncIO::Op::complete(ssize_t result) {
DCHECK_EQ(state_, PENDING);
state_ = COMPLETED;
result_ = result;
onCompleted();
void AsyncIOQueue::submit(OpFactory op) {
queue_.push_back(op);
maybeDequeue();
}
void AsyncIO::Op::onCompleted() { } // default: do nothing
ssize_t AsyncIO::Op::result() const {
CHECK_EQ(state_, COMPLETED);
return result_;
void AsyncIOQueue::onCompleted(AsyncIOOp* op) {
maybeDequeue();
}
CallbackOp::CallbackOp(Callback&& callback) : callback_(std::move(callback)) { }
CallbackOp::~CallbackOp() { }
void AsyncIOQueue::maybeDequeue() {
while (!queue_.empty() && asyncIO_->pending() < asyncIO_->capacity()) {
auto& opFactory = queue_.front();
auto op = opFactory();
queue_.pop_front();
CallbackOp* CallbackOp::make(Callback&& callback) {
// Ensure created on the heap
return new CallbackOp(std::move(callback));
}
// Interpose our completion callback
auto& nextCb = op->notificationCallback();
op->setNotificationCallback([this, nextCb](AsyncIOOp* op) {
this->onCompleted(op);
if (nextCb) nextCb(op);
});
void CallbackOp::onCompleted() {
callback_(result());
delete this;
asyncIO_->submit(op);
}
}
} // namespace folly
......
......@@ -22,6 +22,7 @@
#include <libaio.h>
#include <cstdint>
#include <deque>
#include <functional>
#include <utility>
#include <vector>
......@@ -33,11 +34,87 @@
namespace folly {
/**
* An AsyncIOOp represents a pending operation. You may set a notification
* callback or you may use this class's methods directly.
*
* The op must remain allocated until completion.
*/
class AsyncIOOp : private boost::noncopyable {
friend class AsyncIO;
public:
typedef std::function<void(AsyncIOOp*)> NotificationCallback;
explicit AsyncIOOp(NotificationCallback cb = NotificationCallback());
~AsyncIOOp();
// There would be a cancel() method here if Linux AIO actually implemented
// it. But let's not get your hopes up.
enum State {
UNINITIALIZED,
INITIALIZED,
PENDING,
COMPLETED
};
/**
* Initiate a read request.
*/
void pread(int fd, void* buf, size_t size, off_t start);
void pread(int fd, Range<unsigned char*> range, off_t start);
void preadv(int fd, const iovec* iov, int iovcnt, off_t start);
/**
* Initiate a write request.
*/
void pwrite(int fd, const void* buf, size_t size, off_t start);
void pwrite(int fd, Range<const unsigned char*> range, off_t start);
void pwritev(int fd, const iovec* iov, int iovcnt, off_t start);
/**
* Return the current operation state.
*/
State state() const { return state_; }
/**
* Reset the operation for reuse. It is an error to call reset() on
* an Op that is still pending.
*/
void reset(NotificationCallback cb = NotificationCallback());
void setNotificationCallback(NotificationCallback cb) { cb_ = std::move(cb); }
const NotificationCallback& notificationCallback() const { return cb_; }
/**
* Retrieve the result of this operation. Returns >=0 on success,
* -errno on failure (that is, using the Linux kernel error reporting
* conventions). Use checkKernelError (folly/Exception.h) on the result to
* throw a std::system_error in case of error instead.
*
* It is an error to call this if the Op hasn't yet started or is still
* pending.
*/
ssize_t result() const;
private:
void init();
void start();
void complete(ssize_t result);
NotificationCallback cb_;
iocb iocb_;
State state_;
ssize_t result_;
};
/**
* C++ interface around Linux Async IO.
*/
class AsyncIO : private boost::noncopyable {
public:
typedef AsyncIOOp Op;
enum PollMode {
NOT_POLLABLE,
POLLABLE
......@@ -60,74 +137,6 @@ class AsyncIO : private boost::noncopyable {
explicit AsyncIO(size_t capacity, PollMode pollMode=NOT_POLLABLE);
~AsyncIO();
/**
* An Op represents a pending operation. You may inherit from Op (and
* override onCompleted) in order to be notified of completion (see
* CallbackOp below for an example), or you may use Op's methods directly.
*
* The Op must remain allocated until completion.
*/
class Op : private boost::noncopyable {
friend class AsyncIO;
public:
Op();
virtual ~Op();
// There would be a cancel() method here if Linux AIO actually implemented
// it. But let's not get your hopes up.
enum State {
UNINITIALIZED,
PENDING,
COMPLETED
};
/**
* Return the current operation state.
*/
State state() const { return state_; }
/**
* Reset the operation for reuse. It is an error to call reset() on
* an Op that is still pending.
*/
void reset();
/**
* Retrieve the result of this operation. Returns >=0 on success,
* -errno on failure (that is, using the Linux kernel error reporting
* conventions). Use checkKernelError (folly/Exception.h) on the result to
* throw a std::system_error in case of error instead.
*
* It is an error to call this if the Op hasn't yet started or is still
* pending.
*/
ssize_t result() const;
private:
void start();
void complete(ssize_t result);
virtual void onCompleted();
State state_;
ssize_t result_;
};
/**
* Initiate a read request.
*/
void pread(Op* op, int fd, void* buf, size_t size, off_t start);
void pread(Op* op, int fd, Range<unsigned char*> range, off_t start);
void preadv(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
/**
* Initiate a write request.
*/
void pwrite(Op* op, int fd, const void* buf, size_t size, off_t start);
void pwrite(Op* op, int fd, Range<const unsigned char*> range, off_t start);
void pwritev(Op* op, int fd, const iovec* iov, int iovcnt, off_t start);
/**
* Wait for at least minRequests to complete. Returns the requests that
* have completed; the returned range is valid until the next call to
......@@ -160,9 +169,13 @@ class AsyncIO : private boost::noncopyable {
*/
Range<Op**> pollCompleted();
/**
* Submit an op for execution.
*/
void submit(Op* op);
private:
void initializeContext();
void submit(Op* op, iocb* cb);
Range<Op**> doWait(size_t minRequests, size_t maxRequests);
io_context_t ctx_;
......@@ -173,20 +186,42 @@ class AsyncIO : private boost::noncopyable {
};
/**
* Implementation of AsyncIO::Op that calls a callback and then deletes
* itself.
* Wrapper around AsyncIO that allows you to schedule more requests than
* the AsyncIO's object capacity. Other requests are queued and processed
* in a FIFO order.
*/
class CallbackOp : public AsyncIO::Op {
class AsyncIOQueue {
public:
typedef std::function<void(ssize_t)> Callback;
static CallbackOp* make(Callback&& callback);
/**
* Create a queue, using the given AsyncIO object.
* The AsyncIO object may not be used by anything else until the
* queue is destroyed.
*/
explicit AsyncIOQueue(AsyncIO* asyncIO);
~AsyncIOQueue();
size_t queued() const { return queue_.size(); }
/**
* Submit an op to the AsyncIO queue. The op will be queued until
* the AsyncIO object has room.
*/
void submit(AsyncIOOp* op);
/**
* Submit a delayed op to the AsyncIO queue; this allows you to postpone
* creation of the Op (which may require allocating memory, etc) until
* the AsyncIO object has room.
*/
typedef std::function<AsyncIOOp*()> OpFactory;
void submit(OpFactory op);
private:
explicit CallbackOp(Callback&& callback);
~CallbackOp();
void onCompleted() FOLLY_OVERRIDE;
void onCompleted(AsyncIOOp* op);
void maybeDequeue();
AsyncIO* asyncIO_;
Callback callback_;
std::deque<OpFactory> queue_;
};
} // namespace folly
......
......@@ -36,6 +36,7 @@
namespace fs = folly::fs;
using folly::AsyncIO;
using folly::AsyncIOQueue;
namespace {
......@@ -116,6 +117,14 @@ TemporaryFile::~TemporaryFile() {
TemporaryFile tempFile(6 << 20); // 6MiB
typedef std::unique_ptr<char, void(*)(void*)> ManagedBuffer;
ManagedBuffer allocateAligned(size_t size) {
void* buf;
int rc = posix_memalign(&buf, 512, size);
CHECK_EQ(rc, 0) << strerror(rc);
return ManagedBuffer(reinterpret_cast<char*>(buf), free);
}
void testReadsSerially(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(1, pollMode);
......@@ -127,8 +136,9 @@ void testReadsSerially(const std::vector<TestSpec>& specs,
};
for (int i = 0; i < specs.size(); i++) {
std::unique_ptr<char[]> buf(new char[specs[i].size]);
aioReader.pread(&op, fd, buf.get(), specs[i].size, specs[i].start);
auto buf = allocateAligned(specs[i].size);
op.pread(fd, buf.get(), specs[i].size, specs[i].start);
aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 1);
auto ops = readerWait(&aioReader);
EXPECT_EQ(1, ops.size());
......@@ -145,7 +155,7 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
AsyncIO aioReader(specs.size(), pollMode);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
std::vector<std::unique_ptr<char[]>> bufs(specs.size());
std::vector<ManagedBuffer> bufs;
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
......@@ -153,9 +163,9 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
::close(fd);
};
for (int i = 0; i < specs.size(); i++) {
bufs[i].reset(new char[specs[i].size]);
aioReader.pread(&ops[i], fd, bufs[i].get(), specs[i].size,
specs[i].start);
bufs.push_back(allocateAligned(specs[i].size));
ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
aioReader.submit(&ops[i]);
}
std::vector<bool> pending(specs.size(), true);
......@@ -184,10 +194,63 @@ void testReadsParallel(const std::vector<TestSpec>& specs,
}
}
void testReadsQueued(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
size_t readerCapacity = std::max(specs.size() / 2, size_t(1));
AsyncIO aioReader(readerCapacity, pollMode);
AsyncIOQueue aioQueue(&aioReader);
std::unique_ptr<AsyncIO::Op[]> ops(new AsyncIO::Op[specs.size()]);
std::vector<ManagedBuffer> bufs;
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
PCHECK(fd != -1);
SCOPE_EXIT {
::close(fd);
};
for (int i = 0; i < specs.size(); i++) {
bufs.push_back(allocateAligned(specs[i].size));
ops[i].pread(fd, bufs[i].get(), specs[i].size, specs[i].start);
aioQueue.submit(&ops[i]);
}
std::vector<bool> pending(specs.size(), true);
size_t remaining = specs.size();
while (remaining != 0) {
if (remaining >= readerCapacity) {
EXPECT_EQ(readerCapacity, aioReader.pending());
EXPECT_EQ(remaining - readerCapacity, aioQueue.queued());
} else {
EXPECT_EQ(remaining, aioReader.pending());
EXPECT_EQ(0, aioQueue.queued());
}
auto completed = readerWait(&aioReader);
size_t nrRead = completed.size();
EXPECT_NE(nrRead, 0);
remaining -= nrRead;
for (int i = 0; i < nrRead; i++) {
int id = completed[i] - ops.get();
EXPECT_GE(id, 0);
EXPECT_LT(id, specs.size());
EXPECT_TRUE(pending[id]);
pending[id] = false;
ssize_t res = ops[id].result();
EXPECT_LE(0, res) << folly::errnoStr(-res);
EXPECT_EQ(specs[id].size, res);
}
}
EXPECT_EQ(aioReader.pending(), 0);
EXPECT_EQ(aioQueue.queued(), 0);
for (int i = 0; i < pending.size(); i++) {
EXPECT_FALSE(pending[i]);
}
}
void testReads(const std::vector<TestSpec>& specs,
AsyncIO::PollMode pollMode) {
testReadsSerially(specs, pollMode);
testReadsParallel(specs, pollMode);
testReadsQueued(specs, pollMode);
}
} // anonymous namespace
......@@ -275,8 +338,9 @@ TEST(AsyncIO, NonBlockingWait) {
::close(fd);
};
size_t size = 1024;
std::unique_ptr<char[]> buf(new char[size]);
aioReader.pread(&op, fd, buf.get(), size, 0);
auto buf = allocateAligned(size);
op.pread(fd, buf.get(), size, 0);
aioReader.submit(&op);
EXPECT_EQ(aioReader.pending(), 1);
folly::Range<AsyncIO::Op**> completed;
......@@ -293,3 +357,4 @@ TEST(AsyncIO, NonBlockingWait) {
EXPECT_EQ(aioReader.pending(), 0);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment