Commit 2df6e3b0 authored by Doron Roberts-Kedes's avatar Doron Roberts-Kedes Committed by Facebook Github Bot

folly DeterministicSchedule: joinAll() as a workaround to thread local destructor issues

Summary:
There was a problem with DeterministicSchedule when threads issued shared access stores/loads to atomics during thread local destructors. Thread local destructors run after a thread terminates, and therefore after all of the DSched cleanup work in beforeThreadExit() is already complete.

The new DSched::joinAll() functions works around this problem by blocking the parent until all child threads are blocked at the end of beforeThreadExit(). The parent then lets each child thread proceed one at a time through the thread local destructor with a paired sem_post and .join().

Reviewed By: djwatson

Differential Revision: D13381250

fbshipit-source-id: 80ab39b3f2f71ee69dfbaec9950dfc5908874353
parent 711c7a48
...@@ -333,8 +333,10 @@ struct BufferedAtomic { ...@@ -333,8 +333,10 @@ struct BufferedAtomic {
T doLoad(std::memory_order mo, bool rmw = false) const { T doLoad(std::memory_order mo, bool rmw = false) const {
// Static destructors that outlive DSched instance may load atomics // Static destructors that outlive DSched instance may load atomics
if (!DeterministicSchedule::isActive()) { if (!DeterministicSchedule::isActive()) {
auto prev = prevUnguardedAccess.exchange(std::this_thread::get_id()); if (!DeterministicSchedule::isCurrentThreadExiting()) {
CHECK(prev == std::thread::id() || prev == std::this_thread::get_id()); auto prev = prevUnguardedAccess.exchange(std::this_thread::get_id());
CHECK(prev == std::thread::id() || prev == std::this_thread::get_id());
}
return getBuf().loadDirect(); return getBuf().loadDirect();
} }
ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo(); ThreadInfo& threadInfo = DeterministicSchedule::getCurrentThreadInfo();
...@@ -345,8 +347,10 @@ struct BufferedAtomic { ...@@ -345,8 +347,10 @@ struct BufferedAtomic {
void doStore(T val, std::memory_order mo, bool rmw = false) { void doStore(T val, std::memory_order mo, bool rmw = false) {
// Static destructors that outlive DSched instance may store to atomics // Static destructors that outlive DSched instance may store to atomics
if (!DeterministicSchedule::isActive()) { if (!DeterministicSchedule::isActive()) {
auto prev = prevUnguardedAccess.exchange(std::this_thread::get_id()); if (!DeterministicSchedule::isCurrentThreadExiting()) {
CHECK(prev == std::thread::id() || prev == std::this_thread::get_id()); auto prev = prevUnguardedAccess.exchange(std::this_thread::get_id());
CHECK(prev == std::thread::id() || prev == std::this_thread::get_id());
}
getBuf().storeDirect(val); getBuf().storeDirect(val);
return; return;
} }
......
...@@ -204,16 +204,3 @@ TEST(BufferedAtomic, single_thread_unguarded_access) { ...@@ -204,16 +204,3 @@ TEST(BufferedAtomic, single_thread_unguarded_access) {
ASSERT_EQ(1, x.load()); ASSERT_EQ(1, x.load());
} }
TEST(BufferedAtomic, multiple_thread_unguarded_access) {
DSched* sched = new DSched(DSched::uniform(0));
DeterministicAtomicImpl<int, DeterministicSchedule, BufferedAtomic> x(0);
delete sched;
// simulate static construction/destruction or access to shared
// DeterministicAtomic in pthread_setspecific callbacks after
// DeterministicSchedule::beforeThreadAccess() has been run.
ASSERT_EQ(0, x.load());
auto t = std::thread(
[&]() { ASSERT_DEATH(x.store(1), "prev == std::thread::id()"); });
t.join();
}
...@@ -32,6 +32,7 @@ namespace test { ...@@ -32,6 +32,7 @@ namespace test {
FOLLY_TLS sem_t* DeterministicSchedule::tls_sem; FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched; FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
FOLLY_TLS bool DeterministicSchedule::tls_exiting;
FOLLY_TLS DSchedThreadId DeterministicSchedule::tls_threadId; FOLLY_TLS DSchedThreadId DeterministicSchedule::tls_threadId;
thread_local AuxAct DeterministicSchedule::tls_aux_act; thread_local AuxAct DeterministicSchedule::tls_aux_act;
AuxChk DeterministicSchedule::aux_chk; AuxChk DeterministicSchedule::aux_chk;
...@@ -125,6 +126,7 @@ DeterministicSchedule::DeterministicSchedule( ...@@ -125,6 +126,7 @@ DeterministicSchedule::DeterministicSchedule(
assert(tls_sched == nullptr); assert(tls_sched == nullptr);
assert(tls_aux_act == nullptr); assert(tls_aux_act == nullptr);
tls_exiting = false;
tls_sem = new sem_t; tls_sem = new sem_t;
sem_init(tls_sem, 0, 1); sem_init(tls_sem, 0, 1);
sems_.push_back(tls_sem); sems_.push_back(tls_sem);
...@@ -287,6 +289,7 @@ sem_t* DeterministicSchedule::beforeThreadCreate() { ...@@ -287,6 +289,7 @@ sem_t* DeterministicSchedule::beforeThreadCreate() {
void DeterministicSchedule::afterThreadCreate(sem_t* sem) { void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
assert(tls_sem == nullptr); assert(tls_sem == nullptr);
assert(tls_sched == nullptr); assert(tls_sched == nullptr);
tls_exiting = false;
tls_sem = sem; tls_sem = sem;
tls_sched = this; tls_sched = this;
bool started = false; bool started = false;
...@@ -317,32 +320,64 @@ void DeterministicSchedule::beforeThreadExit() { ...@@ -317,32 +320,64 @@ void DeterministicSchedule::beforeThreadExit() {
active_.erase(std::this_thread::get_id()); active_.erase(std::this_thread::get_id());
if (sems_.size() > 0) { if (sems_.size() > 0) {
FOLLY_TEST_DSCHED_VLOG("exiting"); FOLLY_TEST_DSCHED_VLOG("exiting");
/* Wait here so that parent thread can control when the thread
* enters the thread local destructors. */
exitingSems_[std::this_thread::get_id()] = tls_sem;
afterSharedAccess(); afterSharedAccess();
sem_wait(tls_sem);
} }
tls_sched = nullptr;
tls_aux_act = nullptr;
tls_exiting = true;
sem_destroy(tls_sem); sem_destroy(tls_sem);
delete tls_sem; delete tls_sem;
tls_sem = nullptr; tls_sem = nullptr;
tls_sched = nullptr;
tls_aux_act = nullptr;
} }
void DeterministicSchedule::join(std::thread& child) { void DeterministicSchedule::waitForBeforeThreadExit(std::thread& child) {
assert(tls_sched == this);
beforeSharedAccess();
assert(tls_sched->joins_.count(child.get_id()) == 0);
if (tls_sched->active_.count(child.get_id())) {
sem_t* sem = descheduleCurrentThread();
tls_sched->joins_.insert({child.get_id(), sem});
afterSharedAccess();
// Wait to be scheduled by exiting child thread
beforeSharedAccess();
assert(!tls_sched->active_.count(child.get_id()));
}
afterSharedAccess();
}
void DeterministicSchedule::joinAll(std::vector<std::thread>& children) {
auto sched = tls_sched; auto sched = tls_sched;
if (sched) { if (sched) {
beforeSharedAccess(); // Wait until all children are about to exit
assert(sched->joins_.count(child.get_id()) == 0); for (auto& child : children) {
if (sched->active_.count(child.get_id())) { sched->waitForBeforeThreadExit(child);
sem_t* sem = descheduleCurrentThread();
sched->joins_.insert({child.get_id(), sem});
afterSharedAccess();
// Wait to be scheduled by exiting child thread
beforeSharedAccess();
assert(!sched->active_.count(child.get_id()));
} }
afterSharedAccess(); }
atomic_thread_fence(std::memory_order_seq_cst);
/* Let each child thread proceed one at a time to protect
* shared access during thread local destructors.*/
for (auto& child : children) {
if (sched) {
sem_post(sched->exitingSems_[child.get_id()]);
}
child.join();
}
}
void DeterministicSchedule::join(std::thread& child) {
auto sched = tls_sched;
if (sched) {
sched->waitForBeforeThreadExit(child);
} }
atomic_thread_fence(std::memory_order_seq_cst); atomic_thread_fence(std::memory_order_seq_cst);
FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id()); FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
if (sched) {
sem_post(sched->exitingSems_[child.get_id()]);
}
child.join(); child.join();
} }
......
...@@ -224,6 +224,15 @@ class DeterministicSchedule : boost::noncopyable { ...@@ -224,6 +224,15 @@ class DeterministicSchedule : boost::noncopyable {
/** Calls child.join() as part of a deterministic schedule. */ /** Calls child.join() as part of a deterministic schedule. */
static void join(std::thread& child); static void join(std::thread& child);
/** Waits for each thread in children to reach the end of their
* thread function without allowing them to fully terminate. Then,
* allow one child at a time to fully terminate and join each one.
* This functionality is important to protect shared access that can
* take place after beforeThreadExit() has already been invoked,
* for example when executing thread local destructors.
*/
static void joinAll(std::vector<std::thread>& children);
/** Calls sem_post(sem) as part of a deterministic schedule. */ /** Calls sem_post(sem) as part of a deterministic schedule. */
static void post(sem_t* sem); static void post(sem_t* sem);
...@@ -260,6 +269,13 @@ class DeterministicSchedule : boost::noncopyable { ...@@ -260,6 +269,13 @@ class DeterministicSchedule : boost::noncopyable {
/** Remove the current thread's semaphore from sems_ */ /** Remove the current thread's semaphore from sems_ */
static sem_t* descheduleCurrentThread(); static sem_t* descheduleCurrentThread();
/** Returns true if the current thread has already completed
* the thread function, for example if the thread is executing
* thread local destructors. */
static bool isCurrentThreadExiting() {
return tls_exiting;
}
/** Add sem back into sems_ */ /** Add sem back into sems_ */
static void reschedule(sem_t* sem); static void reschedule(sem_t* sem);
...@@ -279,6 +295,7 @@ class DeterministicSchedule : boost::noncopyable { ...@@ -279,6 +295,7 @@ class DeterministicSchedule : boost::noncopyable {
private: private:
static FOLLY_TLS sem_t* tls_sem; static FOLLY_TLS sem_t* tls_sem;
static FOLLY_TLS DeterministicSchedule* tls_sched; static FOLLY_TLS DeterministicSchedule* tls_sched;
static FOLLY_TLS bool tls_exiting;
static FOLLY_TLS DSchedThreadId tls_threadId; static FOLLY_TLS DSchedThreadId tls_threadId;
static thread_local AuxAct tls_aux_act; static thread_local AuxAct tls_aux_act;
static AuxChk aux_chk; static AuxChk aux_chk;
...@@ -287,6 +304,7 @@ class DeterministicSchedule : boost::noncopyable { ...@@ -287,6 +304,7 @@ class DeterministicSchedule : boost::noncopyable {
std::vector<sem_t*> sems_; std::vector<sem_t*> sems_;
std::unordered_set<std::thread::id> active_; std::unordered_set<std::thread::id> active_;
std::unordered_map<std::thread::id, sem_t*> joins_; std::unordered_map<std::thread::id, sem_t*> joins_;
std::unordered_map<std::thread::id, sem_t*> exitingSems_;
std::vector<ThreadInfo> threadInfoMap_; std::vector<ThreadInfo> threadInfoMap_;
ThreadTimestamps seqCstFenceOrder_; ThreadTimestamps seqCstFenceOrder_;
...@@ -303,6 +321,7 @@ class DeterministicSchedule : boost::noncopyable { ...@@ -303,6 +321,7 @@ class DeterministicSchedule : boost::noncopyable {
sem_t* beforeThreadCreate(); sem_t* beforeThreadCreate();
void afterThreadCreate(sem_t*); void afterThreadCreate(sem_t*);
void beforeThreadExit(); void beforeThreadExit();
void waitForBeforeThreadExit(std::thread& child);
/** Calls user-defined auxiliary function (if any) */ /** Calls user-defined auxiliary function (if any) */
void callAux(bool); void callAux(bool);
}; };
......
...@@ -91,9 +91,7 @@ TEST(DeterministicSchedule, buggyAdd) { ...@@ -91,9 +91,7 @@ TEST(DeterministicSchedule, buggyAdd) {
} while (true); } while (true);
}); // thread lambda }); // thread lambda
} // for t } // for t
for (auto& t : threads) { DeterministicSchedule::joinAll(threads);
DeterministicSchedule::join(t);
}
if (!bug) { if (!bug) {
EXPECT_EQ(test.load(), baseline.load()); EXPECT_EQ(test.load(), baseline.load());
} else { } else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment