Commit c7e095e7 authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

FanoutChannel: Rename getNewReceiver/anyReceivers to subscribe/anySubscribers

Summary: This diff renames getNewReceiver/anyReceivers to subscribe/anySubscribers.

Reviewed By: aary

Differential Revision: D30889890

fbshipit-source-id: 8c3d8b1b1e930a703b5ce40e05ee129531af255e
parent 702374dc
...@@ -59,14 +59,14 @@ FanoutChannel<ValueType>::operator bool() const { ...@@ -59,14 +59,14 @@ FanoutChannel<ValueType>::operator bool() const {
} }
template <typename ValueType> template <typename ValueType>
Receiver<ValueType> FanoutChannel<ValueType>::getNewReceiver( Receiver<ValueType> FanoutChannel<ValueType>::subscribe(
folly::Function<std::vector<ValueType>()> getInitialValues) { folly::Function<std::vector<ValueType>()> getInitialValues) {
return processor_->getNewReceiver(std::move(getInitialValues)); return processor_->subscribe(std::move(getInitialValues));
} }
template <typename ValueType> template <typename ValueType>
bool FanoutChannel<ValueType>::anyReceivers() { bool FanoutChannel<ValueType>::anySubscribers() {
return processor_->anySenders(); return processor_->anySubscribers();
} }
template <typename ValueType> template <typename ValueType>
...@@ -81,10 +81,10 @@ namespace detail { ...@@ -81,10 +81,10 @@ namespace detail {
template <typename ValueType> template <typename ValueType>
class IFanoutChannelProcessor : public IChannelCallback { class IFanoutChannelProcessor : public IChannelCallback {
public: public:
virtual Receiver<ValueType> getNewReceiver( virtual Receiver<ValueType> subscribe(
folly::Function<std::vector<ValueType>()> getInitialValues) = 0; folly::Function<std::vector<ValueType>()> getInitialValues) = 0;
virtual bool anySenders() = 0; virtual bool anySubscribers() = 0;
virtual void destroyHandle(CloseResult closeResult) = 0; virtual void destroyHandle(CloseResult closeResult) = 0;
}; };
...@@ -149,7 +149,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<ValueType> { ...@@ -149,7 +149,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<ValueType> {
* to determine the set of initial values that will (only) go to the new input * to determine the set of initial values that will (only) go to the new input
* receiver. * receiver.
*/ */
Receiver<ValueType> getNewReceiver( Receiver<ValueType> subscribe(
folly::Function<std::vector<ValueType>()> getInitialValues) override { folly::Function<std::vector<ValueType>()> getInitialValues) override {
auto state = state_.wlock(); auto state = state_.wlock();
auto initialValues = auto initialValues =
...@@ -178,8 +178,8 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<ValueType> { ...@@ -178,8 +178,8 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<ValueType> {
/** /**
* Returns whether this fanout channel has any output receivers. * Returns whether this fanout channel has any output receivers.
*/ */
bool anySenders() override { bool anySubscribers() override {
return state_.wlock()->fanoutSender.anyReceivers(); return state_.wlock()->fanoutSender.anySubscribers();
} }
private: private:
......
...@@ -44,9 +44,9 @@ class IFanoutChannelProcessor; ...@@ -44,9 +44,9 @@ class IFanoutChannelProcessor;
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor(); * folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
* *
* auto fanoutChannel = createFanoutChannel(getReceiver(), getExecutor()); * auto fanoutChannel = createFanoutChannel(getReceiver(), getExecutor());
* auto receiver1 = fanoutChannel.newReceiver(); * auto receiver1 = fanoutChannel.subscribe();
* auto receiver2 = fanoutChannel.newReceiver(); * auto receiver2 = fanoutChannel.subscribe();
* auto receiver3 = fanoutChannel.newReceiver([]{ return {1, 2, 3}; }); * auto receiver3 = fanoutChannel.subscribe([]{ return {1, 2, 3}; });
*/ */
template <typename ValueType> template <typename ValueType>
class FanoutChannel { class FanoutChannel {
...@@ -72,13 +72,13 @@ class FanoutChannel { ...@@ -72,13 +72,13 @@ class FanoutChannel {
* receiver. Other functions on this class should not be called from within * receiver. Other functions on this class should not be called from within
* getInitialValues, or a deadlock will occur. * getInitialValues, or a deadlock will occur.
*/ */
Receiver<ValueType> getNewReceiver( Receiver<ValueType> subscribe(
folly::Function<std::vector<ValueType>()> getInitialValues = {}); folly::Function<std::vector<ValueType>()> getInitialValues = {});
/** /**
* Returns whether this fanout channel has any output receivers. * Returns whether this fanout channel has any output receivers.
*/ */
bool anyReceivers(); bool anySubscribers();
/** /**
* Closes the fanout channel. * Closes the fanout channel.
......
...@@ -49,7 +49,7 @@ Receiver<ValueType> FanoutSender<ValueType>::subscribe( ...@@ -49,7 +49,7 @@ Receiver<ValueType> FanoutSender<ValueType>::subscribe(
for (auto&& initialValue : initialValues) { for (auto&& initialValue : initialValues) {
newSender.write(std::move(initialValue)); newSender.write(std::move(initialValue));
} }
if (!anyReceivers()) { if (!anySubscribers()) {
// There are currently no output receivers. Store the new output receiver. // There are currently no output receivers. Store the new output receiver.
senders_.set(detail::senderGetBridge(newSender).release()); senders_.set(detail::senderGetBridge(newSender).release());
} else if (!hasSenderSet()) { } else if (!hasSenderSet()) {
...@@ -69,7 +69,7 @@ Receiver<ValueType> FanoutSender<ValueType>::subscribe( ...@@ -69,7 +69,7 @@ Receiver<ValueType> FanoutSender<ValueType>::subscribe(
} }
template <typename ValueType> template <typename ValueType>
bool FanoutSender<ValueType>::anyReceivers() { bool FanoutSender<ValueType>::anySubscribers() {
clearSendersWithClosedReceivers(); clearSendersWithClosedReceivers();
return hasSenderSet() || getSingleSender() != nullptr; return hasSenderSet() || getSingleSender() != nullptr;
} }
...@@ -78,7 +78,7 @@ template <typename ValueType> ...@@ -78,7 +78,7 @@ template <typename ValueType>
template <typename U> template <typename U>
void FanoutSender<ValueType>::write(U&& element) { void FanoutSender<ValueType>::write(U&& element) {
clearSendersWithClosedReceivers(); clearSendersWithClosedReceivers();
if (!anyReceivers()) { if (!anySubscribers()) {
// There are currently no output receivers to write to. // There are currently no output receivers to write to.
return; return;
} else if (!hasSenderSet()) { } else if (!hasSenderSet()) {
...@@ -96,7 +96,7 @@ void FanoutSender<ValueType>::write(U&& element) { ...@@ -96,7 +96,7 @@ void FanoutSender<ValueType>::write(U&& element) {
template <typename ValueType> template <typename ValueType>
void FanoutSender<ValueType>::close(folly::exception_wrapper ex) && { void FanoutSender<ValueType>::close(folly::exception_wrapper ex) && {
clearSendersWithClosedReceivers(); clearSendersWithClosedReceivers();
if (!anyReceivers()) { if (!anySubscribers()) {
// There are no output receivers to close. // There are no output receivers to close.
return; return;
} else if (!hasSenderSet()) { } else if (!hasSenderSet()) {
......
...@@ -61,7 +61,7 @@ class FanoutSender { ...@@ -61,7 +61,7 @@ class FanoutSender {
/** /**
* Returns whether this fanout sender has any active output receivers. * Returns whether this fanout sender has any active output receivers.
*/ */
bool anyReceivers(); bool anySubscribers();
/** /**
* Sends the given value to all corresponding receivers. * Sends the given value to all corresponding receivers.
......
...@@ -64,14 +64,14 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) { ...@@ -64,14 +64,14 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) {
auto fanoutChannel = auto fanoutChannel =
createFanoutChannel(std::move(inputReceiver), &executor_); createFanoutChannel(std::move(inputReceiver), &executor_);
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
auto [handle1, callback1] = processValues(fanoutChannel.getNewReceiver( auto [handle1, callback1] = processValues(fanoutChannel.subscribe(
[]() { return toVector(100); } /* getInitialValues */)); []() { return toVector(100); } /* getInitialValues */));
auto [handle2, callback2] = processValues(fanoutChannel.getNewReceiver( auto [handle2, callback2] = processValues(fanoutChannel.subscribe(
[]() { return toVector(200); } /* getInitialValues */)); []() { return toVector(200); } /* getInitialValues */));
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
EXPECT_CALL(*callback1, onValue(100)); EXPECT_CALL(*callback1, onValue(100));
EXPECT_CALL(*callback2, onValue(200)); EXPECT_CALL(*callback2, onValue(200));
executor_.drain(); executor_.drain();
...@@ -84,7 +84,7 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) { ...@@ -84,7 +84,7 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) {
sender.write(2); sender.write(2);
executor_.drain(); executor_.drain();
auto [handle3, callback3] = processValues(fanoutChannel.getNewReceiver( auto [handle3, callback3] = processValues(fanoutChannel.subscribe(
[]() { return toVector(300); } /* getInitialValues */)); []() { return toVector(300); } /* getInitialValues */));
EXPECT_CALL(*callback3, onValue(300)); EXPECT_CALL(*callback3, onValue(300));
...@@ -101,7 +101,7 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) { ...@@ -101,7 +101,7 @@ TEST_F(FanoutChannelFixture, ReceiveValue_FanoutBroadcastsValues) {
EXPECT_CALL(*callback3, onClosed()); EXPECT_CALL(*callback3, onClosed());
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
} }
TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) { TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) {
...@@ -109,8 +109,8 @@ TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) { ...@@ -109,8 +109,8 @@ TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) {
auto fanoutChannel = auto fanoutChannel =
createFanoutChannel(std::move(inputReceiver), &executor_); createFanoutChannel(std::move(inputReceiver), &executor_);
auto [handle1, callback1] = processValues(fanoutChannel.getNewReceiver()); auto [handle1, callback1] = processValues(fanoutChannel.subscribe());
auto [handle2, callback2] = processValues(fanoutChannel.getNewReceiver()); auto [handle2, callback2] = processValues(fanoutChannel.subscribe());
EXPECT_CALL(*callback1, onValue(1)); EXPECT_CALL(*callback1, onValue(1));
EXPECT_CALL(*callback2, onValue(1)); EXPECT_CALL(*callback2, onValue(1));
...@@ -119,7 +119,7 @@ TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) { ...@@ -119,7 +119,7 @@ TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
sender.write(1); sender.write(1);
executor_.drain(); executor_.drain();
...@@ -127,7 +127,7 @@ TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) { ...@@ -127,7 +127,7 @@ TEST_F(FanoutChannelFixture, InputClosed_AllOutputReceiversClose) {
std::move(sender).close(); std::move(sender).close();
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
} }
TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) { TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) {
...@@ -135,8 +135,8 @@ TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) { ...@@ -135,8 +135,8 @@ TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) {
auto fanoutChannel = auto fanoutChannel =
createFanoutChannel(std::move(inputReceiver), &executor_); createFanoutChannel(std::move(inputReceiver), &executor_);
auto [handle1, callback1] = processValues(fanoutChannel.getNewReceiver()); auto [handle1, callback1] = processValues(fanoutChannel.subscribe());
auto [handle2, callback2] = processValues(fanoutChannel.getNewReceiver()); auto [handle2, callback2] = processValues(fanoutChannel.subscribe());
EXPECT_CALL(*callback1, onValue(1)); EXPECT_CALL(*callback1, onValue(1));
EXPECT_CALL(*callback2, onValue(1)); EXPECT_CALL(*callback2, onValue(1));
...@@ -145,7 +145,7 @@ TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) { ...@@ -145,7 +145,7 @@ TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
sender.write(1); sender.write(1);
executor_.drain(); executor_.drain();
...@@ -153,7 +153,7 @@ TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) { ...@@ -153,7 +153,7 @@ TEST_F(FanoutChannelFixture, InputThrows_AllOutputReceiversGetException) {
std::move(sender).close(std::runtime_error("Error")); std::move(sender).close(std::runtime_error("Error"));
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
} }
TEST_F(FanoutChannelFixture, ReceiversCancelled) { TEST_F(FanoutChannelFixture, ReceiversCancelled) {
...@@ -161,8 +161,8 @@ TEST_F(FanoutChannelFixture, ReceiversCancelled) { ...@@ -161,8 +161,8 @@ TEST_F(FanoutChannelFixture, ReceiversCancelled) {
auto fanoutChannel = auto fanoutChannel =
createFanoutChannel(std::move(inputReceiver), &executor_); createFanoutChannel(std::move(inputReceiver), &executor_);
auto [handle1, callback1] = processValues(fanoutChannel.getNewReceiver()); auto [handle1, callback1] = processValues(fanoutChannel.subscribe());
auto [handle2, callback2] = processValues(fanoutChannel.getNewReceiver()); auto [handle2, callback2] = processValues(fanoutChannel.subscribe());
EXPECT_CALL(*callback1, onValue(1)); EXPECT_CALL(*callback1, onValue(1));
EXPECT_CALL(*callback2, onValue(1)); EXPECT_CALL(*callback2, onValue(1));
...@@ -172,29 +172,29 @@ TEST_F(FanoutChannelFixture, ReceiversCancelled) { ...@@ -172,29 +172,29 @@ TEST_F(FanoutChannelFixture, ReceiversCancelled) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
sender.write(1); sender.write(1);
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
handle1.reset(); handle1.reset();
sender.write(2); sender.write(2);
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
handle2.reset(); handle2.reset();
sender.write(3); sender.write(3);
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
std::move(sender).close(); std::move(sender).close();
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
} }
TEST_F(FanoutChannelFixture, VectorBool) { TEST_F(FanoutChannelFixture, VectorBool) {
...@@ -202,9 +202,9 @@ TEST_F(FanoutChannelFixture, VectorBool) { ...@@ -202,9 +202,9 @@ TEST_F(FanoutChannelFixture, VectorBool) {
auto fanoutChannel = auto fanoutChannel =
createFanoutChannel(std::move(inputReceiver), &executor_); createFanoutChannel(std::move(inputReceiver), &executor_);
auto [handle1, callback1] = processValues(fanoutChannel.getNewReceiver( auto [handle1, callback1] = processValues(fanoutChannel.subscribe(
[] { return toVector(true); } /* getInitialValues */)); [] { return toVector(true); } /* getInitialValues */));
auto [handle2, callback2] = processValues(fanoutChannel.getNewReceiver( auto [handle2, callback2] = processValues(fanoutChannel.subscribe(
[] { return toVector(false); } /* getInitialValues */)); [] { return toVector(false); } /* getInitialValues */));
EXPECT_CALL(*callback1, onValue(true)); EXPECT_CALL(*callback1, onValue(true));
...@@ -212,7 +212,7 @@ TEST_F(FanoutChannelFixture, VectorBool) { ...@@ -212,7 +212,7 @@ TEST_F(FanoutChannelFixture, VectorBool) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutChannel.anyReceivers()); EXPECT_TRUE(fanoutChannel.anySubscribers());
EXPECT_CALL(*callback1, onValue(true)); EXPECT_CALL(*callback1, onValue(true));
EXPECT_CALL(*callback2, onValue(true)); EXPECT_CALL(*callback2, onValue(true));
...@@ -229,7 +229,7 @@ TEST_F(FanoutChannelFixture, VectorBool) { ...@@ -229,7 +229,7 @@ TEST_F(FanoutChannelFixture, VectorBool) {
std::move(sender).close(); std::move(sender).close();
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutChannel.anyReceivers()); EXPECT_FALSE(fanoutChannel.anySubscribers());
} }
class FanoutChannelFixtureStress : public Test { class FanoutChannelFixtureStress : public Test {
...@@ -276,12 +276,12 @@ TEST_F(FanoutChannelFixtureStress, HandleClosed) { ...@@ -276,12 +276,12 @@ TEST_F(FanoutChannelFixtureStress, HandleClosed) {
std::move(receiver), std::move(receiver),
folly::SerialExecutor::create(&fanoutChannelExecutor)); folly::SerialExecutor::create(&fanoutChannelExecutor));
consumers_.at(0)->startConsuming(fanoutChannel.getNewReceiver()); consumers_.at(0)->startConsuming(fanoutChannel.subscribe());
consumers_.at(1)->startConsuming(fanoutChannel.getNewReceiver()); consumers_.at(1)->startConsuming(fanoutChannel.subscribe());
sleepFor(kTestTimeout / 3); sleepFor(kTestTimeout / 3);
consumers_.at(2)->startConsuming(fanoutChannel.getNewReceiver()); consumers_.at(2)->startConsuming(fanoutChannel.subscribe());
sleepFor(kTestTimeout / 3); sleepFor(kTestTimeout / 3);
...@@ -304,12 +304,12 @@ TEST_F(FanoutChannelFixtureStress, InputChannelClosed) { ...@@ -304,12 +304,12 @@ TEST_F(FanoutChannelFixtureStress, InputChannelClosed) {
std::move(receiver), std::move(receiver),
folly::SerialExecutor::create(&fanoutChannelExecutor)); folly::SerialExecutor::create(&fanoutChannelExecutor));
consumers_.at(0)->startConsuming(fanoutChannel.getNewReceiver()); consumers_.at(0)->startConsuming(fanoutChannel.subscribe());
consumers_.at(1)->startConsuming(fanoutChannel.getNewReceiver()); consumers_.at(1)->startConsuming(fanoutChannel.subscribe());
sleepFor(kTestTimeout / 3); sleepFor(kTestTimeout / 3);
consumers_.at(2)->startConsuming(fanoutChannel.getNewReceiver()); consumers_.at(2)->startConsuming(fanoutChannel.subscribe());
sleepFor(kTestTimeout / 3); sleepFor(kTestTimeout / 3);
......
...@@ -74,7 +74,7 @@ TEST_F(FanoutSenderFixture, WriteValue_FanoutBroadcastsValues) { ...@@ -74,7 +74,7 @@ TEST_F(FanoutSenderFixture, WriteValue_FanoutBroadcastsValues) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutSender.anyReceivers()); EXPECT_TRUE(fanoutSender.anySubscribers());
fanoutSender.write(1); fanoutSender.write(1);
fanoutSender.write(2); fanoutSender.write(2);
...@@ -97,7 +97,7 @@ TEST_F(FanoutSenderFixture, InputThrows_AllOutputReceiversGetException) { ...@@ -97,7 +97,7 @@ TEST_F(FanoutSenderFixture, InputThrows_AllOutputReceiversGetException) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutSender.anyReceivers()); EXPECT_TRUE(fanoutSender.anySubscribers());
fanoutSender.write(1); fanoutSender.write(1);
executor_.drain(); executor_.drain();
...@@ -120,24 +120,24 @@ TEST_F(FanoutSenderFixture, ReceiversCancelled) { ...@@ -120,24 +120,24 @@ TEST_F(FanoutSenderFixture, ReceiversCancelled) {
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutSender.anyReceivers()); EXPECT_TRUE(fanoutSender.anySubscribers());
fanoutSender.write(1); fanoutSender.write(1);
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutSender.anyReceivers()); EXPECT_TRUE(fanoutSender.anySubscribers());
handle1.reset(); handle1.reset();
fanoutSender.write(2); fanoutSender.write(2);
executor_.drain(); executor_.drain();
EXPECT_TRUE(fanoutSender.anyReceivers()); EXPECT_TRUE(fanoutSender.anySubscribers());
handle2.reset(); handle2.reset();
fanoutSender.write(3); fanoutSender.write(3);
executor_.drain(); executor_.drain();
EXPECT_FALSE(fanoutSender.anyReceivers()); EXPECT_FALSE(fanoutSender.anySubscribers());
std::move(fanoutSender).close(); std::move(fanoutSender).close();
executor_.drain(); executor_.drain();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment