Commit 45e22567 authored by James Sedgwick's avatar James Sedgwick Committed by Praveen Kumar Ramakrishnan

Split HandlerContext and Pipeline into inl headers

Summary:
Leave the important headers visible. Opens the door for decent inline docs.

Test Plan: unit

Reviewed By: hans@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2036799

Signature: t1:2036799:1430749004:db5e58655c27b96826549849722fe962b9ae3985
parent 8e154453
......@@ -278,8 +278,10 @@ nobase_follyinclude_HEADERS = \
wangle/channel/AsyncSocketHandler.h \
wangle/channel/Handler.h \
wangle/channel/HandlerContext.h \
wangle/channel/HandlerContext-inl.h \
wangle/channel/OutputBufferingHandler.h \
wangle/channel/Pipeline.h \
wangle/channel/Pipeline-inl.h \
wangle/channel/StaticPipeline.h \
wangle/concurrent/BlockingQueue.h \
wangle/concurrent/Codel.h \
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace wangle {
class PipelineContext {
public:
virtual ~PipelineContext() {}
virtual void attachPipeline() = 0;
virtual void detachPipeline() = 0;
virtual void attachTransport() = 0;
virtual void detachTransport() = 0;
template <class H, class HandlerContext>
void attachContext(H* handler, HandlerContext* ctx) {
if (++handler->attachCount_ == 1) {
handler->ctx_ = ctx;
} else {
handler->ctx_ = nullptr;
}
}
virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0;
};
template <class In>
class InboundLink {
public:
virtual ~InboundLink() {}
virtual void read(In msg) = 0;
virtual void readEOF() = 0;
virtual void readException(exception_wrapper e) = 0;
};
template <class Out>
class OutboundLink {
public:
virtual ~OutboundLink() {}
virtual Future<void> write(Out msg) = 0;
virtual Future<void> close() = 0;
};
template <class P, class H, class Context>
class ContextImplBase : public PipelineContext {
public:
~ContextImplBase() {}
H* getHandler() {
return handler_.get();
}
void initialize(P* pipeline, std::shared_ptr<H> handler) {
pipeline_ = pipeline;
handler_ = std::move(handler);
}
// PipelineContext overrides
void attachPipeline() override {
if (!attached_) {
this->attachContext(handler_.get(), impl_);
handler_->attachPipeline(impl_);
attached_ = true;
}
}
void detachPipeline() override {
handler_->detachPipeline(impl_);
attached_ = false;
}
void attachTransport() override {
DestructorGuard dg(pipeline_);
handler_->attachTransport(impl_);
}
void detachTransport() override {
DestructorGuard dg(pipeline_);
handler_->detachTransport(impl_);
}
void setNextIn(PipelineContext* ctx) override {
auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
if (nextIn) {
nextIn_ = nextIn;
} else {
throw std::invalid_argument("inbound type mismatch");
}
}
void setNextOut(PipelineContext* ctx) override {
auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
if (nextOut) {
nextOut_ = nextOut;
} else {
throw std::invalid_argument("outbound type mismatch");
}
}
protected:
Context* impl_;
P* pipeline_;
std::shared_ptr<H> handler_;
InboundLink<typename H::rout>* nextIn_{nullptr};
OutboundLink<typename H::wout>* nextOut_{nullptr};
private:
bool attached_{false};
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class ContextImpl
: public HandlerContext<typename H::rout,
typename H::wout>,
public InboundLink<typename H::rin>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, HandlerContext<typename H::rout,
typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::BOTH;
explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
ContextImpl() {
this->impl_ = this;
}
~ContextImpl() {}
// HandlerContext overrides
void fireRead(Rout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->read(std::forward<Rout>(msg));
} else {
LOG(WARNING) << "read reached end of pipeline";
}
}
void fireReadEOF() override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readEOF();
} else {
LOG(WARNING) << "readEOF reached end of pipeline";
}
}
void fireReadException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readException(std::move(e));
} else {
LOG(WARNING) << "readException reached end of pipeline";
}
}
Future<void> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
} else {
LOG(WARNING) << "write reached end of pipeline";
return makeFuture();
}
}
Future<void> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
} else {
LOG(WARNING) << "close reached end of pipeline";
return makeFuture();
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
void setWriteFlags(WriteFlags flags) override {
this->pipeline_->setWriteFlags(flags);
}
WriteFlags getWriteFlags() override {
return this->pipeline_->getWriteFlags();
}
void setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) override {
this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
}
std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
return this->pipeline_->getReadBufferSettings();
}
// InboundLink overrides
void read(Rin msg) override {
DestructorGuard dg(this->pipeline_);
this->handler_->read(this, std::forward<Rin>(msg));
}
void readEOF() override {
DestructorGuard dg(this->pipeline_);
this->handler_->readEOF(this);
}
void readException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
this->handler_->readException(this, std::move(e));
}
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
Future<void> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class InboundContextImpl
: public InboundHandlerContext<typename H::rout>,
public InboundLink<typename H::rin>,
public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::IN;
explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
InboundContextImpl() {
this->impl_ = this;
}
~InboundContextImpl() {}
// InboundHandlerContext overrides
void fireRead(Rout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->read(std::forward<Rout>(msg));
} else {
LOG(WARNING) << "read reached end of pipeline";
}
}
void fireReadEOF() override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readEOF();
} else {
LOG(WARNING) << "readEOF reached end of pipeline";
}
}
void fireReadException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readException(std::move(e));
} else {
LOG(WARNING) << "readException reached end of pipeline";
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
// InboundLink overrides
void read(Rin msg) override {
DestructorGuard dg(this->pipeline_);
this->handler_->read(this, std::forward<Rin>(msg));
}
void readEOF() override {
DestructorGuard dg(this->pipeline_);
this->handler_->readEOF(this);
}
void readException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
this->handler_->readException(this, std::move(e));
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class OutboundContextImpl
: public OutboundHandlerContext<typename H::wout>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::OUT;
explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
OutboundContextImpl() {
this->impl_ = this;
}
~OutboundContextImpl() {}
// OutboundHandlerContext overrides
Future<void> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
} else {
LOG(WARNING) << "write reached end of pipeline";
return makeFuture();
}
}
Future<void> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
} else {
LOG(WARNING) << "close reached end of pipeline";
return makeFuture();
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
Future<void> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class Handler, class Pipeline>
struct ContextType {
typedef typename std::conditional<
Handler::dir == HandlerDir::BOTH,
ContextImpl<Pipeline, Handler>,
typename std::conditional<
Handler::dir == HandlerDir::IN,
InboundContextImpl<Pipeline, Handler>,
OutboundContextImpl<Pipeline, Handler>
>::type>::type
type;
};
}} // folly::wangle
......@@ -88,393 +88,6 @@ enum class HandlerDir {
BOTH
};
class PipelineContext {
public:
virtual ~PipelineContext() {}
virtual void attachPipeline() = 0;
virtual void detachPipeline() = 0;
virtual void attachTransport() = 0;
virtual void detachTransport() = 0;
template <class H, class HandlerContext>
void attachContext(H* handler, HandlerContext* ctx) {
if (++handler->attachCount_ == 1) {
handler->ctx_ = ctx;
} else {
handler->ctx_ = nullptr;
}
}
virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0;
};
template <class In>
class InboundLink {
public:
virtual ~InboundLink() {}
virtual void read(In msg) = 0;
virtual void readEOF() = 0;
virtual void readException(exception_wrapper e) = 0;
};
template <class Out>
class OutboundLink {
public:
virtual ~OutboundLink() {}
virtual Future<void> write(Out msg) = 0;
virtual Future<void> close() = 0;
};
template <class P, class H, class Context>
class ContextImplBase : public PipelineContext {
public:
~ContextImplBase() {}
H* getHandler() {
return handler_.get();
}
void initialize(P* pipeline, std::shared_ptr<H> handler) {
pipeline_ = pipeline;
handler_ = std::move(handler);
}
// PipelineContext overrides
void attachPipeline() override {
if (!attached_) {
this->attachContext(handler_.get(), impl_);
handler_->attachPipeline(impl_);
attached_ = true;
}
}
void detachPipeline() override {
handler_->detachPipeline(impl_);
attached_ = false;
}
void attachTransport() override {
DestructorGuard dg(pipeline_);
handler_->attachTransport(impl_);
}
void detachTransport() override {
DestructorGuard dg(pipeline_);
handler_->detachTransport(impl_);
}
void setNextIn(PipelineContext* ctx) override {
auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
if (nextIn) {
nextIn_ = nextIn;
} else {
throw std::invalid_argument("inbound type mismatch");
}
}
void setNextOut(PipelineContext* ctx) override {
auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
if (nextOut) {
nextOut_ = nextOut;
} else {
throw std::invalid_argument("outbound type mismatch");
}
}
protected:
Context* impl_;
P* pipeline_;
std::shared_ptr<H> handler_;
InboundLink<typename H::rout>* nextIn_{nullptr};
OutboundLink<typename H::wout>* nextOut_{nullptr};
private:
bool attached_{false};
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class ContextImpl
: public HandlerContext<typename H::rout,
typename H::wout>,
public InboundLink<typename H::rin>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, HandlerContext<typename H::rout,
typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::BOTH;
explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
ContextImpl() {
this->impl_ = this;
}
~ContextImpl() {}
// HandlerContext overrides
void fireRead(Rout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->read(std::forward<Rout>(msg));
} else {
LOG(WARNING) << "read reached end of pipeline";
}
}
void fireReadEOF() override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readEOF();
} else {
LOG(WARNING) << "readEOF reached end of pipeline";
}
}
void fireReadException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readException(std::move(e));
} else {
LOG(WARNING) << "readException reached end of pipeline";
}
}
Future<void> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
} else {
LOG(WARNING) << "write reached end of pipeline";
return makeFuture();
}
}
Future<void> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
} else {
LOG(WARNING) << "close reached end of pipeline";
return makeFuture();
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
void setWriteFlags(WriteFlags flags) override {
this->pipeline_->setWriteFlags(flags);
}
WriteFlags getWriteFlags() override {
return this->pipeline_->getWriteFlags();
}
void setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) override {
this->pipeline_->setReadBufferSettings(minAvailable, allocationSize);
}
std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
return this->pipeline_->getReadBufferSettings();
}
// InboundLink overrides
void read(Rin msg) override {
DestructorGuard dg(this->pipeline_);
this->handler_->read(this, std::forward<Rin>(msg));
}
void readEOF() override {
DestructorGuard dg(this->pipeline_);
this->handler_->readEOF(this);
}
void readException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
this->handler_->readException(this, std::move(e));
}
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
Future<void> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class InboundContextImpl
: public InboundHandlerContext<typename H::rout>,
public InboundLink<typename H::rin>,
public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::IN;
explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
InboundContextImpl() {
this->impl_ = this;
}
~InboundContextImpl() {}
// InboundHandlerContext overrides
void fireRead(Rout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->read(std::forward<Rout>(msg));
} else {
LOG(WARNING) << "read reached end of pipeline";
}
}
void fireReadEOF() override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readEOF();
} else {
LOG(WARNING) << "readEOF reached end of pipeline";
}
}
void fireReadException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readException(std::move(e));
} else {
LOG(WARNING) << "readException reached end of pipeline";
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
// InboundLink overrides
void read(Rin msg) override {
DestructorGuard dg(this->pipeline_);
this->handler_->read(this, std::forward<Rin>(msg));
}
void readEOF() override {
DestructorGuard dg(this->pipeline_);
this->handler_->readEOF(this);
}
void readException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
this->handler_->readException(this, std::move(e));
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class OutboundContextImpl
: public OutboundHandlerContext<typename H::wout>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::OUT;
explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
OutboundContextImpl() {
this->impl_ = this;
}
~OutboundContextImpl() {}
// OutboundHandlerContext overrides
Future<void> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
} else {
LOG(WARNING) << "write reached end of pipeline";
return makeFuture();
}
}
Future<void> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
} else {
LOG(WARNING) << "close reached end of pipeline";
return makeFuture();
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
Future<void> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class Handler, class Pipeline>
struct ContextType {
typedef typename std::conditional<
Handler::dir == HandlerDir::BOTH,
ContextImpl<Pipeline, Handler>,
typename std::conditional<
Handler::dir == HandlerDir::IN,
InboundContextImpl<Pipeline, Handler>,
OutboundContextImpl<Pipeline, Handler>
>::type>::type
type;
};
}} // folly::wangle
}}
#include <folly/wangle/channel/HandlerContext-inl.h>
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <glog/logging.h>
namespace folly { namespace wangle {
template <class R, class W>
Pipeline<R, W>::Pipeline() : isStatic_(false) {}
template <class R, class W>
Pipeline<R, W>::Pipeline(bool isStatic) : isStatic_(isStatic) {
CHECK(isStatic_);
}
template <class R, class W>
Pipeline<R, W>::~Pipeline() {
if (!isStatic_) {
detachHandlers();
}
}
template <class R, class W>
std::shared_ptr<AsyncTransport> Pipeline<R, W>::getTransport() {
return transport_;
}
template <class R, class W>
void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
writeFlags_ = flags;
}
template <class R, class W>
WriteFlags Pipeline<R, W>::getWriteFlags() {
return writeFlags_;
}
template <class R, class W>
void Pipeline<R, W>::setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) {
readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
}
template <class R, class W>
std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
return readBufferSettings_;
}
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
Pipeline<R, W>::read(R msg) {
if (!front_) {
throw std::invalid_argument("read(): no inbound handler in Pipeline");
}
front_->read(std::forward<R>(msg));
}
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
Pipeline<R, W>::readEOF() {
if (!front_) {
throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
}
front_->readEOF();
}
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
Pipeline<R, W>::readException(exception_wrapper e) {
if (!front_) {
throw std::invalid_argument(
"readException(): no inbound handler in Pipeline");
}
front_->readException(std::move(e));
}
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
Pipeline<R, W>::write(W msg) {
if (!back_) {
throw std::invalid_argument("write(): no outbound handler in Pipeline");
}
return back_->write(std::forward<W>(msg));
}
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
Pipeline<R, W>::close() {
if (!back_) {
throw std::invalid_argument("close(): no outbound handler in Pipeline");
}
return back_->close();
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
return addBack(std::make_shared<H>(std::forward<H>(handler)));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
return addBack(std::shared_ptr<H>(handler, [](H*){}));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
return addFront(std::make_shared<H>(std::forward<H>(handler)));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){}));
}
template <class R, class W>
template <class H>
H* Pipeline<R, W>::getHandler(int i) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
CHECK(ctx);
return ctx->getHandler();
}
namespace detail {
template <class T>
inline void logWarningIfNotNothing(const std::string& warning) {
LOG(WARNING) << warning;
}
template <>
inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
// do nothing
}
} // detail
// TODO Have read/write/etc check that pipeline has been finalized
template <class R, class W>
void Pipeline<R, W>::finalize() {
if (!inCtxs_.empty()) {
front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
inCtxs_[i]->setNextIn(inCtxs_[i+1]);
}
}
if (!outCtxs_.empty()) {
back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
outCtxs_[i]->setNextOut(outCtxs_[i-1]);
}
}
if (!front_) {
detail::logWarningIfNotNothing<R>(
"No inbound handler in Pipeline, inbound operations will throw "
"std::invalid_argument");
}
if (!back_) {
detail::logWarningIfNotNothing<W>(
"No outbound handler in Pipeline, outbound operations will throw "
"std::invalid_argument");
}
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
(*it)->attachPipeline();
}
}
template <class R, class W>
template <class H>
bool Pipeline<R, W>::setOwner(H* handler) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
for (auto& ctx : ctxs_) {
auto ctxImpl = dynamic_cast<Context*>(ctx.get());
if (ctxImpl && ctxImpl->getHandler() == handler) {
owner_ = ctx;
return true;
}
}
return false;
}
template <class R, class W>
void Pipeline<R, W>::attachTransport(
std::shared_ptr<AsyncTransport> transport) {
transport_ = std::move(transport);
for (auto& ctx : ctxs_) {
ctx->attachTransport();
}
}
template <class R, class W>
void Pipeline<R, W>::detachTransport() {
transport_ = nullptr;
for (auto& ctx : ctxs_) {
ctx->detachTransport();
}
}
template <class R, class W>
template <class Context>
void Pipeline<R, W>::addContextFront(Context* ctx) {
addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
}
template <class R, class W>
void Pipeline<R, W>::detachHandlers() {
for (auto& ctx : ctxs_) {
if (ctx != owner_) {
ctx->detachPipeline();
}
}
}
template <class R, class W>
template <class Context>
Pipeline<R, W>& Pipeline<R, W>::addHelper(
std::shared_ptr<Context>&& ctx,
bool front) {
ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
}
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
}
return *this;
}
}} // folly::wangle
......@@ -22,27 +22,11 @@
#include <folly/io/async/DelayedDestruction.h>
#include <folly/ExceptionWrapper.h>
#include <folly/Memory.h>
#include <glog/logging.h>
namespace folly { namespace wangle {
// See Pipeline docblock for purpose
struct Nothing{};
namespace detail {
template <class T>
inline void logWarningIfNotNothing(const std::string& warning) {
LOG(WARNING) << warning;
}
template <>
inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
// do nothing
}
} // detail
/*
* R is the inbound type, i.e. inbound calls start with pipeline.read(R)
* W is the outbound type, i.e. outbound calls start with pipeline.write(W)
......@@ -54,214 +38,82 @@ inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
template <class R, class W = Nothing>
class Pipeline : public DelayedDestruction {
public:
Pipeline() : isStatic_(false) {}
~Pipeline() {
if (!isStatic_) {
detachHandlers();
}
}
Pipeline();
~Pipeline();
std::shared_ptr<AsyncTransport> getTransport() {
return transport_;
}
std::shared_ptr<AsyncTransport> getTransport();
void setWriteFlags(WriteFlags flags) {
writeFlags_ = flags;
}
void setWriteFlags(WriteFlags flags);
WriteFlags getWriteFlags();
WriteFlags getWriteFlags() {
return writeFlags_;
}
void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize) {
readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
}
std::pair<uint64_t, uint64_t> getReadBufferSettings() {
return readBufferSettings_;
}
void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
std::pair<uint64_t, uint64_t> getReadBufferSettings();
template <class T = R>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
read(R msg) {
if (!front_) {
throw std::invalid_argument("read(): no inbound handler in Pipeline");
}
front_->read(std::forward<R>(msg));
}
read(R msg);
template <class T = R>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
readEOF() {
if (!front_) {
throw std::invalid_argument("readEOF(): no inbound handler in Pipeline");
}
front_->readEOF();
}
readEOF();
template <class T = R>
typename std::enable_if<!std::is_same<T, Nothing>::value>::type
readException(exception_wrapper e) {
if (!front_) {
throw std::invalid_argument(
"readException(): no inbound handler in Pipeline");
}
front_->readException(std::move(e));
}
readException(exception_wrapper e);
template <class T = W>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
write(W msg) {
if (!back_) {
throw std::invalid_argument("write(): no outbound handler in Pipeline");
}
return back_->write(std::forward<W>(msg));
}
write(W msg);
template <class T = W>
typename std::enable_if<!std::is_same<T, Nothing>::value, Future<void>>::type
close() {
if (!back_) {
throw std::invalid_argument("close(): no outbound handler in Pipeline");
}
return back_->close();
}
close();
template <class H>
Pipeline& addBack(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
}
Pipeline& addBack(std::shared_ptr<H> handler);
template <class H>
Pipeline& addBack(H&& handler) {
return addBack(std::make_shared<H>(std::forward<H>(handler)));
}
Pipeline& addBack(H&& handler);
template <class H>
Pipeline& addBack(H* handler) {
return addBack(std::shared_ptr<H>(handler, [](H*){}));
}
Pipeline& addBack(H* handler);
template <class H>
Pipeline& addFront(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
}
Pipeline& addFront(std::shared_ptr<H> handler);
template <class H>
Pipeline& addFront(H&& handler) {
return addFront(std::make_shared<H>(std::forward<H>(handler)));
}
Pipeline& addFront(H&& handler);
template <class H>
Pipeline& addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){}));
}
Pipeline& addFront(H* handler);
template <class H>
H* getHandler(int i) {
typedef typename ContextType<H, Pipeline>::type Context;
auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
CHECK(ctx);
return ctx->getHandler();
}
// TODO Have read/write/etc check that pipeline has been finalized
void finalize() {
if (!inCtxs_.empty()) {
front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
inCtxs_[i]->setNextIn(inCtxs_[i+1]);
}
}
if (!outCtxs_.empty()) {
back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
outCtxs_[i]->setNextOut(outCtxs_[i-1]);
}
}
if (!front_) {
detail::logWarningIfNotNothing<R>(
"No inbound handler in Pipeline, inbound operations will throw "
"std::invalid_argument");
}
if (!back_) {
detail::logWarningIfNotNothing<W>(
"No outbound handler in Pipeline, outbound operations will throw "
"std::invalid_argument");
}
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
(*it)->attachPipeline();
}
}
H* getHandler(int i);
void finalize();
// If one of the handlers owns the pipeline itself, use setOwner to ensure
// that the pipeline doesn't try to detach the handler during destruction,
// lest destruction ordering issues occur.
// See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
template <class H>
bool setOwner(H* handler) {
typedef typename ContextType<H, Pipeline>::type Context;
for (auto& ctx : ctxs_) {
auto ctxImpl = dynamic_cast<Context*>(ctx.get());
if (ctxImpl && ctxImpl->getHandler() == handler) {
owner_ = ctx;
return true;
}
}
return false;
}
void attachTransport(
std::shared_ptr<AsyncTransport> transport) {
transport_ = std::move(transport);
for (auto& ctx : ctxs_) {
ctx->attachTransport();
}
}
void detachTransport() {
transport_ = nullptr;
for (auto& ctx : ctxs_) {
ctx->detachTransport();
}
}
bool setOwner(H* handler);
void attachTransport(std::shared_ptr<AsyncTransport> transport);
void detachTransport();
protected:
explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
CHECK(isStatic_);
}
explicit Pipeline(bool isStatic);
template <class Context>
void addContextFront(Context* ctx) {
addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
}
void detachHandlers() {
for (auto& ctx : ctxs_) {
if (ctx != owner_) {
ctx->detachPipeline();
}
}
}
void addContextFront(Context* ctx);
void detachHandlers();
private:
template <class Context>
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front) {
ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
}
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
}
return *this;
}
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
std::shared_ptr<AsyncTransport> transport_;
WriteFlags writeFlags_{WriteFlags::NONE};
......@@ -290,3 +142,5 @@ class PipelineFactory {
};
}
#include <folly/wangle/channel/Pipeline-inl.h>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment