Commit 16d63941 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add io_uring registered buffers support

Summary: Add io_uring registered buffers support

Reviewed By: kevin-vigor

Differential Revision: D23421043

fbshipit-source-id: b9c6a4bbd7a6b6cc580b1e3f64320d9e59d04ab2
parent 6226f8a2
......@@ -67,6 +67,10 @@ class AsyncBaseOp {
pread(fd, range.begin(), range.size(), start);
}
virtual void preadv(int fd, const iovec* iov, int iovcnt, off_t start) = 0;
virtual void
pread(int fd, void* buf, size_t size, off_t start, int /*buf_index*/) {
pread(fd, buf, size, start);
}
/**
* Initiate a write request.
......@@ -76,6 +80,10 @@ class AsyncBaseOp {
pwrite(fd, range.begin(), range.size(), start);
}
virtual void pwritev(int fd, const iovec* iov, int iovcnt, off_t start) = 0;
virtual void
pwrite(int fd, const void* buf, size_t size, off_t start, int /*buf_index*/) {
pwrite(fd, buf, size, start);
}
// we support only these subclasses
virtual AsyncIOOp* getAsyncIOOp() = 0;
......
......@@ -135,6 +135,17 @@ void IoUringOp::preadv(int fd, const iovec* iov, int iovcnt, off_t start) {
io_uring_sqe_set_data(&sqe_, this);
}
void IoUringOp::pread(
int fd,
void* buf,
size_t size,
off_t start,
int buf_index) {
init();
io_uring_prep_read_fixed(&sqe_, fd, buf, size, start, buf_index);
io_uring_sqe_set_data(&sqe_, this);
}
void IoUringOp::pwrite(int fd, const void* buf, size_t size, off_t start) {
init();
iov_[0].iov_base = const_cast<void*>(buf);
......@@ -149,6 +160,17 @@ void IoUringOp::pwritev(int fd, const iovec* iov, int iovcnt, off_t start) {
io_uring_sqe_set_data(&sqe_, this);
}
void IoUringOp::pwrite(
int fd,
const void* buf,
size_t size,
off_t start,
int buf_index) {
init();
io_uring_prep_write_fixed(&sqe_, fd, buf, size, start, buf_index);
io_uring_sqe_set_data(&sqe_, this);
}
void IoUringOp::toStream(std::ostream& os) const {
os << "{" << state_ << ", ";
......@@ -202,6 +224,23 @@ bool IoUring::isAvailable() {
return true;
}
int IoUring::register_buffers(
const struct iovec* iovecs,
unsigned int nr_iovecs) {
initializeContext();
SharedMutex::WriteHolder lk(submitMutex_);
return io_uring_register_buffers(&ioRing_, iovecs, nr_iovecs);
}
int IoUring::unregister_buffers() {
initializeContext();
SharedMutex::WriteHolder lk(submitMutex_);
return io_uring_unregister_buffers(&ioRing_);
}
void IoUring::initializeContext() {
if (!init_.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> lock(initMutex_);
......
......@@ -44,12 +44,16 @@ class IoUringOp : public AsyncBaseOp {
*/
void pread(int fd, void* buf, size_t size, off_t start) override;
void preadv(int fd, const iovec* iov, int iovcnt, off_t start) override;
void pread(int fd, void* buf, size_t size, off_t start, int buf_index)
override;
/**
* Initiate a write request.
*/
void pwrite(int fd, const void* buf, size_t size, off_t start) override;
void pwritev(int fd, const iovec* iov, int iovcnt, off_t start) override;
void pwrite(int fd, const void* buf, size_t size, off_t start, int buf_index)
override;
void reset(NotificationCallback cb = NotificationCallback()) override;
......@@ -96,6 +100,10 @@ class IoUring : public AsyncBase {
static bool isAvailable();
int register_buffers(const struct iovec* iovecs, unsigned int nr_iovecs);
int unregister_buffers();
private:
void initializeContext() override;
int submitOne(AsyncBase::Op* op) override;
......
......@@ -27,7 +27,9 @@
namespace {
static constexpr size_t kBlockSize = 4096;
static constexpr size_t kNumBlocks = 4096;
// we cannot register more than UIO_MAXIOV iovs
// we can create bigger buffers and split them
static constexpr size_t kNumBlocks = UIO_MAXIOV;
static folly::test::TemporaryFile& getTempFile(size_t num) {
CHECK_LE(num, kNumBlocks);
......@@ -56,13 +58,17 @@ struct BenchmarkData {
::close(fd);
}
void reset() {
void reset(bool useRegisteredBuffers) {
ops.clear();
for (size_t i = 0; i < numEntries; i++) {
ops.push_back(std::make_unique<OP>());
auto& op = *ops.back();
op.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; });
op.pread(fd, bufs[i].get(), kBlockSize, i * kBlockSize);
if (useRegisteredBuffers) {
op.pread(fd, bufs[i].get(), kBlockSize, i * kBlockSize, i);
} else {
op.pread(fd, bufs[i].get(), kBlockSize, i * kBlockSize);
}
}
}
......@@ -79,24 +85,31 @@ void runTAsyncIOTest(
unsigned int iters,
size_t numEntries,
size_t batchSize,
bool persist) {
bool persist,
bool useRegisteredBuffers) {
folly::BenchmarkSuspender suspender;
std::vector<folly::AsyncBase::Op*> ops;
ops.reserve(batchSize);
size_t completed = 0;
BenchmarkData<typename TAsync::Op> bmData(numEntries, completed);
std::unique_ptr<TAsync> aio(
persist
? new TAsync(numEntries, folly::AsyncBase::NOT_POLLABLE, batchSize)
: nullptr);
size_t completed = 0;
BenchmarkData<typename TAsync::Op> bmData(numEntries, completed);
if (aio) {
aio->register_buffers(bmData.bufs);
}
suspender.dismiss();
for (unsigned iter = 0; iter < iters; iter++) {
if (!persist) {
aio.reset(
new TAsync(numEntries, folly::AsyncBase::NOT_POLLABLE, batchSize));
if (useRegisteredBuffers) {
aio->register_buffers(bmData.bufs);
}
}
completed = 0;
bmData.reset();
bmData.reset(useRegisteredBuffers);
size_t num = 0;
for (size_t i = 0; i < numEntries; i++) {
ops.push_back(bmData.ops[i].get());
......@@ -129,98 +142,159 @@ void runAsyncIOTest(
public:
BatchAsyncIO(size_t capacity, PollMode pollMode, size_t /*unused*/)
: folly::AsyncIO(capacity, pollMode) {}
void register_buffers(
const std::vector<folly::test::async_base_test_lib_detail::TestUtil::
ManagedBuffer>&) {}
};
runTAsyncIOTest<BatchAsyncIO>(iters, numEntries, batchSize, persist);
runTAsyncIOTest<BatchAsyncIO>(iters, numEntries, batchSize, persist, false);
}
void runIOUringTest(
unsigned int iters,
size_t numEntries,
size_t batchSize,
bool persist) {
bool persist,
bool useRegisteredBuffers = false) {
class BatchIoUring : public folly::IoUring {
public:
BatchIoUring(size_t capacity, PollMode pollMode, size_t batchSize)
: folly::IoUring(capacity, pollMode, batchSize) {}
void register_buffers(
const std::vector<
folly::test::async_base_test_lib_detail::TestUtil::ManagedBuffer>&
bufs) {
std::vector<struct iovec> iovs(bufs.size());
for (size_t i = 0; i < bufs.size(); i++) {
iovs[i].iov_base = bufs[i].get();
iovs[i].iov_len = kBlockSize;
}
auto ret = folly::IoUring::register_buffers(iovs.data(), iovs.size());
CHECK_EQ(ret, 0);
}
};
runTAsyncIOTest<BatchIoUring>(iters, numEntries, batchSize, persist);
runTAsyncIOTest<BatchIoUring>(
iters, numEntries, batchSize, persist, useRegisteredBuffers);
}
} // namespace
BENCHMARK_DRAW_LINE();
BENCHMARK_NAMED_PARAM(
runAsyncIOTest,
async_io_no_batching_no_persist,
4096,
async_io_no_batching_no_per,
1024,
1,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_64_no_persist,
4096,
async_io_batching_64_no_per,
1024,
64,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_256_no_persist,
4096,
async_io_batching_256_no_per,
1024,
256,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_no_batching_persist,
4096,
async_io_no_batching_per,
1024,
1,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_64_persist,
4096,
async_io_batching_64_per,
1024,
64,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runAsyncIOTest,
async_io_batching_256_persist,
4096,
async_io_batching_256_per,
1024,
256,
true)
BENCHMARK_DRAW_LINE();
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_no_batching_no_persist,
4096,
io_uring_no_batching_no_per,
1024,
1,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_64_no_persist,
4096,
io_uring_batching_64_no_per,
1024,
64,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_256_no_persist,
4096,
io_uring_batching_256_no_per,
1024,
256,
false)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_no_batching_persist,
4096,
io_uring_no_batching_per,
1024,
1,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_64_per,
1024,
64,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_256_per,
1024,
256,
true)
BENCHMARK_DRAW_LINE();
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_no_batching_no_per_reg,
1024,
1,
false,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_64_persist,
4096,
io_uring_batching_64_no_per_reg,
1024,
64,
false,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_256_persist,
4096,
io_uring_batching_256_no_per_reg,
1024,
256,
false,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_no_batching_per_reg,
1024,
1,
true,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_64_per_reg,
1024,
64,
true,
true)
BENCHMARK_RELATIVE_NAMED_PARAM(
runIOUringTest,
io_uring_batching_256_per_reg,
1024,
256,
true,
true)
BENCHMARK_DRAW_LINE();
......@@ -236,19 +310,26 @@ int main(int argc, char** argv) {
folly/experimental/io/test/IOBenchmark.cpp relative time/iter iters/s
============================================================================
----------------------------------------------------------------------------
runAsyncIOTest(async_io_no_batching_no_persist) 111.61ms 8.96
runAsyncIOTest(async_io_batching_64_no_persist) 99.89% 111.73ms 8.95
runAsyncIOTest(async_io_batching_256_no_persist 97.35% 114.65ms 8.72
runAsyncIOTest(async_io_no_batching_persist) 120.09% 92.94ms 10.76
runAsyncIOTest(async_io_batching_64_persist) 119.98% 93.03ms 10.75
runAsyncIOTest(async_io_batching_256_persist) 117.25% 95.19ms 10.51
runAsyncIOTest(async_io_no_batching_no_per) 45.33ms 22.06
runAsyncIOTest(async_io_batching_64_no_per) 102.48% 44.24ms 22.61
runAsyncIOTest(async_io_batching_256_no_per) 94.30% 48.08ms 20.80
runAsyncIOTest(async_io_no_batching_per) 173.66% 26.11ms 38.31
runAsyncIOTest(async_io_batching_64_per) 179.94% 25.19ms 39.69
runAsyncIOTest(async_io_batching_256_per) 171.69% 26.40ms 37.87
----------------------------------------------------------------------------
runIOUringTest(io_uring_no_batching_no_per) 180.66% 25.09ms 39.85
runIOUringTest(io_uring_batching_64_no_per) 176.16% 25.74ms 38.86
runIOUringTest(io_uring_batching_256_no_per) 178.45% 25.40ms 39.36
runIOUringTest(io_uring_no_batching_per) 177.59% 25.53ms 39.17
runIOUringTest(io_uring_batching_64_per) 178.06% 25.46ms 39.28
runIOUringTest(io_uring_batching_256_per) 178.81% 25.35ms 39.44
----------------------------------------------------------------------------
runIOUringTest(io_uring_no_batching_no_persist) 120.59% 92.55ms 10.80
runIOUringTest(io_uring_batching_64_no_persist) 120.62% 92.53ms 10.81
runIOUringTest(io_uring_batching_256_no_persist 120.52% 92.61ms 10.80
runIOUringTest(io_uring_no_batching_persist) 116.74% 95.61ms 10.46
runIOUringTest(io_uring_batching_64_persist) 120.64% 92.52ms 10.81
runIOUringTest(io_uring_batching_256_persist) 120.31% 92.77ms 10.78
runIOUringTest(io_uring_no_batching_no_per_reg) 121.76% 37.23ms 26.86
runIOUringTest(io_uring_batching_64_no_per_reg) 119.86% 37.82ms 26.44
runIOUringTest(io_uring_batching_256_no_per_reg 127.17% 35.65ms 28.05
runIOUringTest(io_uring_no_batching_per_reg) 178.60% 25.38ms 39.39
runIOUringTest(io_uring_batching_64_per_reg) 179.33% 25.28ms 39.56
runIOUringTest(io_uring_batching_256_per_reg) 178.69% 25.37ms 39.42
----------------------------------------------------------------------------
============================================================================
*/
......@@ -33,6 +33,77 @@ class BatchIoUring : public IoUring {
};
INSTANTIATE_TYPED_TEST_CASE_P(AsyncBatchTest, AsyncBatchTest, BatchIoUring);
TEST(IoUringTest, RegisteredBuffers) {
constexpr size_t kNumEntries = 2;
constexpr size_t kBufSize = 4096;
IoUring ioUring(kNumEntries, folly::AsyncBase::NOT_POLLABLE);
auto tempFile = folly::test::TempFileUtil::getTempFile(kDefaultFileSize);
int fd = ::open(tempFile.path().c_str(), O_DIRECT | O_RDWR);
SKIP_IF(fd == -1) << "Tempfile can't be opened with O_DIRECT: "
<< folly::errnoStr(errno);
SCOPE_EXIT {
::close(fd);
};
folly::test::async_base_test_lib_detail::TestUtil::ManagedBuffer
regFdWriteBuf =
folly::test::async_base_test_lib_detail::TestUtil::allocateAligned(
kBufSize),
readBuf =
folly::test::async_base_test_lib_detail::TestUtil::allocateAligned(
kBufSize),
regFdReadBuf =
folly::test::async_base_test_lib_detail::TestUtil::allocateAligned(
kBufSize);
::memset(regFdWriteBuf.get(), '0', kBufSize);
::memset(readBuf.get(), 'A', kBufSize);
::memset(regFdReadBuf.get(), 'Z', kBufSize);
struct iovec iov[2];
iov[0].iov_base = regFdWriteBuf.get();
iov[0].iov_len = kBufSize;
iov[1].iov_base = regFdReadBuf.get();
iov[1].iov_len = kBufSize;
CHECK_EQ(ioUring.register_buffers(iov, 2), 0);
IoUring::Op regFdWriteOp, readOp, regFdReadOp;
size_t completed = 0;
regFdWriteOp.setNotificationCallback(
[&](folly::AsyncBaseOp*) { ++completed; });
regFdWriteOp.pwrite(fd, regFdWriteBuf.get(), kBufSize, 0, 0 /*buf_index*/);
readOp.setNotificationCallback([&](folly::AsyncBaseOp*) { ++completed; });
readOp.pread(fd, readBuf.get(), kBufSize, 0);
regFdReadOp.setNotificationCallback(
[&](folly::AsyncBaseOp*) { ++completed; });
regFdReadOp.pread(fd, regFdReadBuf.get(), kBufSize, 0, 1 /*buf_index*/);
// write
ioUring.submit(&regFdWriteOp);
ioUring.wait(1);
CHECK_EQ(completed, 1);
CHECK_EQ(regFdWriteOp.result(), kBufSize);
// read - both via regular and registered buffers
completed = 0;
ioUring.submit(&readOp);
ioUring.submit(&regFdReadOp);
ioUring.wait(kNumEntries);
CHECK_EQ(completed, kNumEntries);
CHECK_EQ(readOp.result(), kBufSize);
CHECK_EQ(regFdReadOp.result(), kBufSize);
// make sure we read the same data
CHECK_EQ(::memcmp(readBuf.get(), regFdWriteBuf.get(), kBufSize), 0);
CHECK_EQ(::memcmp(regFdReadBuf.get(), regFdWriteBuf.get(), kBufSize), 0);
}
} // namespace async_base_test_lib_detail
} // namespace test
} // namespace folly
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment