Commit 4924344d authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Make timed_wait work for semi awaitables

Reviewed By: lewissbaker

Differential Revision: D15363390

fbshipit-source-id: 98c30928773992c5bb7f040e6f5afb8ee6fef713
parent 62257ded
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <folly/experimental/coro/Mutex.h> #include <folly/experimental/coro/Mutex.h>
#include <folly/experimental/coro/detail/Barrier.h> #include <folly/experimental/coro/detail/Barrier.h>
#include <folly/experimental/coro/detail/BarrierTask.h> #include <folly/experimental/coro/detail/BarrierTask.h>
#include <folly/experimental/coro/detail/Helpers.h>
namespace folly { namespace folly {
namespace coro { namespace coro {
...@@ -33,25 +34,6 @@ inline Unit getValueOrUnit(Try<void>&& value) { ...@@ -33,25 +34,6 @@ inline Unit getValueOrUnit(Try<void>&& value) {
return Unit{}; return Unit{};
} }
// Helper class that can be used to annotate Awaitable objects that will
// guarantee that they will be resumed on the correct executor so that
// when the object is awaited within a Task<T> it doesn't automatically
// wrap the Awaitable in something that forces a reschedule onto the
// executor.
template <typename Awaitable>
class UnsafeResumeInlineSemiAwaitable {
public:
explicit UnsafeResumeInlineSemiAwaitable(Awaitable&& awaitable) noexcept
: awaitable_(awaitable) {}
Awaitable&& viaIfAsync(folly::Executor::KeepAlive<>) && noexcept {
return static_cast<Awaitable&&>(awaitable_);
}
private:
Awaitable awaitable_;
};
template <typename SemiAwaitable, typename Result> template <typename SemiAwaitable, typename Result>
detail::BarrierTask makeCollectAllTask( detail::BarrierTask makeCollectAllTask(
folly::Executor* executor, folly::Executor* executor,
......
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <experimental/coroutine>
#include <type_traits>
#include <folly/Optional.h>
#include <folly/experimental/coro/Baton.h>
#include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/Traits.h>
#include <folly/experimental/coro/detail/Helpers.h>
#include <folly/futures/Future.h>
namespace folly {
namespace coro {
template <typename Awaitable>
Task<Optional<lift_unit_t<detail::decay_rvalue_reference_t<
detail::lift_lvalue_reference_t<semi_await_result_t<Awaitable>>>>>>
timed_wait(Awaitable awaitable, Duration duration) {
auto posted = std::make_shared<std::atomic<bool>>(false);
Baton baton;
Try<lift_unit_t<detail::decay_rvalue_reference_t<
detail::lift_lvalue_reference_t<semi_await_result_t<Awaitable>>>>>
result;
futures::sleep(duration)
.via(co_await co_current_executor)
.setCallback_([posted, &baton](auto&&, auto&&) {
if (!posted->exchange(true, std::memory_order_relaxed)) {
baton.post();
}
});
co_invoke(
[awaitable = std::move(
awaitable)]() mutable -> Task<semi_await_result_t<Awaitable>> {
co_return co_await std::move(awaitable);
})
.scheduleOn(co_await co_current_executor)
.start([posted, &baton, &result](auto&& r) {
if (!posted->exchange(true, std::memory_order_relaxed)) {
result = std::move(r);
baton.post();
}
});
co_await detail::UnsafeResumeInlineSemiAwaitable{get_awaiter(baton)};
if (!result.hasValue() && !result.hasException()) {
co_return folly::none;
}
co_return *result;
}
} // namespace coro
} // namespace folly
...@@ -16,14 +16,8 @@ ...@@ -16,14 +16,8 @@
#pragma once #pragma once
#include <experimental/coroutine> #include <experimental/coroutine>
#include <future>
#include <type_traits> #include <type_traits>
#include <folly/Optional.h>
#include <folly/experimental/coro/Traits.h>
#include <folly/experimental/coro/Wait.h>
#include <folly/futures/Future.h>
namespace folly { namespace folly {
namespace coro { namespace coro {
...@@ -58,103 +52,5 @@ class AwaitableReady<void> { ...@@ -58,103 +52,5 @@ class AwaitableReady<void> {
void await_suspend(std::experimental::coroutine_handle<>) noexcept {} void await_suspend(std::experimental::coroutine_handle<>) noexcept {}
void await_resume() noexcept {} void await_resume() noexcept {}
}; };
template <typename Awaitable>
class TimedWaitAwaitable {
public:
static_assert(
std::is_same<Awaitable, std::decay_t<Awaitable>>::value,
"Awaitable should be decayed.");
using await_resume_return_type = await_result_t<Awaitable>;
TimedWaitAwaitable(Awaitable&& awaitable, std::chrono::milliseconds duration)
: awaitable_(std::move(awaitable)), duration_(duration) {}
bool await_ready() {
return false;
}
FOLLY_CORO_AWAIT_SUSPEND_NONTRIVIAL_ATTRIBUTES bool await_suspend(
std::experimental::coroutine_handle<> ch) {
auto sharedState = std::make_shared<SharedState>(ch, storage_);
waitAndNotify(std::move(awaitable_), sharedState).detach();
futures::sleepUnsafe(duration_).thenValue(
[sharedState = std::move(sharedState)](Unit) {
sharedState->setTimeout();
});
return true;
}
Optional<await_resume_return_type> await_resume() {
if (!storage_.hasValue() && !storage_.hasException()) {
return folly::none;
}
return std::move(storage_).value();
}
private:
class SharedState {
public:
SharedState(
std::experimental::coroutine_handle<> ch,
Try<await_resume_return_type>& storage)
: ch_(std::move(ch)), storage_(storage) {}
void setValue(await_resume_return_type&& value) {
if (first_.exchange(true, std::memory_order_relaxed)) {
return;
}
assume(!storage_.hasValue() && !storage_.hasException());
tryEmplace(storage_, static_cast<await_resume_return_type&&>(value));
ch_();
}
void setException(exception_wrapper e) {
if (first_.exchange(true, std::memory_order_relaxed)) {
return;
}
assume(!storage_.hasValue() && !storage_.hasException());
storage_.emplaceException(std::move(e));
ch_();
}
void setTimeout() {
if (first_.exchange(true, std::memory_order_relaxed)) {
return;
}
ch_();
}
private:
std::atomic<bool> first_{false};
std::experimental::coroutine_handle<> ch_;
Try<await_resume_return_type>& storage_;
};
static Wait waitAndNotify(
Awaitable awaitable,
std::shared_ptr<SharedState> sharedState) {
try {
sharedState->setValue(co_await std::forward<Awaitable>(awaitable));
} catch (const std::exception& e) {
sharedState->setException(exception_wrapper(std::current_exception(), e));
} catch (...) {
sharedState->setException(exception_wrapper(std::current_exception()));
}
}
Awaitable awaitable_;
std::chrono::milliseconds duration_;
Try<await_resume_return_type> storage_;
};
template <typename Awaitable>
TimedWaitAwaitable<std::decay_t<Awaitable>> timed_wait(
Awaitable&& awaitable,
std::chrono::milliseconds duration) {
return TimedWaitAwaitable<std::decay_t<Awaitable>>(
std::forward<Awaitable>(awaitable), duration);
}
} // namespace coro } // namespace coro
} // namespace folly } // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Executor.h>
namespace folly {
namespace coro {
namespace detail {
// Helper class that can be used to annotate Awaitable objects that will
// guarantee that they will be resumed on the correct executor so that
// when the object is awaited within a Task<T> it doesn't automatically
// wrap the Awaitable in something that forces a reschedule onto the
// executor.
template <typename Awaitable>
class UnsafeResumeInlineSemiAwaitable {
public:
explicit UnsafeResumeInlineSemiAwaitable(Awaitable&& awaitable) noexcept
: awaitable_(awaitable) {}
Awaitable&& viaIfAsync(folly::Executor::KeepAlive<>) && noexcept {
return static_cast<Awaitable&&>(awaitable_);
}
private:
Awaitable awaitable_;
};
} // namespace detail
} // namespace coro
} // namespace folly
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <folly/executors/ManualExecutor.h> #include <folly/executors/ManualExecutor.h>
#include <folly/experimental/coro/BlockingWait.h> #include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Task.h> #include <folly/experimental/coro/Task.h>
#include <folly/experimental/coro/TimedWait.h>
#include <folly/experimental/coro/Utils.h> #include <folly/experimental/coro/Utils.h>
#include <folly/fibers/Semaphore.h> #include <folly/fibers/Semaphore.h>
#include <folly/io/async/ScopedEventBaseThread.h> #include <folly/io/async/ScopedEventBaseThread.h>
...@@ -252,7 +253,7 @@ TEST(Coro, CurrentExecutor) { ...@@ -252,7 +253,7 @@ TEST(Coro, CurrentExecutor) {
EXPECT_EQ(42, coro::blockingWait(std::move(task))); EXPECT_EQ(42, coro::blockingWait(std::move(task)));
} }
coro::Task<void> taskTimedWait() { coro::Task<void> taskTimedWaitFuture() {
auto ex = co_await coro::co_current_executor; auto ex = co_await coro::co_current_executor;
auto fastFuture = auto fastFuture =
futures::sleep(std::chrono::milliseconds{50}).via(ex).thenValue([](Unit) { futures::sleep(std::chrono::milliseconds{50}).via(ex).thenValue([](Unit) {
...@@ -296,8 +297,46 @@ coro::Task<void> taskTimedWait() { ...@@ -296,8 +297,46 @@ coro::Task<void> taskTimedWait() {
co_return; co_return;
} }
TEST(Coro, TimedWait) { TEST(Coro, TimedWaitFuture) {
coro::blockingWait(taskTimedWait()); coro::blockingWait(taskTimedWaitFuture());
}
coro::Task<void> taskTimedWaitTask() {
auto fastTask = []() -> coro::Task<int> {
co_await futures::sleep(std::chrono::milliseconds{50});
co_return 42;
}();
auto fastResult = co_await coro::timed_wait(
std::move(fastTask), std::chrono::milliseconds{100});
EXPECT_TRUE(fastResult);
EXPECT_EQ(42, *fastResult);
struct ExpectedException : public std::runtime_error {
ExpectedException() : std::runtime_error("ExpectedException") {}
};
auto throwingTask = []() -> coro::Task<void> {
co_await futures::sleep(std::chrono::milliseconds{50});
throw ExpectedException();
}();
EXPECT_THROW(
(void)co_await coro::timed_wait(
std::move(throwingTask), std::chrono::milliseconds{100}),
ExpectedException);
auto slowTask = []() -> coro::Task<int> {
co_await futures::sleep(std::chrono::milliseconds{200});
co_return 42;
}();
auto slowResult = co_await coro::timed_wait(
std::move(slowTask), std::chrono::milliseconds{100});
EXPECT_FALSE(slowResult);
co_return;
}
TEST(Coro, TimedWaitTask) {
coro::blockingWait(taskTimedWaitTask());
} }
template <int value> template <int value>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment