Commit 99e8fc1f authored by Abhinav Rai's avatar Abhinav Rai Committed by Facebook Github Bot

Add timeout support in ExecutorLoopController.

Summary:
Add timeout support to python executor. Executor uses a separate thread running a folly event base.
Whenever a timer operation is called, scheduleTimeout is executed in that event base.

Reviewed By: andriigrynenko

Differential Revision: D18201883

fbshipit-source-id: 337518bea52993461201d6706f438130673ef6ce
parent 60da8ef5
......@@ -15,12 +15,15 @@
*/
#pragma once
#include <folly/futures/Future.h>
namespace folly {
namespace fibers {
inline ExecutorLoopController::ExecutorLoopController(folly::Executor* executor)
: executor_(executor) {}
: executor_(executor),
timeoutManager_(executor_),
timer_(HHWheelTimer::newTimer(&timeoutManager_)) {}
inline ExecutorLoopController::~ExecutorLoopController() {}
......@@ -59,7 +62,7 @@ inline void ExecutorLoopController::scheduleThreadSafe() {
}
inline HHWheelTimer& ExecutorLoopController::timer() {
throw std::logic_error("Time schedule isn't supported by asyncio executor");
return *timer_;
}
} // namespace fibers
......
......@@ -25,6 +25,50 @@
namespace folly {
namespace fibers {
class ExecutorTimeoutManager : public TimeoutManager {
public:
explicit ExecutorTimeoutManager(folly::Executor* executor)
: executor_(executor) {}
ExecutorTimeoutManager(ExecutorTimeoutManager&&) = default;
ExecutorTimeoutManager& operator=(ExecutorTimeoutManager&&) = default;
ExecutorTimeoutManager(const ExecutorTimeoutManager&) = delete;
ExecutorTimeoutManager& operator=(const ExecutorTimeoutManager&) = delete;
void attachTimeoutManager(
AsyncTimeout* /* unused */,
InternalEnum /* unused */) final {}
void detachTimeoutManager(AsyncTimeout* /* unused */) final {
throw std::logic_error(
"detachTimeoutManager() call isn't supported by ExecutorTimeoutManager.");
}
bool scheduleTimeout(AsyncTimeout* obj, timeout_type timeout) final {
folly::futures::sleep(timeout).via(executor_).thenValue(
[obj](folly::Unit) { obj->timeoutExpired(); });
return true;
}
void cancelTimeout(AsyncTimeout* /* unused */) final {
throw std::logic_error(
"cancelTimeout() call isn't supported by ExecutorTimeoutManager.");
}
void bumpHandlingTime() final {
throw std::logic_error(
"bumpHandlingTime() call isn't supported by ExecutorTimeoutManager.");
}
bool isInTimeoutManagerThread() final {
throw std::logic_error(
"isInTimeoutManagerThread() call isn't supported by ExecutorTimeoutManager.");
}
private:
folly::Executor* executor_;
};
/**
* A fiber loop controller that works for arbitrary folly::Executor
*/
......@@ -41,6 +85,8 @@ class ExecutorLoopController : public fibers::LoopController {
folly::Executor* executor_;
Executor::KeepAlive<> executorKeepAlive_;
fibers::FiberManager* fm_{nullptr};
ExecutorTimeoutManager timeoutManager_;
HHWheelTimer::UniquePtr timer_;
void setFiberManager(fibers::FiberManager* fm) override;
void schedule() override;
......
......@@ -23,10 +23,12 @@
#include <folly/futures/Future.h>
#include <folly/Conv.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/fibers/AddTasks.h>
#include <folly/fibers/AtomicBatchDispatcher.h>
#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/ExecutorLoopController.h>
#include <folly/fibers/FiberManager.h>
#include <folly/fibers/FiberManagerMap.h>
#include <folly/fibers/GenericBaton.h>
......@@ -1473,6 +1475,28 @@ TEST(FiberManager, batonWaitTimeoutHandler) {
EXPECT_EQ(1, fibersRun);
}
TEST(FiberManager, batonWaitTimeoutHandlerExecutor) {
Baton baton2;
FiberManager manager(
std::make_unique<ExecutorLoopController>(folly::getCPUExecutor().get()));
auto task = [&](size_t timeout_ms) {
Baton baton;
auto start = std::chrono::steady_clock::now();
auto res = baton.try_wait_for(std::chrono::milliseconds(timeout_ms));
auto finish = std::chrono::steady_clock::now();
EXPECT_FALSE(res);
auto duration_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(finish - start)
.count();
EXPECT_GT(duration_ms, timeout_ms);
EXPECT_LT(duration_ms, timeout_ms + 100);
baton2.post();
};
manager.addTask([&]() { task(300); });
baton2.wait();
}
TEST(FiberManager, batonWaitTimeoutMany) {
FiberManager manager(std::make_unique<EventBaseLoopController>());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment