Commit a166df21 authored by Jon Maltiel Swenson's avatar Jon Maltiel Swenson Committed by facebook-github-bot-1

Activate server timeout after write success

Summary: Start server timeout after socket write succeeds in mcrouter.  Add neceessary Fibers logic to enable this behavior.

Reviewed By: pavlo-fb

Differential Revision: D2613344

fb-gh-sync-id: 1bc0fbe8b325a3e91cd010f89104b83ebf183679
parent 8cb94e9b
...@@ -15,7 +15,10 @@ ...@@ -15,7 +15,10 @@
*/ */
#include "Baton.h" #include "Baton.h"
#include <chrono>
#include <folly/detail/MemoryIdler.h> #include <folly/detail/MemoryIdler.h>
#include <folly/experimental/fibers/FiberManager.h>
namespace folly { namespace fibers { namespace folly { namespace fibers {
...@@ -23,6 +26,13 @@ void Baton::wait() { ...@@ -23,6 +26,13 @@ void Baton::wait() {
wait([](){}); wait([](){});
} }
void Baton::wait(TimeoutHandler& timeoutHandler) {
timeoutHandler.setBaton(this);
timeoutHandler.setFiberManager(FiberManager::getFiberManagerUnsafe());
wait();
timeoutHandler.cancelTimeout();
}
bool Baton::timed_wait(TimeoutController::Duration timeout) { bool Baton::timed_wait(TimeoutController::Duration timeout) {
return timed_wait(timeout, [](){}); return timed_wait(timeout, [](){});
} }
...@@ -153,4 +163,24 @@ void Baton::reset() { ...@@ -153,4 +163,24 @@ void Baton::reset() {
waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);; waitingFiber_.store(NO_WAITER, std::memory_order_relaxed);;
} }
void Baton::TimeoutHandler::scheduleTimeout(uint32_t timeoutMs) {
assert(fiberManager_ != nullptr);
assert(baton_ != nullptr);
if (timeoutMs > 0) {
timeoutPtr_ = fiberManager_->timeoutManager_->registerTimeout(
[baton = baton_]() {
if (!baton->try_wait()) {
baton->postHelper(TIMEOUT);
}
},
std::chrono::milliseconds(timeoutMs));
}
}
void Baton::TimeoutHandler::cancelTimeout() {
if (timeoutPtr_) {
fiberManager_->timeoutManager_->cancel(timeoutPtr_);
}
}
}} }}
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
namespace folly { namespace fibers { namespace folly { namespace fibers {
class Fiber; class Fiber;
class FiberManager;
/** /**
* @class Baton * @class Baton
...@@ -32,6 +33,8 @@ class Fiber; ...@@ -32,6 +33,8 @@ class Fiber;
*/ */
class Baton { class Baton {
public: public:
class TimeoutHandler;
Baton(); Baton();
~Baton() {} ~Baton() {}
...@@ -41,6 +44,15 @@ class Baton { ...@@ -41,6 +44,15 @@ class Baton {
*/ */
void wait(); void wait();
/**
* Put active fiber to sleep indefinitely. However, timeoutHandler may
* be used elsewhere on the same thread in order to schedule a wakeup
* for the active fiber. Users of timeoutHandler must be on the same thread
* as the active fiber and may only schedule one timeout, which must occur
* after the active fiber calls wait.
*/
void wait(TimeoutHandler& timeoutHandler);
/** /**
* Puts active fiber to sleep. Returns when post is called. * Puts active fiber to sleep. Returns when post is called.
* *
...@@ -98,6 +110,35 @@ class Baton { ...@@ -98,6 +110,35 @@ class Baton {
*/ */
void reset(); void reset();
/**
* Provides a way to schedule a wakeup for a wait()ing fiber.
* A TimeoutHandler must be passed to Baton::wait(TimeoutHandler&)
* before timeouts are scheduled/cancelled. It is only safe to use the
* TimeoutHandler on the same thread as the wait()ing fiber.
* scheduleTimeout() may only be called once prior to the end of the
* associated Baton's life.
*/
class TimeoutHandler {
public:
void scheduleTimeout(uint32_t timeoutMs);
void cancelTimeout();
private:
friend class Baton;
void setFiberManager(FiberManager* fiberManager) {
fiberManager_ = fiberManager;
}
void setBaton(Baton* baton) {
baton_ = baton;
}
FiberManager* fiberManager_{nullptr};
Baton* baton_{nullptr};
intptr_t timeoutPtr_{0};
};
private: private:
enum { enum {
/** /**
......
...@@ -1465,6 +1465,38 @@ TEST(FiberManager, resizePeriodically) { ...@@ -1465,6 +1465,38 @@ TEST(FiberManager, resizePeriodically) {
EXPECT_EQ(5, manager.fibersPoolSize()); EXPECT_EQ(5, manager.fibersPoolSize());
} }
TEST(FiberManager, batonWaitTimeoutHandler) {
FiberManager manager(folly::make_unique<EventBaseLoopController>());
folly::EventBase evb;
dynamic_cast<EventBaseLoopController&>(manager.loopController())
.attachEventBase(evb);
size_t fibersRun = 0;
Baton baton;
Baton::TimeoutHandler timeoutHandler;
manager.addTask([&]() {
baton.wait(timeoutHandler);
++fibersRun;
});
manager.loopUntilNoReady();
EXPECT_FALSE(baton.try_wait());
EXPECT_EQ(0, fibersRun);
timeoutHandler.scheduleTimeout(250);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_FALSE(baton.try_wait());
EXPECT_EQ(0, fibersRun);
evb.loopOnce();
manager.loopUntilNoReady();
EXPECT_EQ(1, fibersRun);
}
static size_t sNumAwaits; static size_t sNumAwaits;
void runBenchmark(size_t numAwaits, size_t toSend) { void runBenchmark(size_t numAwaits, size_t toSend) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment