Commit 40923559 authored by James Sedgwick's avatar James Sedgwick Committed by Dave Watson

modifiable channel pipelines

Summary:
Basically the same interface as before, but you must specify the read and write types for the ends of pipeline. Implementation is cleaner as well; there's fewer levels of indirection

This dynamic casts shit all over the place and is less typesafe then the previous iteration, but I think with some carefully placed static_asserts, could be just as safe (in the case where you don't do any modification, anyway)

Right now you can only add to the front or back of the pipeline but the way it's set up you could add any number of mutations, including ones that are triggered by handlers. But this should (might?) be enough for Tunnel, which was the motivation.

Test Plan: basic test compiles, thrift2 diff still works with a one line change

Reviewed By: hans@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, bmatheny

FB internal diff: D1661169

Tasks: 5002299

Signature: t1:1661169:1416521727:1f126279796c0b09d1905b9f7dbc48a9e5540271
parent 8a5fcb4f
......@@ -138,7 +138,6 @@ class ChannelHandlerPtr : public ChannelHandler<
}
}
void attachTransport(Context* ctx) override {
ctx_ = ctx;
if (handler_) {
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/Future.h>
#include <folly/ExceptionWrapper.h>
namespace folly { namespace wangle {
template <class In, class Out>
class ChannelHandlerContext {
public:
virtual ~ChannelHandlerContext() {}
virtual void fireRead(In msg) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
virtual Future<void> fireWrite(Out msg) = 0;
virtual Future<void> fireClose() = 0;
virtual std::shared_ptr<AsyncTransport> getTransport() = 0;
virtual void setWriteFlags(WriteFlags flags) = 0;
virtual WriteFlags getWriteFlags() = 0;
virtual void setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) = 0;
virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
/* TODO
template <class H>
virtual void addHandlerBefore(H&&) {}
template <class H>
virtual void addHandlerAfter(H&&) {}
template <class H>
virtual void replaceHandler(H&&) {}
virtual void removeHandler() {}
*/
};
class PipelineContext {
public:
virtual ~PipelineContext() {}
virtual void attachTransport() = 0;
virtual void detachTransport() = 0;
void link(PipelineContext* other) {
setNextIn(other);
other->setNextOut(this);
}
protected:
virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0;
};
template <class In>
class InboundChannelHandlerContext {
public:
virtual ~InboundChannelHandlerContext() {}
virtual void read(In msg) = 0;
virtual void readEOF() = 0;
virtual void readException(exception_wrapper e) = 0;
};
template <class Out>
class OutboundChannelHandlerContext {
public:
virtual ~OutboundChannelHandlerContext() {}
virtual Future<void> write(Out msg) = 0;
virtual Future<void> close() = 0;
};
template <class P, class H>
class ContextImpl : public ChannelHandlerContext<typename H::rout,
typename H::wout>,
public InboundChannelHandlerContext<typename H::rin>,
public OutboundChannelHandlerContext<typename H::win>,
public PipelineContext {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
typedef typename H::win Win;
typedef typename H::wout Wout;
template <class HandlerArg>
explicit ContextImpl(P* pipeline, HandlerArg&& handlerArg)
: pipeline_(pipeline),
handler_(std::forward<HandlerArg>(handlerArg)) {
handler_.attachPipeline(this);
}
~ContextImpl() {
handler_.detachPipeline(this);
}
H* getHandler() {
return &handler_;
}
// PipelineContext overrides
void setNextIn(PipelineContext* ctx) override {
auto nextIn = dynamic_cast<InboundChannelHandlerContext<Rout>*>(ctx);
if (nextIn) {
nextIn_ = nextIn;
} else {
throw std::invalid_argument("wrong type in setNextIn");
}
}
void setNextOut(PipelineContext* ctx) override {
auto nextOut = dynamic_cast<OutboundChannelHandlerContext<Wout>*>(ctx);
if (nextOut) {
nextOut_ = nextOut;
} else {
throw std::invalid_argument("wrong type in setNextOut");
}
}
void attachTransport() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
handler_.attachTransport(this);
}
void detachTransport() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
handler_.detachTransport(this);
}
// ChannelHandlerContext overrides
void fireRead(Rout msg) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
if (nextIn_) {
nextIn_->read(std::forward<Rout>(msg));
} else {
LOG(WARNING) << "read reached end of pipeline";
}
}
void fireReadEOF() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
if (nextIn_) {
nextIn_->readEOF();
} else {
LOG(WARNING) << "readEOF reached end of pipeline";
}
}
void fireReadException(exception_wrapper e) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
if (nextIn_) {
nextIn_->readException(std::move(e));
} else {
LOG(WARNING) << "readException reached end of pipeline";
}
}
Future<void> fireWrite(Wout msg) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
if (nextOut_) {
return nextOut_->write(std::forward<Wout>(msg));
} else {
LOG(WARNING) << "write reached end of pipeline";
return makeFuture();
}
}
Future<void> fireClose() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
if (nextOut_) {
return nextOut_->close();
} else {
LOG(WARNING) << "close reached end of pipeline";
return makeFuture();
}
}
std::shared_ptr<AsyncTransport> getTransport() override {
return pipeline_->getTransport();
}
void setWriteFlags(WriteFlags flags) override {
pipeline_->setWriteFlags(flags);
}
WriteFlags getWriteFlags() override {
return pipeline_->getWriteFlags();
}
void setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) override {
pipeline_->setReadBufferSettings(minAvailable, allocationSize);
}
std::pair<uint64_t, uint64_t> getReadBufferSettings() override {
return pipeline_->getReadBufferSettings();
}
// InboundChannelHandlerContext overrides
void read(Rin msg) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
handler_.read(this, std::forward<Rin>(msg));
}
void readEOF() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
handler_.readEOF(this);
}
void readException(exception_wrapper e) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
handler_.readException(this, std::move(e));
}
// OutboundChannelHandlerContext overrides
Future<void> write(Win msg) override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
return handler_.write(this, std::forward<Win>(msg));
}
Future<void> close() override {
typename P::DestructorGuard dg(static_cast<DelayedDestruction*>(pipeline_));
return handler_.close(this);
}
private:
P* pipeline_;
H handler_;
InboundChannelHandlerContext<Rout>* nextIn_{nullptr};
OutboundChannelHandlerContext<Wout>* nextOut_{nullptr};
};
}}
......@@ -17,6 +17,7 @@
#include <folly/experimental/wangle/channel/ChannelHandler.h>
#include <folly/experimental/wangle/channel/ChannelPipeline.h>
#include <folly/io/IOBufQueue.h>
#include <folly/Memory.h>
#include <folly/Conv.h>
#include <gtest/gtest.h>
......@@ -63,7 +64,7 @@ class EchoService : public ChannelHandlerAdapter<std::string> {
};
TEST(ChannelTest, PlzCompile) {
ChannelPipeline<
ChannelPipeline<IOBuf, IOBuf,
BytesPassthrough,
BytesPassthrough,
// If this were useful it wouldn't be that hard
......@@ -71,18 +72,34 @@ TEST(ChannelTest, PlzCompile) {
BytesPassthrough>
pipeline(BytesPassthrough(), BytesPassthrough(), BytesPassthrough);
ChannelPipeline<
ChannelPipeline<int, std::string,
ChannelHandlerPtr<ToString>,
KittyPrepender,
KittyPrepender,
EchoService>
KittyPrepender>
kittyPipeline(
std::make_shared<ToString>(),
KittyPrepender{},
KittyPrepender{},
EchoService{});
KittyPrepender{});
kittyPipeline.addBack(KittyPrepender{});
kittyPipeline.addBack(EchoService{});
kittyPipeline.finalize();
kittyPipeline.read(5);
auto handler = kittyPipeline.getHandler<KittyPrepender>(2);
CHECK(handler);
auto p = folly::make_unique<int>(42);
folly::Optional<std::unique_ptr<int>> foo{std::move(p)};
}
TEST(ChannelTest, PlzCompile2) {
EchoService echoService;
ChannelPipeline<int, std::string> pipeline;
pipeline
.addBack(ToString())
.addBack(KittyPrepender())
.addBack(KittyPrepender())
.addBack(ChannelHandlerPtr<EchoService, false>(&echoService))
.finalize();
pipeline.read(42);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment