Commit a91af7b3 authored by Matt Dordal's avatar Matt Dordal Committed by Anton Likhtarov

Timed wait for futures

Summary:
It might be useful to be able to wait for some time (but not forever) on a
future, so this is a shot at doing that. It's a very heavyweight implementation, however.

Since the current interface for waitWithSemaphore doesn't really make sense if
the timeout fires, change it to return a Future<T>.

Test Plan: unit tests

Reviewed By: hans@fb.com

Subscribers: trunkagent, folly@lists, fugalh

FB internal diff: D1358230
parent 19ec26e1
......@@ -419,29 +419,67 @@ whenN(InputIterator first, InputIterator last, size_t n) {
return ctx->p.getFuture();
}
template <typename F>
typename F::value_type
waitWithSemaphore(F&& f) {
template <typename T>
Future<T>
waitWithSemaphore(Future<T>&& f) {
LifoSem sem;
Try<typename F::value_type> done;
f.then([&](Try<typename F::value_type> &&t) {
done = std::move(t);
auto done = f.then([&](Try<T> &&t) {
sem.post();
return std::move(t.value());
});
sem.wait();
return std::move(done.value());
return done;
}
inline void waitWithSemaphore(Future<void>&& f) {
template<>
inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
LifoSem sem;
Try<void> done;
f.then([&](Try<void> &&t) {
done = std::move(t);
auto done = f.then([&](Try<void> &&t) {
sem.post();
t.value();
});
sem.wait();
return done.value();
return done;
}
template <typename T, class Duration>
Future<T>
waitWithSemaphore(Future<T>&& f, Duration timeout) {
auto sem = std::make_shared<LifoSem>();
auto done = f.then([sem](Try<T> &&t) {
sem->post();
return std::move(t.value());
});
std::thread t([sem, timeout](){
std::this_thread::sleep_for(timeout);
sem->shutdown();
});
t.detach();
try {
sem->wait();
} catch (ShutdownSemError & ign) { }
return done;
}
template <class Duration>
Future<void>
waitWithSemaphore(Future<void>&& f, Duration timeout) {
auto sem = std::make_shared<LifoSem>();
auto done = f.then([sem](Try<void> &&t) {
sem->post();
t.value();
});
std::thread t([sem, timeout](){
std::this_thread::sleep_for(timeout);
sem->shutdown();
});
t.detach();
try {
sem->wait();
} catch (ShutdownSemError & ign) { }
return done;
}
}}
// I haven't included a Future<T&> specialization because I don't forsee us
......
......@@ -313,15 +313,23 @@ Future<std::vector<std::pair<
Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>>
whenN(InputIterator first, InputIterator last, size_t n);
/** Wait for the given future to complete on a semaphore. Returns the result of
* the given future.
/** Wait for the given future to complete on a semaphore. Returns a completed
* future containing the result.
*
* NB if the promise for the future would be fulfilled in the same thread that
* you call this, it will deadlock.
*/
template <class F>
typename F::value_type
waitWithSemaphore(F&& f);
template <class T>
Future<T> waitWithSemaphore(Future<T>&& f);
/** Wait for up to `timeout` for the given future to complete. Returns a future
* which may or may not be completed depending whether the given future
* completed in time
*
* Note: each call to this starts a (short-lived) thread and allocates memory.
*/
template <typename T, class Duration>
Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
}} // folly::wangle
......
......@@ -632,25 +632,25 @@ TEST(Future, throwIfFailed) {
TEST(Future, waitWithSemaphoreImmediate) {
waitWithSemaphore(makeFuture());
auto done = waitWithSemaphore(makeFuture(42));
auto done = waitWithSemaphore(makeFuture(42)).value();
EXPECT_EQ(42, done);
vector<int> v{1,2,3};
auto done_v = waitWithSemaphore(makeFuture(v));
auto done_v = waitWithSemaphore(makeFuture(v)).value();
EXPECT_EQ(v.size(), done_v.size());
EXPECT_EQ(v, done_v);
vector<Future<void>> v_f;
v_f.push_back(makeFuture());
v_f.push_back(makeFuture());
auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end()));
auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value();
EXPECT_EQ(2, done_v_f.size());
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
auto fut = whenAll(v_fb.begin(), v_fb.end());
auto done_v_fb = waitWithSemaphore(std::move(fut));
auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value());
EXPECT_EQ(2, done_v_fb.size());
}
......@@ -667,7 +667,7 @@ TEST(Future, waitWithSemaphore) {
return t.value();
});
flag = true;
result.store(waitWithSemaphore(std::move(n)));
result.store(waitWithSemaphore(std::move(n)).value());
LOG(INFO) << result;
},
std::move(f)
......@@ -681,3 +681,63 @@ TEST(Future, waitWithSemaphore) {
EXPECT_EQ(id, std::this_thread::get_id());
EXPECT_EQ(result.load(), 42);
}
TEST(Future, waitWithSemaphoreForTime) {
{
Promise<int> p;
Future<int> f = p.getFuture();
auto t = waitWithSemaphore(std::move(f),
std::chrono::microseconds(1));
EXPECT_FALSE(t.isReady());
p.setValue(1);
EXPECT_TRUE(t.isReady());
}
{
Promise<int> p;
Future<int> f = p.getFuture();
p.setValue(1);
auto t = waitWithSemaphore(std::move(f),
std::chrono::milliseconds(1));
EXPECT_TRUE(t.isReady());
}
{
vector<Future<bool>> v_fb;
v_fb.push_back(makeFuture(true));
v_fb.push_back(makeFuture(false));
auto f = whenAll(v_fb.begin(), v_fb.end());
auto t = waitWithSemaphore(std::move(f),
std::chrono::milliseconds(1));
EXPECT_TRUE(t.isReady());
EXPECT_EQ(2, t.value().size());
}
{
vector<Future<bool>> v_fb;
Promise<bool> p1;
Promise<bool> p2;
v_fb.push_back(p1.getFuture());
v_fb.push_back(p2.getFuture());
auto f = whenAll(v_fb.begin(), v_fb.end());
auto t = waitWithSemaphore(std::move(f),
std::chrono::milliseconds(1));
EXPECT_FALSE(t.isReady());
p1.setValue(true);
EXPECT_FALSE(t.isReady());
p2.setValue(true);
EXPECT_TRUE(t.isReady());
}
{
Promise<int> p;
Future<int> f = p.getFuture();
auto begin = std::chrono::system_clock::now();
auto t = waitWithSemaphore(std::move(f),
std::chrono::milliseconds(1));
auto end = std::chrono::system_clock::now();
EXPECT_TRUE( end - begin < std::chrono::milliseconds(2));
EXPECT_FALSE(t.isReady());
}
{
auto t = waitWithSemaphore(makeFuture(),
std::chrono::milliseconds(1));
EXPECT_TRUE(t.isReady());
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment