Commit 01336b86 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

Be explicit about memory orders in Futures

Summary:
[Folly] Be explicit about memory orders in Futures.

This clarifies the basic purpose of each atomic variable - namely, whether it protects control flow (relaxed) v.s. state (typically acq/rel).

Reviewed By: davidtgoldblatt

Differential Revision: D8472791

fbshipit-source-id: 350ec9b38d7c36cfb6d9ae9f1abf758b1f2e97ba
parent dcbf6db1
......@@ -43,7 +43,7 @@ auto Barrier::allocateControlBlock() -> ControlBlock* {
if (!block) {
throw_exception<std::bad_alloc>();
}
block->valueAndReaderCount = 0;
std::atomic_init(&block->valueAndReaderCount, uint64_t(0));
auto p = promises(block);
uint32_t i = 0;
......
......@@ -450,7 +450,7 @@ FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) {
auto ctx = std::make_shared<Context>(std::move(e));
auto f = [ctx](Try<T>&& t) {
if (!ctx->token.exchange(true)) {
if (!ctx->token.exchange(true, std::memory_order_relaxed)) {
ctx->promise.setTry(std::move(t));
}
};
......@@ -476,7 +476,7 @@ FutureBase<T>::withinImplementation(Duration dur, E e, Timekeeper* tk) {
}
// "after" completed first, cancel "this"
lockedCtx->thisFuture.raise(FutureTimeout());
if (!lockedCtx->token.exchange(true)) {
if (!lockedCtx->token.exchange(true, std::memory_order_relaxed)) {
if (t.hasException()) {
lockedCtx->promise.setException(std::move(t.exception()));
} else {
......@@ -1394,7 +1394,7 @@ collect(InputIterator first, InputIterator last) {
finalResult.reserve(n);
}
~Context() {
if (!threw.exchange(true)) {
if (!threw.load(std::memory_order_relaxed)) {
// map Optional<T> -> T
std::transform(
result.begin(),
......@@ -1414,10 +1414,10 @@ collect(InputIterator first, InputIterator last) {
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
if (t.hasException()) {
if (!ctx->threw.exchange(true)) {
if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
ctx->p.setException(std::move(t.exception()));
}
} else if (!ctx->threw) {
} else if (!ctx->threw.load(std::memory_order_relaxed)) {
ctx->result[i] = std::move(t.value());
}
});
......@@ -1434,7 +1434,7 @@ Future<std::tuple<typename remove_cvref_t<Fs>::value_type...>> collect(
using Result = std::tuple<typename remove_cvref_t<Fs>::value_type...>;
struct Context {
~Context() {
if (!threw.exchange(true)) {
if (!threw.load(std::memory_order_relaxed)) {
p.setValue(unwrapTryTuple(std::move(results)));
}
}
......@@ -1448,10 +1448,10 @@ Future<std::tuple<typename remove_cvref_t<Fs>::value_type...>> collect(
[&](auto i, auto&& f) {
f.setCallback_([i, ctx](auto&& t) {
if (t.hasException()) {
if (!ctx->threw.exchange(true)) {
if (!ctx->threw.exchange(true, std::memory_order_relaxed)) {
ctx->p.setException(std::move(t.exception()));
}
} else if (!ctx->threw) {
} else if (!ctx->threw.load(std::memory_order_relaxed)) {
std::get<i.value>(ctx->results) = std::move(t);
}
});
......@@ -1475,13 +1475,13 @@ collectAny(InputIterator first, InputIterator last) {
struct Context {
Promise<std::pair<size_t, Try<T>>> p;
std::atomic<bool> done {false};
std::atomic<bool> done{false};
};
auto ctx = std::make_shared<Context>();
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
if (!ctx->done.exchange(true)) {
if (!ctx->done.exchange(true, std::memory_order_relaxed)) {
ctx->p.setValue(std::make_pair(i, std::move(t)));
}
});
......@@ -1511,9 +1511,12 @@ collectAnyWithoutException(InputIterator first, InputIterator last) {
auto ctx = std::make_shared<Context>(size_t(std::distance(first, last)));
for (size_t i = 0; first != last; ++first, ++i) {
first->setCallback_([i, ctx](Try<T>&& t) {
if (!t.hasException() && !ctx->done.exchange(true)) {
if (!t.hasException() &&
!ctx->done.exchange(true, std::memory_order_relaxed)) {
ctx->p.setValue(std::make_pair(i, std::move(t.value())));
} else if (++ctx->nFulfilled == ctx->nTotal) {
} else if (
ctx->nFulfilled.fetch_add(1, std::memory_order_relaxed) + 1 ==
ctx->nTotal) {
ctx->p.setException(t.exception());
}
});
......@@ -1653,7 +1656,7 @@ window(Executor* executor, Collection input, F func, size_t n) {
F func;
static void spawn(std::shared_ptr<WindowContext> ctx) {
size_t i = ctx->i++;
size_t i = ctx->i.fetch_add(1, std::memory_order_relaxed);
if (i < ctx->input.size()) {
auto fut = makeSemiFutureWith(
[&] { return ctx->func(std::move(ctx->input[i])); });
......@@ -2168,8 +2171,8 @@ Future<Unit> whileDo(P&& predicate, F&& thunk) {
template <class F>
Future<Unit> times(const int n, F&& thunk) {
return folly::whileDo(
[ n, count = std::make_unique<std::atomic<int>>(0) ]() mutable {
return count->fetch_add(1) < n;
[n, count = std::make_unique<std::atomic<int>>(0)]() mutable {
return count->fetch_add(1, std::memory_order_relaxed) < n;
},
std::forward<F>(thunk));
}
......
......@@ -500,8 +500,8 @@ class Core final {
// exactly two `CoreAndCallbackReference` objects, which call
// `derefCallback` and `detachOne` in their destructor. One will guard
// this scope, the other one will guard the lambda passed to the executor.
attached_ += 2;
callbackReferences_ += 2;
attached_.fetch_add(2, std::memory_order_relaxed);
callbackReferences_.fetch_add(2, std::memory_order_relaxed);
CoreAndCallbackReference guard_local_scope(this);
CoreAndCallbackReference guard_lambda(this);
try {
......@@ -536,7 +536,7 @@ class Core final {
callback_(std::move(result_));
}
} else {
attached_++;
attached_.fetch_add(1, std::memory_order_relaxed);
SCOPE_EXIT {
context_.~Context();
callback_.~Callback();
......@@ -548,7 +548,7 @@ class Core final {
}
void detachOne() noexcept {
auto a = attached_--;
auto a = attached_.fetch_sub(1, std::memory_order_acq_rel);
assert(a >= 1);
if (a == 1) {
delete this;
......@@ -556,7 +556,9 @@ class Core final {
}
void derefCallback() noexcept {
if (--callbackReferences_ == 0) {
auto c = callbackReferences_.fetch_sub(1, std::memory_order_acq_rel);
assert(c >= 1);
if (c == 1) {
context_.~Context();
callback_.~Callback();
}
......@@ -577,7 +579,7 @@ class Core final {
std::atomic<State> state_;
std::atomic<unsigned char> attached_;
std::atomic<unsigned char> callbackReferences_{0};
std::atomic<bool> interruptHandlerSet_ {false};
std::atomic<bool> interruptHandlerSet_{false};
SpinLock interruptLock_;
int8_t priority_ {-1};
Executor::KeepAlive<> executor_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment