Commit c4035d06 authored by Hans Fugal's avatar Hans Fugal Committed by Pavlo Kushnir

(wangle) fix a race condition in Core::maybeCallback

Summary:
`calledBack_` could be seen as true by both threads in this conditional. Classic rookie mistake. :-/

Test Plan: run unit tests

Reviewed By: darshan@fb.com

Subscribers: trunkagent, hannesr, net-systems@, fugalh, exa, njormrod, folly-diffs@

FB internal diff: D1661199

Tasks: 5542938, 5506504

Signature: t1:1661199:1415215840:fb69f56c8cf6f59beeca809724ce015b5260d9ad

Blame Revision: D1636487
parent 35654c7b
......@@ -193,18 +193,19 @@ class Core : protected FSM<State> {
private:
void maybeCallback() {
assert(ready());
if (!calledBack_ && isActive() && callback_) {
// TODO(5306911) we should probably try/catch
calledBack_ = true;
Executor* x = executor_;
RequestContext::setContext(context_);
if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
} else {
callback_(std::move(*result_));
if (isActive() && callback_) {
if (!calledBack_.exchange(true)) {
// TODO(5306911) we should probably try/catch
Executor* x = executor_;
RequestContext::setContext(context_);
if (x) {
MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
} else {
callback_(std::move(*result_));
}
}
}
}
......
......@@ -27,6 +27,7 @@
#include <folly/wangle/Executor.h>
#include <folly/wangle/Future.h>
#include <folly/wangle/ManualExecutor.h>
#include <folly/MPMCQueue.h>
#include <folly/io/async/Request.h>
......@@ -40,6 +41,43 @@ using std::vector;
#define EXPECT_TYPE(x, T) \
EXPECT_TRUE((std::is_same<decltype(x), T>::value))
/// Simple executor that does work in another thread
class ThreadExecutor : public Executor {
folly::MPMCQueue<Func> funcs;
std::atomic<bool> done {false};
std::thread worker;
folly::Baton<> baton;
void work() {
baton.post();
Func fn;
while (!done) {
while (!funcs.isEmpty()) {
funcs.blockingRead(fn);
fn();
}
}
}
public:
ThreadExecutor(size_t n = 1024)
: funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {}
~ThreadExecutor() {
done = true;
funcs.write([]{});
worker.join();
}
void add(Func fn) override {
funcs.blockingWrite(std::move(fn));
}
void waitForStartup() {
baton.wait();
}
};
typedef WangleException eggs_t;
static eggs_t eggs("eggs");
......@@ -950,3 +988,30 @@ TEST(Future, context) {
// Fulfil the promise
p.setValue();
}
// This only fails about 1 in 1k times when the bug is present :(
TEST(Future, t5506504) {
ThreadExecutor x;
auto fn = [&x]{
auto promises = std::make_shared<vector<Promise<void>>>(4);
vector<Future<void>> futures;
for (auto& p : *promises) {
futures.emplace_back(
p.getFuture()
.via(&x)
.then([](Try<void>&&){}));
}
x.waitForStartup();
x.add([promises]{
for (auto& p : *promises) p.setValue();
});
return whenAll(futures.begin(), futures.end());
};
waitWithSemaphore(fn());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment