Commit aa378653 authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

Change MergeChannel public interface

Summary:
The current MergeChannel public interface has a few problems that have become more apparent with usage.

1. The output receiver does not contain the key associated with the input receiver that geneated the value. This often means that the user must transform input receivers before adding them to the merge channel to include the key, which wastes memory.
2. There is no way for a consumer of the output receiver to know if an input receiver was closed without an exception.
3. If an input receiver was closed with an exception, the entire merge channel is closed.

This diff changes the public API to fix these problems. Instead of returning an output receiver that just has values, the output receiver contains MergeChannelEvent objects. Each such event object has the key of the corresponding input receiver associated with the event, along with the event contents. There are four types of events:

1. A value was received through an existing input receiver.
2. A new input receiver was added.
3. An existing input receiver was removed.
4. An existing input receiver was closed (with or without an exception).

Note that we could theoretically avoid genering events for 2 and 3 (as they correspond with calls that the user makes to add and remove), but there are cases where it is useful to have 2 and 3. This way, an event is always received when an existing input receiver is removed, regardless of whether it was removed explicitly (through a call to removeReceiver) or implicitly (if the input receiver was closed).

Reviewed By: aary

Differential Revision: D33033374

fbshipit-source-id: 34f74cba3f765c76a80d448647d733ad38a2195c
parent 017ba9c3
......@@ -70,6 +70,11 @@ void MergeChannel<KeyType, ValueType>::removeReceiver(KeyType key) {
processor_->removeReceiver(key);
}
template <typename KeyType, typename ValueType>
folly::F14FastSet<KeyType> MergeChannel<KeyType, ValueType>::getReceiverKeys() {
return processor_->getReceiverKeys();
}
template <typename KeyType, typename ValueType>
void MergeChannel<KeyType, ValueType>::close(
std::optional<folly::exception_wrapper> ex) && {
......@@ -88,6 +93,8 @@ class IMergeChannelProcessor : public IChannelCallback {
virtual void removeReceiver(KeyType key) = 0;
virtual folly::F14FastSet<KeyType> getReceiverKeys() = 0;
virtual void destroyHandle(CloseResult closeResult) = 0;
};
......@@ -130,13 +137,41 @@ class IMergeChannelProcessor : public IChannelCallback {
template <typename KeyType, typename ValueType>
class MergeChannelProcessor
: public IMergeChannelProcessor<KeyType, ValueType> {
private:
struct State {
explicit State(
ChannelBridgePtr<MergeChannelEvent<KeyType, ValueType>> _sender)
: sender(std::move(_sender)) {}
ChannelState getSenderState() {
return detail::getSenderState(sender.get());
}
// The output sender for the merge channel.
ChannelBridgePtr<MergeChannelEvent<KeyType, ValueType>> sender;
// A non-owning map from key to receiver.
folly::F14NodeMap<KeyType, ChannelBridge<ValueType>*> receiversByKey;
// The set of receivers that feed into this MergeChannel. This map "owns"
// its receivers. MergeChannelProcessor must free any receiver removed from
// this map.
folly::F14NodeMap<ChannelBridge<ValueType>*, const KeyType*> receivers;
// Whether or not the handle to the MergeChannel has been destroyed.
bool handleDestroyed{false};
};
using WLockedStatePtr = typename folly::Synchronized<State>::WLockedPtr;
public:
MergeChannelProcessor(
Sender<ValueType> sender,
Sender<MergeChannelEvent<KeyType, ValueType>> sender,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor)
: sender_(std::move(detail::senderGetBridge(sender))),
executor_(std::move(executor)) {
CHECK(sender_->senderWait(this));
: executor_(std::move(executor)),
state_(State(std::move(detail::senderGetBridge(sender)))) {
auto state = state_.wlock();
CHECK(state->sender->senderWait(this));
}
/**
......@@ -144,57 +179,75 @@ class MergeChannelProcessor
* removal.
*/
void addNewReceiver(KeyType key, Receiver<ValueType> receiver) {
executor_->add(
[=, key = std::move(key), receiver = std::move(receiver)]() mutable {
if (getSenderState() != ChannelState::Active) {
return;
}
auto [unbufferedReceiver, buffer] =
detail::receiverUnbuffer(std::move(receiver));
auto existingReceiverIt = receiversByKey_.find(key);
if (existingReceiverIt != receiversByKey_.end()) {
if (receivers_.contains(existingReceiverIt->second) &&
!existingReceiverIt->second->isReceiverCancelled()) {
// We already have a receiver with the given key. Trigger
// cancellation on that previous receiver.
existingReceiverIt->second->receiverCancel();
}
receiversByKey_.erase(existingReceiverIt);
}
receiversByKey_.insert(std::make_pair(key, unbufferedReceiver.get()));
auto* receiverPtr = unbufferedReceiver.get();
receivers_.insert(unbufferedReceiver.release());
processAllAvailableValues(receiverPtr, std::move(buffer));
});
auto state = state_.wlock();
if (state->getSenderState() != ChannelState::Active) {
return;
}
auto [unbufferedReceiver, buffer] =
detail::receiverUnbuffer(std::move(receiver));
auto existingReceiverIt = state->receiversByKey.find(key);
if (existingReceiverIt != state->receiversByKey.end()) {
CHECK(state->receivers.contains(existingReceiverIt->second));
if (!existingReceiverIt->second->isReceiverCancelled()) {
// We already have a receiver with the given key. Trigger cancellation
// on that previous receiver.
existingReceiverIt->second->receiverCancel();
}
auto keyToRemove = existingReceiverIt->first;
state->receivers[existingReceiverIt->second] = nullptr;
state->receiversByKey.erase(existingReceiverIt);
state->sender->senderPush(MergeChannelEvent<KeyType, ValueType>{
keyToRemove, MergeChannelReceiverRemoved{}});
}
auto [it, _] = state->receiversByKey.insert(
std::make_pair(key, unbufferedReceiver.get()));
auto* receiverPtr = unbufferedReceiver.get();
state->receivers.insert(
std::make_pair(unbufferedReceiver.release(), &it->first));
state->sender->senderPush(MergeChannelEvent<KeyType, ValueType>{
key, MergeChannelReceiverAdded{}});
processAllAvailableValues(state, receiverPtr, std::move(buffer));
}
/**
* Removes the receiver with the given key.
*/
void removeReceiver(KeyType key) {
executor_->add([=]() {
if (getSenderState() != ChannelState::Active) {
return;
}
auto receiverIt = receiversByKey_.find(key);
if (receiverIt == receiversByKey_.end()) {
return;
}
if (receivers_.contains(receiverIt->second) &&
!receiverIt->second->isReceiverCancelled()) {
receiverIt->second->receiverCancel();
}
receiversByKey_.erase(receiverIt);
});
auto state = state_.wlock();
if (state->getSenderState() != ChannelState::Active) {
return;
}
auto receiverIt = state->receiversByKey.find(key);
if (receiverIt == state->receiversByKey.end()) {
return;
}
CHECK(state->receivers.contains(receiverIt->second));
if (!receiverIt->second->isReceiverCancelled()) {
receiverIt->second->receiverCancel();
}
auto keyToRemove = receiverIt->first;
state->receivers[receiverIt->second] = nullptr;
state->receiversByKey.erase(receiverIt);
state->sender->senderPush(MergeChannelEvent<KeyType, ValueType>{
keyToRemove, MergeChannelReceiverRemoved{}});
}
folly::F14FastSet<KeyType> getReceiverKeys() {
auto state = state_.rlock();
auto receiverKeys = folly::F14FastSet<KeyType>();
receiverKeys.reserve(state->receiversByKey.size());
for (const auto& [key, _] : state->receiversByKey) {
receiverKeys.insert(key);
}
return receiverKeys;
}
/**
* Called when the user's MergeChannel object is destroyed.
*/
void destroyHandle(CloseResult closeResult) {
executor_->add([=, closeResult = std::move(closeResult)]() mutable {
processHandleDestroyed(std::move(closeResult));
});
auto state = state_.wlock();
processHandleDestroyed(state, std::move(closeResult));
}
/**
......@@ -203,17 +256,18 @@ class MergeChannelProcessor
*/
void consume(ChannelBridgeBase* bridge) override {
executor_->add([=]() {
if (bridge == sender_.get()) {
auto state = state_.wlock();
if (bridge == state->sender.get()) {
// The consumer of the output receiver has stopped consuming.
CHECK(getSenderState() != ChannelState::CancellationProcessed);
sender_->senderClose();
processSenderCancelled();
CHECK(state->getSenderState() != ChannelState::CancellationProcessed);
state->sender->senderClose();
processSenderCancelled(state);
} else {
// One or more values are now available from an input receiver.
auto* receiver = static_cast<ChannelBridge<ValueType>*>(bridge);
CHECK(
getReceiverState(receiver) != ChannelState::CancellationProcessed);
processAllAvailableValues(receiver);
processAllAvailableValues(state, receiver);
}
});
}
......@@ -224,19 +278,20 @@ class MergeChannelProcessor
*/
void canceled(ChannelBridgeBase* bridge) override {
executor_->add([=]() {
if (bridge == sender_.get()) {
auto state = state_.wlock();
if (bridge == state->sender.get()) {
// We previously cancelled the sender due to an input receiver closure
// with an exception (or the closure of all input receivers without an
// exception). Process the cancellation for the sender.
CHECK(getSenderState() == ChannelState::CancellationTriggered);
processSenderCancelled();
CHECK(state->getSenderState() == ChannelState::CancellationTriggered);
processSenderCancelled(state);
} else {
// We previously cancelled this input receiver, either because the
// consumer of the output receiver stopped consuming or because another
// input receiver received an exception. Process the cancellation for
// this input receiver.
auto* receiver = static_cast<ChannelBridge<ValueType>*>(bridge);
processReceiverCancelled(receiver, CloseResult());
processReceiverCancelled(state, receiver, CloseResult());
}
});
}
......@@ -251,12 +306,16 @@ class MergeChannelProcessor
* will process cancellation for the input receiver.
*/
void processAllAvailableValues(
WLockedStatePtr& state,
ChannelBridge<ValueType>* receiver,
std::optional<ReceiverQueue<ValueType>> buffer = std::nullopt) {
CHECK(state->receivers.contains(receiver));
const auto* key = state->receivers.at(receiver);
auto closeResult = receiver->isReceiverCancelled()
? CloseResult()
: (buffer.has_value() ? processValues(std::move(buffer.value()))
: std::nullopt);
: (buffer.has_value()
? processValues(state, std::move(buffer.value()), key)
: std::nullopt);
while (!closeResult.has_value()) {
if (receiver->receiverWait(this)) {
// There are no more values available right now. We will stop processing
......@@ -266,12 +325,12 @@ class MergeChannelProcessor
}
auto values = receiver->receiverGetValues();
CHECK(!values.empty());
closeResult = processValues(std::move(values));
closeResult = processValues(state, std::move(values), key);
}
if (closeResult.has_value()) {
// The receiver received a value indicating channel closure.
receiver->receiverCancel();
processReceiverCancelled(receiver, std::move(closeResult.value()));
processReceiverCancelled(state, receiver, std::move(closeResult.value()));
}
}
......@@ -280,14 +339,18 @@ class MergeChannelProcessor
* CloseResult if the given channel was closed, so the caller can stop
* attempting to process values from it.
*/
std::optional<CloseResult> processValues(ReceiverQueue<ValueType> values) {
std::optional<CloseResult> processValues(
WLockedStatePtr& state,
ReceiverQueue<ValueType> values,
const KeyType* key) {
while (!values.empty()) {
auto inputResult = std::move(values.front());
values.pop();
if (inputResult.hasValue()) {
// We have received a normal value from an input receiver. Write it to
// the output receiver.
sender_->senderPush(std::move(inputResult.value()));
state->sender->senderPush(MergeChannelEvent<KeyType, ValueType>{
*key, std::move(inputResult.value())});
} else {
// The input receiver was closed.
return inputResult.hasException()
......@@ -299,28 +362,29 @@ class MergeChannelProcessor
}
/**
* Processes the cancellation of an input receiver. If the cancellation was
* due to receipt of an exception, we will also trigger cancellation for the
* sender (and all other input receivers).
* Processes the cancellation of an input receiver.
*/
void processReceiverCancelled(
ChannelBridge<ValueType>* receiver, CloseResult closeResult) {
WLockedStatePtr& state,
ChannelBridge<ValueType>* receiver,
CloseResult closeResult) {
CHECK(getReceiverState(receiver) == ChannelState::CancellationTriggered);
receivers_.erase(receiver);
(ChannelBridgePtr<ValueType>(receiver));
if (closeResult.exception.has_value()) {
// We received an exception. We need to close the sender and all
// receivers.
if (getSenderState() == ChannelState::Active) {
sender_->senderClose(std::move(closeResult.exception.value()));
}
for (auto* otherReceiver : receivers_) {
if (getReceiverState(otherReceiver) == ChannelState::Active) {
otherReceiver->receiverCancel();
}
auto* key = state->receivers.at(receiver);
if (key != nullptr) {
auto keyToRemove = *key;
CHECK_EQ(state->receiversByKey.erase(keyToRemove), 1);
if (state->getSenderState() == ChannelState::Active) {
state->sender->senderPush(MergeChannelEvent<KeyType, ValueType>{
keyToRemove,
MergeChannelReceiverClosed{
closeResult.exception.has_value()
? std::move(closeResult.exception.value())
: folly::exception_wrapper()}});
}
}
maybeDelete();
state->receivers.erase(receiver);
(ChannelBridgePtr<ValueType>(receiver));
maybeDelete(state);
}
/**
......@@ -328,15 +392,15 @@ class MergeChannelProcessor
* the output receiver has stopped consuming). We will trigger cancellation
* for all input receivers not already cancelled.
*/
void processSenderCancelled() {
CHECK(getSenderState() == ChannelState::CancellationTriggered);
sender_ = nullptr;
for (auto* receiver : receivers_) {
void processSenderCancelled(WLockedStatePtr& state) {
CHECK(state->getSenderState() == ChannelState::CancellationTriggered);
state->sender = nullptr;
for (auto [receiver, _] : state->receivers) {
if (getReceiverState(receiver) == ChannelState::Active) {
receiver->receiverCancel();
}
}
maybeDelete();
maybeDelete(state);
}
/**
......@@ -344,22 +408,27 @@ class MergeChannelProcessor
* close the sender and trigger cancellation for all input receivers not
* already cancelled.
*/
void processHandleDestroyed(CloseResult closeResult) {
CHECK(!handleDestroyed_);
handleDestroyed_ = true;
if (getSenderState() == ChannelState::Active) {
void processHandleDestroyed(WLockedStatePtr& state, CloseResult closeResult) {
CHECK(!state->handleDestroyed);
state->handleDestroyed = true;
if (state->getSenderState() == ChannelState::Active) {
for (auto [key, receiver] : state->receiversByKey) {
state->receivers[receiver] = nullptr;
state->sender->senderPush(MergeChannelEvent<KeyType, ValueType>{
key, MergeChannelReceiverRemoved{}});
}
if (closeResult.exception.has_value()) {
sender_->senderClose(std::move(closeResult.exception.value()));
state->sender->senderClose(std::move(closeResult.exception.value()));
} else {
sender_->senderClose();
state->sender->senderClose();
}
}
for (auto* receiver : receivers_) {
for (auto [receiver, _] : state->receivers) {
if (getReceiverState(receiver) == ChannelState::Active) {
receiver->receiverCancel();
}
}
maybeDelete();
maybeDelete(state);
}
/**
......@@ -367,9 +436,10 @@ class MergeChannelProcessor
* sender and all input receivers, and if the user's MergeChannel object was
* destroyed.
*/
void maybeDelete() {
if (getSenderState() == ChannelState::CancellationProcessed &&
receivers_.empty() && handleDestroyed_) {
void maybeDelete(WLockedStatePtr& state) {
if (state->getSenderState() == ChannelState::CancellationProcessed &&
state->receivers.empty() && state->handleDestroyed) {
state.unlock();
delete this;
}
}
......@@ -378,30 +448,19 @@ class MergeChannelProcessor
return detail::getReceiverState(receiver);
}
ChannelState getSenderState() {
return detail::getSenderState(sender_.get());
}
ChannelBridgePtr<ValueType> sender_;
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
bool handleDestroyed_{false};
// The set of receivers that feed into this MergeChannel. This set "owns" its
// receivers. MergeChannelProcessor must free any receiver removed from this
// set.
folly::F14FastSet<ChannelBridge<ValueType>*> receivers_;
// A non-owning map from key to receiver. If the receiver for a given key is
// not present in receivers_, it has been freed and must not be used.
folly::F14FastMap<KeyType, ChannelBridge<ValueType>*> receiversByKey_;
folly::Synchronized<State> state_;
};
} // namespace detail
template <typename KeyType, typename ValueType>
std::pair<Receiver<ValueType>, MergeChannel<KeyType, ValueType>>
std::pair<
Receiver<MergeChannelEvent<KeyType, ValueType>>,
MergeChannel<KeyType, ValueType>>
createMergeChannel(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor) {
auto [receiver, sender] = Channel<ValueType>::create();
auto [receiver, sender] =
Channel<MergeChannelEvent<KeyType, ValueType>>::create();
auto* processor = new detail::MergeChannelProcessor<KeyType, ValueType>(
std::move(sender), std::move(executor));
return std::make_pair(
......
......@@ -16,6 +16,7 @@
#pragma once
#include <folly/container/F14Set.h>
#include <folly/executors/SequencedExecutor.h>
#include <folly/experimental/channels/Channel.h>
......@@ -27,11 +28,29 @@ template <typename KeyType, typename ValueType>
class IMergeChannelProcessor;
}
struct MergeChannelReceiverAdded {};
struct MergeChannelReceiverRemoved {};
struct MergeChannelReceiverClosed {
folly::exception_wrapper exception;
};
template <typename KeyType, typename ValueType>
struct MergeChannelEvent {
using EventType = std::variant<
ValueType,
MergeChannelReceiverAdded,
MergeChannelReceiverRemoved,
MergeChannelReceiverClosed>;
KeyType key;
EventType event;
};
/**
* A merge channel allows one to merge multiple receivers into a single output
* receiver. The set of receivers being merged can be changed at runtime. Each
* receiver is added with a key that can be used to remove the receiver at a
* later point.
* A merge channel allows one to merge multiple receivers into a single
* output receiver. The set of receivers being merged can be changed at
* runtime. Each receiver is added with a key that can be used to remove
* the receiver at a later point.
*
* Example:
*
......@@ -81,6 +100,11 @@ class MergeChannel {
*/
void removeReceiver(KeyType key);
/**
* Returns a set of keys for receivers that are merged into this MergeChannel.
*/
folly::F14FastSet<KeyType> getReceiverKeys();
/**
* Closes the merge channel.
*/
......@@ -96,7 +120,9 @@ class MergeChannel {
* @param executor: The SequencedExecutor to use for merging values.
*/
template <typename KeyType, typename ValueType>
std::pair<Receiver<ValueType>, MergeChannel<KeyType, ValueType>>
std::pair<
Receiver<MergeChannelEvent<KeyType, ValueType>>,
MergeChannel<KeyType, ValueType>>
createMergeChannel(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor);
} // namespace channels
......
......@@ -25,8 +25,85 @@
namespace folly {
namespace channels {
using namespace std::string_literals;
using namespace testing;
template <typename T, typename U>
bool isMatch(
const std::string& type,
const T& actual,
const U& expected,
testing::MatchResultListener* resultListener) {
if (expected != actual) {
*resultListener << folly::sformat(
"{} mismatch: ({} != {})", type, actual, expected);
return false;
}
return true;
}
MATCHER_P(ReceiverAdded, expectedKey, "") {
return isMatch(
"Is receiver added event",
std::holds_alternative<MergeChannelReceiverAdded>(arg.event),
true,
result_listener) &&
isMatch("Key", arg.key, expectedKey, result_listener);
}
MATCHER_P(ReceiverRemoved, expectedKey, "") {
return isMatch(
"Is receiver removed event",
std::holds_alternative<MergeChannelReceiverRemoved>(arg.event),
true,
result_listener) &&
isMatch("Key", arg.key, expectedKey, result_listener);
}
MATCHER_P2(ValueReceived, expectedKey, expectedValue, "") {
return isMatch(
"Is value received event",
std::holds_alternative<int>(arg.event),
true,
result_listener) &&
isMatch("Key", arg.key, expectedKey, result_listener) &&
isMatch(
"Value", std::get<int>(arg.event), expectedValue, result_listener);
}
MATCHER_P(ReceiverClosed, expectedKey, "") {
return isMatch(
"Is receiver closed event",
std::holds_alternative<MergeChannelReceiverClosed>(arg.event),
true,
result_listener) &&
isMatch("Key", arg.key, expectedKey, result_listener) &&
isMatch(
"Exception present",
!!std::get<MergeChannelReceiverClosed>(arg.event).exception,
false,
result_listener);
}
MATCHER_P(ReceiverClosedRuntimeError, expectedKey, "") {
if (!std::holds_alternative<MergeChannelReceiverClosed>(arg.event)) {
*result_listener << "Event is not for a closed receiver";
return false;
}
return isMatch("Key", arg.key, expectedKey, result_listener) &&
isMatch(
"Exception present",
!!std::get<MergeChannelReceiverClosed>(arg.event).exception,
true,
result_listener) &&
isMatch(
"Runtime error present",
std::get<MergeChannelReceiverClosed>(arg.event)
.exception.template is_compatible_with<std::runtime_error>(),
true,
result_listener);
}
class MergeChannelFixture : public Test {
protected:
MergeChannelFixture() {}
......@@ -35,18 +112,27 @@ class MergeChannelFixture : public Test {
using TCallback = StrictMock<MockNextCallback<int>>;
ChannelCallbackHandle processValues(Receiver<int> receiver) {
ChannelCallbackHandle processValues(
Receiver<MergeChannelEvent<std::string, int>> receiver) {
return consumeChannelWithCallback(
std::move(receiver),
&executor_,
[=](folly::Try<int> resultTry) -> folly::coro::Task<bool> {
[=](folly::Try<MergeChannelEvent<std::string, int>> resultTry)
-> folly::coro::Task<bool> {
if (resultTry.hasValue()) {
std::visit(
[](const auto& eventType) {
LOG(INFO) << "Type: " << typeid(eventType).name();
},
resultTry.value().event);
}
onNext_(std::move(resultTry));
co_return true;
});
}
folly::ManualExecutor executor_;
StrictMock<MockNextCallback<int>> onNext_;
StrictMock<MockNextCallback<MergeChannelEvent<std::string, int>>> onNext_;
};
TEST_F(MergeChannelFixture, ReceiveValues_ReturnMergedValues) {
......@@ -56,18 +142,24 @@ TEST_F(MergeChannelFixture, ReceiveValues_ReturnMergedValues) {
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(5));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub3"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 3)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub3"s, 5)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub2"s)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub3"s)));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
sender1.write(1);
sender2.write(2);
executor_.drain();
......@@ -93,18 +185,24 @@ TEST_F(
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2a));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(5));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub2"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s))).RetiresOnSaturation();
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 3)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 5)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub2"s))).RetiresOnSaturation();
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2a));
executor_.drain();
sender1.write(1);
sender2a.write(2);
executor_.drain();
......@@ -129,18 +227,24 @@ TEST_F(
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2a));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(5));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub2"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s))).RetiresOnSaturation();
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 3)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 5)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub2"s)));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2a));
executor_.drain();
sender1.write(1);
sender2a.write(2);
executor_.drain();
......@@ -165,17 +269,21 @@ TEST_F(MergeChannelFixture, ReceiveValues_RemoveReceiver) {
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 3)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub1"s)));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
sender1.write(1);
sender2.write(2);
executor_.drain();
......@@ -196,17 +304,21 @@ TEST_F(MergeChannelFixture, ReceiveValues_RemoveReceiver_AfterClose) {
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub2"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 3)));
EXPECT_CALL(onNext_, onValue(ReceiverRemoved("sub1"s)));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
sender1.write(1);
sender2.write(2);
executor_.drain();
......@@ -229,20 +341,26 @@ TEST_F(MergeChannelFixture, OneInputClosed_ContinuesMerging) {
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub3"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub3"s, 3)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub3"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 4)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 5)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub2"s)));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
mergeChannel.addNewReceiver("sub3", std::move(receiver3));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(4));
EXPECT_CALL(onNext_, onValue(5));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
sender1.write(1);
sender2.write(2);
sender3.write(3);
......@@ -261,25 +379,33 @@ TEST_F(MergeChannelFixture, OneInputClosed_ContinuesMerging) {
executor_.drain();
}
TEST_F(MergeChannelFixture, OneInputThrows_OutputClosedWithException) {
TEST_F(MergeChannelFixture, OneInputThrows_ContinuesMerging) {
auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create();
auto [receiver3, sender3] = Channel<int>::create();
auto [mergedReceiver, mergeChannel] =
createMergeChannel<std::string, int>(&executor_);
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub3"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub3"s, 3)));
EXPECT_CALL(onNext_, onValue(ReceiverClosedRuntimeError("sub3"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 4)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 5)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverClosed("sub2"s)));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
mergeChannel.addNewReceiver("sub3", std::move(receiver3));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onRuntimeError("std::runtime_error: Error"));
auto callbackHandle = processValues(std::move(mergedReceiver));
sender1.write(1);
sender2.write(2);
sender3.write(3);
......@@ -291,6 +417,9 @@ TEST_F(MergeChannelFixture, OneInputThrows_OutputClosedWithException) {
std::move(sender1).close();
std::move(sender2).close();
executor_.drain();
std::move(mergeChannel).close();
executor_.drain();
}
TEST_F(MergeChannelFixture, Cancelled) {
......@@ -305,9 +434,12 @@ TEST_F(MergeChannelFixture, Cancelled) {
mergeChannel.addNewReceiver("sub3", std::move(receiver3));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub1"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub2"s)));
EXPECT_CALL(onNext_, onValue(ReceiverAdded("sub3"s)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub1"s, 1)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub2"s, 2)));
EXPECT_CALL(onNext_, onValue(ValueReceived("sub3"s, 3)));
EXPECT_CALL(onNext_, onCancelled());
auto callbackHandle = processValues(std::move(mergedReceiver));
......@@ -350,11 +482,18 @@ class MergeChannelFixtureStress : public Test {
});
}
static std::unique_ptr<StressTestConsumer<ProducedValue>> makeConsumer() {
return std::make_unique<StressTestConsumer<ProducedValue>>(
static std::unique_ptr<
StressTestConsumer<MergeChannelEvent<int, ProducedValue>>>
makeConsumer() {
return std::make_unique<
StressTestConsumer<MergeChannelEvent<int, ProducedValue>>>(
ConsumptionMode::CallbackWithHandle,
[lastReceived = toVector(-1, -1, -1)](ProducedValue value) mutable {
EXPECT_EQ(value.value, ++lastReceived[value.producerIndex]);
[lastReceived = toVector(-1, -1, -1)](
MergeChannelEvent<int, ProducedValue> result) mutable {
if (std::holds_alternative<ProducedValue>(result.event)) {
auto value = std::get<ProducedValue>(result.event);
EXPECT_EQ(value.value, ++lastReceived[value.producerIndex]);
}
});
}
......@@ -367,7 +506,8 @@ class MergeChannelFixtureStress : public Test {
std::chrono::milliseconds{5000};
std::vector<std::unique_ptr<StressTestProducer<ProducedValue>>> producers_;
std::unique_ptr<StressTestConsumer<ProducedValue>> consumer_;
std::unique_ptr<StressTestConsumer<MergeChannelEvent<int, ProducedValue>>>
consumer_;
};
TEST_F(MergeChannelFixtureStress, HandleClosed) {
......@@ -396,40 +536,6 @@ TEST_F(MergeChannelFixtureStress, HandleClosed) {
EXPECT_EQ(consumer_->waitForClose().get(), CloseType::NoException);
}
TEST_F(MergeChannelFixtureStress, InputChannelReceivesException) {
folly::CPUThreadPoolExecutor mergeChannelExecutor(1);
auto [mergeReceiver, mergeChannel] = createMergeChannel<int, ProducedValue>(
folly::SerialExecutor::create(&mergeChannelExecutor));
consumer_->startConsuming(std::move(mergeReceiver));
auto [receiver0, sender0] = Channel<ProducedValue>::create();
producers_.at(0)->startProducing(
std::move(sender0), std::runtime_error("Error"));
mergeChannel.addNewReceiver(0 /* subscriptionId */, std::move(receiver0));
sleepFor(kTestTimeout / 4);
auto [receiver1, sender1] = Channel<ProducedValue>::create();
producers_.at(1)->startProducing(
std::move(sender1), std::runtime_error("Error"));
mergeChannel.addNewReceiver(1 /* subscriptionId */, std::move(receiver1));
sleepFor(kTestTimeout / 4);
mergeChannel.removeReceiver(0 /* subscriptionId */);
sleepFor(kTestTimeout / 4);
producers_.at(0)->stopProducing();
sleepFor(kTestTimeout / 4);
auto closeFuture = consumer_->waitForClose();
EXPECT_FALSE(closeFuture.isReady());
producers_.at(1)->stopProducing();
EXPECT_EQ(std::move(closeFuture).get(), CloseType::Exception);
}
TEST_F(MergeChannelFixtureStress, Cancelled) {
folly::CPUThreadPoolExecutor mergeChannelExecutor(1);
auto [mergeReceiver, mergeChannel] = createMergeChannel<int, ProducedValue>(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment