Commit 99b9a899 authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

Fix crash in MergeChannel when removing subscription to closed receiver

Summary: We currently call closeReceiver() on a receiver in receiversBySubscriptionId_ that has already been removed from receivers_ (and freed). The fix is to check to see whether the receiver in receiversBySubscriptionId_ is still present in receivers_ before invoking any functions on it. If the receiver is not present in receivers_, that means it has already been closed, so we do not need to close it.

Reviewed By: aary

Differential Revision: D32233062

fbshipit-source-id: 0aa763b509897db7de970e12e2f01cffc1b49fc3
parent e9906e2a
...@@ -158,11 +158,11 @@ class MergeChannelProcessor ...@@ -158,11 +158,11 @@ class MergeChannelProcessor
auto [unbufferedReceiver, buffer] = auto [unbufferedReceiver, buffer] =
detail::receiverUnbuffer(std::move(receiver)); detail::receiverUnbuffer(std::move(receiver));
auto existingReceiverIt = receiversBySubscriptionId_.find(subscriptionId); auto existingReceiverIt = receiversBySubscriptionId_.find(subscriptionId);
if (existingReceiverIt != receiversBySubscriptionId_.end() && if (existingReceiverIt != receiversBySubscriptionId_.end()) {
receivers_.contains(existingReceiverIt->second)) { if (receivers_.contains(existingReceiverIt->second) &&
// We already have a receiver with the given subscription ID. Trigger !existingReceiverIt->second->isReceiverCancelled()) {
// cancellation on that previous receiver. // We already have a receiver with the given subscription ID. Trigger
if (!existingReceiverIt->second->isReceiverCancelled()) { // cancellation on that previous receiver.
existingReceiverIt->second->receiverCancel(); existingReceiverIt->second->receiverCancel();
} }
receiversBySubscriptionId_.erase(existingReceiverIt); receiversBySubscriptionId_.erase(existingReceiverIt);
...@@ -187,7 +187,8 @@ class MergeChannelProcessor ...@@ -187,7 +187,8 @@ class MergeChannelProcessor
if (receiverIt == receiversBySubscriptionId_.end()) { if (receiverIt == receiversBySubscriptionId_.end()) {
return; return;
} }
if (!receiverIt->second->isReceiverCancelled()) { if (receivers_.contains(receiverIt->second) &&
!receiverIt->second->isReceiverCancelled()) {
receiverIt->second->receiverCancel(); receiverIt->second->receiverCancel();
} }
receiversBySubscriptionId_.erase(receiverIt); receiversBySubscriptionId_.erase(receiverIt);
...@@ -390,10 +391,18 @@ class MergeChannelProcessor ...@@ -390,10 +391,18 @@ class MergeChannelProcessor
ChannelBridgePtr<TValue> sender_; ChannelBridgePtr<TValue> sender_;
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_; folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
bool handleDestroyed_{false};
// The set of receivers that feed into this MergeChannel. This set "owns" its
// receivers. MergeChannelProcessor must free any receiver removed from this
// set.
folly::F14FastSet<ChannelBridge<TValue>*> receivers_; folly::F14FastSet<ChannelBridge<TValue>*> receivers_;
folly::F14FastMap<TSubscriptionId, ChannelBridge<TValue>*>
// A non-owning map from subscription ID to receiver. If the receiver for a
// given subscription ID is not present in receivers_, it has been freed and
// must not be used.
folly::F14FastMap<TSubscriptionId, ChannelBridge<TValue>*> //
receiversBySubscriptionId_; receiversBySubscriptionId_;
bool handleDestroyed_{false};
}; };
} // namespace detail } // namespace detail
......
...@@ -159,6 +159,69 @@ TEST_F( ...@@ -159,6 +159,69 @@ TEST_F(
executor_.drain(); executor_.drain();
} }
TEST_F(MergeChannelFixture, ReceiveValues_RemoveReceiver) {
auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create();
auto [mergedReceiver, mergeChannel] =
createMergeChannel<int, std::string>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
sender1.write(1);
sender2.write(2);
executor_.drain();
mergeChannel.removeReceiver("sub2");
sender1.write(3);
sender2.write(4);
executor_.drain();
std::move(mergeChannel).close();
executor_.drain();
}
TEST_F(MergeChannelFixture, ReceiveValues_RemoveReceiver_AfterClose) {
auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create();
auto [mergedReceiver, mergeChannel] =
createMergeChannel<int, std::string>(&executor_);
mergeChannel.addNewReceiver("sub1", std::move(receiver1));
mergeChannel.addNewReceiver("sub2", std::move(receiver2));
executor_.drain();
EXPECT_CALL(onNext_, onValue(1));
EXPECT_CALL(onNext_, onValue(2));
EXPECT_CALL(onNext_, onValue(3));
EXPECT_CALL(onNext_, onClosed());
auto callbackHandle = processValues(std::move(mergedReceiver));
sender1.write(1);
sender2.write(2);
executor_.drain();
std::move(sender2).close();
executor_.drain();
mergeChannel.removeReceiver("sub2");
sender1.write(3);
executor_.drain();
std::move(mergeChannel).close();
executor_.drain();
}
TEST_F(MergeChannelFixture, OneInputClosed_ContinuesMerging) { TEST_F(MergeChannelFixture, OneInputClosed_ContinuesMerging) {
auto [receiver1, sender1] = Channel<int>::create(); auto [receiver1, sender1] = Channel<int>::create();
auto [receiver2, sender2] = Channel<int>::create(); auto [receiver2, sender2] = Channel<int>::create();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment