Commit 5379b434 authored by Sarang Masti's avatar Sarang Masti Committed by Facebook Github Bot

Support forcing a digest update in BufferedStat

Summary:
Provide an option to force digest updates in BufferedStat. This
is useful in tests when one can't easily mock the clock.

Reviewed By: yfeldblum

Differential Revision: D9732899

fbshipit-source-id: e6a9c01d97a156bdded58fc01d8389bbf9edbbf7
parent da7798c6
...@@ -43,8 +43,14 @@ class SimpleQuantileEstimator { ...@@ -43,8 +43,14 @@ class SimpleQuantileEstimator {
QuantileEstimates estimateQuantiles( QuantileEstimates estimateQuantiles(
Range<const double*> quantiles, Range<const double*> quantiles,
TimePoint now = ClockT::now()); TimePoint now = ClockT::now());
void addValue(double value, TimePoint now = ClockT::now()); void addValue(double value, TimePoint now = ClockT::now());
/// Flush buffered values
void flush() {
bufferedDigest_.flush();
}
private: private:
detail::BufferedDigest<TDigest, ClockT> bufferedDigest_; detail::BufferedDigest<TDigest, ClockT> bufferedDigest_;
}; };
...@@ -65,8 +71,14 @@ class SlidingWindowQuantileEstimator { ...@@ -65,8 +71,14 @@ class SlidingWindowQuantileEstimator {
QuantileEstimates estimateQuantiles( QuantileEstimates estimateQuantiles(
Range<const double*> quantiles, Range<const double*> quantiles,
TimePoint now = ClockT::now()); TimePoint now = ClockT::now());
void addValue(double value, TimePoint now = ClockT::now()); void addValue(double value, TimePoint now = ClockT::now());
/// Flush buffered values
void flush() {
bufferedSlidingWindow_.flush();
}
private: private:
detail::BufferedSlidingWindow<TDigest, ClockT> bufferedSlidingWindow_; detail::BufferedSlidingWindow<TDigest, ClockT> bufferedSlidingWindow_;
}; };
......
...@@ -39,7 +39,7 @@ void BufferedStat<DigestT, ClockT>::append(double value, TimePoint now) { ...@@ -39,7 +39,7 @@ void BufferedStat<DigestT, ClockT>::append(double value, TimePoint now) {
if (UNLIKELY(now > expiry_.load(std::memory_order_relaxed).tp)) { if (UNLIKELY(now > expiry_.load(std::memory_order_relaxed).tp)) {
std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t()); std::unique_lock<SharedMutex> g(mutex_, std::try_to_lock_t());
if (g.owns_lock()) { if (g.owns_lock()) {
doUpdate(now, g); doUpdate(now, g, UpdateMode::OnExpiry);
} }
} }
digestBuilder_.append(value); digestBuilder_.append(value);
...@@ -59,18 +59,25 @@ template <typename DigestT, typename ClockT> ...@@ -59,18 +59,25 @@ template <typename DigestT, typename ClockT>
std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired( std::unique_lock<SharedMutex> BufferedStat<DigestT, ClockT>::updateIfExpired(
TimePoint now) { TimePoint now) {
std::unique_lock<SharedMutex> g(mutex_); std::unique_lock<SharedMutex> g(mutex_);
doUpdate(now, g); doUpdate(now, g, UpdateMode::OnExpiry);
return g; return g;
} }
template <typename DigestT, typename ClockT>
void BufferedStat<DigestT, ClockT>::flush() {
std::unique_lock<SharedMutex> g(mutex_);
doUpdate(ClockT::now(), g, UpdateMode::Now);
}
template <typename DigestT, typename ClockT> template <typename DigestT, typename ClockT>
void BufferedStat<DigestT, ClockT>::doUpdate( void BufferedStat<DigestT, ClockT>::doUpdate(
TimePoint now, TimePoint now,
const std::unique_lock<SharedMutex>& g) { const std::unique_lock<SharedMutex>& g,
UpdateMode updateMode) {
DCHECK(g.owns_lock()); DCHECK(g.owns_lock());
// Check that no other thread has performed the slide after the check // Check that no other thread has performed the slide after the check
auto oldExpiry = expiry_.load(std::memory_order_relaxed).tp; auto oldExpiry = expiry_.load(std::memory_order_relaxed).tp;
if (now > oldExpiry) { if (now > oldExpiry || updateMode == UpdateMode::Now) {
now = roundUp(now); now = roundUp(now);
expiry_.store(TimePointHolder(now), std::memory_order_relaxed); expiry_.store(TimePointHolder(now), std::memory_order_relaxed);
onNewDigest(digestBuilder_.build(), now, oldExpiry, g); onNewDigest(digestBuilder_.build(), now, oldExpiry, g);
...@@ -133,11 +140,16 @@ void BufferedSlidingWindow<DigestT, ClockT>::onNewDigest( ...@@ -133,11 +140,16 @@ void BufferedSlidingWindow<DigestT, ClockT>::onNewDigest(
TimePoint newExpiry, TimePoint newExpiry,
TimePoint oldExpiry, TimePoint oldExpiry,
const std::unique_lock<SharedMutex>& /*g*/) { const std::unique_lock<SharedMutex>& /*g*/) {
if (newExpiry > oldExpiry) {
auto diff = newExpiry - oldExpiry; auto diff = newExpiry - oldExpiry;
slidingWindow_.slide(diff / this->bufferDuration_); slidingWindow_.slide(diff / this->bufferDuration_);
diff -= this->bufferDuration_; diff -= this->bufferDuration_;
slidingWindow_.set(diff / this->bufferDuration_, std::move(digest)); slidingWindow_.set(diff / this->bufferDuration_, std::move(digest));
} else {
// just update current window
std::array<DigestT, 2> a{{slidingWindow_.front(), std::move(digest)}};
slidingWindow_.set(0 /* current window */, DigestT::merge(a));
}
} }
} // namespace detail } // namespace detail
......
...@@ -44,6 +44,8 @@ class BufferedStat { ...@@ -44,6 +44,8 @@ class BufferedStat {
void append(double value, TimePoint now = ClockT::now()); void append(double value, TimePoint now = ClockT::now());
void flush();
protected: protected:
// https://www.mail-archive.com/llvm-bugs@lists.llvm.org/msg18280.html // https://www.mail-archive.com/llvm-bugs@lists.llvm.org/msg18280.html
// Wrap the time point in something with a noexcept constructor. // Wrap the time point in something with a noexcept constructor.
...@@ -66,12 +68,27 @@ class BufferedStat { ...@@ -66,12 +68,27 @@ class BufferedStat {
TimePoint oldExpiry, TimePoint oldExpiry,
const std::unique_lock<SharedMutex>& g) = 0; const std::unique_lock<SharedMutex>& g) = 0;
// Update digest if now > expiry
std::unique_lock<SharedMutex> updateIfExpired(TimePoint now); std::unique_lock<SharedMutex> updateIfExpired(TimePoint now);
// Update digest unconditionally
std::unique_lock<SharedMutex> update();
private: private:
DigestBuilder<DigestT> digestBuilder_; DigestBuilder<DigestT> digestBuilder_;
void doUpdate(TimePoint now, const std::unique_lock<SharedMutex>& g); // Controls how digest updates happen in doUpdate
enum class UpdateMode {
OnExpiry,
Now,
};
// Update digest. If updateMode == UpdateMode::Now digest is updated
// unconditionally, else digest is updated only if expiry has passed.
void doUpdate(
TimePoint now,
const std::unique_lock<SharedMutex>& g,
UpdateMode updateMode);
TimePoint roundUp(TimePoint t); TimePoint roundUp(TimePoint t);
}; };
......
...@@ -50,6 +50,11 @@ std::vector<BucketT> SlidingWindow<BucketT>::get() const { ...@@ -50,6 +50,11 @@ std::vector<BucketT> SlidingWindow<BucketT>::get() const {
return buckets; return buckets;
} }
template <typename BucketT>
BucketT SlidingWindow<BucketT>::front() const {
return buckets_[curHead_];
}
template <typename BucketT> template <typename BucketT>
void SlidingWindow<BucketT>::set(size_t idx, BucketT bucket) { void SlidingWindow<BucketT>::set(size_t idx, BucketT bucket) {
if (idx < buckets_.size()) { if (idx < buckets_.size()) {
......
...@@ -40,6 +40,8 @@ class SlidingWindow { ...@@ -40,6 +40,8 @@ class SlidingWindow {
void set(size_t idx, BucketT bucket); void set(size_t idx, BucketT bucket);
BucketT front() const;
/* /*
* Slides the SlidingWindow by nBuckets, inserting new buckets using the * Slides the SlidingWindow by nBuckets, inserting new buckets using the
* Function given during construction. * Function given during construction.
......
...@@ -114,6 +114,43 @@ TEST_F(BufferedDigestTest, PartiallyPassedExpiry) { ...@@ -114,6 +114,43 @@ TEST_F(BufferedDigestTest, PartiallyPassedExpiry) {
EXPECT_EQ(2, values[2]); EXPECT_EQ(2, values[2]);
} }
TEST_F(BufferedDigestTest, ForceUpdate) {
bd->append(0);
bd->append(1);
bd->append(2);
// empty since we haven't passed expiry
auto digest = bd->get();
EXPECT_TRUE(digest.empty());
// force update
bd->flush();
digest = bd->get();
auto values = digest.getValues();
EXPECT_EQ(0, values[0]);
EXPECT_EQ(1, values[1]);
EXPECT_EQ(2, values[2]);
// append 3 and do a normal get; only the previously
// flushed values should show up and not 3 since we
// haven't passed expiry
bd->append(3);
digest = bd->get();
values = digest.getValues();
EXPECT_EQ(0, values[0]);
EXPECT_EQ(1, values[1]);
EXPECT_EQ(2, values[2]);
// pass expiry; 3 should now be visible
MockClock::Now += bufferDuration;
digest = bd->get();
values = digest.getValues();
EXPECT_EQ(0, values[0]);
EXPECT_EQ(1, values[1]);
EXPECT_EQ(2, values[2]);
EXPECT_EQ(3, values[3]);
}
class BufferedSlidingWindowTest : public ::testing::Test { class BufferedSlidingWindowTest : public ::testing::Test {
protected: protected:
std::unique_ptr<BufferedSlidingWindow<SimpleDigest, MockClock>> bsw; std::unique_ptr<BufferedSlidingWindow<SimpleDigest, MockClock>> bsw;
...@@ -154,6 +191,59 @@ TEST_F(BufferedSlidingWindowTest, PartiallyPassedExpiry) { ...@@ -154,6 +191,59 @@ TEST_F(BufferedSlidingWindowTest, PartiallyPassedExpiry) {
} }
} }
TEST_F(BufferedSlidingWindowTest, ForceUpdate) {
bsw->append(0);
bsw->append(1);
bsw->append(2);
// empty since we haven't passed expiry
auto digests = bsw->get();
EXPECT_EQ(0, digests.size());
// flush
bsw->flush();
digests = bsw->get();
EXPECT_EQ(1, digests.size());
EXPECT_EQ(3, digests[0].getValues().size());
for (double i = 0; i < 3; ++i) {
EXPECT_EQ(i, digests[0].getValues()[i]);
}
// append 3 and flush again; 3 will be merged with
// current window
bsw->append(3);
bsw->flush();
digests = bsw->get();
EXPECT_EQ(1, digests.size());
EXPECT_EQ(4, digests[0].getValues().size());
for (double i = 0; i < 4; ++i) {
EXPECT_EQ(i, digests[0].getValues()[i]);
}
// append 4 and do a regular get. previous values
// show up but not 4
bsw->append(4);
digests = bsw->get();
EXPECT_EQ(1, digests.size());
EXPECT_EQ(4, digests[0].getValues().size());
for (double i = 0; i < 4; ++i) {
EXPECT_EQ(i, digests[0].getValues()[i]);
}
// pass expiry
MockClock::Now += windowDuration;
digests = bsw->get();
EXPECT_EQ(2, digests.size());
EXPECT_EQ(1, digests[0].getValues().size());
EXPECT_EQ(4, digests[0].getValues().front());
EXPECT_EQ(4, digests[1].getValues().size());
for (double i = 0; i < 4; ++i) {
EXPECT_EQ(i, digests[1].getValues()[i]);
}
}
TEST_F(BufferedSlidingWindowTest, BufferingAfterSlide) { TEST_F(BufferedSlidingWindowTest, BufferingAfterSlide) {
MockClock::Now += std::chrono::milliseconds{1}; MockClock::Now += std::chrono::milliseconds{1};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment