Commit 0f00cc10 authored by Geoff Oakham's avatar Geoff Oakham Committed by Facebook GitHub Bot

TokenBucketStorage primitive extracted from BasicDynamicTokenBucket

Summary:
Extract the underlying primitive that powers BasicDynamicTokenBucket, as TokenBucketStroage. This abstraction speaks to "tokens" (instead of time) and allows for arbitrary borrowing.  It also handles thread safety.

This allows other bucket token based libraries to be created (eg. a Cooperative Scheduler) using the same primitive.

API compatibility has been maintained with the existing classes.

Reviewed By: yfeldblum, rohithmenon

Differential Revision: D29522354

fbshipit-source-id: 010bd9caf3f9c3726621e87d23e5c564773424b7
parent 7a18d182
...@@ -40,6 +40,193 @@ struct TokenBucketPolicyDefault { ...@@ -40,6 +40,193 @@ struct TokenBucketPolicyDefault {
using concurrent = std::true_type; using concurrent = std::true_type;
}; };
/**
* Thread-safe (atomic) token bucket primitive.
*
* This primitive can be used to implement a token bucket
* (http://en.wikipedia.org/wiki/Token_bucket). It handles
* the storage of the state in an atomic way, and presents
* an interface dealing with tokens, rate, burstSize and time.
*
* This primitive records the last time it was updated. This allows the
* token bucket to add tokens "just in time" when tokens are requested.
*
* @tparam Policy A policy.
*/
template <typename Policy = TokenBucketPolicyDefault>
class TokenBucketStorage {
template <typename T>
using Atom = typename Policy::template atom<T>;
using Align = typename Policy::align;
using Clock = typename Policy::clock; // do we need clock here?
using Concurrent = typename Policy::concurrent;
static_assert(Clock::is_steady, "clock must be steady"); // do we need clock?
public:
/**
* Constructor.
*
* @param zeroTime Initial time at which to consider the token bucket
* starting to fill. Defaults to 0, so by default token
* buckets are "full" after construction.
*/
explicit TokenBucketStorage(double zeroTime = 0) noexcept
: zeroTime_(zeroTime) {}
/**
* Copy constructor.
*
* Thread-safe. (Copy constructors of derived classes may not be thread-safe
* however.)
*/
TokenBucketStorage(const TokenBucketStorage& other) noexcept
: zeroTime_(other.zeroTime_.load(std::memory_order_relaxed)) {}
/**
* Copy-assignment operator.
*
* Warning: not thread safe for the object being assigned to (including
* self-assignment). Thread-safe for the other object.
*/
TokenBucketStorage& operator=(const TokenBucketStorage& other) noexcept {
zeroTime_.store(other.zeroTime(), std::memory_order_relaxed);
return *this;
}
/**
* Re-initialize token bucket.
*
* Thread-safe.
*
* @param zeroTime Initial time at which to consider the token bucket
* starting to fill. Defaults to 0, so by default token
* bucket is reset to "full".
*/
void reset(double zeroTime = 0) noexcept {
zeroTime_.store(zeroTime, std::memory_order_relaxed);
}
/**
* Returns the number of tokens currently available. This could be negative
* (if in debt); will be a most burstSize.
*
*
* Thread-safe (but returned values may immediately be outdated).
*/
double available(
double rate, double burstSize, double nowInSeconds) const noexcept {
assert(rate > 0);
assert(burstSize > 0);
double zt = this->zeroTime_.load(std::memory_order_relaxed);
return std::min((nowInSeconds - zt) * rate, burstSize);
}
/**
* Consume tokens at the given rate/burst/time.
*
* Consumption is actually done by the callback function: it's given a
* reference with the number of available tokens and returns the number
* consumed. Typically the return value would be between 0.0 and available,
* but there are no restrictions.
*
* Note: the callback may be called multiple times, so please no side-effects
*/
template <typename Callback>
double consume(
double rate,
double burstSize,
double nowInSeconds,
const Callback& callback) {
assert(rate > 0);
assert(burstSize > 0);
double zeroTimeOld;
double zeroTimeNew;
double consumed;
do {
zeroTimeOld = zeroTime();
double tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
consumed = callback(tokens);
double tokensNew = tokens - consumed;
if (consumed == 0.0) {
return consumed;
}
zeroTimeNew = nowInSeconds - tokensNew / rate;
} while (UNLIKELY(
!compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));
return consumed;
}
/**
* returns the time at which the bucket will have `target` tokens available.
*
* Caution: it doesn't make sense to ask about target > burstSize
*
* Eg.
* // time debt repaid
* bucket.timeWhenBucket(rate, 0);
*
* // time bucket is full
* bucket.timeWhenBucket(rate, burstSize);
*/
double timeWhenBucket(double rate, double target) {
return zeroTime() + target / rate;
}
/**
* Return extra tokens back to the bucket.
*
* Thread-safe.
*/
void returnTokens(double tokensToReturn, double rate) {
assert(rate > 0);
returnTokensImpl(tokensToReturn, rate);
}
private:
/**
* Adjust zeroTime based on rate and tokenCount and return the new value of
* zeroTime_. Note: Token count can be negative to move the zeroTime_
* into the future.
*/
double returnTokensImpl(double tokenCount, double rate) {
auto zeroTimeOld = zeroTime_.load(std::memory_order_relaxed);
double zeroTimeNew;
do {
zeroTimeNew = zeroTimeOld - tokenCount / rate;
} while (UNLIKELY(
!compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));
return zeroTimeNew;
}
static bool compare_exchange_weak_relaxed(
Atom<double>& atom, double& expected, double zeroTime) {
if (Concurrent::value) {
return atom.compare_exchange_weak(
expected, zeroTime, std::memory_order_relaxed);
} else {
return atom.store(zeroTime, std::memory_order_relaxed), true;
}
}
double zeroTime() const {
return this->zeroTime_.load(std::memory_order_relaxed);
}
static constexpr size_t AlignZeroTime =
constexpr_max(Align::value, alignof(Atom<double>));
alignas(AlignZeroTime) Atom<double> zeroTime_;
};
/** /**
* Thread-safe (atomic) token bucket implementation. * Thread-safe (atomic) token bucket implementation.
* *
...@@ -85,7 +272,7 @@ class BasicDynamicTokenBucket { ...@@ -85,7 +272,7 @@ class BasicDynamicTokenBucket {
* buckets are "full" after construction. * buckets are "full" after construction.
*/ */
explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept
: zeroTime_(zeroTime) {} : bucket_(zeroTime) {}
/** /**
* Copy constructor. * Copy constructor.
...@@ -94,21 +281,7 @@ class BasicDynamicTokenBucket { ...@@ -94,21 +281,7 @@ class BasicDynamicTokenBucket {
* however.) * however.)
*/ */
BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept
: zeroTime_(other.zeroTime_.load(std::memory_order_relaxed)) {} : bucket_(other.bucket_) {}
/**
* Copy-assignment operator.
*
* Warning: not thread safe for the object being assigned to (including
* self-assignment). Thread-safe for the other object.
*/
BasicDynamicTokenBucket& operator=(
const BasicDynamicTokenBucket& other) noexcept {
zeroTime_.store(
other.zeroTime_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
return *this;
}
/** /**
* Re-initialize token bucket. * Re-initialize token bucket.
...@@ -119,9 +292,7 @@ class BasicDynamicTokenBucket { ...@@ -119,9 +292,7 @@ class BasicDynamicTokenBucket {
* starting to fill. Defaults to 0, so by default token * starting to fill. Defaults to 0, so by default token
* bucket is reset to "full". * bucket is reset to "full".
*/ */
void reset(double zeroTime = 0) noexcept { void reset(double zeroTime = 0) noexcept { bucket_.reset(zeroTime); }
zeroTime_.store(zeroTime, std::memory_order_relaxed);
}
/** /**
* Returns the current time in seconds since Epoch. * Returns the current time in seconds since Epoch.
...@@ -155,18 +326,17 @@ class BasicDynamicTokenBucket { ...@@ -155,18 +326,17 @@ class BasicDynamicTokenBucket {
assert(rate > 0); assert(rate > 0);
assert(burstSize > 0); assert(burstSize > 0);
if (nowInSeconds <= zeroTime_.load(std::memory_order_relaxed)) { if (bucket_.available(rate, burstSize, nowInSeconds) < 0.0) {
return 0; return 0;
} }
return consumeImpl( double consumed = bucket_.consume(
rate, burstSize, nowInSeconds, [toConsume](double& tokens) { rate, burstSize, nowInSeconds, [toConsume](double available) {
if (tokens < toConsume) { return available < toConsume ? 0.0 : toConsume;
return false;
}
tokens -= toConsume;
return true;
}); });
assert(consumed == toConsume || consumed == 0.0);
return consumed == toConsume;
} }
/** /**
...@@ -192,28 +362,19 @@ class BasicDynamicTokenBucket { ...@@ -192,28 +362,19 @@ class BasicDynamicTokenBucket {
assert(rate > 0); assert(rate > 0);
assert(burstSize > 0); assert(burstSize > 0);
if (nowInSeconds <= zeroTime_.load(std::memory_order_relaxed)) { if (bucket_.available(rate, burstSize, nowInSeconds) <= 0.0) {
return 0; return 0;
} }
double consumed; double consumed = bucket_.consume(
consumeImpl( rate, burstSize, nowInSeconds, [toConsume](double available) {
rate, burstSize, nowInSeconds, [&consumed, toConsume](double& tokens) { return constexpr_min(available, toConsume);
if (tokens < toConsume) {
consumed = tokens;
tokens = 0.0;
} else {
consumed = toConsume;
tokens -= toConsume;
}
return true;
}); });
return consumed; return consumed;
} }
/** /**
* Return extra tokens back to the bucket. This will move the zeroTime_ * Return extra tokens back to the bucket.
* value back based on the rate.
* *
* Thread-safe. * Thread-safe.
*/ */
...@@ -221,13 +382,13 @@ class BasicDynamicTokenBucket { ...@@ -221,13 +382,13 @@ class BasicDynamicTokenBucket {
assert(rate > 0); assert(rate > 0);
assert(tokensToReturn > 0); assert(tokensToReturn > 0);
returnTokensImpl(tokensToReturn, rate); bucket_.returnTokens(tokensToReturn, rate);
} }
/** /**
* Like consumeOrDrain but the call will always satisfy the asked for count. * Like consumeOrDrain but the call will always satisfy the asked for count.
* It does so by borrowing tokens from the future (zeroTime_ will move * It does so by borrowing tokens from the future if the currently available
* forward) if the currently available count isn't sufficient. * count isn't sufficient.
* *
* Returns a folly::Optional<double>. The optional wont be set if the request * Returns a folly::Optional<double>. The optional wont be set if the request
* cannot be satisfied: only case is when it is larger than burstSize. The * cannot be satisfied: only case is when it is larger than burstSize. The
...@@ -262,8 +423,9 @@ class BasicDynamicTokenBucket { ...@@ -262,8 +423,9 @@ class BasicDynamicTokenBucket {
if (consumed > 0) { if (consumed > 0) {
toConsume -= consumed; toConsume -= consumed;
} else { } else {
double zeroTimeNew = returnTokensImpl(-toConsume, rate); bucket_.returnTokens(-toConsume, rate);
double napTime = std::max(0.0, zeroTimeNew - nowInSeconds); double debtPaid = bucket_.timeWhenBucket(rate, 0);
double napTime = std::max(0.0, debtPaid - nowInSeconds);
return napTime; return napTime;
} }
} }
...@@ -299,63 +461,11 @@ class BasicDynamicTokenBucket { ...@@ -299,63 +461,11 @@ class BasicDynamicTokenBucket {
double nowInSeconds = defaultClockNow()) const noexcept { double nowInSeconds = defaultClockNow()) const noexcept {
assert(rate > 0); assert(rate > 0);
assert(burstSize > 0); assert(burstSize > 0);
return std::max(0.0, bucket_.available(rate, burstSize, nowInSeconds));
double zt = this->zeroTime_.load(std::memory_order_relaxed);
if (nowInSeconds <= zt) {
return 0;
}
return std::min((nowInSeconds - zt) * rate, burstSize);
} }
private: private:
static bool compare_exchange_weak_relaxed( TokenBucketStorage<Policy> bucket_;
Atom<double>& atom, double& expected, double value) {
if (Concurrent::value) {
return atom.compare_exchange_weak(
expected, value, std::memory_order_relaxed);
} else {
return atom.store(value, std::memory_order_relaxed), true;
}
}
template <typename TCallback>
bool consumeImpl(
double rate,
double burstSize,
double nowInSeconds,
const TCallback& callback) {
auto zeroTimeOld = zeroTime_.load(std::memory_order_relaxed);
double zeroTimeNew;
do {
auto tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
if (!callback(tokens)) {
return false;
}
zeroTimeNew = nowInSeconds - tokens / rate;
} while (UNLIKELY(
!compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));
return true;
}
/**
* Adjust zeroTime based on rate and tokenCount and return the new value of
* zeroTime_. Note: Token count can be negative to move the zeroTime_ value
* into the future.
*/
double returnTokensImpl(double tokenCount, double rate) {
auto zeroTimeOld = zeroTime_.load(std::memory_order_relaxed);
double zeroTimeNew;
do {
zeroTimeNew = zeroTimeOld - tokenCount / rate;
} while (UNLIKELY(
!compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));
return zeroTimeNew;
}
static constexpr size_t AlignValue =
constexpr_max(Align::value, alignof(Atom<double>));
alignas(AlignValue) Atom<double> zeroTime_;
}; };
/** /**
...@@ -480,7 +590,7 @@ class BasicTokenBucket { ...@@ -480,7 +590,7 @@ class BasicTokenBucket {
} }
/** /**
* Returns extra token back to the bucket. * Returns extra token back to the bucket. Could be negative--it's all good.
*/ */
void returnTokens(double tokensToReturn) { void returnTokens(double tokensToReturn) {
return tokenBucket_.returnTokens(tokensToReturn, rate_); return tokenBucket_.returnTokens(tokensToReturn, rate_);
......
...@@ -35,6 +35,20 @@ TEST(TokenBucket, ReverseTime) { ...@@ -35,6 +35,20 @@ TEST(TokenBucket, ReverseTime) {
EXPECT_EQ(tokensBefore, tokenBucket.available()); EXPECT_EQ(tokensBefore, tokenBucket.available());
} }
TEST(TokenBucketTest, CtorAssign) {
BasicDynamicTokenBucket bucketA(100.0);
EXPECT_EQ(0, bucketA.available(10, 10, 90));
BasicDynamicTokenBucket bucketB(bucketA);
EXPECT_EQ(0, bucketB.available(10, 10, 90));
bucketA.reset(0.0);
EXPECT_EQ(10, bucketA.available(10, 10, 90));
bucketB = bucketA;
EXPECT_EQ(10, bucketB.available(10, 10, 90));
}
TEST_P(TokenBucketTest, sanity) { TEST_P(TokenBucketTest, sanity) {
std::pair<double, double> params = GetParam(); std::pair<double, double> params = GetParam();
double rate = params.first; double rate = params.first;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment