Commit 555213ca authored by James Sedgwick's avatar James Sedgwick Committed by Praveen Kumar Ramakrishnan

inbound/outbound handlers

Summary:
Much less copypasta this time around. I wonder if the getters and setters for write flags and read buffer settings are necessary in the new handler types, or even if they belong in the bidirectional handler

I'm all ears for more suggestions on reducing copypasta

I'm going to reorg the code (inl headers etc) in a subsequent diff once this is in - easier to review this way

Test Plan: existing unit, thinking about tests for these changes

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2026522

Tasks: 6836580

Signature: t1:2026522:1430346145:bd7f7770eddce0470e2ac72440fc001cf128df08
parent dded9096
......@@ -51,6 +51,8 @@ class HandlerBase {
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
public:
static const HandlerDir dir = HandlerDir::BOTH;
typedef Rin rin;
typedef Rout rout;
typedef Win win;
......@@ -99,6 +101,47 @@ class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
*/
};
struct Unit{};
template <class Rin, class Rout = Rin>
class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
public:
static const HandlerDir dir = HandlerDir::IN;
typedef Rin rin;
typedef Rout rout;
typedef Unit win;
typedef Unit wout;
typedef InboundHandlerContext<Rout> Context;
virtual ~InboundHandler() {}
virtual void read(Context* ctx, Rin msg) = 0;
virtual void readEOF(Context* ctx) {
ctx->fireReadEOF();
}
virtual void readException(Context* ctx, exception_wrapper e) {
ctx->fireReadException(std::move(e));
}
};
template <class Win, class Wout = Win>
class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
public:
static const HandlerDir dir = HandlerDir::OUT;
typedef Unit rin;
typedef Unit rout;
typedef Win win;
typedef Wout wout;
typedef OutboundHandlerContext<Wout> Context;
virtual ~OutboundHandler() {}
virtual Future<void> write(Context* ctx, Win msg) = 0;
virtual Future<void> close(Context* ctx) {
return ctx->fireClose();
}
};
template <class R, class W = R>
class HandlerAdapter : public Handler<R, R, W, W> {
public:
......@@ -116,4 +159,10 @@ class HandlerAdapter : public Handler<R, R, W, W> {
typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
BytesToBytesHandler;
typedef InboundHandler<IOBufQueue&>
InboundBytesToBytesHandler;
typedef OutboundHandler<std::unique_ptr<IOBuf>>
OutboundBytesToBytesHandler;
}}
......@@ -55,6 +55,39 @@ class HandlerContext {
*/
};
template <class In>
class InboundHandlerContext {
public:
virtual ~InboundHandlerContext() {}
virtual void fireRead(In msg) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
// TODO Need get/set writeFlags, readBufferSettings? Probably not.
// Do we even really need them stored in the pipeline at all?
// Could just always delegate to the socket impl
};
template <class Out>
class OutboundHandlerContext {
public:
virtual ~OutboundHandlerContext() {}
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
};
enum class HandlerDir {
IN,
OUT,
BOTH
};
class PipelineContext {
public:
virtual ~PipelineContext() {}
......@@ -74,12 +107,6 @@ class PipelineContext {
}
}
void link(PipelineContext* other) {
setNextIn(other);
other->setNextOut(this);
}
protected:
virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0;
};
......@@ -144,7 +171,7 @@ class ContextImplBase : public PipelineContext {
if (nextIn) {
nextIn_ = nextIn;
} else {
throw std::invalid_argument("wrong type in setNextIn");
throw std::invalid_argument("inbound type mismatch");
}
}
......@@ -153,7 +180,7 @@ class ContextImplBase : public PipelineContext {
if (nextOut) {
nextOut_ = nextOut;
} else {
throw std::invalid_argument("wrong type in setNextOut");
throw std::invalid_argument("outbound type mismatch");
}
}
......@@ -170,16 +197,19 @@ class ContextImplBase : public PipelineContext {
};
template <class P, class H>
class ContextImpl : public HandlerContext<typename H::rout,
typename H::wout>,
public InboundLink<typename H::rin>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, HandlerContext<typename H::rout, typename H::wout>> {
class ContextImpl
: public HandlerContext<typename H::rout,
typename H::wout>,
public InboundLink<typename H::rin>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, HandlerContext<typename H::rout,
typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::BOTH;
explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
......@@ -294,4 +324,157 @@ class ContextImpl : public HandlerContext<typename H::rout,
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class InboundContextImpl
: public InboundHandlerContext<typename H::rout>,
public InboundLink<typename H::rin>,
public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::IN;
explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
InboundContextImpl() {
this->impl_ = this;
}
~InboundContextImpl() {}
// InboundHandlerContext overrides
void fireRead(Rout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->read(std::forward<Rout>(msg));
} else {
LOG(WARNING) << "read reached end of pipeline";
}
}
void fireReadEOF() override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readEOF();
} else {
LOG(WARNING) << "readEOF reached end of pipeline";
}
}
void fireReadException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
if (this->nextIn_) {
this->nextIn_->readException(std::move(e));
} else {
LOG(WARNING) << "readException reached end of pipeline";
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
// InboundLink overrides
void read(Rin msg) override {
DestructorGuard dg(this->pipeline_);
this->handler_->read(this, std::forward<Rin>(msg));
}
void readEOF() override {
DestructorGuard dg(this->pipeline_);
this->handler_->readEOF(this);
}
void readException(exception_wrapper e) override {
DestructorGuard dg(this->pipeline_);
this->handler_->readException(this, std::move(e));
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class P, class H>
class OutboundContextImpl
: public OutboundHandlerContext<typename H::wout>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::OUT;
explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
// For StaticPipeline
OutboundContextImpl() {
this->impl_ = this;
}
~OutboundContextImpl() {}
// OutboundHandlerContext overrides
Future<void> fireWrite(Wout msg) override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->write(std::forward<Wout>(msg));
} else {
LOG(WARNING) << "write reached end of pipeline";
return makeFuture();
}
}
Future<void> fireClose() override {
DestructorGuard dg(this->pipeline_);
if (this->nextOut_) {
return this->nextOut_->close();
} else {
LOG(WARNING) << "close reached end of pipeline";
return makeFuture();
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return this->pipeline_->getTransport();
}
// OutboundLink overrides
Future<void> write(Win msg) override {
DestructorGuard dg(this->pipeline_);
return this->handler_->write(this, std::forward<Win>(msg));
}
Future<void> close() override {
DestructorGuard dg(this->pipeline_);
return this->handler_->close(this);
}
private:
using DestructorGuard = typename P::DestructorGuard;
};
template <class Handler, class Pipeline>
struct ContextType {
typedef typename std::conditional<
Handler::dir == HandlerDir::BOTH,
ContextImpl<Pipeline, Handler>,
typename std::conditional<
Handler::dir == HandlerDir::IN,
InboundContextImpl<Pipeline, Handler>,
OutboundContextImpl<Pipeline, Handler>
>::type>::type
type;
};
}}
......@@ -30,7 +30,7 @@ namespace folly { namespace wangle {
*
* This handler may only be used in a single Pipeline.
*/
class OutputBufferingHandler : public BytesToBytesHandler,
class OutputBufferingHandler : public OutboundBytesToBytesHandler,
protected EventBase::LoopCallback {
public:
Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) override {
......
......@@ -83,15 +83,8 @@ class Pipeline : public DelayedDestruction {
template <class H>
Pipeline& addBack(std::shared_ptr<H> handler) {
ctxs_.push_back(std::make_shared<ContextImpl<Pipeline, H>>(
this,
std::move(handler)));
return *this;
}
template <class H>
Pipeline& addBack(H* handler) {
return addBack(std::shared_ptr<H>(handler, [](H*){}));
typedef typename ContextType<H, Pipeline>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
}
template <class H>
......@@ -100,16 +93,14 @@ class Pipeline : public DelayedDestruction {
}
template <class H>
Pipeline& addFront(std::shared_ptr<H> handler) {
ctxs_.insert(
ctxs_.begin(),
std::make_shared<ContextImpl<Pipeline, H>>(this, std::move(handler)));
return *this;
Pipeline& addBack(H* handler) {
return addBack(std::shared_ptr<H>(handler, [](H*){}));
}
template <class H>
Pipeline& addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){}));
Pipeline& addFront(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
}
template <class H>
......@@ -117,30 +108,40 @@ class Pipeline : public DelayedDestruction {
return addFront(std::make_shared<H>(std::forward<H>(handler)));
}
template <class H>
Pipeline& addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){}));
}
template <class H>
H* getHandler(int i) {
auto ctx = dynamic_cast<ContextImpl<Pipeline, H>*>(ctxs_[i].get());
typedef typename ContextType<H, Pipeline>::type Context;
auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
CHECK(ctx);
return ctx->getHandler();
}
// TODO Have read/write/etc check that pipeline has been finalized
void finalize() {
if (ctxs_.empty()) {
return;
}
for (size_t i = 0; i < ctxs_.size() - 1; i++) {
ctxs_[i]->link(ctxs_[i+1].get());
if (!inCtxs_.empty()) {
front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
inCtxs_[i]->setNextIn(inCtxs_[i+1]);
}
}
back_ = dynamic_cast<OutboundLink<W>*>(ctxs_.back().get());
if (!back_) {
throw std::invalid_argument("wrong type for last handler");
if (!outCtxs_.empty()) {
back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
outCtxs_[i]->setNextOut(outCtxs_[i-1]);
}
}
front_ = dynamic_cast<InboundLink<R>*>(ctxs_.front().get());
if (!front_) {
throw std::invalid_argument("wrong type for first handler");
throw std::invalid_argument("no inbound handler in Pipeline");
}
if (!back_) {
throw std::invalid_argument("no outbound handler in Pipeline");
}
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
......@@ -154,8 +155,9 @@ class Pipeline : public DelayedDestruction {
// See thrift/lib/cpp2/async/Cpp2Channel.cpp for an example
template <class H>
bool setOwner(H* handler) {
typedef typename ContextType<H, Pipeline>::type Context;
for (auto& ctx : ctxs_) {
auto ctxImpl = dynamic_cast<ContextImpl<Pipeline, H>*>(ctx.get());
auto ctxImpl = dynamic_cast<Context*>(ctx.get());
if (ctxImpl && ctxImpl->getHandler() == handler) {
owner_ = ctx;
return true;
......@@ -185,10 +187,8 @@ class Pipeline : public DelayedDestruction {
}
template <class Context>
void addContextFront(Context* context) {
ctxs_.insert(
ctxs_.begin(),
std::shared_ptr<Context>(context, [](Context*){}));
void addContextFront(Context* ctx) {
addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
}
void detachHandlers() {
......@@ -200,15 +200,29 @@ class Pipeline : public DelayedDestruction {
}
private:
template <class Context>
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front) {
ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
}
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
}
return *this;
}
std::shared_ptr<AsyncTransport> transport_;
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
bool isStatic_{false};
std::shared_ptr<PipelineContext> owner_;
std::vector<std::shared_ptr<PipelineContext>> ctxs_;
std::vector<PipelineContext*> inCtxs_;
std::vector<PipelineContext*> outCtxs_;
InboundLink<R>* front_{nullptr};
OutboundLink<W>* back_{nullptr};
std::vector<std::shared_ptr<PipelineContext>> ctxs_;
std::shared_ptr<PipelineContext> owner_;
};
}}
......
......@@ -67,8 +67,6 @@ class StaticPipeline<R, W, Handler, Handlers...>
}
protected:
typedef ContextImpl<Pipeline<R, W>, Handler> Context;
template <class HandlerArg, class... HandlerArgs>
StaticPipeline(
bool isFirst,
......@@ -119,7 +117,7 @@ class StaticPipeline<R, W, Handler, Handlers...>
bool isFirst_;
folly::Optional<Handler> handler_;
std::shared_ptr<Handler> handlerPtr_;
ContextImpl<Pipeline<R, W>, Handler> ctx_;
typename ContextType<Handler, Pipeline<R, W>>::type ctx_;
};
}} // folly::wangle
......@@ -40,7 +40,7 @@ namespace folly { namespace wangle {
* IOBufQueue.front(), without split() or pop_front().
*/
class ByteToMessageCodec
: public BytesToBytesHandler {
: public InboundBytesToBytesHandler {
public:
virtual std::unique_ptr<IOBuf> decode(
......
......@@ -95,13 +95,13 @@ TEST(LengthFieldFramePipeline, SimpleTest) {
pipeline
.addBack(BytesReflector())
.addBack(LengthFieldPrepender())
.addBack(LengthFieldBasedFrameDecoder())
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 2);
}))
.addBack(LengthFieldPrepender())
.finalize();
auto buf = IOBuf::create(2);
......
......@@ -47,7 +47,7 @@ namespace folly { namespace wangle {
*
*/
class LengthFieldPrepender
: public BytesToBytesHandler {
: public OutboundBytesToBytesHandler {
public:
LengthFieldPrepender(
int lengthFieldLength = 4,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment