Commit 67b7c2ad authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add support for SQ poll groups - this allows sharing of the same SQ kernel poll thread

Summary:
Add support for SQ poll groups - this allows sharing of the same SQ kernel poll thread

(Note: this ignores all push blocking failures!)

Reviewed By: kevin-vigor

Differential Revision: D23769744

fbshipit-source-id: c5fd3d22fd1048321896d7445d7bc7e2863ad533
parent d586f9f8
...@@ -17,11 +17,164 @@ ...@@ -17,11 +17,164 @@
#include <folly/experimental/io/IoUringBackend.h> #include <folly/experimental/io/IoUringBackend.h>
#include <folly/Likely.h> #include <folly/Likely.h>
#include <folly/String.h> #include <folly/String.h>
#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <folly/portability/Sockets.h> #include <folly/portability/Sockets.h>
#include <folly/synchronization/CallOnce.h> #include <folly/synchronization/CallOnce.h>
#include <glog/logging.h> #include <glog/logging.h>
namespace {
class SQGroupInfoRegistry {
private:
// a group is a collection of io_uring instances
// that share up to numThreads SQ poll threads
struct SQGroupInfo {
struct SQSubGroupInfo {
folly::F14FastSet<int> fds;
size_t count{0};
void add(int fd) {
CHECK(fds.find(fd) == fds.end());
fds.insert(fd);
++count;
}
size_t remove(int fd) {
auto iter = fds.find(fd);
CHECK(iter != fds.end());
fds.erase(fd);
--count;
return count;
}
};
explicit SQGroupInfo(size_t num) : subGroups(num) {}
// returns the least loaded subgroup
SQSubGroupInfo* getNextSubgroup() {
size_t min_idx = 0;
for (size_t i = 0; i < subGroups.size(); i++) {
if (subGroups[i].count == 0) {
return &subGroups[i];
}
if (subGroups[i].count < subGroups[min_idx].count) {
min_idx = i;
}
}
return &subGroups[min_idx];
}
size_t add(int fd, SQSubGroupInfo* sg) {
CHECK(fdSgMap.find(fd) == fdSgMap.end());
fdSgMap.insert(std::make_pair(fd, sg));
sg->add(fd);
++count;
return count;
}
size_t remove(int fd) {
auto iter = fdSgMap.find(fd);
CHECK(fdSgMap.find(fd) != fdSgMap.end());
iter->second->remove(fd);
fdSgMap.erase(iter);
--count;
return count;
}
// file descriptor to sub group index map
folly::F14FastMap<int, SQSubGroupInfo*> fdSgMap;
// array of subgoups
std::vector<SQSubGroupInfo> subGroups;
// number of entries
size_t count{0};
};
using SQGroupInfoMap = folly::F14FastMap<std::string, SQGroupInfo>;
SQGroupInfoMap map_;
std::mutex mutex_;
public:
SQGroupInfoRegistry() = default;
~SQGroupInfoRegistry() = default;
using FDCreateFunc = folly::Function<int(struct io_uring_params&)>;
using FDCloseFunc = folly::Function<void()>;
size_t addTo(
const std::string& groupName,
size_t groupNumThreads,
FDCreateFunc& createFd,
struct io_uring_params& params) {
if (groupName.empty()) {
createFd(params);
return 0;
}
size_t ret = 0;
std::lock_guard g(mutex_);
SQGroupInfo::SQSubGroupInfo* sg = nullptr;
SQGroupInfo* info = nullptr;
auto iter = map_.find(groupName);
if (iter != map_.end()) {
info = &iter->second;
sg = info->getNextSubgroup();
// we're adding to a non empty subgroup
if (sg->count) {
params.wq_fd = *(sg->fds.begin());
params.flags |= IORING_SETUP_ATTACH_WQ;
}
}
auto fd = createFd(params);
if (fd >= 0) {
if (!info) {
SQGroupInfo gr(groupNumThreads);
info = &map_.insert(std::make_pair(groupName, std::move(gr)))
.first->second;
sg = info->getNextSubgroup();
}
ret = info->add(fd, sg);
}
return ret;
}
size_t removeFrom(const std::string& groupName, int fd, FDCloseFunc& func) {
if (groupName.empty()) {
func();
return 0;
}
size_t ret;
std::lock_guard g(mutex_);
func();
auto iter = map_.find(groupName);
CHECK(iter != map_.end());
// check for empty group
if ((ret = iter->second.remove(fd)) == 0) {
map_.erase(iter);
}
return ret;
}
};
static folly::Indestructible<SQGroupInfoRegistry> sSQGroupInfoRegistry;
} // namespace
namespace folly { namespace folly {
IoUringBackend::FdRegistry::FdRegistry(struct io_uring& ioRing, size_t n) IoUringBackend::FdRegistry::FdRegistry(struct io_uring& ioRing, size_t n)
: ioRing_(ioRing), files_(n, -1), inUse_(n), records_(n) {} : ioRing_(ioRing), files_(n, -1), inUse_(n), records_(n) {}
...@@ -108,16 +261,28 @@ IoUringBackend::IoUringBackend(Options options) ...@@ -108,16 +261,28 @@ IoUringBackend::IoUringBackend(Options options)
params_.sq_thread_cpu = options.sqCpu; params_.sq_thread_cpu = options.sqCpu;
} }
SQGroupInfoRegistry::FDCreateFunc func = [&](struct io_uring_params& params) {
// allocate entries both for poll add and cancel // allocate entries both for poll add and cancel
if (::io_uring_queue_init_params( if (::io_uring_queue_init_params(
2 * options_.maxSubmit, &ioRing_, &params_)) { 2 * options_.maxSubmit, &ioRing_, &params)) {
LOG(ERROR) << "io_uring_queue_init_params(" << 2 * options_.maxSubmit << "," LOG(ERROR) << "io_uring_queue_init_params(" << 2 * options_.maxSubmit
<< params_.cq_entries << ") " << "," << params.cq_entries << ") "
<< "failed errno = " << errno << ":\"" << folly::errnoStr(errno) << "failed errno = " << errno << ":\""
<< "\" " << this; << folly::errnoStr(errno) << "\" " << this;
throw NotAvailable("io_uring_queue_init error"); throw NotAvailable("io_uring_queue_init error");
} }
return ioRing_.ring_fd;
};
auto ret = sSQGroupInfoRegistry->addTo(
options_.sqGroupName, options_.sqGroupNumThreads, func, params_);
if (!options_.sqGroupName.empty()) {
LOG(INFO) << "Adding to SQ poll group \"" << options_.sqGroupName
<< "\" ret = " << ret << " fd = " << ioRing_.ring_fd;
}
numEntries_ *= 2; numEntries_ *= 2;
// timer entry // timer entry
...@@ -182,9 +347,20 @@ void IoUringBackend::cleanup() { ...@@ -182,9 +347,20 @@ void IoUringBackend::cleanup() {
signalReadEntry_.reset(); signalReadEntry_.reset();
freeList_.clear_and_dispose([](auto _) { delete _; }); freeList_.clear_and_dispose([](auto _) { delete _; });
int fd = ioRing_.ring_fd;
SQGroupInfoRegistry::FDCloseFunc func = [&]() {
// exit now // exit now
::io_uring_queue_exit(&ioRing_); ::io_uring_queue_exit(&ioRing_);
ioRing_.ring_fd = -1; ioRing_.ring_fd = -1;
};
auto ret = sSQGroupInfoRegistry->removeFrom(
options_.sqGroupName, ioRing_.ring_fd, func);
if (!options_.sqGroupName.empty()) {
LOG(INFO) << "Removing from SQ poll group \"" << options_.sqGroupName
<< "\" ret = " << ret << " fd = " << fd;
}
} }
} }
......
...@@ -87,12 +87,24 @@ class PollIoBackend : public EventBaseBackendBase { ...@@ -87,12 +87,24 @@ class PollIoBackend : public EventBaseBackendBase {
return *this; return *this;
} }
Options& setCQCpu(uint32_t v) { Options& setSQCpu(uint32_t v) {
sqCpu = v; sqCpu = v;
return *this; return *this;
} }
Options& setSQGroupName(const std::string& v) {
sqGroupName = v;
return *this;
}
Options& setSQGroupNumThreads(size_t v) {
sqGroupNumThreads = v;
return *this;
}
size_t capacity{0}; size_t capacity{0};
size_t maxSubmit{128}; size_t maxSubmit{128};
size_t maxGet{static_cast<size_t>(-1)}; size_t maxGet{static_cast<size_t>(-1)};
...@@ -102,6 +114,8 @@ class PollIoBackend : public EventBaseBackendBase { ...@@ -102,6 +114,8 @@ class PollIoBackend : public EventBaseBackendBase {
std::chrono::milliseconds sqIdle{0}; std::chrono::milliseconds sqIdle{0};
std::chrono::milliseconds cqIdle{0}; std::chrono::milliseconds cqIdle{0};
uint32_t sqCpu{0}; uint32_t sqCpu{0};
std::string sqGroupName;
size_t sqGroupNumThreads{1};
}; };
explicit PollIoBackend(Options options); explicit PollIoBackend(Options options);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment