Commit bf1b6d05 authored by Andrew Smith's avatar Andrew Smith Committed by Facebook GitHub Bot

FanoutChannel: Change TValue to ValueType

Summary: This diff changes TValue to ValueType in FanoutChannel (adhering to the folly guideline of not prefixing template parameters with T).

Reviewed By: aary

Differential Revision: D30889892

fbshipit-source-id: 2ef79620289d554c9802124426d3c4ac70a06f12
parent 140e8ccd
...@@ -24,16 +24,16 @@ ...@@ -24,16 +24,16 @@
namespace folly { namespace folly {
namespace channels { namespace channels {
template <typename TValue> template <typename ValueType>
FanoutChannel<TValue>::FanoutChannel(TProcessor* processor) FanoutChannel<ValueType>::FanoutChannel(TProcessor* processor)
: processor_(processor) {} : processor_(processor) {}
template <typename TValue> template <typename ValueType>
FanoutChannel<TValue>::FanoutChannel(FanoutChannel&& other) noexcept FanoutChannel<ValueType>::FanoutChannel(FanoutChannel&& other) noexcept
: processor_(std::exchange(other.processor_, nullptr)) {} : processor_(std::exchange(other.processor_, nullptr)) {}
template <typename TValue> template <typename ValueType>
FanoutChannel<TValue>& FanoutChannel<TValue>::operator=( FanoutChannel<ValueType>& FanoutChannel<ValueType>::operator=(
FanoutChannel&& other) noexcept { FanoutChannel&& other) noexcept {
if (&other == this) { if (&other == this) {
return *this; return *this;
...@@ -45,31 +45,31 @@ FanoutChannel<TValue>& FanoutChannel<TValue>::operator=( ...@@ -45,31 +45,31 @@ FanoutChannel<TValue>& FanoutChannel<TValue>::operator=(
return *this; return *this;
} }
template <typename TValue> template <typename ValueType>
FanoutChannel<TValue>::~FanoutChannel() { FanoutChannel<ValueType>::~FanoutChannel() {
if (processor_ != nullptr) { if (processor_ != nullptr) {
std::move(*this).close(std::nullopt /* ex */); std::move(*this).close(std::nullopt /* ex */);
} }
} }
template <typename TValue> template <typename ValueType>
FanoutChannel<TValue>::operator bool() const { FanoutChannel<ValueType>::operator bool() const {
return processor_ != nullptr; return processor_ != nullptr;
} }
template <typename TValue> template <typename ValueType>
Receiver<TValue> FanoutChannel<TValue>::getNewReceiver( Receiver<ValueType> FanoutChannel<ValueType>::getNewReceiver(
folly::Function<std::vector<TValue>()> getInitialValues) { folly::Function<std::vector<ValueType>()> getInitialValues) {
return processor_->newReceiver(std::move(getInitialValues)); return processor_->newReceiver(std::move(getInitialValues));
} }
template <typename TValue> template <typename ValueType>
bool FanoutChannel<TValue>::anyReceivers() { bool FanoutChannel<ValueType>::anyReceivers() {
return processor_->anySenders(); return processor_->anySenders();
} }
template <typename TValue> template <typename ValueType>
void FanoutChannel<TValue>::close( void FanoutChannel<ValueType>::close(
std::optional<folly::exception_wrapper> ex) && { std::optional<folly::exception_wrapper> ex) && {
processor_->destroyHandle( processor_->destroyHandle(
ex.has_value() ? detail::CloseResult(std::move(ex.value())) ex.has_value() ? detail::CloseResult(std::move(ex.value()))
...@@ -79,11 +79,11 @@ void FanoutChannel<TValue>::close( ...@@ -79,11 +79,11 @@ void FanoutChannel<TValue>::close(
namespace detail { namespace detail {
template <typename TValue> template <typename ValueType>
class IFanoutChannelProcessor : public IChannelCallback { class IFanoutChannelProcessor : public IChannelCallback {
public: public:
virtual Receiver<TValue> newReceiver( virtual Receiver<ValueType> newReceiver(
folly::Function<std::vector<TValue>()> getInitialValues) = 0; folly::Function<std::vector<ValueType>()> getInitialValues) = 0;
virtual bool anySenders() = 0; virtual bool anySenders() = 0;
...@@ -116,8 +116,8 @@ class IFanoutChannelProcessor : public IChannelCallback { ...@@ -116,8 +116,8 @@ class IFanoutChannelProcessor : public IChannelCallback {
* This object will then be deleted once the input receiver and each remaining * This object will then be deleted once the input receiver and each remaining
* input receiver transitions to the CancellationProcessed state. * input receiver transitions to the CancellationProcessed state.
*/ */
template <typename TValue> template <typename ValueType>
class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { class FanoutChannelProcessor : public IFanoutChannelProcessor<ValueType> {
public: public:
explicit FanoutChannelProcessor( explicit FanoutChannelProcessor(
folly::Executor::KeepAlive<folly::SequencedExecutor> executor) folly::Executor::KeepAlive<folly::SequencedExecutor> executor)
...@@ -128,7 +128,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -128,7 +128,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
* *
* @param inputReceiver: The input receiver to fan out values from. * @param inputReceiver: The input receiver to fan out values from.
*/ */
void start(Receiver<TValue> inputReceiver) { void start(Receiver<ValueType> inputReceiver) {
executor_->add([=, inputReceiver = std::move(inputReceiver)]() mutable { executor_->add([=, inputReceiver = std::move(inputReceiver)]() mutable {
auto [unbufferedInputReceiver, buffer] = auto [unbufferedInputReceiver, buffer] =
detail::receiverUnbuffer(std::move(inputReceiver)); detail::receiverUnbuffer(std::move(inputReceiver));
...@@ -145,10 +145,10 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -145,10 +145,10 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
* to determine the set of initial values that will (only) go to the new input * to determine the set of initial values that will (only) go to the new input
* receiver. * receiver.
*/ */
Receiver<TValue> newReceiver( Receiver<ValueType> newReceiver(
folly::Function<std::vector<TValue>()> getInitialValues) override { folly::Function<std::vector<ValueType>()> getInitialValues) override {
numSendersPlusHandle_++; numSendersPlusHandle_++;
auto [receiver, sender] = Channel<TValue>::create(); auto [receiver, sender] = Channel<ValueType>::create();
executor_->add([=, executor_->add([=,
sender = std::move(senderGetBridge(sender)), sender = std::move(senderGetBridge(sender)),
getInitialValues = std::move(getInitialValues)]() mutable { getInitialValues = std::move(getInitialValues)]() mutable {
...@@ -190,7 +190,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -190,7 +190,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
return detail::getReceiverState(receiver_.get()); return detail::getReceiverState(receiver_.get());
} }
ChannelState getSenderState(ChannelBridge<TValue>* sender) { ChannelState getSenderState(ChannelBridge<ValueType>* sender) {
return detail::getSenderState(sender); return detail::getSenderState(sender);
} }
...@@ -216,7 +216,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -216,7 +216,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
processAllAvailableValues(); processAllAvailableValues();
} else { } else {
// The consumer of an output receiver has stopped consuming. // The consumer of an output receiver has stopped consuming.
auto* sender = static_cast<ChannelBridge<TValue>*>(bridge); auto* sender = static_cast<ChannelBridge<ValueType>*>(bridge);
CHECK_NE(getSenderState(sender), ChannelState::CancellationProcessed); CHECK_NE(getSenderState(sender), ChannelState::CancellationProcessed);
sender->senderClose(); sender->senderClose();
processSenderCancelled(sender); processSenderCancelled(sender);
...@@ -236,7 +236,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -236,7 +236,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
// We previously cancelled the sender due to the closure of the input // We previously cancelled the sender due to the closure of the input
// receiver or the destruction of the user's FanoutChannel object. // receiver or the destruction of the user's FanoutChannel object.
// Process the cancellation for the sender. // Process the cancellation for the sender.
auto* sender = static_cast<ChannelBridge<TValue>*>(bridge); auto* sender = static_cast<ChannelBridge<ValueType>*>(bridge);
CHECK_EQ(getSenderState(sender), ChannelState::CancellationTriggered); CHECK_EQ(getSenderState(sender), ChannelState::CancellationTriggered);
processSenderCancelled(sender); processSenderCancelled(sender);
} }
...@@ -252,7 +252,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -252,7 +252,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
* will process cancellation for the input receiver. * will process cancellation for the input receiver.
*/ */
void processAllAvailableValues( void processAllAvailableValues(
std::optional<ReceiverQueue<TValue>> buffer = std::nullopt) { std::optional<ReceiverQueue<ValueType>> buffer = std::nullopt) {
auto closeResult = receiver_->isReceiverCancelled() auto closeResult = receiver_->isReceiverCancelled()
? CloseResult() ? CloseResult()
: (buffer.has_value() ? processValues(std::move(buffer.value())) : (buffer.has_value() ? processValues(std::move(buffer.value()))
...@@ -280,7 +280,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -280,7 +280,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
* CloseResult if channel was closed, so the caller can stop attempting to * CloseResult if channel was closed, so the caller can stop attempting to
* process values from it. * process values from it.
*/ */
std::optional<CloseResult> processValues(ReceiverQueue<TValue> values) { std::optional<CloseResult> processValues(ReceiverQueue<ValueType> values) {
while (!values.empty()) { while (!values.empty()) {
auto inputResult = std::move(values.front()); auto inputResult = std::move(values.front());
values.pop(); values.pop();
...@@ -321,7 +321,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -321,7 +321,7 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
* Processes the cancellation of a sender (indicating that the consumer of * Processes the cancellation of a sender (indicating that the consumer of
* the corresponding output receiver has stopped consuming). * the corresponding output receiver has stopped consuming).
*/ */
void processSenderCancelled(ChannelBridge<TValue>* sender) { void processSenderCancelled(ChannelBridge<ValueType>* sender) {
CHECK_EQ(getSenderState(sender), ChannelState::CancellationTriggered); CHECK_EQ(getSenderState(sender), ChannelState::CancellationTriggered);
senders_.erase(sender); senders_.erase(sender);
numSendersPlusHandle_--; numSendersPlusHandle_--;
...@@ -361,25 +361,25 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> { ...@@ -361,25 +361,25 @@ class FanoutChannelProcessor : public IFanoutChannelProcessor<TValue> {
} }
} }
ChannelBridgePtr<TValue> receiver_; ChannelBridgePtr<ValueType> receiver_;
folly::Executor::KeepAlive<folly::SequencedExecutor> executor_; folly::Executor::KeepAlive<folly::SequencedExecutor> executor_;
folly::F14FastSet< folly::F14FastSet<
ChannelBridgePtr<TValue>, ChannelBridgePtr<ValueType>,
ChannelBridgeHash<TValue>, ChannelBridgeHash<ValueType>,
ChannelBridgeEqual<TValue>> ChannelBridgeEqual<ValueType>>
senders_; senders_;
std::atomic<size_t> numSendersPlusHandle_; std::atomic<size_t> numSendersPlusHandle_;
}; };
} // namespace detail } // namespace detail
template <typename TReceiver, typename TValue> template <typename ReceiverType, typename ValueType>
FanoutChannel<TValue> createFanoutChannel( FanoutChannel<ValueType> createFanoutChannel(
TReceiver inputReceiver, ReceiverType inputReceiver,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor) { folly::Executor::KeepAlive<folly::SequencedExecutor> executor) {
auto* processor = auto* processor =
new detail::FanoutChannelProcessor<TValue>(std::move(executor)); new detail::FanoutChannelProcessor<ValueType>(std::move(executor));
processor->start(std::move(inputReceiver)); processor->start(std::move(inputReceiver));
return FanoutChannel<TValue>(processor); return FanoutChannel<ValueType>(processor);
} }
} // namespace channels } // namespace channels
} // namespace folly } // namespace folly
...@@ -23,7 +23,7 @@ namespace folly { ...@@ -23,7 +23,7 @@ namespace folly {
namespace channels { namespace channels {
namespace detail { namespace detail {
template <typename TValue> template <typename ValueType>
class IFanoutChannelProcessor; class IFanoutChannelProcessor;
} }
...@@ -38,20 +38,20 @@ class IFanoutChannelProcessor; ...@@ -38,20 +38,20 @@ class IFanoutChannelProcessor;
* Example: * Example:
* *
* // Function that returns a receiver: * // Function that returns a receiver:
* Receiver<int> getInputReceiver(); * Receiver<int> getInpuReceiverType();
* *
* // Function that returns an executor * // Function that returns an executor
* folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor(); * folly::Executor::KeepAlive<folly::SequencedExecutor> getExecutor();
* *
* auto fanoutChannel = createFanoutChannel(getReceiver(), getExecutor()); * auto fanoutChannel = createFanoutChannel(geReceiverType(), getExecutor());
* auto receiver1 = fanoutChannel.newReceiver(); * auto receiver1 = fanoutChannel.newReceiver();
* auto receiver2 = fanoutChannel.newReceiver(); * auto receiver2 = fanoutChannel.newReceiver();
* auto receiver3 = fanoutChannel.newReceiver([]{ return {1, 2, 3}; }); * auto receiver3 = fanoutChannel.newReceiver([]{ return {1, 2, 3}; });
* std::move(fanoutChannel).close(); * std::move(fanoutChannel).close();
*/ */
template <typename TValue> template <typename ValueType>
class FanoutChannel { class FanoutChannel {
using TProcessor = detail::IFanoutChannelProcessor<TValue>; using TProcessor = detail::IFanoutChannelProcessor<ValueType>;
public: public:
explicit FanoutChannel(TProcessor* processor); explicit FanoutChannel(TProcessor* processor);
...@@ -70,8 +70,8 @@ class FanoutChannel { ...@@ -70,8 +70,8 @@ class FanoutChannel {
* to determine the set of initial values that will (only) go to the new input * to determine the set of initial values that will (only) go to the new input
* receiver. * receiver.
*/ */
Receiver<TValue> getNewReceiver( Receiver<ValueType> getNewReceiver(
folly::Function<std::vector<TValue>()> getInitialValues = {}); folly::Function<std::vector<ValueType>()> getInitialValues = {});
/** /**
* Returns whether this fanout channel has any output receivers. * Returns whether this fanout channel has any output receivers.
...@@ -90,9 +90,11 @@ class FanoutChannel { ...@@ -90,9 +90,11 @@ class FanoutChannel {
/** /**
* Creates a new fanout channel that fans out updates from an input receiver. * Creates a new fanout channel that fans out updates from an input receiver.
*/ */
template <typename TReceiver, typename TValue = typename TReceiver::ValueType> template <
FanoutChannel<TValue> createFanoutChannel( typename ReceiverType,
TReceiver inputReceiver, typename ValueType = typename ReceiverType::ValueType>
FanoutChannel<ValueType> createFanoutChannel(
ReceiverType inputReceiver,
folly::Executor::KeepAlive<folly::SequencedExecutor> executor); folly::Executor::KeepAlive<folly::SequencedExecutor> executor);
} // namespace channels } // namespace channels
} // namespace folly } // namespace folly
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment