Commit b984cc9d authored by Robin Cheng's avatar Robin Cheng Committed by Facebook GitHub Bot

Make ThreadCachedInts TSAN-friendly.

Summary:
ThreadCachedInts uses asymmetric barriers to heavily optimize the reader side. TSAN does not support (symmetric or asymmetric) barriers. In this particular case, this diff replaces the memory order for accessing increments and decrements with memory_order_seq_cst only when running under TSAN, and that, I think, is a valid alternative to the barriers because it ensures a total order between all atomic accesses to increments and decrements (so that, when we read all decrements before all increments, we make sure that any writes to increments are read as long as decrements are read, which seems to be the purpose of the barriers).

Introduced additional tests to RcuTest and to AtomicReadMostlyMainPtrTest. The problem with existing tests is that this only triggers TSAN when the counter was already 0 when calling synchronize(), not when synchronize() had to invoke futexWait, because futexWait is itself a synchronization point.

Reviewed By: davidtgoldblatt

Differential Revision: D24029131

fbshipit-source-id: 57c7bf32061868ccd5a4e20154c8c7db6e4eeef5
parent bb70495b
......@@ -326,7 +326,7 @@ TEST(AtomicReadMostlyMainPtrStressTest, ReadOnly) {
}
}
TEST(AtomicReadMostlyMainPtrStressTest, ReadWrite) {
TEST(AtomicReadMostlyMainPtrStressTest, ReadWriteConsistency) {
const static int kReaders = 4;
const static int kWriters = 4;
// This gives a test that runs for about 4 seconds on my machine; that's about
......@@ -415,3 +415,46 @@ TEST(AtomicReadMostlyMainPtrStressTest, ReadWrite) {
// is highly unflakey.
EXPECT_GE(casFailures.load(), 10);
}
TEST(AtomicReadMostlyMainPtrStressTest, ReadWriteTsan) {
// Do purely read and writes - we don't do anything else, to avoid
// extra synchronization that interferes with TSAN.
AtomicReadMostlyMainPtr<int> ptr(std::make_shared<int>(0));
std::atomic<bool> stop{false};
const static int kReaders = 4;
const static int kWriters = 4;
std::vector<std::thread> readers(kReaders);
std::vector<std::thread> writers(kWriters);
for (auto& thread : readers) {
thread = std::thread([&] {
while (!stop.load(std::memory_order_relaxed)) {
ptr.load(std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
break;
}
});
}
for (auto& thread : writers) {
thread = std::thread([&] {
while (!stop.load(std::memory_order_relaxed)) {
auto newValue = std::make_shared<int>(0);
ptr.store(newValue, std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::milliseconds(15));
break;
}
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
stop.store(true, std::memory_order_relaxed);
for (auto& thread : readers) {
thread.join();
}
for (auto& thread : writers) {
thread.join();
}
}
......@@ -34,6 +34,11 @@ namespace folly {
namespace detail {
// Use memory_order_seq_cst for accesses to increments/decrements if we're
// running under TSAN, because TSAN ignores barriers completely.
constexpr std::memory_order kCounterMemoryOrder =
kIsSanitizeThread ? std::memory_order_seq_cst : std::memory_order_relaxed;
template <typename Tag>
class ThreadCachedInts {
std::atomic<int64_t> orphan_inc_[2] = {};
......@@ -51,14 +56,14 @@ class ThreadCachedInts {
~Integer() noexcept {
// Increment counts must be set before decrement counts
ints_->orphan_inc_[0].fetch_add(
inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
inc_[0].load(kCounterMemoryOrder), kCounterMemoryOrder);
ints_->orphan_inc_[1].fetch_add(
inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
inc_[1].load(kCounterMemoryOrder), kCounterMemoryOrder);
folly::asymmetricLightBarrier(); // B
ints_->orphan_dec_[0].fetch_add(
dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
dec_[0].load(kCounterMemoryOrder), kCounterMemoryOrder);
ints_->orphan_dec_[1].fetch_add(
dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
dec_[1].load(kCounterMemoryOrder), kCounterMemoryOrder);
ints_->waiting_.store(0, std::memory_order_release);
detail::futexWake(&ints_->waiting_);
// reset the cache_ on destructor so we can handle the delete/recreate
......@@ -84,7 +89,7 @@ class ThreadCachedInts {
auto& c = int_cache_->inc_[epoch];
auto val = c.load(std::memory_order_relaxed);
c.store(val + 1, std::memory_order_relaxed);
c.store(val + 1, kCounterMemoryOrder);
folly::asymmetricLightBarrier(); // A
}
......@@ -97,7 +102,7 @@ class ThreadCachedInts {
auto& c = int_cache_->dec_[epoch];
auto val = c.load(std::memory_order_relaxed);
c.store(val + 1, std::memory_order_relaxed);
c.store(val + 1, kCounterMemoryOrder);
folly::asymmetricLightBarrier(); // C
if (waiting_.load(std::memory_order_acquire)) {
......@@ -107,7 +112,7 @@ class ThreadCachedInts {
}
int64_t readFull(uint8_t epoch) {
int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
int64_t full = -orphan_dec_[epoch].load(kCounterMemoryOrder);
// Matches A - ensure all threads have seen new value of version,
// *and* that we see current values of counters in readFull()
......@@ -119,7 +124,7 @@ class ThreadCachedInts {
// the callbacks.
folly::asymmetricHeavyBarrier();
for (auto& i : cs_.accessAllThreads()) {
full -= i.dec_[epoch].load(std::memory_order_relaxed);
full -= i.dec_[epoch].load(kCounterMemoryOrder);
}
// Matches B - ensure that all increments are seen if decrements
......@@ -129,11 +134,11 @@ class ThreadCachedInts {
auto accessor = cs_.accessAllThreads();
for (auto& i : accessor) {
full += i.inc_[epoch].load(std::memory_order_relaxed);
full += i.inc_[epoch].load(kCounterMemoryOrder);
}
// orphan is read behind accessAllThreads lock
return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
return full + orphan_inc_[epoch].load(kCounterMemoryOrder);
}
void waitForZero(uint8_t phase) {
......@@ -160,15 +165,15 @@ class ThreadCachedInts {
// touch orphan_ and clear out all counts.
void resetAfterFork() {
if (int_cache_) {
int_cache_->dec_[0].store(0, std::memory_order_relaxed);
int_cache_->dec_[1].store(0, std::memory_order_relaxed);
int_cache_->inc_[0].store(0, std::memory_order_relaxed);
int_cache_->inc_[1].store(0, std::memory_order_relaxed);
int_cache_->dec_[0].store(0, kCounterMemoryOrder);
int_cache_->dec_[1].store(0, kCounterMemoryOrder);
int_cache_->inc_[0].store(0, kCounterMemoryOrder);
int_cache_->inc_[1].store(0, kCounterMemoryOrder);
}
orphan_inc_[0].store(0, std::memory_order_relaxed);
orphan_inc_[1].store(0, std::memory_order_relaxed);
orphan_dec_[0].store(0, std::memory_order_relaxed);
orphan_dec_[1].store(0, std::memory_order_relaxed);
orphan_inc_[0].store(0, kCounterMemoryOrder);
orphan_inc_[1].store(0, kCounterMemoryOrder);
orphan_dec_[0].store(0, kCounterMemoryOrder);
orphan_dec_[1].store(0, kCounterMemoryOrder);
}
};
......
......@@ -289,3 +289,26 @@ TEST(RcuTest, RcuObjBase) {
synchronize_rcu();
EXPECT_TRUE(retired);
}
TEST(RcuTest, Tsan) {
int data = 0;
std::thread t1([&] {
auto epoch = rcu_default_domain()->lock_shared();
data = 1;
rcu_default_domain()->unlock_shared(std::move(epoch));
// Delay before exiting so the thread is still alive for TSAN detection.
std::this_thread::sleep_for(std::chrono::milliseconds(200));
});
std::thread t2([&] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// This should establish a happens-before relationship between the earlier
// write (data = 1) and this write below (data = 2).
rcu_default_domain()->synchronize();
data = 2;
});
t1.join();
t2.join();
EXPECT_EQ(data, 2);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment