Commit 74502e3c authored by Dave Watson's avatar Dave Watson Committed by Facebook Github Bot

Fix ThreadLocal races

Summary:
I misread the ThreadLocal docs, thread destruction functions do *not* grab the accessAllTHreads_ lock
unless you use *strict* mode, and even then, it is only a read lock.

Easy enough to make the thread-destruction global bits to be atomic / use folly::Synchronized.

Reviewed By: yfeldblum

Differential Revision: D6592905

fbshipit-source-id: 4ae600dff4c8c04751483a452ca7c07ef3f26380
parent 58399f25
......@@ -36,9 +36,8 @@ namespace detail {
template <typename Tag>
class ThreadCachedInts {
// These are only accessed under the ThreadLocal lock.
int64_t orphan_inc_[2]{0, 0};
int64_t orphan_dec_[2]{0, 0};
std::atomic<int64_t> orphan_inc_[2];
std::atomic<int64_t> orphan_dec_[2];
folly::detail::Futex<> waiting_;
class Integer {
......@@ -49,10 +48,16 @@ class ThreadCachedInts {
std::atomic<int64_t> inc_[2];
std::atomic<int64_t> dec_[2];
~Integer() noexcept {
ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed);
ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed);
ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed);
ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed);
// Increment counts must be set before decrement counts
ints_->orphan_inc_[0].fetch_add(
inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
ints_->orphan_inc_[1].fetch_add(
inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
folly::asymmetricLightBarrier(); // B
ints_->orphan_dec_[0].fetch_add(
dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
ints_->orphan_dec_[1].fetch_add(
dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
ints_->waiting_.store(0, std::memory_order_release);
ints_->waiting_.futexWake();
}
......@@ -99,7 +104,7 @@ class ThreadCachedInts {
}
int64_t readFull(uint8_t epoch) {
int64_t full = 0;
int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
// Matches A - ensure all threads have seen new value of version,
// *and* that we see current values of counters in readFull()
......@@ -125,8 +130,7 @@ class ThreadCachedInts {
}
// orphan is read behind accessAllThreads lock
auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch];
return res;
return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
}
void waitForZero(uint8_t phase) {
......@@ -158,10 +162,10 @@ class ThreadCachedInts {
int_cache_->inc_[0].store(0, std::memory_order_relaxed);
int_cache_->inc_[1].store(0, std::memory_order_relaxed);
}
orphan_inc_[0] = 0;
orphan_inc_[1] = 0;
orphan_dec_[0] = 0;
orphan_dec_[1] = 0;
orphan_inc_[0].store(0, std::memory_order_relaxed);
orphan_inc_[1].store(0, std::memory_order_relaxed);
orphan_dec_[0].store(0, std::memory_order_relaxed);
orphan_dec_[1].store(0, std::memory_order_relaxed);
}
};
......
......@@ -19,6 +19,7 @@
#include <atomic>
#include <folly/Function.h>
#include <folly/Synchronized.h>
#include <folly/ThreadLocal.h>
#include <glog/logging.h>
......@@ -81,7 +82,7 @@ class ThreadCachedLists : public ThreadCachedListsBase {
// Push list to the global list.
void pushGlobal(ListHead& list);
ListHead ghead_;
folly::Synchronized<ListHead> ghead_;
struct TLHead : public AtomicListHead {
ThreadCachedLists* parent_;
......@@ -90,7 +91,7 @@ class ThreadCachedLists : public ThreadCachedListsBase {
TLHead(ThreadCachedLists* parent) : parent_(parent) {}
~TLHead() {
parent_->ghead_.splice(*this);
parent_->ghead_->splice(*this);
}
};
......@@ -146,7 +147,7 @@ void ThreadCachedLists<Tag>::collect(ListHead& list) {
list.splice(thr);
}
list.splice(ghead_);
list.splice(*ghead_.wlock());
}
template <typename Tag>
......
......@@ -24,12 +24,11 @@
#include <folly/Random.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
using namespace folly;
DEFINE_int64(iters, 100000, "Number of iterations");
DEFINE_int64(threads, 32, "Number of threads");
DEFINE_uint64(threads, 32, "Number of threads");
TEST(RcuTest, Basic) {
auto foo = new int(2);
......@@ -116,7 +115,7 @@ TEST(RcuTest, Stress) {
for (uint i = 0; i < sz; i++) {
ints[i].store(new int(0));
}
for (int th = 0; th < FLAGS_threads; th++) {
for (unsigned th = 0; th < FLAGS_threads; th++) {
threads.push_back(std::thread([&]() {
for (int i = 0; i < FLAGS_iters / 100; i++) {
rcu_reader g;
......@@ -148,11 +147,16 @@ TEST(RcuTest, Stress) {
}
done = true;
updater.join();
// Cleanup for asan
synchronize_rcu();
for (uint i = 0; i < sz; i++) {
delete ints[i].exchange(nullptr);
}
}
TEST(RcuTest, Synchronize) {
std::vector<std::thread> threads;
for (int th = 0; th < FLAGS_threads; th++) {
for (unsigned th = 0; th < FLAGS_threads; th++) {
threads.push_back(std::thread([&]() {
for (int i = 0; i < 10; i++) {
synchronize_rcu();
......@@ -197,14 +201,11 @@ TEST(RcuTest, MoveReaderBetweenThreads) {
}
TEST(RcuTest, ForkTest) {
folly::Baton<> b;
rcu_token epoch;
std::thread t([&]() {
epoch = rcu_default_domain()->lock_shared();
b.post();
});
t.detach();
b.wait();
t.join();
auto pid = fork();
if (pid) {
// parent
......@@ -221,22 +222,21 @@ TEST(RcuTest, ForkTest) {
}
}
TEST(RcuTest, CoreLocalList) {
TEST(RcuTest, ThreadLocalList) {
struct TTag;
folly::detail::ThreadCachedLists<TTag> lists;
int numthreads = 32;
std::vector<std::thread> threads;
std::atomic<int> done{0};
for (int tr = 0; tr < numthreads; tr++) {
threads.push_back(std::thread([&]() {
std::vector<std::thread> threads{FLAGS_threads};
std::atomic<unsigned long> done{FLAGS_threads};
for (auto& tr : threads) {
tr = std::thread([&]() {
for (int i = 0; i < FLAGS_iters; i++) {
auto node = new folly::detail::ThreadCachedListsBase::Node;
lists.push(node);
}
done++;
}));
--done;
});
}
while (done.load() != numthreads) {
while (done.load() > 0) {
folly::detail::ThreadCachedLists<TTag>::ListHead list{};
lists.collect(list);
list.forEach([](folly::detail::ThreadCachedLists<TTag>::Node* node) {
......@@ -246,6 +246,11 @@ TEST(RcuTest, CoreLocalList) {
for (auto& thread : threads) {
thread.join();
}
// Run cleanup pass one more time to make ASAN happy
folly::detail::ThreadCachedLists<TTag>::ListHead list{};
lists.collect(list);
list.forEach(
[](folly::detail::ThreadCachedLists<TTag>::Node* node) { delete node; });
}
TEST(RcuTest, ThreadDeath) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment