Commit 8961aec7 authored by Nathan Bronson's avatar Nathan Bronson Committed by Andrii Grynenko

add per node mutex for emulated futex

Summary:
The emulated futex (not used on Linux) has an optimization to defer
notification until after the bucket mutex has been unlocked, to avoid
lock collisions between the waiter and waker.  That code will have a
use-after-free problem if the waiter's condition_variable has a spurious
wakeup, which is allowed by the spec.  That code also doesn't do
anything about contention between multiple waiters.

This diff adds a mutex to each wait node, relieving the waiters from
having to acquire the bucket lock on wakeup.  Rather than trying to
perform a racy late notification, we just make sure to release the node
lock immediately after calling notify_one, which seems to have the
desired effect.

Test Plan:
1) existing unit tests
2) new unit tests

Reviewed By: delong.j@fb.com

Subscribers: boseant, njormrod

FB internal diff: D1602360
parent bdde233c
......@@ -123,17 +123,24 @@ FutexResult nativeFutexWaitImpl(void* addr,
///////////////////////////////////////////////////////
// compatibility implementation using standard C++ API
// Our emulated futex uses 4096 lists of wait nodes. There are two levels
// of locking: a per-list mutex that controls access to the list and a
// per-node mutex, condvar, and bool that are used for the actual wakeups.
// The per-node mutex allows us to do precise wakeups without thundering
// herds.
struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> {
void* const addr_;
const uint32_t waitMask_;
bool hasTimeout_;
// tricky: hold both bucket and node mutex to write, either to read
bool signaled_;
std::mutex mutex_;
std::condition_variable cond_;
EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout)
EmulatedFutexWaitNode(void* addr, uint32_t waitMask)
: addr_(addr)
, waitMask_(waitMask)
, hasTimeout_(hasTimeout)
, signaled_(false)
{
}
......@@ -162,42 +169,24 @@ std::once_flag EmulatedFutexBucket::gBucketInit;
int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
auto& bucket = EmulatedFutexBucket::bucketFor(addr);
std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
int numAwoken = 0;
boost::intrusive::list<EmulatedFutexWaitNode> deferredWakeups;
{
std::unique_lock<std::mutex> lock(bucket.mutex_);
for (auto iter = bucket.waiters_.begin();
numAwoken < count && iter != bucket.waiters_.end(); ) {
auto current = iter;
auto& node = *iter++;
if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
// We unlink, but waiter destroys the node. We must signal timed
// waiters under the lock, to avoid a race where we release the lock,
// the waiter times out and deletes the node, and then we try to
// signal it. This problem doesn't exist for unbounded waiters,
// so for them we optimize their wakeup by releasing the lock first.
bucket.waiters_.erase(current);
if (node.hasTimeout_) {
node.signaled_ = true;
node.cond_.notify_one();
} else {
deferredWakeups.push_back(node);
}
++numAwoken;
}
for (auto iter = bucket.waiters_.begin();
numAwoken < count && iter != bucket.waiters_.end(); ) {
auto current = iter;
auto& node = *iter++;
if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
++numAwoken;
// we unlink, but waiter destroys the node
bucket.waiters_.erase(current);
std::unique_lock<std::mutex> nodeLock(node.mutex_);
node.signaled_ = true;
node.cond_.notify_one();
}
}
while (!deferredWakeups.empty()) {
auto& node = deferredWakeups.front();
deferredWakeups.pop_front();
node.signaled_ = true;
node.cond_.notify_one();
}
return numAwoken;
}
......@@ -207,30 +196,39 @@ FutexResult emulatedFutexWaitImpl(
time_point<system_clock>* absSystemTime,
time_point<steady_clock>* absSteadyTime,
uint32_t waitMask) {
bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr;
EmulatedFutexWaitNode node(addr, waitMask, hasTimeout);
auto& bucket = EmulatedFutexBucket::bucketFor(addr);
std::unique_lock<std::mutex> lock(bucket.mutex_);
EmulatedFutexWaitNode node(addr, waitMask);
uint32_t actual;
memcpy(&actual, addr, sizeof(uint32_t));
if (actual != expected) {
return FutexResult::VALUE_CHANGED;
}
{
std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
uint32_t actual;
memcpy(&actual, addr, sizeof(uint32_t));
if (actual != expected) {
return FutexResult::VALUE_CHANGED;
}
bucket.waiters_.push_back(node);
} // bucketLock scope
bucket.waiters_.push_back(node);
while (!node.signaled_) {
std::cv_status status = std::cv_status::no_timeout;
if (absSystemTime != nullptr) {
status = node.cond_.wait_until(lock, *absSystemTime);
} else if (absSteadyTime != nullptr) {
status = node.cond_.wait_until(lock, *absSteadyTime);
} else {
node.cond_.wait(lock);
std::cv_status status = std::cv_status::no_timeout;
{
std::unique_lock<std::mutex> nodeLock(node.mutex_);
while (!node.signaled_ && status != std::cv_status::timeout) {
if (absSystemTime != nullptr) {
status = node.cond_.wait_until(nodeLock, *absSystemTime);
} else if (absSteadyTime != nullptr) {
status = node.cond_.wait_until(nodeLock, *absSteadyTime);
} else {
node.cond_.wait(nodeLock);
}
}
} // nodeLock scope
if (status == std::cv_status::timeout) {
if (status == std::cv_status::timeout) {
// it's not really a timeout until we unlink the unsignaled node
std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
if (!node.signaled_) {
bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
return FutexResult::TIMEDOUT;
}
......
......@@ -54,36 +54,36 @@ void run_basic_tests() {
DSched::join(thr);
}
template<template<typename> class Atom>
void run_wait_until_tests();
template <template<typename> class Atom, typename Clock>
void liveClockWaitUntilTests() {
Futex<Atom> f(0);
template <typename Clock>
void stdAtomicWaitUntilTests() {
Futex<std::atomic> f(0);
auto thrA = DSched::thread([&]{
while (true) {
typename Clock::time_point nowPlus2s = Clock::now() + seconds(2);
auto res = f.futexWaitUntil(0, nowPlus2s);
EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
if (res == FutexResult::AWOKEN) {
break;
for (int stress = 0; stress < 1000; ++stress) {
auto fp = &f; // workaround for t5336595
auto thrA = DSched::thread([fp,stress]{
while (true) {
auto deadline = Clock::now() + microseconds(1 << (stress % 20));
auto res = fp->futexWaitUntil(0, deadline);
EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
if (res == FutexResult::AWOKEN) {
break;
}
}
});
while (f.futexWake() != 1) {
std::this_thread::yield();
}
});
while (f.futexWake() != 1) {
std::this_thread::yield();
DSched::join(thrA);
}
DSched::join(thrA);
auto start = Clock::now();
EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)),
FutexResult::TIMEDOUT);
LOG(INFO) << "Futex wait timed out after waiting for "
<< duration_cast<milliseconds>(Clock::now() - start).count()
<< "ms";
<< "ms, should be ~100ms";
}
template <typename Clock>
......@@ -96,10 +96,10 @@ void deterministicAtomicWaitUntilTests() {
EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED);
}
template <>
void run_wait_until_tests<std::atomic>() {
stdAtomicWaitUntilTests<system_clock>();
stdAtomicWaitUntilTests<steady_clock>();
template<template<typename> class Atom>
void run_wait_until_tests() {
liveClockWaitUntilTests<Atom, system_clock>();
liveClockWaitUntilTests<Atom, steady_clock>();
}
template <>
......@@ -177,6 +177,11 @@ TEST(Futex, basic_live) {
run_wait_until_tests<std::atomic>();
}
TEST(Futex, basic_emulated) {
run_basic_tests<EmulatedFutexAtomic>();
run_wait_until_tests<EmulatedFutexAtomic>();
}
TEST(Futex, basic_deterministic) {
DSched sched(DSched::uniform(0));
run_basic_tests<DeterministicAtomic>();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment