Commit aa8c3c8f authored by Kevin Vigor's avatar Kevin Vigor Committed by Facebook GitHub Bot

allow specifying multiple CPUs in io_uring SQ_POLL mode.

Summary:
Existing API allows user to specify a single preferred CPU for SQ_POLL workers,
but it also allows specifying multiple workers.

Add API to allow specifying a set of CPUs, not just one.

The specified set of CPUs are used in round-robin fashion, so it is not strictly
necessary that the number of CPUs and the number of workers be the same.

#forceTDHashing

Reviewed By: yfeldblum

Differential Revision: D26380317

fbshipit-source-id: ecfd8722670ffc3e3dc246fae1bcb7f4d8e6f3e4
parent 12066f6a
......@@ -145,7 +145,11 @@ class SQGroupInfoRegistry {
}
};
explicit SQGroupInfo(size_t num) : subGroups(num) {}
SQGroupInfo(size_t num, std::set<uint32_t> const& cpus) : subGroups(num) {
for (const uint32_t cpu : cpus) {
nextCpu.emplace_back(cpu);
}
}
// returns the least loaded subgroup
SQSubGroupInfo* getNextSubgroup() {
......@@ -188,6 +192,9 @@ class SQGroupInfoRegistry {
std::vector<SQSubGroupInfo> subGroups;
// number of entries
size_t count{0};
// Set of CPUs we will bind threads to.
std::vector<uint32_t> nextCpu;
int nextCpuIndex{0};
};
using SQGroupInfoMap = folly::F14FastMap<std::string, SQGroupInfo>;
......@@ -205,13 +212,13 @@ class SQGroupInfoRegistry {
const std::string& groupName,
size_t groupNumThreads,
FDCreateFunc& createFd,
struct io_uring_params& params) {
struct io_uring_params& params,
std::set<uint32_t> const& cpus) {
if (groupName.empty()) {
createFd(params);
return 0;
}
size_t ret = 0;
std::lock_guard g(mutex_);
SQGroupInfo::SQSubGroupInfo* sg = nullptr;
......@@ -219,28 +226,34 @@ class SQGroupInfoRegistry {
auto iter = map_.find(groupName);
if (iter != map_.end()) {
info = &iter->second;
sg = info->getNextSubgroup();
} else {
// First use of this group.
SQGroupInfo gr(groupNumThreads, cpus);
info =
&map_.insert(std::make_pair(groupName, std::move(gr))).first->second;
}
sg = info->getNextSubgroup();
if (sg->count) {
// we're adding to a non empty subgroup
if (sg->count) {
params.wq_fd = *(sg->fds.begin());
params.flags |= IORING_SETUP_ATTACH_WQ;
params.wq_fd = *(sg->fds.begin());
params.flags |= IORING_SETUP_ATTACH_WQ;
} else {
// First use of this subgroup, pin thread to CPU if specified.
if (info->nextCpu.size()) {
uint32_t cpu = info->nextCpu[info->nextCpuIndex];
info->nextCpuIndex = (info->nextCpuIndex + 1) % info->nextCpu.size();
params.sq_thread_cpu = cpu;
params.flags |= IORING_SETUP_SQ_AFF;
}
}
auto fd = createFd(params);
if (fd >= 0) {
if (!info) {
SQGroupInfo gr(groupNumThreads);
info = &map_.insert(std::make_pair(groupName, std::move(gr)))
.first->second;
sg = info->getNextSubgroup();
}
ret = info->add(fd, sg);
if (fd < 0) {
return 0;
}
return ret;
return info->add(fd, sg);
}
size_t removeFrom(const std::string& groupName, int fd, FDCloseFunc& func) {
......@@ -402,7 +415,6 @@ IoUringBackend::IoUringBackend(Options options)
if (options.flags & Options::Flags::POLL_SQ) {
params_.flags |= IORING_SETUP_SQPOLL;
params_.sq_thread_idle = options.sqIdle.count();
params_.sq_thread_cpu = options.sqCpu;
}
SQGroupInfoRegistry::FDCreateFunc func = [&](struct io_uring_params& params) {
......@@ -438,7 +450,11 @@ IoUringBackend::IoUringBackend(Options options)
};
auto ret = sSQGroupInfoRegistry->addTo(
options_.sqGroupName, options_.sqGroupNumThreads, func, params_);
options_.sqGroupName,
options_.sqGroupNumThreads,
func,
params_,
options.sqCpus);
if (!options_.sqGroupName.empty()) {
LOG(INFO) << "Adding to SQ poll group \"" << options_.sqGroupName
......
......@@ -104,8 +104,22 @@ class IoUringBackend : public EventBaseBackendBase {
return *this;
}
// Set the CPU as preferred for submission queue poll thread.
//
// This only has effect if POLL_SQ flag is specified.
//
// Can call multiple times to specify multiple CPUs.
Options& setSQCpu(uint32_t v) {
sqCpu = v;
sqCpus.insert(v);
return *this;
}
// Set the preferred CPUs for submission queue poll thread(s).
//
// This only has effect if POLL_SQ flag is specified.
Options& setSQCpus(std::set<uint32_t> const& cpus) {
sqCpus.insert(cpus.begin(), cpus.end());
return *this;
}
......@@ -131,7 +145,7 @@ class IoUringBackend : public EventBaseBackendBase {
std::chrono::milliseconds sqIdle{0};
std::chrono::milliseconds cqIdle{0};
uint32_t sqCpu{0};
std::set<uint32_t> sqCpus;
std::string sqGroupName;
size_t sqGroupNumThreads{1};
};
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/io/IoUringBackend.h>
#include <folly/portability/GTest.h>
namespace folly::test {
class IoUringBackendMockTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
};
using CapturedParams = std::vector<struct io_uring_params>;
static CapturedParams gCapturedParams;
static int gNextFakeFd = 1;
extern "C" {
// Crappy implementation of io_uring_queue_init_params which just captures
// params.
int io_uring_queue_init_params(
unsigned /*entries*/, struct io_uring* ring, struct io_uring_params* p) {
gCapturedParams.push_back(*p);
ring->ring_fd = gNextFakeFd++;
return 0;
}
// Need this so destructor does not blow up.
void io_uring_queue_exit(struct io_uring* ring) {
ring->ring_fd = -1;
}
}
TEST(IoUringBackendMockTest, SetupPollNoGroup) {
IoUringBackend::Options options;
options.setFlags(IoUringBackend::Options::Flags::POLL_SQ);
IoUringBackend io(options);
EXPECT_EQ(gCapturedParams.size(), 1);
EXPECT_TRUE(gCapturedParams[0].flags & IORING_SETUP_CQSIZE);
EXPECT_TRUE(gCapturedParams[0].flags & IORING_SETUP_SQPOLL);
EXPECT_FALSE(gCapturedParams[0].flags & IORING_SETUP_SQ_AFF);
EXPECT_FALSE(gCapturedParams[0].flags & IORING_SETUP_ATTACH_WQ);
}
TEST(IoUringBackendMockTest, SetupPollWithGroup) {
IoUringBackend::Options options;
options.setFlags(IoUringBackend::Options::Flags::POLL_SQ)
.setSQGroupName("test group")
.setSQGroupNumThreads(1);
IoUringBackend io1(options);
IoUringBackend io2(options);
EXPECT_EQ(gCapturedParams.size(), 2);
// We set up one thread for the group, so the first call should be normal...
EXPECT_TRUE(gCapturedParams[0].flags & IORING_SETUP_SQPOLL);
EXPECT_FALSE(gCapturedParams[0].flags & IORING_SETUP_SQ_AFF);
EXPECT_FALSE(gCapturedParams[0].flags & IORING_SETUP_ATTACH_WQ);
// second call should have attached to existing fd 1.
EXPECT_TRUE(gCapturedParams[1].flags & IORING_SETUP_SQPOLL);
EXPECT_FALSE(gCapturedParams[1].flags & IORING_SETUP_SQ_AFF);
EXPECT_TRUE(gCapturedParams[1].flags & IORING_SETUP_ATTACH_WQ);
EXPECT_EQ(gCapturedParams[1].wq_fd, 1);
}
TEST(IoUringBackendMockTest, SetupPollWithGroupAndCpu) {
IoUringBackend::Options options;
options.setFlags(IoUringBackend::Options::Flags::POLL_SQ)
.setSQGroupName("test group")
.setSQGroupNumThreads(2)
.setSQCpu(666)
.setSQCpu(42);
IoUringBackend io1(options);
IoUringBackend io2(options);
IoUringBackend io3(options);
EXPECT_EQ(gCapturedParams.size(), 3);
// The first call should create a thread with CPU affinity set.
EXPECT_TRUE(gCapturedParams[0].flags & IORING_SETUP_SQPOLL);
EXPECT_TRUE(gCapturedParams[0].flags & IORING_SETUP_SQ_AFF);
EXPECT_FALSE(gCapturedParams[0].flags & IORING_SETUP_ATTACH_WQ);
// We don't know which CPU code will choose, but it better be one or the
// other.
EXPECT_TRUE(
gCapturedParams[0].sq_thread_cpu == 666 ||
gCapturedParams[0].sq_thread_cpu == 42);
// We set two threads, so second call should create a thread with other CPU.
EXPECT_TRUE(gCapturedParams[1].flags & IORING_SETUP_CQSIZE);
EXPECT_TRUE(gCapturedParams[1].flags & IORING_SETUP_SQPOLL);
EXPECT_TRUE(gCapturedParams[1].flags & IORING_SETUP_SQ_AFF);
EXPECT_FALSE(gCapturedParams[1].flags & IORING_SETUP_ATTACH_WQ);
// This one better choose the other CPU.
EXPECT_TRUE(
gCapturedParams[1].sq_thread_cpu == 666 ||
gCapturedParams[1].sq_thread_cpu == 42);
EXPECT_NE(gCapturedParams[0].sq_thread_cpu, gCapturedParams[1].sq_thread_cpu);
// And the third thread should have attached to an existing SQ and not
// specified an affinity.
EXPECT_TRUE(gCapturedParams[2].flags & IORING_SETUP_SQPOLL);
EXPECT_FALSE(gCapturedParams[2].flags & IORING_SETUP_SQ_AFF);
EXPECT_TRUE(gCapturedParams[2].flags & IORING_SETUP_ATTACH_WQ);
}
} // namespace folly::test
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment