Commit db7a54ac authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add support for async range submit

Summary: Add support for async range submit

Reviewed By: yfeldblum

Differential Revision: D23389493

fbshipit-source-id: 7171893a2eeccc20fcdd625e98f88d365027c930
parent 135f8252
......@@ -100,9 +100,9 @@ AsyncBase::~AsyncBase() {
}
}
void AsyncBase::decrementPending() {
void AsyncBase::decrementPending(size_t n) {
auto p =
pending_.fetch_add(static_cast<size_t>(-1), std::memory_order_acq_rel);
pending_.fetch_add(static_cast<size_t>(-n), std::memory_order_acq_rel);
DCHECK_GE(p, 1);
}
......@@ -128,6 +128,34 @@ void AsyncBase::submit(Op* op) {
op->start();
}
int AsyncBase::submit(Range<Op**> ops) {
for (auto& op : ops) {
CHECK_EQ(op->state(), Op::State::INITIALIZED);
}
initializeContext(); // on demand
// We can increment past capacity, but we'll clean up after ourselves.
auto p = pending_.fetch_add(ops.size(), std::memory_order_acq_rel);
if (p >= capacity_) {
decrementPending(ops.size());
throw std::range_error("AsyncBase: too many pending requests");
}
int rc = submitRange(ops);
if (rc < 0) {
decrementPending(ops.size());
throwSystemErrorExplicit(-rc, "AsyncBase: io_submit failed");
}
submitted_ += rc;
DCHECK_LE(rc, ops.size());
for (int i = 0; i < rc; i++) {
ops[i]->start();
}
return rc;
}
Range<AsyncBase::Op**> AsyncBase::wait(size_t minRequests) {
CHECK(isInit());
CHECK_EQ(pollFd_, -1) << "wait() only allowed on non-pollable object";
......
......@@ -234,6 +234,11 @@ class AsyncBase {
*/
void submit(Op* op);
/**
* Submit a range of ops for execution
*/
int submit(Range<Op**> ops);
protected:
void complete(Op* op, ssize_t result) {
op->complete(result);
......@@ -247,9 +252,10 @@ class AsyncBase {
return init_.load(std::memory_order_relaxed);
}
void decrementPending();
void decrementPending(size_t num = 1);
virtual void initializeContext() = 0;
virtual int submitOne(AsyncBase::Op* op) = 0;
virtual int submitRange(Range<AsyncBase::Op**> ops) = 0;
enum class WaitType { COMPLETE, CANCEL };
virtual Range<AsyncBase::Op**> doWait(
......
......@@ -199,6 +199,27 @@ int AsyncIO::submitOne(AsyncBase::Op* op) {
return io_submit(ctx_, 1, &cb);
}
int AsyncIO::submitRange(Range<AsyncBase::Op**> ops) {
std::vector<iocb*> vec;
vec.reserve(ops.size());
for (auto& op : ops) {
AsyncIOOp* aop = op->getAsyncIOOp();
if (!aop) {
continue;
}
iocb* cb = &aop->iocb_;
cb->data = nullptr; // unused
if (pollFd_ != -1) {
io_set_eventfd(cb, pollFd_);
}
vec.push_back(cb);
}
return vec.size() ? io_submit(ctx_, vec.size(), vec.data()) : -1;
}
Range<AsyncBase::Op**> AsyncIO::doWait(
WaitType type,
size_t minRequests,
......
......@@ -81,6 +81,7 @@ class AsyncIO : public AsyncBase {
private:
void initializeContext() override;
int submitOne(AsyncBase::Op* op) override;
int submitRange(Range<AsyncBase::Op**> ops) override;
Range<AsyncBase::Op**> doWait(
WaitType type,
......
......@@ -237,6 +237,36 @@ int IoUring::submitOne(AsyncBase::Op* op) {
return io_uring_submit(&ioRing_);
}
int IoUring::submitRange(Range<AsyncBase::Op**> ops) {
size_t num = 0;
int total = 0;
SharedMutex::WriteHolder lk(submitMutex_);
for (size_t i = 0; i < ops.size(); i++) {
IoUringOp* iop = ops[i]->getIoUringOp();
if (!iop) {
continue;
}
auto* sqe = io_uring_get_sqe(&ioRing_);
if (!sqe) {
break;
}
*sqe = iop->getSqe();
++num;
if (num % maxSubmit_ == 0 || (i + 1 == ops.size())) {
auto ret = io_uring_submit(&ioRing_);
if (ret <= 0) {
return total;
}
total += ret;
}
}
return total ? total : -1;
}
Range<AsyncBase::Op**> IoUring::doWait(
WaitType type,
size_t minRequests,
......
......@@ -99,6 +99,7 @@ class IoUring : public AsyncBase {
private:
void initializeContext() override;
int submitOne(AsyncBase::Op* op) override;
int submitRange(Range<AsyncBase::Op**> ops) override;
Range<AsyncBase::Op**> doWait(
WaitType type,
......
......@@ -45,6 +45,10 @@ namespace async_base_test_lib_detail {
constexpr size_t kODirectAlign = 4096; // align reads to 4096 B (for O_DIRECT)
constexpr size_t kDefaultFileSize = 6 << 20; // 6MiB
constexpr size_t kBatchNumEntries = 1024;
constexpr size_t kBatchSize = 192;
constexpr size_t kBatchBlockSize = 4096;
struct TestSpec {
off_t start;
size_t size;
......@@ -474,6 +478,50 @@ REGISTER_TYPED_TEST_CASE_P(
NonBlockingWait,
Cancel);
// batch tests
template <typename T>
class AsyncBatchTest : public ::testing::Test {};
TYPED_TEST_CASE_P(AsyncBatchTest);
TYPED_TEST_P(AsyncBatchTest, BatchRead) {
TypeParam aioReader;
auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDONLY);
SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: "
<< folly::errnoStr(errno);
SCOPE_EXIT {
::close(fd);
};
using OpPtr = folly::AsyncBaseOp*;
std::unique_ptr<typename TypeParam::Op[]> ops(
new typename TypeParam::Op[kBatchNumEntries]);
std::unique_ptr<OpPtr[]> opPtrs(new OpPtr[kBatchNumEntries]);
std::vector<folly::test::async_base_test_lib_detail::TestUtil::ManagedBuffer>
bufs;
bufs.reserve(kBatchNumEntries);
size_t completed = 0;
for (size_t i = 0; i < kBatchNumEntries; i++) {
bufs.push_back(
folly::test::async_base_test_lib_detail::TestUtil::allocateAligned(
kBatchBlockSize));
auto& op = ops[i];
opPtrs[i] = &op;
op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; });
op.pread(fd, bufs[i].get(), kBatchBlockSize, i * kBatchBlockSize);
}
aioReader.submit(
Range<AsyncBase::Op**>(opPtrs.get(), opPtrs.get() + kBatchNumEntries));
aioReader.wait(kBatchNumEntries);
CHECK_EQ(completed, kBatchNumEntries);
}
REGISTER_TYPED_TEST_CASE_P(AsyncBatchTest, BatchRead);
} // namespace async_base_test_lib_detail
} // namespace test
} // namespace folly
......@@ -23,6 +23,13 @@ namespace folly {
namespace test {
namespace async_base_test_lib_detail {
INSTANTIATE_TYPED_TEST_CASE_P(AsyncTest, AsyncTest, AsyncIO);
class BatchAsyncIO : public AsyncIO {
public:
BatchAsyncIO() : AsyncIO(kBatchNumEntries, folly::AsyncBase::NOT_POLLABLE) {}
};
INSTANTIATE_TYPED_TEST_CASE_P(AsyncBatchTest, AsyncBatchTest, BatchAsyncIO);
} // namespace async_base_test_lib_detail
} // namespace test
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/eventfd.h>
#include <folly/Benchmark.h>
#include <folly/FileUtil.h>
#include <folly/experimental/io/AsyncIO.h>
#include <folly/experimental/io/IoUring.h>
#include <folly/experimental/io/test/AsyncBaseTestLib.h>
#include <folly/experimental/io/test/IoTestTempFileUtil.h>
#include <folly/portability/GFlags.h>
namespace {
static constexpr size_t kBlockSize = 4096;
static constexpr size_t kNumBlocks = 4096;
static folly::test::TemporaryFile& getTempFile(size_t num) {
CHECK_LE(num, kNumBlocks);
static auto sTempFile =
folly::test::TempFileUtil::getTempFile(kNumBlocks * kBlockSize);
return sTempFile;
}
template <typename OP>
struct BenchmarkData {
BenchmarkData(size_t num, size_t& c) : numEntries(num), completed(c) {
fd = ::open(getTempFile(num).path().c_str(), O_DIRECT | O_RDONLY);
CHECK_GE(fd, 0);
ops.reserve(numEntries);
bufs.reserve(numEntries);
for (size_t i = 0; i < numEntries; i++) {
bufs.push_back(
folly::test::async_base_test_lib_detail::TestUtil::allocateAligned(
kBlockSize));
}
}
~BenchmarkData() {
::close(fd);
}
void reset() {
ops.clear();
for (size_t i = 0; i < numEntries; i++) {
ops.push_back(std::make_unique<OP>());
auto& op = *ops.back();
op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; });
op.pread(fd, bufs[i].get(), kBlockSize, i * kBlockSize);
}
}
std::vector<std::unique_ptr<folly::AsyncBase::Op>> ops;
std::vector<folly::test::async_base_test_lib_detail::TestUtil::ManagedBuffer>
bufs;
size_t numEntries;
size_t& completed;
int fd{-1};
};
template <typename TAsync>
void runTAsyncIOTest(
unsigned int iters,
size_t numEntries,
size_t batchSize,
bool persist) {
folly::BenchmarkSuspender suspender;
std::vector<folly::AsyncBase::Op*> ops;
ops.reserve(batchSize);
std::unique_ptr<TAsync> aio(
persist
? new TAsync(numEntries, folly::AsyncBase::NOT_POLLABLE, batchSize)
: nullptr);
size_t completed = 0;
BenchmarkData<typename TAsync::Op> bmData(numEntries, completed);
suspender.dismiss();
for (unsigned iter = 0; iter < iters; iter++) {
if (!persist) {
aio.reset(
new TAsync(numEntries, folly::AsyncBase::NOT_POLLABLE, batchSize));
}
completed = 0;
bmData.reset();
size_t num = 0;
for (size_t i = 0; i < numEntries; i++) {
ops.push_back(bmData.ops[i].get());
if (++num == batchSize) {
num = 0;
aio->submit(folly::Range(ops.data(), ops.data() + ops.size()));
ops.clear();
}
}
if (num) {
aio->submit(folly::Range(ops.data(), ops.data() + ops.size()));
ops.clear();
}
aio->wait(numEntries);
CHECK_EQ(completed, numEntries);
if (!persist) {
aio.reset();
}
}
aio.reset();
suspender.rehire();
}
void runAsyncIOTest(
unsigned int iters,
size_t numEntries,
size_t batchSize,
bool persist) {
class BatchAsyncIO : public folly::AsyncIO {
public:
BatchAsyncIO(size_t capacity, PollMode pollMode, size_t /*unused*/)
: folly::AsyncIO(capacity, pollMode) {}
};
runTAsyncIOTest<BatchAsyncIO>(iters, numEntries, batchSize, persist);
}
void runIOUringTest(
unsigned int iters,
size_t numEntries,
size_t batchSize,
bool persist) {
class BatchIoUring : public folly::IoUring {
public:
BatchIoUring(size_t capacity, PollMode pollMode, size_t batchSize)
: folly::IoUring(capacity, pollMode, batchSize) {}
};
runTAsyncIOTest<BatchIoUring>(iters, numEntries, batchSize, persist);
}
} // namespace
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(
runAsyncIOTest,
async_io_no_batching_no_persist,
4096,
1,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_64_no_persist,
4096,
64,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_256_no_persist,
4096,
256,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_no_batching_persist,
4096,
1,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_64_persist,
4096,
64,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_256_persist,
4096,
256,
true)
BENCHMARK_DRAW_LINE();
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_no_batching_no_persist,
4096,
1,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_64_no_persist,
4096,
64,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_256_no_persist,
4096,
256,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_no_batching_persist,
4096,
1,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_64_persist,
4096,
64,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_256_persist,
4096,
256,
true)
BENCHMARK_DRAW_LINE();
int main(int argc, char** argv) {
getTempFile(kNumBlocks);
gflags::ParseCommandLineFlags(&argc, &argv, true);
folly::runBenchmarks();
}
/*
./io_benchmark --bm_min_iters=100
============================================================================
folly/experimental/io/test/IOBenchmark.cpp relative time/iter iters/s
============================================================================
----------------------------------------------------------------------------
runAsyncIOTest(async_io_no_batching_no_persist) 111.61ms 8.96
runAsyncIOTest(async_io_batching_64_no_persist) 99.89% 111.73ms 8.95
runAsyncIOTest(async_io_batching_256_no_persist 97.35% 114.65ms 8.72
runAsyncIOTest(async_io_no_batching_persist) 120.09% 92.94ms 10.76
runAsyncIOTest(async_io_batching_64_persist) 119.98% 93.03ms 10.75
runAsyncIOTest(async_io_batching_256_persist) 117.25% 95.19ms 10.51
----------------------------------------------------------------------------
runIOUringTest(io_uring_no_batching_no_persist) 120.59% 92.55ms 10.80
runIOUringTest(io_uring_batching_64_no_persist) 120.62% 92.53ms 10.81
runIOUringTest(io_uring_batching_256_no_persist 120.52% 92.61ms 10.80
runIOUringTest(io_uring_no_batching_persist) 116.74% 95.61ms 10.46
runIOUringTest(io_uring_batching_64_persist) 120.64% 92.52ms 10.81
runIOUringTest(io_uring_batching_256_persist) 120.31% 92.77ms 10.78
----------------------------------------------------------------------------
============================================================================
*/
......@@ -24,6 +24,15 @@ namespace folly {
namespace test {
namespace async_base_test_lib_detail {
INSTANTIATE_TYPED_TEST_CASE_P(AsyncTest, AsyncTest, IoUring);
class BatchIoUring : public IoUring {
public:
static constexpr size_t kMaxSubmit = 64;
BatchIoUring()
: IoUring(kBatchNumEntries, folly::AsyncBase::NOT_POLLABLE, kMaxSubmit) {}
};
INSTANTIATE_TYPED_TEST_CASE_P(AsyncBatchTest, AsyncBatchTest, BatchIoUring);
} // namespace async_base_test_lib_detail
} // namespace test
} // namespace folly
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment