Commit df00b343 authored by Marc Celani's avatar Marc Celani Committed by Facebook Github Bot

Be able to take now as input

Summary: Take now as input to improve performance when updating many buffered stats at once

Reviewed By: yfeldblum

Differential Revision: D7906107

fbshipit-source-id: cf09bc2c2673b0a336628b3e6b31ee2d55dbb10c
parent daf3e6fc
...@@ -35,14 +35,15 @@ SimpleQuantileEstimator<ClockT>::SimpleQuantileEstimator() ...@@ -35,14 +35,15 @@ SimpleQuantileEstimator<ClockT>::SimpleQuantileEstimator()
template <typename ClockT> template <typename ClockT>
QuantileEstimates SimpleQuantileEstimator<ClockT>::estimateQuantiles( QuantileEstimates SimpleQuantileEstimator<ClockT>::estimateQuantiles(
Range<const double*> quantiles) { Range<const double*> quantiles,
auto digest = bufferedDigest_.get(); TimePoint now) {
auto digest = bufferedDigest_.get(now);
return detail::estimatesFromDigest(digest, quantiles); return detail::estimatesFromDigest(digest, quantiles);
} }
template <typename ClockT> template <typename ClockT>
void SimpleQuantileEstimator<ClockT>::addValue(double value) { void SimpleQuantileEstimator<ClockT>::addValue(double value, TimePoint now) {
bufferedDigest_.append(value); bufferedDigest_.append(value, now);
} }
template <typename ClockT> template <typename ClockT>
...@@ -53,15 +54,18 @@ SlidingWindowQuantileEstimator<ClockT>::SlidingWindowQuantileEstimator( ...@@ -53,15 +54,18 @@ SlidingWindowQuantileEstimator<ClockT>::SlidingWindowQuantileEstimator(
template <typename ClockT> template <typename ClockT>
QuantileEstimates SlidingWindowQuantileEstimator<ClockT>::estimateQuantiles( QuantileEstimates SlidingWindowQuantileEstimator<ClockT>::estimateQuantiles(
Range<const double*> quantiles) { Range<const double*> quantiles,
auto digests = bufferedSlidingWindow_.get(); TimePoint now) {
auto digests = bufferedSlidingWindow_.get(now);
auto digest = TDigest::merge(digests); auto digest = TDigest::merge(digests);
return detail::estimatesFromDigest(digest, quantiles); return detail::estimatesFromDigest(digest, quantiles);
} }
template <typename ClockT> template <typename ClockT>
void SlidingWindowQuantileEstimator<ClockT>::addValue(double value) { void SlidingWindowQuantileEstimator<ClockT>::addValue(
bufferedSlidingWindow_.append(value); double value,
TimePoint now) {
bufferedSlidingWindow_.append(value, now);
} }
} // namespace folly } // namespace folly
...@@ -30,25 +30,42 @@ struct QuantileEstimates { ...@@ -30,25 +30,42 @@ struct QuantileEstimates {
std::vector<std::pair<double, double>> quantiles; std::vector<std::pair<double, double>> quantiles;
}; };
template <typename ClockT>
class QuantileEstimator { class QuantileEstimator {
public: public:
using TimePoint = typename ClockT::time_point;
virtual ~QuantileEstimator() {} virtual ~QuantileEstimator() {}
QuantileEstimates estimateQuantiles(Range<const double*> quantiles) {
return estimateQuantiles(quantiles, ClockT::now());
}
virtual QuantileEstimates estimateQuantiles( virtual QuantileEstimates estimateQuantiles(
Range<const double*> quantiles) = 0; Range<const double*> quantiles,
virtual void addValue(double value) = 0; TimePoint now) = 0;
void addValue(double value) {
addValue(value, ClockT::now());
}
virtual void addValue(double value, TimePoint now) = 0;
}; };
/* /*
* A QuantileEstimator that buffers writes for 1 second. * A QuantileEstimator that buffers writes for 1 second.
*/ */
template <typename ClockT = std::chrono::steady_clock> template <typename ClockT = std::chrono::steady_clock>
class SimpleQuantileEstimator : public QuantileEstimator { class SimpleQuantileEstimator : public QuantileEstimator<ClockT> {
public: public:
using TimePoint = typename ClockT::time_point;
SimpleQuantileEstimator(); SimpleQuantileEstimator();
QuantileEstimates estimateQuantiles(Range<const double*> quantiles) override; QuantileEstimates estimateQuantiles(
void addValue(double value) override; Range<const double*> quantiles,
TimePoint now) override;
void addValue(double value, TimePoint now) override;
private: private:
detail::BufferedDigest<TDigest, ClockT> bufferedDigest_; detail::BufferedDigest<TDigest, ClockT> bufferedDigest_;
...@@ -59,14 +76,18 @@ class SimpleQuantileEstimator : public QuantileEstimator { ...@@ -59,14 +76,18 @@ class SimpleQuantileEstimator : public QuantileEstimator {
* constructor). Values are buffered for windowDuration. * constructor). Values are buffered for windowDuration.
*/ */
template <typename ClockT = std::chrono::steady_clock> template <typename ClockT = std::chrono::steady_clock>
class SlidingWindowQuantileEstimator : public QuantileEstimator { class SlidingWindowQuantileEstimator : public QuantileEstimator<ClockT> {
public: public:
using TimePoint = typename ClockT::time_point;
SlidingWindowQuantileEstimator( SlidingWindowQuantileEstimator(
std::chrono::seconds windowDuration, std::chrono::seconds windowDuration,
size_t nWindows = 60); size_t nWindows = 60);
QuantileEstimates estimateQuantiles(Range<const double*> quantiles) override; QuantileEstimates estimateQuantiles(
void addValue(double value) override; Range<const double*> quantiles,
TimePoint now) override;
void addValue(double value, TimePoint now) override;
private: private:
detail::BufferedSlidingWindow<TDigest, ClockT> bufferedSlidingWindow_; detail::BufferedSlidingWindow<TDigest, ClockT> bufferedSlidingWindow_;
......
...@@ -35,8 +35,7 @@ BufferedStat<DigestT, ClockT>::BufferedStat( ...@@ -35,8 +35,7 @@ BufferedStat<DigestT, ClockT>::BufferedStat(
} }
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
void BufferedStat<DigestT, ClockT>::append(double value) { void BufferedStat<DigestT, ClockT>::append(double value, TimePoint now) {
auto now = ClockT::now();
if (UNLIKELY(now > expiry_.load(std::memory_order_acquire).tp)) { if (UNLIKELY(now > expiry_.load(std::memory_order_acquire).tp)) {
std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t()); std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t());
if (g.owns_lock()) { if (g.owns_lock()) {
...@@ -47,8 +46,8 @@ void BufferedStat<DigestT, ClockT>::append(double value) { ...@@ -47,8 +46,8 @@ void BufferedStat<DigestT, ClockT>::append(double value) {
} }
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
typename ClockT::time_point BufferedStat<DigestT, ClockT>::roundUp( typename BufferedStat<DigestT, ClockT>::TimePoint
typename ClockT::time_point t) { BufferedStat<DigestT, ClockT>::roundUp(TimePoint t) {
auto remainder = t.time_since_epoch() % bufferDuration_; auto remainder = t.time_since_epoch() % bufferDuration_;
if (remainder.count() != 0) { if (remainder.count() != 0) {
return t + bufferDuration_ - remainder; return t + bufferDuration_ - remainder;
...@@ -57,8 +56,8 @@ typename ClockT::time_point BufferedStat<DigestT, ClockT>::roundUp( ...@@ -57,8 +56,8 @@ typename ClockT::time_point BufferedStat<DigestT, ClockT>::roundUp(
} }
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired() { std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired(
auto now = ClockT::now(); TimePoint now) {
std::unique_lock<SharedMutex> g(mutex_); std::unique_lock<SharedMutex> g(mutex_);
doUpdate(now, g); doUpdate(now, g);
return g; return g;
...@@ -66,7 +65,7 @@ std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired() { ...@@ -66,7 +65,7 @@ std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired() {
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
void BufferedStat<DigestT, ClockT>::doUpdate( void BufferedStat<DigestT, ClockT>::doUpdate(
typename ClockT::time_point now, TimePoint now,
const std::unique_lock<SharedMutex>& g) { const std::unique_lock<SharedMutex>& g) {
DCHECK(g.owns_lock()); DCHECK(g.owns_lock());
// Check that no other thread has performed the slide after the check // Check that no other thread has performed the slide after the check
...@@ -87,16 +86,16 @@ BufferedDigest<DigestT, ClockT>::BufferedDigest( ...@@ -87,16 +86,16 @@ BufferedDigest<DigestT, ClockT>::BufferedDigest(
digest_(digestSize) {} digest_(digestSize) {}
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
DigestT BufferedDigest<DigestT, ClockT>::get() { DigestT BufferedDigest<DigestT, ClockT>::get(TimePoint now) {
auto g = this->updateIfExpired(); auto g = this->updateIfExpired(now);
return digest_; return digest_;
} }
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
void BufferedDigest<DigestT, ClockT>::onNewDigest( void BufferedDigest<DigestT, ClockT>::onNewDigest(
DigestT digest, DigestT digest,
typename ClockT::time_point /*newExpiry*/, TimePoint /*newExpiry*/,
typename ClockT::time_point /*oldExpiry*/, TimePoint /*oldExpiry*/,
const std::unique_lock<SharedMutex>& /*g*/) { const std::unique_lock<SharedMutex>& /*g*/) {
std::array<DigestT, 2> a{{digest_, std::move(digest)}}; std::array<DigestT, 2> a{{digest_, std::move(digest)}};
digest_ = DigestT::merge(a); digest_ = DigestT::merge(a);
...@@ -112,10 +111,11 @@ BufferedSlidingWindow<DigestT, ClockT>::BufferedSlidingWindow( ...@@ -112,10 +111,11 @@ BufferedSlidingWindow<DigestT, ClockT>::BufferedSlidingWindow(
slidingWindow_([=]() { return DigestT(digestSize); }, nBuckets) {} slidingWindow_([=]() { return DigestT(digestSize); }, nBuckets) {}
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
std::vector<DigestT> BufferedSlidingWindow<DigestT, ClockT>::get() { std::vector<DigestT> BufferedSlidingWindow<DigestT, ClockT>::get(
TimePoint now) {
std::vector<DigestT> digests; std::vector<DigestT> digests;
{ {
auto g = this->updateIfExpired(); auto g = this->updateIfExpired(now);
digests = slidingWindow_.get(); digests = slidingWindow_.get();
} }
digests.erase( digests.erase(
...@@ -130,8 +130,8 @@ std::vector<DigestT> BufferedSlidingWindow<DigestT, ClockT>::get() { ...@@ -130,8 +130,8 @@ std::vector<DigestT> BufferedSlidingWindow<DigestT, ClockT>::get() {
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
void BufferedSlidingWindow<DigestT, ClockT>::onNewDigest( void BufferedSlidingWindow<DigestT, ClockT>::onNewDigest(
DigestT digest, DigestT digest,
typename ClockT::time_point newExpiry, TimePoint newExpiry,
typename ClockT::time_point oldExpiry, TimePoint oldExpiry,
const std::unique_lock<SharedMutex>& /*g*/) { const std::unique_lock<SharedMutex>& /*g*/) {
auto diff = newExpiry - oldExpiry; auto diff = newExpiry - oldExpiry;
slidingWindow_.slide(diff / this->bufferDuration_); slidingWindow_.slide(diff / this->bufferDuration_);
......
...@@ -31,6 +31,8 @@ namespace detail { ...@@ -31,6 +31,8 @@ namespace detail {
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
class BufferedStat { class BufferedStat {
public: public:
using TimePoint = typename ClockT::time_point;
BufferedStat() = delete; BufferedStat() = delete;
BufferedStat( BufferedStat(
...@@ -40,7 +42,7 @@ class BufferedStat { ...@@ -40,7 +42,7 @@ class BufferedStat {
virtual ~BufferedStat() {} virtual ~BufferedStat() {}
void append(double value); void append(double value, TimePoint now = ClockT::now());
protected: protected:
// https://www.mail-archive.com/llvm-bugs@lists.llvm.org/msg18280.html // https://www.mail-archive.com/llvm-bugs@lists.llvm.org/msg18280.html
...@@ -49,9 +51,9 @@ class BufferedStat { ...@@ -49,9 +51,9 @@ class BufferedStat {
public: public:
TimePointHolder() noexcept {} TimePointHolder() noexcept {}
TimePointHolder(typename ClockT::time_point t) : tp(t) {} TimePointHolder(TimePoint t) : tp(t) {}
typename ClockT::time_point tp; TimePoint tp;
}; };
const typename ClockT::duration bufferDuration_; const typename ClockT::duration bufferDuration_;
...@@ -60,20 +62,18 @@ class BufferedStat { ...@@ -60,20 +62,18 @@ class BufferedStat {
virtual void onNewDigest( virtual void onNewDigest(
DigestT digest, DigestT digest,
typename ClockT::time_point newExpiry, TimePoint newExpiry,
typename ClockT::time_point oldExpiry, TimePoint oldExpiry,
const std::unique_lock<SharedMutex>& g) = 0; const std::unique_lock<SharedMutex>& g) = 0;
std::unique_lock<SharedMutex> updateIfExpired(); std::unique_lock<SharedMutex> updateIfExpired(TimePoint now);
private: private:
DigestBuilder<DigestT> digestBuilder_; DigestBuilder<DigestT> digestBuilder_;
void doUpdate( void doUpdate(TimePoint now, const std::unique_lock<SharedMutex>& g);
typename ClockT::time_point now,
const std::unique_lock<SharedMutex>& g);
typename ClockT::time_point roundUp(typename ClockT::time_point t); TimePoint roundUp(TimePoint t);
}; };
/* /*
...@@ -82,17 +82,19 @@ class BufferedStat { ...@@ -82,17 +82,19 @@ class BufferedStat {
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
class BufferedDigest : public BufferedStat<DigestT, ClockT> { class BufferedDigest : public BufferedStat<DigestT, ClockT> {
public: public:
using TimePoint = typename ClockT::time_point;
BufferedDigest( BufferedDigest(
typename ClockT::duration bufferDuration, typename ClockT::duration bufferDuration,
size_t bufferSize, size_t bufferSize,
size_t digestSize); size_t digestSize);
DigestT get(); DigestT get(TimePoint now = ClockT::now());
void onNewDigest( void onNewDigest(
DigestT digest, DigestT digest,
typename ClockT::time_point newExpiry, TimePoint newExpiry,
typename ClockT::time_point oldExpiry, TimePoint oldExpiry,
const std::unique_lock<SharedMutex>& g) final; const std::unique_lock<SharedMutex>& g) final;
private: private:
...@@ -106,18 +108,20 @@ class BufferedDigest : public BufferedStat<DigestT, ClockT> { ...@@ -106,18 +108,20 @@ class BufferedDigest : public BufferedStat<DigestT, ClockT> {
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
class BufferedSlidingWindow : public BufferedStat<DigestT, ClockT> { class BufferedSlidingWindow : public BufferedStat<DigestT, ClockT> {
public: public:
using TimePoint = typename ClockT::time_point;
BufferedSlidingWindow( BufferedSlidingWindow(
size_t nBuckets, size_t nBuckets,
typename ClockT::duration bufferDuration, typename ClockT::duration bufferDuration,
size_t bufferSize, size_t bufferSize,
size_t digestSize); size_t digestSize);
std::vector<DigestT> get(); std::vector<DigestT> get(TimePoint now = ClockT::now());
void onNewDigest( void onNewDigest(
DigestT digest, DigestT digest,
typename ClockT::time_point newExpiry, TimePoint newExpiry,
typename ClockT::time_point oldExpiry, TimePoint oldExpiry,
const std::unique_lock<SharedMutex>& g) final; const std::unique_lock<SharedMutex>& g) final;
private: private:
......
...@@ -35,7 +35,8 @@ struct MockClock { ...@@ -35,7 +35,8 @@ struct MockClock {
MockClock::time_point MockClock::Now = MockClock::time_point{}; MockClock::time_point MockClock::Now = MockClock::time_point{};
class QuantileEstimatorTest class QuantileEstimatorTest
: public ::testing::TestWithParam<std::shared_ptr<QuantileEstimator>> {}; : public ::testing::TestWithParam<
std::shared_ptr<QuantileEstimator<MockClock>>> {};
TEST_P(QuantileEstimatorTest, EstimateQuantiles) { TEST_P(QuantileEstimatorTest, EstimateQuantiles) {
auto estimator = GetParam(); auto estimator = GetParam();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment