Commit c3bb66ae authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

Remove custom equality/hash function for ChannelBridge

Summary:
Currently, both Merge and MergeChannel use a folly::F14FastSet of channel bridge unique pointers. In order to look up items inside this hash set by raw pointer, a custom equality and hash function are provided. These functions just check for equality of the underlying raw pointers (and run std::hash on the raw pointers).

This has turned into a large performance bottleneck. For some reason, folly::F14FastSet's performance tanks when using custom equality/hash functions. The performance seems to get worse as more items are added. In tests I was running, a single lookup took over 5ms with a set of 200k items.

This diff removes the custom equality/hash functions for ChannelBridge. Merge and MergeChannel are changed to use hash sets of raw pointers. Lifetime is managed outside of the F14FastSet (by explicitly deleting the raw pointers after items are removed from the set).

Reviewed By: aary

Differential Revision: D31426627

fbshipit-source-id: 1ce7fd1b431704f6353b9ea3d689cee002dd9bdc
parent 24759fc8
...@@ -84,13 +84,13 @@ class MergeProcessor : public IChannelCallback { ...@@ -84,13 +84,13 @@ class MergeProcessor : public IChannelCallback {
.insert(std::make_pair( .insert(std::make_pair(
unbufferedInputReceiver.get(), std::move(buffer))) unbufferedInputReceiver.get(), std::move(buffer)))
.second); .second);
receivers_.insert(std::move(unbufferedInputReceiver)); receivers_.insert(unbufferedInputReceiver.release());
} }
for (auto& receiver : receivers_) { for (auto* receiver : receivers_) {
processAllAvailableValues( processAllAvailableValues(
receiver.get(), receiver,
!buffers.empty() !buffers.empty()
? std::make_optional(std::move(buffers.at(receiver.get()))) ? std::make_optional(std::move(buffers.at(receiver)))
: std::nullopt); : std::nullopt);
} }
}); });
...@@ -210,14 +210,15 @@ class MergeProcessor : public IChannelCallback { ...@@ -210,14 +210,15 @@ class MergeProcessor : public IChannelCallback {
ChannelBridge<TValue>* receiver, CloseResult closeResult) { ChannelBridge<TValue>* receiver, CloseResult closeResult) {
CHECK_EQ(getReceiverState(receiver), ChannelState::CancellationTriggered); CHECK_EQ(getReceiverState(receiver), ChannelState::CancellationTriggered);
receivers_.erase(receiver); receivers_.erase(receiver);
(ChannelBridgePtr<TValue>(receiver));
if (closeResult.exception.has_value()) { if (closeResult.exception.has_value()) {
// We received an exception. We need to close the sender and all other // We received an exception. We need to close the sender and all other
// receivers. // receivers.
if (getSenderState() == ChannelState::Active) { if (getSenderState() == ChannelState::Active) {
sender_->senderClose(std::move(closeResult.exception.value())); sender_->senderClose(std::move(closeResult.exception.value()));
} }
for (auto& otherReceiver : receivers_) { for (auto* otherReceiver : receivers_) {
if (getReceiverState(otherReceiver.get()) == ChannelState::Active) { if (getReceiverState(otherReceiver) == ChannelState::Active) {
otherReceiver->receiverCancel(); otherReceiver->receiverCancel();
} }
} }
...@@ -238,8 +239,8 @@ class MergeProcessor : public IChannelCallback { ...@@ -238,8 +239,8 @@ class MergeProcessor : public IChannelCallback {
void processSenderCancelled() { void processSenderCancelled() {
CHECK_EQ(getSenderState(), ChannelState::CancellationTriggered); CHECK_EQ(getSenderState(), ChannelState::CancellationTriggered);
sender_ = nullptr; sender_ = nullptr;
for (auto& receiver : receivers_) { for (auto* receiver : receivers_) {
if (getReceiverState(receiver.get()) == ChannelState::Active) { if (getReceiverState(receiver) == ChannelState::Active) {
receiver->receiverCancel(); receiver->receiverCancel();
} }
} }
...@@ -265,11 +266,7 @@ class MergeProcessor : public IChannelCallback { ...@@ -265,11 +266,7 @@ class MergeProcessor : public IChannelCallback {
return detail::getSenderState(sender_.get()); return detail::getSenderState(sender_.get());
} }
folly::F14FastSet< folly::F14FastSet<ChannelBridge<TValue>*> receivers_;
ChannelBridgePtr<TValue>,
ChannelBridgeHash<TValue>,
ChannelBridgeEqual<TValue>>
receivers_;
ChannelBridgePtr<TValue> sender_; ChannelBridgePtr<TValue> sender_;
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_; folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
}; };
......
...@@ -170,7 +170,7 @@ class MergeChannelProcessor ...@@ -170,7 +170,7 @@ class MergeChannelProcessor
receiversBySubscriptionId_.insert( receiversBySubscriptionId_.insert(
std::make_pair(subscriptionId, unbufferedReceiver.get())); std::make_pair(subscriptionId, unbufferedReceiver.get()));
auto* receiverPtr = unbufferedReceiver.get(); auto* receiverPtr = unbufferedReceiver.get();
receivers_.insert(std::move(unbufferedReceiver)); receivers_.insert(unbufferedReceiver.release());
processAllAvailableValues(receiverPtr, std::move(buffer)); processAllAvailableValues(receiverPtr, std::move(buffer));
}); });
} }
...@@ -313,14 +313,15 @@ class MergeChannelProcessor ...@@ -313,14 +313,15 @@ class MergeChannelProcessor
ChannelBridge<TValue>* receiver, CloseResult closeResult) { ChannelBridge<TValue>* receiver, CloseResult closeResult) {
CHECK(getReceiverState(receiver) == ChannelState::CancellationTriggered); CHECK(getReceiverState(receiver) == ChannelState::CancellationTriggered);
receivers_.erase(receiver); receivers_.erase(receiver);
(ChannelBridgePtr<TValue>(receiver));
if (closeResult.exception.has_value()) { if (closeResult.exception.has_value()) {
// We received an exception. We need to close the sender and all // We received an exception. We need to close the sender and all
// receivers. // receivers.
if (getSenderState() == ChannelState::Active) { if (getSenderState() == ChannelState::Active) {
sender_->senderClose(std::move(closeResult.exception.value())); sender_->senderClose(std::move(closeResult.exception.value()));
} }
for (auto& otherReceiver : receivers_) { for (auto* otherReceiver : receivers_) {
if (getReceiverState(otherReceiver.get()) == ChannelState::Active) { if (getReceiverState(otherReceiver) == ChannelState::Active) {
otherReceiver->receiverCancel(); otherReceiver->receiverCancel();
} }
} }
...@@ -336,8 +337,8 @@ class MergeChannelProcessor ...@@ -336,8 +337,8 @@ class MergeChannelProcessor
void processSenderCancelled() { void processSenderCancelled() {
CHECK(getSenderState() == ChannelState::CancellationTriggered); CHECK(getSenderState() == ChannelState::CancellationTriggered);
sender_ = nullptr; sender_ = nullptr;
for (auto& receiver : receivers_) { for (auto* receiver : receivers_) {
if (getReceiverState(receiver.get()) == ChannelState::Active) { if (getReceiverState(receiver) == ChannelState::Active) {
receiver->receiverCancel(); receiver->receiverCancel();
} }
} }
...@@ -359,8 +360,8 @@ class MergeChannelProcessor ...@@ -359,8 +360,8 @@ class MergeChannelProcessor
sender_->senderClose(); sender_->senderClose();
} }
} }
for (auto& receiver : receivers_) { for (auto* receiver : receivers_) {
if (getReceiverState(receiver.get()) == ChannelState::Active) { if (getReceiverState(receiver) == ChannelState::Active) {
receiver->receiverCancel(); receiver->receiverCancel();
} }
} }
...@@ -389,11 +390,7 @@ class MergeChannelProcessor ...@@ -389,11 +390,7 @@ class MergeChannelProcessor
ChannelBridgePtr<TValue> sender_; ChannelBridgePtr<TValue> sender_;
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_; folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
folly::F14FastSet< folly::F14FastSet<ChannelBridge<TValue>*> receivers_;
ChannelBridgePtr<TValue>,
ChannelBridgeHash<TValue>,
ChannelBridgeEqual<TValue>>
receivers_;
folly::F14FastMap<TSubscriptionId, ChannelBridge<TValue>*> folly::F14FastMap<TSubscriptionId, ChannelBridge<TValue>*>
receiversBySubscriptionId_; receiversBySubscriptionId_;
bool handleDestroyed_{false}; bool handleDestroyed_{false};
......
...@@ -137,42 +137,6 @@ class ChannelBridge : public ChannelBridgeBase { ...@@ -137,42 +137,6 @@ class ChannelBridge : public ChannelBridgeBase {
template <typename TValue> template <typename TValue>
using ChannelBridgePtr = typename ChannelBridge<TValue>::Ptr; using ChannelBridgePtr = typename ChannelBridge<TValue>::Ptr;
template <typename TValue>
struct ChannelBridgeHash {
using is_transparent = std::true_type;
bool operator()(ChannelBridgePtr<TValue> const& arg) const {
return std::hash<ChannelBridgePtr<TValue>>()(arg);
}
bool operator()(ChannelBridge<TValue>* const& arg) const {
return std::hash<ChannelBridge<TValue>*>()(arg);
}
};
template <typename TValue>
struct ChannelBridgeEqual {
using is_transparent = std::true_type;
bool operator()(
ChannelBridgePtr<TValue> const& lhs,
ChannelBridgePtr<TValue> const& rhs) const {
return lhs.get() == rhs.get();
}
bool operator()(
ChannelBridge<TValue>* const& lhs,
ChannelBridgePtr<TValue> const& rhs) const {
return lhs == rhs.get();
}
bool operator()(
ChannelBridgePtr<TValue> const& lhs,
ChannelBridge<TValue>* const& rhs) const {
return lhs.get() == rhs;
}
};
} // namespace detail } // namespace detail
} // namespace channels } // namespace channels
} // namespace folly } // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment