Commit 42b8f101 authored by Kevin Vigor's avatar Kevin Vigor Committed by Facebook GitHub Bot

Add simple wrapper for AsyncIO that hides all the grunge.

Summary:
Add SimpleAsyncIO class. This creates an instance of an AsyncIO
and manages a completion thread and a pool of ops, so user does
not have to worry about any of these details.

Differential Revision: D23945993

fbshipit-source-id: 90f9b83ca3447a19e3bc4aaf2a2f196ee4ea614d
parent 19119237
...@@ -225,10 +225,12 @@ if (NOT ${LIBAIO_FOUND} AND NOT ${LIBURING_FOUND}) ...@@ -225,10 +225,12 @@ if (NOT ${LIBAIO_FOUND} AND NOT ${LIBURING_FOUND})
list(REMOVE_ITEM files list(REMOVE_ITEM files
${FOLLY_DIR}/experimental/io/AsyncBase.cpp ${FOLLY_DIR}/experimental/io/AsyncBase.cpp
${FOLLY_DIR}/experimental/io/PollIoBackend.cpp ${FOLLY_DIR}/experimental/io/PollIoBackend.cpp
${FOLLY_DIR}/experimental/io/SimpleAsyncIO.cpp
) )
list(REMOVE_ITEM hfiles list(REMOVE_ITEM hfiles
${FOLLY_DIR}/experimental/io/AsyncBase.h ${FOLLY_DIR}/experimental/io/AsyncBase.h
${FOLLY_DIR}/experimental/io/PollIoBackend.h ${FOLLY_DIR}/experimental/io/PollIoBackend.h
${FOLLY_DIR}/experimental/io/SimpleAsyncIO.h
) )
endif() endif()
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/io/SimpleAsyncIO.h>
#include <folly/String.h>
#if __has_include(<folly/experimental/io/AsyncIO.h>)
#define AIO_SUPPORTED
#include <folly/experimental/io/AsyncIO.h>
#endif
#if __has_include(<folly/experimental/io/IoUring.h>)
#define IOURING_SUPPORTED
#include <folly/experimental/io/IoUring.h>
#endif
#if !defined(AIO_SUPPORTED) && !defined(IOURING_SUPORTED)
#error "Cannot build without at least one of AsyncIO.h and IoUring.h"
#endif
#include <folly/portability/Sockets.h>
namespace folly {
SimpleAsyncIO::SimpleAsyncIO(Config cfg)
: maxRequests_(cfg.maxRequests_),
completionExecutor_(cfg.completionExecutor_),
terminating_(false) {
#if !defined(AIO_SUPPORTED)
if (cfg.mode_ == AIO) {
LOG(WARNING)
<< "aio mode requested but not available at link time, falling back to io_uring.";
cfg.setMode(IOURING);
}
#endif
#if !defined(IOURING_SUPPORTED)
if (cfg.mode_ == IOURING) {
LOG(WARNING)
<< "io_uring mode requested but not available at link time, falling back to aio.";
cfg.setMode(AIO);
}
#else
if (cfg.mode_ == IOURING && !IoUring::isAvailable()) {
#if defined(AIO_SUPPORTED)
LOG(WARNING)
<< "io_uring requested but not available at runtime, falling back to aio mode.";
cfg.setMode(AIO);
#else
LOG(FATAL)
<< "io_uring requested but not available at runtime, aio not available at link time, cannot proceed.";
#endif
}
#endif
switch (cfg.mode_) {
#if defined(AIO_SUPPORTED)
case AIO:
asyncIO_ = std::make_unique<AsyncIO>(maxRequests_, AsyncBase::POLLABLE);
opsFreeList_.withWLock(
[this](std::queue<std::unique_ptr<AsyncBaseOp>>& freeList) {
for (size_t i = 0; i < maxRequests_; ++i) {
freeList.push(std::make_unique<AsyncIOOp>());
}
});
break;
#endif
#ifdef IOURING_SUPPORTED
case IOURING:
asyncIO_ = std::make_unique<IoUring>(maxRequests_, AsyncBase::POLLABLE);
opsFreeList_.withWLock(
[this](std::queue<std::unique_ptr<AsyncBaseOp>>& freeList) {
for (size_t i = 0; i < maxRequests_; ++i) {
freeList.push(std::make_unique<IoUringOp>());
}
});
break;
#endif
default:
// Should never happen...
LOG(FATAL) << "Unavailable mode " << (int)cfg.mode_ << " requested.";
break;
}
if (cfg.evb_) {
initHandler(cfg.evb_, NetworkSocket::fromFd(asyncIO_->pollFd()));
} else {
evb_ = std::make_unique<ScopedEventBaseThread>();
initHandler(
evb_->getEventBase(), NetworkSocket::fromFd(asyncIO_->pollFd()));
}
registerHandler(EventHandler::READ | EventHandler::PERSIST);
}
SimpleAsyncIO::~SimpleAsyncIO() {
// stop accepting new IO.
opsFreeList_.withWLock(
[this](std::queue<std::unique_ptr<AsyncBaseOp>>& freeList) mutable {
terminating_ = true;
if (freeList.size() == maxRequests_) {
drainedBaton_.post();
}
});
drainedBaton_.wait();
unregisterHandler();
}
void SimpleAsyncIO::handlerReady(uint16_t events) noexcept {
if (events & EventHandler::READ) {
// All the work (including putting op back on free list) happens in the
// notificationCallback, so we can simply drop the ops returned from
// pollCompleted. But we must still call it or ops never complete.
while (asyncIO_->pollCompleted().size()) {
;
}
}
}
std::unique_ptr<AsyncBaseOp> SimpleAsyncIO::getOp() {
std::unique_ptr<AsyncBaseOp> rc;
opsFreeList_.withWLock(
[this, &rc](std::queue<std::unique_ptr<AsyncBaseOp>>& freeList) {
if (!freeList.empty() && !terminating_) {
rc = std::move(freeList.front());
freeList.pop();
rc->reset();
}
});
return rc;
}
void SimpleAsyncIO::putOp(std::unique_ptr<AsyncBaseOp>&& op) {
opsFreeList_.withWLock(
[this, op{std::move(op)}](
std::queue<std::unique_ptr<AsyncBaseOp>>& freeList) mutable {
freeList.push(std::move(op));
if (terminating_ && freeList.size() == maxRequests_) {
drainedBaton_.post();
}
});
}
void SimpleAsyncIO::submitOp(
Function<void(AsyncBaseOp*)> preparer,
SimpleAsyncIOCompletor completor) {
std::unique_ptr<AsyncBaseOp> opHolder = getOp();
if (!opHolder) {
completor(-EBUSY);
return;
}
// Grab a raw pointer to the op before we create the completion lambda,
// since we move the unique_ptr into the lambda and can no longer access
// it.
AsyncBaseOp* op = opHolder.get();
preparer(op);
op->setNotificationCallback(
[this, completor{std::move(completor)}, opHolder{std::move(opHolder)}](
AsyncBaseOp* op) mutable {
CHECK(op == opHolder.get());
int rc = op->result();
completionExecutor_->add(
[rc, completor{std::move(completor)}]() mutable { completor(rc); });
// NB: the moment we put the opHolder, the destructor might delete the
// current instance. So do not access any member variables after this
// point! Also, obviously, do not access op.
putOp(std::move(opHolder));
});
asyncIO_->submit(op);
}
void SimpleAsyncIO::pread(
int fd,
void* buf,
size_t size,
off_t start,
SimpleAsyncIOCompletor completor) {
submitOp(
[=](AsyncBaseOp* op) { op->pread(fd, buf, size, start); },
std::move(completor));
}
void SimpleAsyncIO::pwrite(
int fd,
const void* buf,
size_t size,
off_t start,
SimpleAsyncIOCompletor completor) {
submitOp(
[=](AsyncBaseOp* op) { op->pwrite(fd, buf, size, start); },
std::move(completor));
}
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Synchronized.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/experimental/io/AsyncBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/ScopedEventBaseThread.h>
#include <queue>
namespace folly {
class SimpleAsyncIO : public EventHandler {
public:
// SimpleAsyncIO is a wrapper around AsyncIO intended to hide all the details.
//
// Usage: just create an instance of SimpleAsyncIO and then issue IO with
// pread and pwrite, no other effort required. e.g.:
//
// auto tmpfile = folly::File::temporary();
// folly::SimpleAsyncIO aio;
// aio.pwrite(
// tmpfile.fd(),
// "hello world",
// /*size=*/11,
// /*offset=*/0,
// [](int rc) { LOG(INFO) << "Write completed with result " << rc; });
//
// IO is dispatched in the context of the calling thread; it may block briefly
// to obtain a lock on shared resources, but will *not* block for IO
// completion. If the IO queue is full (see maxRequests in Config, below),
// IO fails with -EBUSY.
//
// IO is completed on the executor specified in the config (global CPU
// executor by default).
//
// IO is completed by calling the callback function provided to pread/pwrite.
// The single parameter to the callback is either a negative errno or the
// number of bytes transferred.
//
// There is a "hidden" EventBase which polls for IO completion and dispatches
// completion events to the executor. You may specify an existing EventBase in
// the config (and you are then responsible for making sure the EventBase
// instance outlives the SimpleAsyncIO instance). If you do not specify one, a
// ScopedEventBaseThread instance will be created.
//
// Following structure defines the configuration of a SimpleAsyncIO instance,
// in case you need to override the (sensible) defaults.
//
// Typical usage is something like:
//
// SimpleAsyncIO io(SimpleAsyncIO::Config()
// .setMaxRequests(100)
// .setMode(SimpleAsyncIO::Mode::IOURING));
//
enum Mode { AIO, IOURING };
struct Config {
Config()
: maxRequests_(1000),
completionExecutor_(getKeepAliveToken(getCPUExecutor().get())),
mode_(AIO),
evb_(nullptr) {}
Config& setMaxRequests(size_t maxRequests) {
maxRequests_ = maxRequests;
return *this;
}
Config& setCompletionExecutor(Executor::KeepAlive<> completionExecutor) {
completionExecutor_ = completionExecutor;
return *this;
}
Config& setMode(Mode mode) {
mode_ = mode;
return *this;
}
Config& setEventBase(EventBase* evb) {
evb_ = evb;
return *this;
}
private:
size_t maxRequests_;
Executor::KeepAlive<> completionExecutor_;
Mode mode_;
EventBase* evb_;
friend class SimpleAsyncIO;
};
explicit SimpleAsyncIO(Config cfg = Config());
virtual ~SimpleAsyncIO() override;
using SimpleAsyncIOCompletor = Function<void(int rc)>;
void pread(
int fd,
void* buf,
size_t size,
off_t start,
SimpleAsyncIOCompletor completor);
void pwrite(
int fd,
const void* data,
size_t size,
off_t offset,
SimpleAsyncIOCompletor completor);
private:
std::unique_ptr<AsyncBaseOp> getOp();
void putOp(std::unique_ptr<AsyncBaseOp>&&);
void submitOp(
Function<void(AsyncBaseOp*)> preparer,
SimpleAsyncIOCompletor completor);
virtual void handlerReady(uint16_t events) noexcept override;
size_t maxRequests_;
Executor::KeepAlive<> completionExecutor_;
std::unique_ptr<AsyncBase> asyncIO_;
Synchronized<std::queue<std::unique_ptr<AsyncBaseOp>>> opsFreeList_;
std::unique_ptr<ScopedEventBaseThread> evb_;
bool terminating_;
Baton<> drainedBaton_;
};
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/io/SimpleAsyncIO.h>
#include <folly/File.h>
#include <folly/synchronization/Baton.h>
#include <glog/logging.h>
#include <folly/Random.h>
#include <folly/io/IOBuf.h>
#include <folly/portability/GTest.h>
#include <bitset>
using namespace folly;
class SimpleAsyncIOTest : public ::testing::TestWithParam<SimpleAsyncIO::Mode> {
public:
void SetUp() override {
config_.setMode(GetParam());
}
static std::string testTypeToString(
testing::TestParamInfo<SimpleAsyncIO::Mode> const& setting) {
switch (setting.param) {
case SimpleAsyncIO::Mode::AIO:
return "aio";
case SimpleAsyncIO::Mode::IOURING:
return "iouring";
}
}
protected:
SimpleAsyncIO::Config config_;
};
TEST_P(SimpleAsyncIOTest, WriteAndReadBack) {
auto tmpfile = File::temporary();
SimpleAsyncIO aio(config_);
Baton done;
int result;
const std::string data("Green Room Rockers");
aio.pwrite(
tmpfile.fd(), data.data(), data.size(), 0, [&done, &result](int rc) {
result = rc;
done.post();
});
ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
EXPECT_EQ(result, data.size());
std::array<uint8_t, 128> buffer;
done.reset();
aio.pread(
tmpfile.fd(), buffer.data(), buffer.size(), 0, [&done, &result](int rc) {
result = rc;
done.post();
});
ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
EXPECT_EQ(result, data.size());
EXPECT_EQ(memcmp(buffer.data(), data.data(), data.size()), 0);
}
std::string makeRandomBinaryString(size_t size) {
std::string content;
content.clear();
while (content.size() < size) {
content.append(std::bitset<8>(folly::Random::rand32()).to_string());
}
content.resize(size);
return content;
}
TEST_P(SimpleAsyncIOTest, ChainedReads) {
auto tmpfile = File::temporary();
int fd = tmpfile.fd();
Baton done;
static const size_t chunkSize = 128;
static const size_t numChunks = 1000;
std::vector<std::unique_ptr<IOBuf>> writeChunks;
std::vector<std::unique_ptr<IOBuf>> readChunks;
std::atomic<uint32_t> completed = 0;
for (size_t i = 0; i < numChunks; ++i) {
writeChunks.push_back(IOBuf::copyBuffer(makeRandomBinaryString(chunkSize)));
readChunks.push_back(IOBuf::create(chunkSize));
}
// allow for one read and one write for each chunk to be outstanding.
SimpleAsyncIO aio(config_.setMaxRequests(numChunks * 2));
for (size_t i = 0; i < numChunks; ++i) {
aio.pwrite(
fd,
writeChunks[i]->data(),
chunkSize,
i * chunkSize,
[fd, i, &readChunks, &aio, &done, &completed](int rc) {
ASSERT_EQ(rc, chunkSize);
aio.pread(
fd,
readChunks[i]->writableData(),
chunkSize,
i * chunkSize,
[=, &done, &completed](int rc) {
ASSERT_EQ(rc, chunkSize);
if (++completed == numChunks) {
done.post();
}
});
});
}
ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(60)));
for (size_t i = 0; i < numChunks; ++i) {
CHECK_EQ(
memcmp(writeChunks[i]->data(), readChunks[i]->data(), chunkSize), 0);
}
}
TEST_P(SimpleAsyncIOTest, DestroyWithPendingIO) {
auto tmpfile = File::temporary();
int fd = tmpfile.fd();
std::atomic<uint32_t> completed = 0;
static const size_t bufferSize = 128;
static const size_t numWrites = 100;
std::array<uint8_t, bufferSize> buffer;
memset(buffer.data(), 0, buffer.size());
// Slam out 100 writes and then destroy the SimpleAsyncIO instance
// without waiting for them to complete.
{
SimpleAsyncIO aio(config_);
for (size_t i = 0; i < numWrites; ++i) {
aio.pwrite(
fd, buffer.data(), bufferSize, i * bufferSize, [&completed](int rc) {
ASSERT_EQ(rc, bufferSize);
++completed;
});
}
}
// Destructor should have blocked until all IO was done.
ASSERT_EQ(completed, numWrites);
}
INSTANTIATE_TEST_CASE_P(
SimpleAsyncIOTests,
SimpleAsyncIOTest,
::testing::Values(
SimpleAsyncIO::Mode::AIO /*, SimpleAsyncIO::Mode::IOURING */),
SimpleAsyncIOTest::testTypeToString);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment