Commit 45d40269 authored by Jon Maltiel Swenson's avatar Jon Maltiel Swenson Committed by Facebook GitHub Bot

Add utility to make an observer with jittered updates

Summary: This diff adds a utility `withJitter` that creates an observer proxying updates from the input observer, with the difference that each update will be propagated with a random lag.

Reviewed By: andriigrynenko

Differential Revision: D23795834

fbshipit-source-id: cdaee7460c3c346e70f6c8c78f18467fe8cd4e47
parent 588857cb
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <utility>
#include <fmt/core.h>
#include <glog/logging.h>
#include <folly/DefaultKeepAliveExecutor.h>
#include <folly/Random.h>
#include <folly/Synchronized.h>
#include <folly/executors/GlobalExecutor.h>
#include <folly/experimental/observer/Observable.h>
#include <folly/experimental/observer/Observer.h>
#include <folly/futures/Future.h>
namespace folly {
namespace observer {
template <typename T>
Observer<T> withJitter(
Observer<T> observer,
std::chrono::milliseconds lag,
std::chrono::milliseconds jitter) {
class WithJitterObservable {
public:
using element_type = T;
WithJitterObservable(
Observer<T> observer,
std::chrono::milliseconds lag,
std::chrono::milliseconds jitter)
: observer_(std::move(observer)),
state_(std::make_shared<Synchronized<State, std::mutex>>(
State(observer_.getSnapshot().getShared()))),
lag_(lag),
jitter_(jitter) {}
std::shared_ptr<const T> get() {
return state_->lock()->laggingValue;
}
void subscribe(std::function<void()> callback) {
handle_ = observer_.addCallback([state = state_,
observer = observer_,
callback = std::move(callback),
lag = lag_,
jitter = jitter_](auto /* snapshot */) {
if (std::exchange(state->lock()->delayedRefreshPending, true)) {
return;
}
const auto sleepFor = lag - jitter +
std::chrono::milliseconds{Random::rand64(2 * jitter.count())};
auto* executor = dynamic_cast<DefaultKeepAliveExecutor*>(
getGlobalCPUExecutor().get());
CHECK(executor);
futures::sleep(sleepFor)
.via(executor->weakRef())
.thenValue([callback, observer, state](auto&&) mutable {
state->withLock([&](auto& s) {
s.laggingValue = observer.getSnapshot().getShared();
s.delayedRefreshPending = false;
});
callback();
});
});
}
void unsubscribe() {
handle_.cancel();
}
private:
struct State {
explicit State(std::shared_ptr<const T> value)
: laggingValue(std::move(value)) {}
std::shared_ptr<const T> laggingValue;
bool delayedRefreshPending{false};
};
Observer<T> observer_;
CallbackHandle handle_;
std::shared_ptr<Synchronized<State, std::mutex>> state_;
const std::chrono::milliseconds lag_;
const std::chrono::milliseconds jitter_;
};
if (lag == std::chrono::milliseconds::zero()) {
return observer;
}
if (jitter > lag) {
throw std::invalid_argument(
fmt::format("lag ({}) cannot be less than jitter ({})", lag, jitter));
}
return ObserverCreator<WithJitterObservable>(std::move(observer), lag, jitter)
.getObserver();
}
} // namespace observer
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <chrono>
#include <folly/experimental/observer/Observer.h>
namespace folly {
namespace observer {
/**
* The returned Observer will proxy updates from the input observer but will
* delay the propagation of each update by some duration between 0 ms and
* lag + jitter. In addition, if an update arrives while no preceding jittered
* updates are still in flight, then the delay applied to the latest update will
* be a uniformly random duration between lag - jitter and lag + jitter.
*/
template <typename T>
Observer<T> withJitter(
Observer<T> observer,
std::chrono::milliseconds lag,
std::chrono::milliseconds jitter);
} // namespace observer
} // namespace folly
#include <folly/experimental/observer/WithJitter-inl.h>
......@@ -18,6 +18,7 @@
#include <folly/Singleton.h>
#include <folly/experimental/observer/SimpleObservable.h>
#include <folly/experimental/observer/WithJitter.h>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h>
......@@ -568,3 +569,71 @@ TEST(Observer, Unwrap) {
folly::observer_detail::ObserverManager::waitForAllUpdates();
EXPECT_EQ(**observer, 4);
}
TEST(Observer, WithJitterMonotoneProgress) {
SimpleObservable<int> observable(0);
auto observer = observable.getObserver();
EXPECT_EQ(0, **observer);
auto laggingObserver = withJitter(
std::move(observer),
std::chrono::milliseconds{100},
std::chrono::milliseconds{100});
EXPECT_EQ(0, **laggingObserver);
// Updates should never propagate out of order. E.g., if update 1 arrives and
// is delayed by 100 milliseconds, followed immediately by the arrival of
// update 2 with 1 millisecond delay, then update 1 should never overwrite
// update 2.
for (int i = 1, lastSeen = 0; i <= 50; ++i) {
auto curr = **laggingObserver;
EXPECT_LE(lastSeen, curr);
lastSeen = curr;
observable.setValue(i);
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds{10});
}
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{2});
// The latest update is eventually propagated
EXPECT_EQ(50, **laggingObserver);
}
TEST(Observer, WithJitterActuallyInducesLag) {
SimpleObservable<int> observable(0);
auto observer = observable.getObserver();
EXPECT_EQ(0, **observer);
auto laggingObserver = withJitter(
observer, std::chrono::seconds{10}, std::chrono::milliseconds::zero());
EXPECT_EQ(0, **laggingObserver);
observable.setValue(42);
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
EXPECT_EQ(0, **laggingObserver);
}
TEST(Observer, WithJitterNoEarlyRefresh) {
SimpleObservable<int> observable(0);
auto base = observable.getObserver();
auto copy = makeObserver([base] { return **base; });
auto laggingObserver = withJitter(
base, std::chrono::seconds{10}, std::chrono::milliseconds::zero());
auto delta = makeObserver(
[copy, laggingObserver] { return **copy - **laggingObserver; });
EXPECT_EQ(0, **base);
EXPECT_EQ(0, **copy);
EXPECT_EQ(0, **laggingObserver);
EXPECT_EQ(0, **delta);
observable.setValue(42);
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds{1});
// Updates along the base -> copy -> delta path should not trigger an early
// refresh of laggingObserver
EXPECT_EQ(42, **base);
EXPECT_EQ(42, **copy);
EXPECT_EQ(0, **laggingObserver);
EXPECT_EQ(42, **delta);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment