Commit 991a1d5e authored by James Sedgwick's avatar James Sedgwick Committed by woo

waitWithSemaphore -> Future<T>::wait()

Summary:
see task. also adjust all callsites to get() or wait() as appropriate. this is a lot better, especially in various tests

Facebook:

cc @rushix, @wez as this was quite nice for docstore and novak, and you might want to be aware of new conventions

Test Plan: futures unit, wait for contbuild

Reviewed By: hans@fb.com

Subscribers: trunkagent, fbcode-common-diffs@, hero-diffs@, cold-storage-diffs@, adamsyta, zhuohuang, darshan, micha, folly-diffs@, lins, tingy, hannesr, jsedgwick, wez, rushix

FB internal diff: D1785572

Tasks: 5940008

Signature: t1:1785572:1421866794:a879de4d0bc14e96c434f623ed2a74361e25f28c
parent 7660d85b
...@@ -643,65 +643,6 @@ whenN(InputIterator first, InputIterator last, size_t n) { ...@@ -643,65 +643,6 @@ whenN(InputIterator first, InputIterator last, size_t n) {
return ctx->p.getFuture(); return ctx->p.getFuture();
} }
template <typename T>
Future<T>
waitWithSemaphore(Future<T>&& f) {
Baton<> baton;
auto done = f.then([&](Try<T> &&t) {
baton.post();
return std::move(t.value());
});
baton.wait();
while (!done.isReady()) {
// There's a race here between the return here and the actual finishing of
// the future. f is completed, but the setup may not have finished on done
// after the baton has posted.
std::this_thread::yield();
}
return done;
}
template<>
inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
Baton<> baton;
auto done = f.then([&](Try<void> &&t) {
baton.post();
t.value();
});
baton.wait();
while (!done.isReady()) {
// There's a race here between the return here and the actual finishing of
// the future. f is completed, but the setup may not have finished on done
// after the baton has posted.
std::this_thread::yield();
}
return done;
}
template <typename T, class Dur>
Future<T>
waitWithSemaphore(Future<T>&& f, Dur timeout) {
auto baton = std::make_shared<Baton<>>();
auto done = f.then([baton](Try<T> &&t) {
baton->post();
return std::move(t.value());
});
baton->timed_wait(std::chrono::system_clock::now() + timeout);
return done;
}
template <class Dur>
Future<void>
waitWithSemaphore(Future<void>&& f, Dur timeout) {
auto baton = std::make_shared<Baton<>>();
auto done = f.then([baton](Try<void> &&t) {
baton->post();
t.value();
});
baton->timed_wait(std::chrono::system_clock::now() + timeout);
return done;
}
namespace { namespace {
template <class T> template <class T>
void getWaitHelper(Future<T>* f) { void getWaitHelper(Future<T>* f) {
...@@ -834,6 +775,34 @@ Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) { ...@@ -834,6 +775,34 @@ Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
}); });
} }
template <class T>
Future<T> Future<T>::wait() {
Baton<> baton;
auto done = then([&](Try<T> t) {
baton.post();
return makeFuture(std::move(t));
});
baton.wait();
while (!done.isReady()) {
// There's a race here between the return here and the actual finishing of
// the future. f is completed, but the setup may not have finished on done
// after the baton has posted.
std::this_thread::yield();
}
return done;
}
template <class T>
Future<T> Future<T>::wait(Duration dur) {
auto baton = std::make_shared<Baton<>>();
auto done = then([baton](Try<T> t) {
baton->post();
return makeFuture(std::move(t));
});
baton->timed_wait(std::chrono::system_clock::now() + dur);
return done;
}
} }
// I haven't included a Future<T&> specialization because I don't forsee us // I haven't included a Future<T&> specialization because I don't forsee us
......
...@@ -478,6 +478,15 @@ class Future { ...@@ -478,6 +478,15 @@ class Future {
/// now. The optional Timekeeper is as with futures::sleep(). /// now. The optional Timekeeper is as with futures::sleep().
Future<T> delayed(Duration, Timekeeper* = nullptr); Future<T> delayed(Duration, Timekeeper* = nullptr);
/// Block until this Future is complete. Returns a new Future containing the
/// result.
Future<T> wait();
/// Block until this Future is complete or until the given Duration passes.
/// Returns a new Future which either contains the result or is incomplete,
/// depending on whether the Duration passed.
Future<T> wait(Duration);
private: private:
typedef detail::Core<T>* corePtr; typedef detail::Core<T>* corePtr;
...@@ -608,24 +617,6 @@ Future<std::vector<std::pair< ...@@ -608,24 +617,6 @@ Future<std::vector<std::pair<
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>> Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
whenN(InputIterator first, InputIterator last, size_t n); whenN(InputIterator first, InputIterator last, size_t n);
/** Wait for the given future to complete on a semaphore. Returns a completed
* future containing the result.
*
* NB if the promise for the future would be fulfilled in the same thread that
* you call this, it will deadlock.
*/
template <class T>
Future<T> waitWithSemaphore(Future<T>&& f);
/** Wait for up to `timeout` for the given future to complete. Returns a future
* which may or may not be completed depending whether the given future
* completed in time
*
* Note: each call to this starts a (short-lived) thread and allocates memory.
*/
template <typename T, class Dur>
Future<T> waitWithSemaphore(Future<T>&& f, Dur timeout);
} // folly } // folly
#include <folly/futures/Future-inl.h> #include <folly/futures/Future-inl.h>
...@@ -873,31 +873,31 @@ TEST(Future, throwIfFailed) { ...@@ -873,31 +873,31 @@ TEST(Future, throwIfFailed) {
}); });
} }
TEST(Future, waitWithSemaphoreImmediate) { TEST(Future, waitImmediate) {
waitWithSemaphore(makeFuture()); makeFuture().wait();
auto done = waitWithSemaphore(makeFuture(42)).value(); auto done = makeFuture(42).wait().value();
EXPECT_EQ(42, done); EXPECT_EQ(42, done);
vector<int> v{1,2,3}; vector<int> v{1,2,3};
auto done_v = waitWithSemaphore(makeFuture(v)).value(); auto done_v = makeFuture(v).wait().value();
EXPECT_EQ(v.size(), done_v.size()); EXPECT_EQ(v.size(), done_v.size());
EXPECT_EQ(v, done_v); EXPECT_EQ(v, done_v);
vector<Future<void>> v_f; vector<Future<void>> v_f;
v_f.push_back(makeFuture()); v_f.push_back(makeFuture());
v_f.push_back(makeFuture()); v_f.push_back(makeFuture());
auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value(); auto done_v_f = whenAll(v_f.begin(), v_f.end()).wait().value();
EXPECT_EQ(2, done_v_f.size()); EXPECT_EQ(2, done_v_f.size());
vector<Future<bool>> v_fb; vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true)); v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false)); v_fb.push_back(makeFuture(false));
auto fut = whenAll(v_fb.begin(), v_fb.end()); auto fut = whenAll(v_fb.begin(), v_fb.end());
auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value()); auto done_v_fb = std::move(fut.wait().value());
EXPECT_EQ(2, done_v_fb.size()); EXPECT_EQ(2, done_v_fb.size());
} }
TEST(Future, waitWithSemaphore) { TEST(Future, wait) {
Promise<int> p; Promise<int> p;
Future<int> f = p.getFuture(); Future<int> f = p.getFuture();
std::atomic<bool> flag{false}; std::atomic<bool> flag{false};
...@@ -910,7 +910,7 @@ TEST(Future, waitWithSemaphore) { ...@@ -910,7 +910,7 @@ TEST(Future, waitWithSemaphore) {
return t.value(); return t.value();
}); });
flag = true; flag = true;
result.store(waitWithSemaphore(std::move(n)).value()); result.store(n.wait().value());
}, },
std::move(f) std::move(f)
); );
...@@ -924,12 +924,11 @@ TEST(Future, waitWithSemaphore) { ...@@ -924,12 +924,11 @@ TEST(Future, waitWithSemaphore) {
EXPECT_EQ(result.load(), 42); EXPECT_EQ(result.load(), 42);
} }
TEST(Future, waitWithSemaphoreForTime) { TEST(Future, waitWithDuration) {
{ {
Promise<int> p; Promise<int> p;
Future<int> f = p.getFuture(); Future<int> f = p.getFuture();
auto t = waitWithSemaphore(std::move(f), auto t = f.wait(std::chrono::milliseconds(1));
std::chrono::microseconds(1));
EXPECT_FALSE(t.isReady()); EXPECT_FALSE(t.isReady());
p.setValue(1); p.setValue(1);
EXPECT_TRUE(t.isReady()); EXPECT_TRUE(t.isReady());
...@@ -938,8 +937,7 @@ TEST(Future, waitWithSemaphoreForTime) { ...@@ -938,8 +937,7 @@ TEST(Future, waitWithSemaphoreForTime) {
Promise<int> p; Promise<int> p;
Future<int> f = p.getFuture(); Future<int> f = p.getFuture();
p.setValue(1); p.setValue(1);
auto t = waitWithSemaphore(std::move(f), auto t = f.wait(std::chrono::milliseconds(1));
std::chrono::milliseconds(1));
EXPECT_TRUE(t.isReady()); EXPECT_TRUE(t.isReady());
} }
{ {
...@@ -947,8 +945,7 @@ TEST(Future, waitWithSemaphoreForTime) { ...@@ -947,8 +945,7 @@ TEST(Future, waitWithSemaphoreForTime) {
v_fb.push_back(makeFuture(true)); v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false)); v_fb.push_back(makeFuture(false));
auto f = whenAll(v_fb.begin(), v_fb.end()); auto f = whenAll(v_fb.begin(), v_fb.end());
auto t = waitWithSemaphore(std::move(f), auto t = f.wait(std::chrono::milliseconds(1));
std::chrono::milliseconds(1));
EXPECT_TRUE(t.isReady()); EXPECT_TRUE(t.isReady());
EXPECT_EQ(2, t.value().size()); EXPECT_EQ(2, t.value().size());
} }
...@@ -959,8 +956,7 @@ TEST(Future, waitWithSemaphoreForTime) { ...@@ -959,8 +956,7 @@ TEST(Future, waitWithSemaphoreForTime) {
v_fb.push_back(p1.getFuture()); v_fb.push_back(p1.getFuture());
v_fb.push_back(p2.getFuture()); v_fb.push_back(p2.getFuture());
auto f = whenAll(v_fb.begin(), v_fb.end()); auto f = whenAll(v_fb.begin(), v_fb.end());
auto t = waitWithSemaphore(std::move(f), auto t = f.wait(std::chrono::milliseconds(1));
std::chrono::milliseconds(1));
EXPECT_FALSE(t.isReady()); EXPECT_FALSE(t.isReady());
p1.setValue(true); p1.setValue(true);
EXPECT_FALSE(t.isReady()); EXPECT_FALSE(t.isReady());
...@@ -968,8 +964,7 @@ TEST(Future, waitWithSemaphoreForTime) { ...@@ -968,8 +964,7 @@ TEST(Future, waitWithSemaphoreForTime) {
EXPECT_TRUE(t.isReady()); EXPECT_TRUE(t.isReady());
} }
{ {
auto t = waitWithSemaphore(makeFuture(), auto t = makeFuture().wait(std::chrono::milliseconds(1));
std::chrono::milliseconds(1));
EXPECT_TRUE(t.isReady()); EXPECT_TRUE(t.isReady());
} }
} }
...@@ -1171,7 +1166,7 @@ TEST(Future, t5506504) { ...@@ -1171,7 +1166,7 @@ TEST(Future, t5506504) {
return whenAll(futures.begin(), futures.end()); return whenAll(futures.begin(), futures.end());
}; };
waitWithSemaphore(fn()); fn().wait();
} }
// Test of handling of a circular dependency. It's never recommended // Test of handling of a circular dependency. It's never recommended
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment