Commit ef8b5db6 authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

Add closeSubscribers to FanoutChannel

Summary: This diff adds a new function to FanoutChannel that closes all existing subscribers, without closing the FanoutChannel itself.

Reviewed By: aary

Differential Revision: D33035825

fbshipit-source-id: 6aa867344c280ad7a91bd8248e745aa2f27e7cc8
parent 9385c05a
......@@ -72,6 +72,13 @@ bool FanoutChannel<ValueType, ContextType>::anySubscribers() const {
return processor_->anySubscribers();
}
template <typename ValueType, typename ContextType>
void FanoutChannel<ValueType, ContextType>::closeSubscribers(
folly::exception_wrapper ex) {
processor_->closeSubscribers(
ex ? detail::CloseResult(std::move(ex)) : detail::CloseResult());
}
template <typename ValueType, typename ContextType>
void FanoutChannel<ValueType, ContextType>::close(
folly::exception_wrapper ex) && {
......@@ -91,6 +98,8 @@ class IFanoutChannelProcessor : public IChannelCallback {
virtual bool anySubscribers() = 0;
virtual void closeSubscribers(CloseResult closeResult) = 0;
virtual void destroyHandle(CloseResult closeResult) = 0;
};
......@@ -176,6 +185,17 @@ class FanoutChannelProcessor
return state->fanoutSender.subscribe(std::move(initialValues));
}
/**
* Closes all subscribers without closing the fanout channel.
*/
void closeSubscribers(CloseResult closeResult) {
auto state = state_.wlock();
std::move(state->fanoutSender)
.close(
closeResult.exception.has_value() ? closeResult.exception.value()
: folly::exception_wrapper());
}
/**
* This is called when the user's FanoutChannel object has been destroyed.
*/
......
......@@ -114,6 +114,13 @@ class FanoutChannel {
*/
bool anySubscribers() const;
/**
* Closes all subscribers, without closing the fanout channel. New subscribers
* can be added after this call.
*/
void closeSubscribers(
folly::exception_wrapper ex = folly::exception_wrapper());
/**
* Closes the fanout channel.
*/
......
......@@ -205,6 +205,45 @@ TEST_F(FanoutChannelFixture, ReceiversCancelled) {
EXPECT_FALSE(fanoutChannel.anySubscribers());
}
TEST_F(FanoutChannelFixture, SubscribersClosed) {
auto [inputReceiver, sender] = Channel<int>::create();
auto fanoutChannel =
createFanoutChannel(std::move(inputReceiver), &executor_);
auto [handle1, callback1] = processValues(fanoutChannel.subscribe());
auto [handle2, callback2] = processValues(fanoutChannel.subscribe());
executor_.drain();
EXPECT_TRUE(fanoutChannel.anySubscribers());
EXPECT_CALL(*callback1, onValue(1));
EXPECT_CALL(*callback2, onValue(1));
sender.write(1);
executor_.drain();
EXPECT_TRUE(fanoutChannel.anySubscribers());
EXPECT_CALL(*callback1, onClosed());
EXPECT_CALL(*callback2, onClosed());
fanoutChannel.closeSubscribers();
executor_.drain();
EXPECT_FALSE(fanoutChannel.anySubscribers());
auto [handle3, callback3] = processValues(fanoutChannel.subscribe());
executor_.drain();
EXPECT_TRUE(fanoutChannel.anySubscribers());
EXPECT_CALL(*callback3, onValue(2));
sender.write(2);
executor_.drain();
EXPECT_CALL(*callback3, onClosed());
std::move(fanoutChannel).close();
executor_.drain();
}
TEST_F(FanoutChannelFixture, VectorBool) {
auto [inputReceiver, sender] = Channel<bool>::create();
auto fanoutChannel =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment