Commit 9880573b authored by Dave Watson's avatar Dave Watson Committed by Facebook Github Bot

remove constant tick

Summary:
Preciesly calculate the next tick.  Currently only calculates the tick based on the lowest level of wheel timer, so it will still tick at least every WHEEL_SIZE intervals.

Currently the tick calculation is a linear scan over all the buckets, the next diff will optimize this.

Reviewed By: yfeldblum

Differential Revision: D3637096

fbshipit-source-id: 53dd596a2085c05c657cccbc7efba267bbd086a6
parent 7e7a7df7
...@@ -58,12 +58,7 @@ void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel, ...@@ -58,12 +58,7 @@ void HHWheelTimer::Callback::setScheduled(HHWheelTimer* wheel,
wheel_ = wheel; wheel_ = wheel;
// Only update the now_ time if we're not in a timeout expired callback expiration_ = getCurTime() + timeout;
if (wheel_->count_ == 0 && !wheel_->processingCallbacksGuard_) {
wheel_->now_ = getCurTime();
}
expiration_ = wheel_->now_ + timeout;
} }
void HHWheelTimer::Callback::cancelTimeoutImpl() { void HHWheelTimer::Callback::cancelTimeoutImpl() {
...@@ -85,8 +80,10 @@ HHWheelTimer::HHWheelTimer( ...@@ -85,8 +80,10 @@ HHWheelTimer::HHWheelTimer(
: AsyncTimeout(timeoutMananger, internal), : AsyncTimeout(timeoutMananger, internal),
interval_(intervalMS), interval_(intervalMS),
defaultTimeout_(defaultTimeoutMS), defaultTimeout_(defaultTimeoutMS),
nextTick_(1), lastTick_(1),
expireTick_(1),
count_(0), count_(0),
startTime_(getCurTime()),
processingCallbacksGuard_(nullptr) {} processingCallbacksGuard_(nullptr) {}
HHWheelTimer::~HHWheelTimer() { HHWheelTimer::~HHWheelTimer() {
...@@ -108,12 +105,13 @@ HHWheelTimer::~HHWheelTimer() { ...@@ -108,12 +105,13 @@ HHWheelTimer::~HHWheelTimer() {
void HHWheelTimer::scheduleTimeoutImpl(Callback* callback, void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
int64_t due = timeToWheelTicks(timeout) + nextTick_; auto nextTick = calcNextTick();
int64_t diff = due - nextTick_; int64_t due = timeToWheelTicks(timeout) + nextTick;
int64_t diff = due - nextTick;
CallbackList* list; CallbackList* list;
if (diff < 0) { if (diff < 0) {
list = &buckets_[0][nextTick_ & WHEEL_MASK]; list = &buckets_[0][nextTick & WHEEL_MASK];
} else if (diff < WHEEL_SIZE) { } else if (diff < WHEEL_SIZE) {
list = &buckets_[0][due & WHEEL_MASK]; list = &buckets_[0][due & WHEEL_MASK];
} else if (diff < 1 << (2 * WHEEL_BITS)) { } else if (diff < 1 << (2 * WHEEL_BITS)) {
...@@ -124,7 +122,7 @@ void HHWheelTimer::scheduleTimeoutImpl(Callback* callback, ...@@ -124,7 +122,7 @@ void HHWheelTimer::scheduleTimeoutImpl(Callback* callback,
/* in largest slot */ /* in largest slot */
if (diff > LARGEST_SLOT) { if (diff > LARGEST_SLOT) {
diff = LARGEST_SLOT; diff = LARGEST_SLOT;
due = diff + nextTick_; due = diff + nextTick;
} }
list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK]; list = &buckets_[3][(due >> 3 * WHEEL_BITS) & WHEEL_MASK];
} }
...@@ -138,13 +136,18 @@ void HHWheelTimer::scheduleTimeout(Callback* callback, ...@@ -138,13 +136,18 @@ void HHWheelTimer::scheduleTimeout(Callback* callback,
callback->context_ = RequestContext::saveContext(); callback->context_ = RequestContext::saveContext();
if (count_ == 0 && !processingCallbacksGuard_) { uint64_t prev = count_;
this->AsyncTimeout::scheduleTimeout(interval_.count()); count_++;
}
callback->setScheduled(this, timeout); callback->setScheduled(this, timeout);
scheduleTimeoutImpl(callback, timeout); scheduleTimeoutImpl(callback, timeout);
count_++;
/* If we're calling callbacks, timer will be reset after all
* callbacks are called.
*/
if (!processingCallbacksGuard_) {
scheduleNextTimeout();
}
} }
void HHWheelTimer::scheduleTimeout(Callback* callback) { void HHWheelTimer::scheduleTimeout(Callback* callback) {
...@@ -159,7 +162,7 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) { ...@@ -159,7 +162,7 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
while (!cbs.empty()) { while (!cbs.empty()) {
auto* cb = &cbs.front(); auto* cb = &cbs.front();
cbs.pop_front(); cbs.pop_front();
scheduleTimeoutImpl(cb, cb->getTimeRemaining(now_)); scheduleTimeoutImpl(cb, cb->getTimeRemaining(getCurTime()));
} }
// If tick is zero, timeoutExpired will cascade the next bucket. // If tick is zero, timeoutExpired will cascade the next bucket.
...@@ -167,6 +170,8 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) { ...@@ -167,6 +170,8 @@ bool HHWheelTimer::cascadeTimers(int bucket, int tick) {
} }
void HHWheelTimer::timeoutExpired() noexcept { void HHWheelTimer::timeoutExpired() noexcept {
auto nextTick = calcNextTick();
// If the last smart pointer for "this" is reset inside the callback's // If the last smart pointer for "this" is reset inside the callback's
// timeoutExpired(), then the guard will detect that it is time to bail from // timeoutExpired(), then the guard will detect that it is time to bail from
// this method. // this method.
...@@ -185,21 +190,19 @@ void HHWheelTimer::timeoutExpired() noexcept { ...@@ -185,21 +190,19 @@ void HHWheelTimer::timeoutExpired() noexcept {
// timeoutExpired() can only be invoked directly from the event base loop. // timeoutExpired() can only be invoked directly from the event base loop.
// It should never be invoked recursively. // It should never be invoked recursively.
// //
milliseconds catchup = std::chrono::duration_cast<milliseconds>( lastTick_ = expireTick_;
std::chrono::steady_clock::now().time_since_epoch()); while (lastTick_ < nextTick) {
while (now_ < catchup) { int idx = lastTick_ & WHEEL_MASK;
now_ += interval_;
int idx = nextTick_ & WHEEL_MASK; if (idx == 0) {
if (0 == idx) {
// Cascade timers // Cascade timers
if (cascadeTimers(1, (nextTick_ >> WHEEL_BITS) & WHEEL_MASK) && if (cascadeTimers(1, (lastTick_ >> WHEEL_BITS) & WHEEL_MASK) &&
cascadeTimers(2, (nextTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) { cascadeTimers(2, (lastTick_ >> (2 * WHEEL_BITS)) & WHEEL_MASK)) {
cascadeTimers(3, (nextTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK); cascadeTimers(3, (lastTick_ >> (3 * WHEEL_BITS)) & WHEEL_MASK);
} }
} }
nextTick_++; lastTick_++;
CallbackList* cbs = &buckets_[0][idx]; CallbackList* cbs = &buckets_[0][idx];
while (!cbs->empty()) { while (!cbs->empty()) {
auto* cb = &cbs->front(); auto* cb = &cbs->front();
...@@ -223,9 +226,7 @@ void HHWheelTimer::timeoutExpired() noexcept { ...@@ -223,9 +226,7 @@ void HHWheelTimer::timeoutExpired() noexcept {
return; return;
} }
} }
if (count_ > 0) { scheduleNextTimeout();
this->AsyncTimeout::scheduleTimeout(interval_.count());
}
} }
size_t HHWheelTimer::cancelAll() { size_t HHWheelTimer::cancelAll() {
...@@ -262,4 +263,41 @@ size_t HHWheelTimer::cancelAll() { ...@@ -262,4 +263,41 @@ size_t HHWheelTimer::cancelAll() {
return count; return count;
} }
void HHWheelTimer::scheduleNextTimeout() {
auto nextTick = calcNextTick();
long tick = 1;
if (nextTick & WHEEL_MASK) {
for (tick = nextTick & WHEEL_MASK; tick < WHEEL_SIZE; tick++) {
if (!buckets_[0][tick].empty()) {
break;
}
}
tick -= (nextTick - 1) & WHEEL_MASK;
}
if (count_ > 0) {
if (!this->AsyncTimeout::isScheduled() ||
(expireTick_ > tick + nextTick - 1)) {
this->AsyncTimeout::scheduleTimeout(interval_ * tick);
expireTick_ = tick + nextTick - 1;
}
} else {
this->AsyncTimeout::cancelTimeout();
}
}
int64_t HHWheelTimer::calcNextTick() {
auto intervals =
(getCurTime().count() - startTime_.count()) / interval_.count();
// Slow eventbases will have skew between the actual time and the
// callback time. To avoid racing the next scheduleNextTimeout()
// call, always schedule new timeouts against the actual
// timeoutExpired() time.
if (!processingCallbacksGuard_) {
return intervals;
} else {
return lastTick_;
}
}
} // folly } // folly
...@@ -33,14 +33,6 @@ namespace folly { ...@@ -33,14 +33,6 @@ namespace folly {
/** /**
* Hashed Hierarchical Wheel Timer * Hashed Hierarchical Wheel Timer
* *
* Comparison:
* AsyncTimeout - a single timeout.
* HHWheelTimer - a set of efficient timeouts with different interval,
* but timeouts are not exact.
*
* All of the above are O(1) in insertion, tick update and cancel
* This implementation ticks once every 10ms.
* We model timers as the number of ticks until the next * We model timers as the number of ticks until the next
* due event. We allow 32-bits of space to track this * due event. We allow 32-bits of space to track this
* due interval, and break that into 4 regions of 8 bits. * due interval, and break that into 4 regions of 8 bits.
...@@ -53,8 +45,11 @@ namespace folly { ...@@ -53,8 +45,11 @@ namespace folly {
* into a different bucket. * into a different bucket.
* *
* This technique results in a very cheap mechanism for * This technique results in a very cheap mechanism for
* maintaining time and timers, provided that we can maintain * maintaining time and timers.
* a consistent rate of ticks. *
* Unlike the original timer wheel paper, this implementation does
* *not* tick constantly, and instead calculates the exact next wakeup
* time.
*/ */
class HHWheelTimer : private folly::AsyncTimeout, class HHWheelTimer : private folly::AsyncTimeout,
public folly::DelayedDestruction { public folly::DelayedDestruction {
...@@ -292,12 +287,22 @@ class HHWheelTimer : private folly::AsyncTimeout, ...@@ -292,12 +287,22 @@ class HHWheelTimer : private folly::AsyncTimeout,
} }
bool cascadeTimers(int bucket, int tick); bool cascadeTimers(int bucket, int tick);
int64_t nextTick_; int64_t lastTick_;
int64_t expireTick_;
uint64_t count_; uint64_t count_;
std::chrono::milliseconds now_; std::chrono::milliseconds startTime_;
int64_t calcNextTick();
void scheduleNextTimeout();
bool* processingCallbacksGuard_; bool* processingCallbacksGuard_;
CallbackList timeouts; // Timeouts queued to run CallbackList timeouts; // Timeouts queued to run
std::chrono::milliseconds getCurTime() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch());
}
}; };
} // folly } // folly
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#include <folly/Random.h>
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
#include <folly/io/async/HHWheelTimer.h> #include <folly/io/async/HHWheelTimer.h>
#include <folly/io/async/test/UndelayedDestruction.h> #include <folly/io/async/test/UndelayedDestruction.h>
...@@ -272,3 +273,52 @@ TEST_F(HHWheelTimerTest, Level1) { ...@@ -272,3 +273,52 @@ TEST_F(HHWheelTimerTest, Level1) {
T_CHECK_TIMEOUT( T_CHECK_TIMEOUT(
start, t2.timestamps[0], milliseconds(300), milliseconds(256)); start, t2.timestamps[0], milliseconds(300), milliseconds(256));
} }
TEST_F(HHWheelTimerTest, Stress) {
StackWheelTimer t(&eventBase, milliseconds(1));
long timeoutcount = 10000;
TestTimeout timeouts[10000];
long runtimeouts = 0;
for (long i = 0; i < timeoutcount; i++) {
long newtimeout = Random::rand32(1, 10000);
if (Random::rand32(3)) {
// NOTE: hhwheel timer runs before eventbase runAfterDelay,
// so runAfterDelay cancelTimeout() must run at least one timerwheel
// before scheduleTimeout, to ensure it runs first.
newtimeout += 256;
t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
eventBase.runAfterDelay(
[&, i]() {
timeouts[i].fn = nullptr;
timeouts[i].cancelTimeout();
runtimeouts++;
LOG(INFO) << "Ran " << runtimeouts << " timeouts, cancelled";
},
newtimeout - 256);
timeouts[i].fn = [&, i, newtimeout]() {
LOG(INFO) << "FAIL:timer " << i << " still fired in " << newtimeout;
EXPECT_FALSE(true);
};
} else {
t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
timeouts[i].fn = [&, i]() {
timeoutcount++;
long newtimeout = Random::rand32(1, 10000);
t.scheduleTimeout(&timeouts[i], std::chrono::milliseconds(newtimeout));
runtimeouts++;
/* sleep override */ usleep(1000);
LOG(INFO) << "Ran " << runtimeouts << " timeouts of " << timeoutcount;
timeouts[i].fn = [&, i]() {
runtimeouts++;
LOG(INFO) << "Ran " << runtimeouts << " timeouts of " << timeoutcount;
};
};
}
}
LOG(INFO) << "RUNNING TEST";
eventBase.loop();
EXPECT_EQ(runtimeouts, timeoutcount);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment