Commit 4ab53568 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add signal handling support for the PollIoBackend derived backends

Summary: Add signal handling support for the PollIoBackend derived backends

Reviewed By: kevin-vigor

Differential Revision: D20612200

fbshipit-source-id: 5a92e2b46a9fa102bfc02958058679bac6c22269
parent cf7ecf62
......@@ -112,20 +112,24 @@ IoUringBackend::IoUringBackend(
entries_.reset(new IoSqe[numEntries_]);
// timer entry
timerEntry_ = &entries_[0];
timerEntry_->backend_ = this;
timerEntry_->backendCb_ = PollIoBackend::processTimerIoCb;
// build the free list - first entry is the timer entry
for (size_t i = 2; i < numEntries_; i++) {
entries_[i - 1].next_ = &entries_[i];
entries_[i - 1].backend_ = this;
entries_[i - 1].backendCb_ = PollIoBackend::processPollIoCb;
// signal entry
signalReadEntry_ = &entries_[1];
signalReadEntry_->backend_ = this;
signalReadEntry_->backendCb_ = PollIoBackend::processSignalReadIoCb;
// build the free list - first 2 entres are reserved
for (size_t i = 2; i < numEntries_; ++i) {
entries_[i].next_ = (i == (numEntries_ - 1)) ? nullptr : &entries_[i + 1];
entries_[i].backend_ = this;
entries_[i].backendCb_ = PollIoBackend::processPollIoCb;
}
entries_[numEntries_ - 1].backend_ = this;
entries_[numEntries_ - 1].backendCb_ = PollIoBackend::processPollIoCb;
freeHead_ = &entries_[1];
freeHead_ = &entries_[2];
// we need to call the init before adding the timer fd
// so we avoid a deadlock - waiting for the queue to be drained
......@@ -137,7 +141,7 @@ IoUringBackend::IoUringBackend(
}
// add the timer fd
if (!addTimerFd()) {
if (!addTimerFd() || !addSignalFds()) {
cleanup();
entries_.reset();
throw NotAvailable("io_uring_submit error");
......
......@@ -14,12 +14,14 @@
* limitations under the License.
*/
#include <signal.h>
#include <sys/timerfd.h>
#include <atomic>
#include <folly/FileUtil.h>
#include <folly/Likely.h>
#include <folly/SpinLock.h>
#include <folly/experimental/io/PollIoBackend.h>
#include <folly/portability/Sockets.h>
#include <folly/synchronization/CallOnce.h>
......@@ -31,6 +33,85 @@ extern "C" FOLLY_ATTR_WEAK void eb_poll_loop_post_hook(
uint64_t call_time,
int ret);
namespace {
struct SignalRegistry {
struct SigInfo {
struct sigaction sa_ {};
size_t count_{0};
};
using SignalMap = std::map<int, SigInfo>;
constexpr SignalRegistry() {}
~SignalRegistry() {}
void notify(int sig);
void setNotifyFd(int sig, int fd);
// lock protecting the signal map
folly::MicroSpinLock mapLock_ = {0};
std::unique_ptr<SignalMap> map_;
std::atomic<int> notifyFd_{-1};
};
SignalRegistry sSignalRegistry;
static void __cdecl evSigHandler(int sig) {
sSignalRegistry.notify(sig);
}
void SignalRegistry::notify(int sig) {
// use try_lock in case somebody already has the lock
if (mapLock_.try_lock()) {
int fd = notifyFd_.load();
if (fd >= 0) {
uint8_t sigNum = static_cast<uint8_t>(sig);
::write(fd, &sigNum, 1);
}
mapLock_.unlock();
}
}
void SignalRegistry::setNotifyFd(int sig, int fd) {
folly::MSLGuard g(mapLock_);
if (fd >= 0) {
if (!map_) {
map_ = std::make_unique<SignalMap>();
}
// switch the fd
notifyFd_.store(fd);
auto iter = (*map_).find(sig);
if (iter != (*map_).end()) {
iter->second.count_++;
} else {
auto& entry = (*map_)[sig];
entry.count_ = 1;
struct sigaction sa = {};
sa.sa_handler = evSigHandler;
sa.sa_flags |= SA_RESTART;
::sigfillset(&sa.sa_mask);
if (::sigaction(sig, &sa, &entry.sa_) == -1) {
(*map_).erase(sig);
}
}
} else {
notifyFd_.store(fd);
if (map_) {
auto iter = (*map_).find(sig);
if ((iter != (*map_).end()) && (--iter->second.count_ == 0)) {
auto entry = iter->second;
(*map_).erase(iter);
// just restore
::sigaction(sig, &entry.sa_, nullptr);
}
}
}
}
} // namespace
namespace folly {
PollIoBackend::TimerEntry::TimerEntry(
Event* event,
......@@ -39,6 +120,26 @@ PollIoBackend::TimerEntry::TimerEntry(
setExpireTime(timeout);
}
PollIoBackend::SocketPair::SocketPair() {
if (::socketpair(AF_UNIX, SOCK_STREAM, 0, fds_.data())) {
throw std::runtime_error("socketpair error");
}
// set the sockets to non blocking mode
for (auto fd : fds_) {
auto flags = ::fcntl(fd, F_GETFL, 0);
::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
}
PollIoBackend::SocketPair::~SocketPair() {
for (auto fd : fds_) {
if (fd >= 0) {
::close(fd);
}
}
}
PollIoBackend::PollIoBackend(size_t capacity, size_t maxSubmit, size_t maxGet)
: capacity_(capacity),
numEntries_(capacity),
......@@ -61,6 +162,14 @@ bool PollIoBackend::addTimerFd() {
return (1 == submitOne(timerEntry_));
}
bool PollIoBackend::addSignalFds() {
auto* entry = allocSubmissionEntry(); // this can be nullptr
signalReadEntry_->prepPollAdd(
entry, signalFds_.readFd(), POLLIN, false /*registerFd*/);
return (1 == submitOne(signalReadEntry_));
}
void PollIoBackend::scheduleTimeout() {
if (!timerChanged_) {
return;
......@@ -176,6 +285,56 @@ size_t PollIoBackend::processTimers() {
return ret;
}
void PollIoBackend::addSignalEvent(Event& event) {
auto* ev = event.getEvent();
signals_[ev->ev_fd].insert(&event);
// we pass the write fd for notifications
sSignalRegistry.setNotifyFd(ev->ev_fd, signalFds_.writeFd());
}
void PollIoBackend::removeSignalEvent(Event& event) {
auto* ev = event.getEvent();
auto iter = signals_.find(ev->ev_fd);
if (iter != signals_.end()) {
sSignalRegistry.setNotifyFd(ev->ev_fd, -1);
}
}
size_t PollIoBackend::processSignals() {
size_t ret = 0;
static constexpr auto kNumEntries = NSIG * 2;
static_assert(
NSIG < 256, "Use a different data type to cover all the signal values");
std::array<bool, NSIG> processed{};
std::array<uint8_t, kNumEntries> signals;
ssize_t num =
folly::readNoInt(signalFds_.readFd(), signals.data(), signals.size());
for (ssize_t i = 0; i < num; i++) {
int signum = static_cast<int>(signals[i]);
if ((signum >= 0) && (signum < static_cast<int>(processed.size())) &&
!processed[signum]) {
processed[signum] = true;
auto iter = signals_.find(signum);
if (iter != signals_.end()) {
auto& set = iter->second;
for (auto& event : set) {
auto* ev = event->getEvent();
ev->ev_res = 0;
event_ref_flags(ev) |= EVLIST_ACTIVE;
(*event_ref_callback(ev))(
(int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
event_ref_flags(ev) &= ~EVLIST_ACTIVE;
}
}
}
}
// add the signal fd(s) back
addSignalFds();
return ret;
}
PollIoBackend::IoCb* PollIoBackend::allocIoCb() {
// try to allocate from the pool first
if (FOLLY_LIKELY(freeHead_ != nullptr)) {
......@@ -286,7 +445,7 @@ int PollIoBackend::eb_event_base_loop(int flags) {
submitList(submitList_, waitForEvents);
if (!numInsertedEvents_ && timers_.empty()) {
if (!numInsertedEvents_ && timers_.empty() && signals_.empty()) {
return 1;
}
......@@ -309,16 +468,26 @@ int PollIoBackend::eb_event_base_loop(int flags) {
processTimers_ = false;
}
size_t numProcessedSignals = 0;
if (processSignals_ && !loopBreak_) {
numProcessedSignals = processSignals();
processSignals_ = false;
}
if (!activeEvents_.empty() && !loopBreak_) {
processActiveEvents();
if (flags & EVLOOP_ONCE) {
done = true;
}
} else if (flags & EVLOOP_NONBLOCK) {
done = true;
if (signals_.empty()) {
done = true;
}
}
if (!done && numProcessedTimers && (flags & EVLOOP_ONCE)) {
if (!done && (numProcessedTimers || numProcessedSignals) &&
(flags & EVLOOP_ONCE)) {
done = true;
}
}
......@@ -343,8 +512,13 @@ int PollIoBackend::eb_event_add(Event& event, const struct timeval* timeout) {
return 0;
}
// TBD - signal later
if ((ev->ev_events & (EV_READ | EV_WRITE | EV_SIGNAL)) &&
if (ev->ev_events & EV_SIGNAL) {
event_ref_flags(ev) |= EVLIST_INSERTED;
addSignalEvent(event);
return 0;
}
if ((ev->ev_events & (EV_READ | EV_WRITE)) &&
!(event_ref_flags(ev) & (EVLIST_INSERTED | EVLIST_ACTIVE))) {
auto* iocb = allocIoCb();
CHECK(iocb);
......@@ -403,6 +577,12 @@ int PollIoBackend::eb_event_del(Event& event) {
return -1;
}
if (ev->ev_events & EV_SIGNAL) {
event_ref_flags(ev) &= ~(EVLIST_INSERTED | EVLIST_ACTIVE);
removeSignalEvent(event);
return 0;
}
auto* iocb = reinterpret_cast<IoCb*>(event.getUserData());
bool wasLinked = iocb->is_linked();
iocb->resetEvent();
......
......@@ -21,6 +21,7 @@
#include <chrono>
#include <map>
#include <set>
#include <vector>
#include <boost/intrusive/list.hpp>
......@@ -135,6 +136,27 @@ class PollIoBackend : public EventBaseBackendBase {
}
};
class SocketPair {
public:
SocketPair();
SocketPair(const SocketPair&) = delete;
SocketPair& operator=(const SocketPair&) = delete;
~SocketPair();
int readFd() const {
return fds_[1];
}
int writeFd() const {
return fds_[0];
}
private:
std::array<int, 2> fds_ = {-1, -1};
};
static FOLLY_ALWAYS_INLINE uint32_t getPollFlags(short events) {
uint32_t ret = 0;
if (events & EV_READ) {
......@@ -191,6 +213,22 @@ class PollIoBackend : public EventBaseBackendBase {
backend->setProcessTimers();
}
// signal handling
void addSignalEvent(Event& event);
void removeSignalEvent(Event& event);
bool addSignalFds();
size_t processSignals();
FOLLY_ALWAYS_INLINE void setProcessSignals() {
processSignals_ = true;
}
static void processSignalReadIoCb(
PollIoBackend* backend,
IoCb* /*unused*/,
int64_t /*unused*/) {
backend->setProcessSignals();
}
void processPollIo(IoCb* ioCb, int64_t res) noexcept;
IoCb* FOLLY_NULLABLE allocIoCb();
......@@ -215,6 +253,7 @@ class PollIoBackend : public EventBaseBackendBase {
size_t capacity_;
size_t numEntries_;
IoCb* timerEntry_{nullptr};
IoCb* signalReadEntry_{nullptr};
IoCb* freeHead_{nullptr};
// timer related
......@@ -227,6 +266,10 @@ class PollIoBackend : public EventBaseBackendBase {
std::map<Event*, std::chrono::time_point<std::chrono::steady_clock>>
eventToTimers_;
// signal related
SocketPair signalFds_;
std::map<int, std::set<Event*>> signals_;
// submit
size_t maxSubmit_;
IoCbList submitList_;
......@@ -238,6 +281,7 @@ class PollIoBackend : public EventBaseBackendBase {
bool loopBreak_{false};
bool shuttingDown_{false};
bool processTimers_{false};
bool processSignals_{false};
size_t numInsertedEvents_{0};
IoCbList activeEvents_;
// number of IoCb instances in use
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/io/IoUringBackend.h>
#include <folly/io/async/test/AsyncSignalHandlerTestLib.h>
namespace folly {
namespace test {
static constexpr size_t kCapacity = 16 * 1024;
static constexpr size_t kMaxSubmit = 128;
static constexpr size_t kMaxGet = static_cast<size_t>(-1);
struct IoUringBackendProvider {
static std::unique_ptr<folly::EventBaseBackendBase> getBackend() {
try {
return std::make_unique<folly::IoUringBackend>(
kCapacity, kMaxSubmit, kMaxGet, false /* useRegisteredFds */);
} catch (const IoUringBackend::NotAvailable&) {
return nullptr;
}
}
};
INSTANTIATE_TYPED_TEST_CASE_P(
AsyncSignalHandlerTest,
AsyncSignalHandlerTest,
IoUringBackendProvider);
} // namespace test
} // namespace folly
......@@ -14,53 +14,19 @@
* limitations under the License.
*/
#include <folly/io/async/AsyncSignalHandler.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/test/AsyncSignalHandlerTestLib.h>
#include <folly/portability/GTest.h>
using namespace folly;
namespace {
class TestSignalHandler : public AsyncSignalHandler {
public:
using AsyncSignalHandler::AsyncSignalHandler;
void signalReceived(int /* signum */) noexcept override {
called = true;
namespace folly {
namespace test {
struct DefaultBackendProvider {
static std::unique_ptr<folly::EventBaseBackendBase> getBackend() {
return folly::EventBase::getDefaultBackend();
}
bool called{false};
};
} // namespace
TEST(AsyncSignalHandler, basic) {
EventBase evb;
TestSignalHandler handler{&evb};
handler.registerSignalHandler(SIGUSR1);
kill(getpid(), SIGUSR1);
EXPECT_FALSE(handler.called);
evb.loopOnce(EVLOOP_NONBLOCK);
EXPECT_TRUE(handler.called);
}
TEST(AsyncSignalHandler, attachEventBase) {
TestSignalHandler handler{nullptr};
EXPECT_FALSE(handler.getEventBase());
EventBase evb;
handler.attachEventBase(&evb);
EXPECT_EQ(&evb, handler.getEventBase());
handler.registerSignalHandler(SIGUSR1);
kill(getpid(), SIGUSR1);
EXPECT_FALSE(handler.called);
evb.loopOnce(EVLOOP_NONBLOCK);
EXPECT_TRUE(handler.called);
handler.unregisterSignalHandler(SIGUSR1);
handler.detachEventBase();
EXPECT_FALSE(handler.getEventBase());
}
INSTANTIATE_TYPED_TEST_CASE_P(
AsyncSignalHandlerTest,
AsyncSignalHandlerTest,
DefaultBackendProvider);
} // namespace test
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/io/async/AsyncSignalHandler.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/test/Util.h>
#include <folly/portability/GTest.h>
#define FOLLY_SKIP_IF_NULLPTR_BACKEND(evb) \
auto backend = TypeParam::getBackend(); \
SKIP_IF(!backend) << "Backend not available"; \
EventBase evb(std::move(backend))
namespace folly {
namespace test {
class TestSignalHandler : public AsyncSignalHandler {
public:
using AsyncSignalHandler::AsyncSignalHandler;
void signalReceived(int /* signum */) noexcept override {
called = true;
}
bool called{false};
};
template <typename T>
class AsyncSignalHandlerTest : public ::testing::Test {
public:
AsyncSignalHandlerTest() = default;
};
TYPED_TEST_CASE_P(AsyncSignalHandlerTest);
TYPED_TEST_P(AsyncSignalHandlerTest, basic) {
FOLLY_SKIP_IF_NULLPTR_BACKEND(evb);
TestSignalHandler handler{&evb};
handler.registerSignalHandler(SIGUSR1);
kill(getpid(), SIGUSR1);
EXPECT_FALSE(handler.called);
evb.loopOnce(EVLOOP_NONBLOCK);
EXPECT_TRUE(handler.called);
}
TYPED_TEST_P(AsyncSignalHandlerTest, attachEventBase) {
TestSignalHandler handler{nullptr};
EXPECT_FALSE(handler.getEventBase());
FOLLY_SKIP_IF_NULLPTR_BACKEND(evb);
handler.attachEventBase(&evb);
EXPECT_EQ(&evb, handler.getEventBase());
handler.registerSignalHandler(SIGUSR1);
kill(getpid(), SIGUSR1);
EXPECT_FALSE(handler.called);
evb.loopOnce(EVLOOP_NONBLOCK);
EXPECT_TRUE(handler.called);
handler.unregisterSignalHandler(SIGUSR1);
handler.detachEventBase();
EXPECT_FALSE(handler.getEventBase());
}
REGISTER_TYPED_TEST_CASE_P(AsyncSignalHandlerTest, basic, attachEventBase);
} // namespace test
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment