Commit e4b8b76f authored by James Sedgwick's avatar James Sedgwick Committed by Sara Golemon

rearrange Pipeline to have more functionality in PipelineBase

Summary: This way, handlers can carry out more complex manipulations of the pipeline via ctx->getPipeline() without knowing the R/W types

Reviewed By: @djwatson

Differential Revision: D2158736
parent 6a2eb674
......@@ -420,6 +420,7 @@ libfolly_la_SOURCES = \
wangle/acceptor/SocketOptions.cpp \
wangle/acceptor/TransportInfo.cpp \
wangle/bootstrap/ServerBootstrap.cpp \
wangle/channel/Pipeline.cpp \
wangle/concurrent/CPUThreadPoolExecutor.cpp \
wangle/concurrent/Codel.cpp \
wangle/concurrent/IOThreadPoolExecutor.cpp \
......
......@@ -59,7 +59,7 @@ class OutboundLink {
virtual Future<void> close() = 0;
};
template <class P, class H, class Context>
template <class H, class Context>
class ContextImplBase : public PipelineContext {
public:
~ContextImplBase() = default;
......@@ -68,7 +68,7 @@ class ContextImplBase : public PipelineContext {
return handler_.get();
}
void initialize(P* pipeline, std::shared_ptr<H> handler) {
void initialize(PipelineBase* pipeline, std::shared_ptr<H> handler) {
pipeline_ = pipeline;
handler_ = std::move(handler);
}
......@@ -119,23 +119,23 @@ class ContextImplBase : public PipelineContext {
protected:
Context* impl_;
P* pipeline_;
PipelineBase* pipeline_;
std::shared_ptr<H> handler_;
InboundLink<typename H::rout>* nextIn_{nullptr};
OutboundLink<typename H::wout>* nextOut_{nullptr};
private:
bool attached_{false};
using DestructorGuard = typename P::DestructorGuard;
using DestructorGuard = typename DelayedDestruction::DestructorGuard;
};
template <class P, class H>
template <class H>
class ContextImpl
: public HandlerContext<typename H::rout,
typename H::wout>,
public InboundLink<typename H::rin>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, HandlerContext<typename H::rout,
public ContextImplBase<H, HandlerContext<typename H::rout,
typename H::wout>> {
public:
typedef typename H::rin Rin;
......@@ -144,7 +144,7 @@ class ContextImpl
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::BOTH;
explicit ContextImpl(P* pipeline, std::shared_ptr<H> handler) {
explicit ContextImpl(PipelineBase* pipeline, std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
......@@ -278,14 +278,14 @@ class ContextImpl
}
private:
using DestructorGuard = typename P::DestructorGuard;
using DestructorGuard = typename DelayedDestruction::DestructorGuard;
};
template <class P, class H>
template <class H>
class InboundContextImpl
: public InboundHandlerContext<typename H::rout>,
public InboundLink<typename H::rin>,
public ContextImplBase<P, H, InboundHandlerContext<typename H::rout>> {
public ContextImplBase<H, InboundHandlerContext<typename H::rout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
......@@ -293,7 +293,9 @@ class InboundContextImpl
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::IN;
explicit InboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
explicit InboundContextImpl(
PipelineBase* pipeline,
std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
......@@ -378,14 +380,14 @@ class InboundContextImpl
}
private:
using DestructorGuard = typename P::DestructorGuard;
using DestructorGuard = typename DelayedDestruction::DestructorGuard;
};
template <class P, class H>
template <class H>
class OutboundContextImpl
: public OutboundHandlerContext<typename H::wout>,
public OutboundLink<typename H::win>,
public ContextImplBase<P, H, OutboundHandlerContext<typename H::wout>> {
public ContextImplBase<H, OutboundHandlerContext<typename H::wout>> {
public:
typedef typename H::rin Rin;
typedef typename H::rout Rout;
......@@ -393,7 +395,9 @@ class OutboundContextImpl
typedef typename H::wout Wout;
static const HandlerDir dir = HandlerDir::OUT;
explicit OutboundContextImpl(P* pipeline, std::shared_ptr<H> handler) {
explicit OutboundContextImpl(
PipelineBase* pipeline,
std::shared_ptr<H> handler) {
this->impl_ = this;
this->initialize(pipeline, std::move(handler));
}
......@@ -442,18 +446,18 @@ class OutboundContextImpl
}
private:
using DestructorGuard = typename P::DestructorGuard;
using DestructorGuard = typename DelayedDestruction::DestructorGuard;
};
template <class Handler, class Pipeline>
template <class Handler>
struct ContextType {
typedef typename std::conditional<
Handler::dir == HandlerDir::BOTH,
ContextImpl<Pipeline, Handler>,
ContextImpl<Handler>,
typename std::conditional<
Handler::dir == HandlerDir::IN,
InboundContextImpl<Pipeline, Handler>,
OutboundContextImpl<Pipeline, Handler>
InboundContextImpl<Handler>,
OutboundContextImpl<Handler>
>::type>::type
type;
};
......
......@@ -35,28 +35,124 @@ Pipeline<R, W>::~Pipeline() {
}
}
template <class R, class W>
void Pipeline<R, W>::setWriteFlags(WriteFlags flags) {
writeFlags_ = flags;
template <class H>
PipelineBase& PipelineBase::addBack(std::shared_ptr<H> handler) {
typedef typename ContextType<H>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
}
template <class R, class W>
WriteFlags Pipeline<R, W>::getWriteFlags() {
return writeFlags_;
template <class H>
PipelineBase& PipelineBase::addBack(H&& handler) {
return addBack(std::make_shared<H>(std::forward<H>(handler)));
}
template <class R, class W>
void Pipeline<R, W>::setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) {
readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
template <class H>
PipelineBase& PipelineBase::addBack(H* handler) {
return addBack(std::shared_ptr<H>(handler, [](H*){}));
}
template <class R, class W>
std::pair<uint64_t, uint64_t> Pipeline<R, W>::getReadBufferSettings() {
return readBufferSettings_;
template <class H>
PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
typedef typename ContextType<H>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
}
template <class H>
PipelineBase& PipelineBase::addFront(H&& handler) {
return addFront(std::make_shared<H>(std::forward<H>(handler)));
}
template <class H>
PipelineBase& PipelineBase::addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){}));
}
template <class H>
PipelineBase& PipelineBase::removeHelper(H* handler, bool checkEqual) {
typedef typename ContextType<H>::type Context;
bool removed = false;
for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
auto ctx = std::dynamic_pointer_cast<Context>(*it);
if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
it = removeAt(it);
removed = true;
if (it == ctxs_.end()) {
break;
}
}
}
if (!removed) {
throw std::invalid_argument("No such handler in pipeline");
}
return *this;
}
template <class H>
PipelineBase& PipelineBase::remove() {
return removeHelper<H>(nullptr, false);
}
template <class H>
PipelineBase& PipelineBase::remove(H* handler) {
return removeHelper<H>(handler, true);
}
template <class H>
H* PipelineBase::getHandler(int i) {
typedef typename ContextType<H>::type Context;
auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
CHECK(ctx);
return ctx->getHandler();
}
template <class H>
bool PipelineBase::setOwner(H* handler) {
typedef typename ContextType<H>::type Context;
for (auto& ctx : ctxs_) {
auto ctxImpl = dynamic_cast<Context*>(ctx.get());
if (ctxImpl && ctxImpl->getHandler() == handler) {
owner_ = ctx;
return true;
}
}
return false;
}
template <class Context>
void PipelineBase::addContextFront(Context* ctx) {
addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
}
template <class Context>
PipelineBase& PipelineBase::addHelper(
std::shared_ptr<Context>&& ctx,
bool front) {
ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
}
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
}
return *this;
}
namespace detail {
template <class T>
inline void logWarningIfNotUnit(const std::string& warning) {
LOG(WARNING) << warning;
}
template <>
inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
// do nothing
}
} // detail
template <class R, class W>
template <class T>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
......@@ -126,141 +222,6 @@ Pipeline<R, W>::close() {
return back_->close();
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addBack(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), false);
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addBack(H&& handler) {
return addBack(std::make_shared<H>(std::forward<H>(handler)));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addBack(H* handler) {
return addBack(std::shared_ptr<H>(handler, [](H*){}));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addFront(std::shared_ptr<H> handler) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
return addHelper(std::make_shared<Context>(this, std::move(handler)), true);
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addFront(H&& handler) {
return addFront(std::make_shared<H>(std::forward<H>(handler)));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){}));
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
bool removed = false;
for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
auto ctx = std::dynamic_pointer_cast<Context>(*it);
if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
it = removeAt(it);
removed = true;
if (it == ctxs_.end()) {
break;
}
}
}
if (!removed) {
throw std::invalid_argument("No such handler in pipeline");
}
return *this;
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::remove() {
return removeHelper<H>(nullptr, false);
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
return removeHelper<H>(handler, true);
}
template <class R, class W>
typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
const typename Pipeline<R, W>::ContextIterator& it) {
(*it)->detachPipeline();
const auto dir = (*it)->getDirection();
if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
CHECK(it2 != inCtxs_.end());
inCtxs_.erase(it2);
}
if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
CHECK(it2 != outCtxs_.end());
outCtxs_.erase(it2);
}
return ctxs_.erase(it);
}
template <class R, class W>
Pipeline<R, W>& Pipeline<R, W>::removeFront() {
if (ctxs_.empty()) {
throw std::invalid_argument("No handlers in pipeline");
}
removeAt(ctxs_.begin());
return *this;
}
template <class R, class W>
Pipeline<R, W>& Pipeline<R, W>::removeBack() {
if (ctxs_.empty()) {
throw std::invalid_argument("No handlers in pipeline");
}
removeAt(--ctxs_.end());
return *this;
}
template <class R, class W>
template <class H>
H* Pipeline<R, W>::getHandler(int i) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
auto ctx = dynamic_cast<Context*>(ctxs_[i].get());
CHECK(ctx);
return ctx->getHandler();
}
namespace detail {
template <class T>
inline void logWarningIfNotUnit(const std::string& warning) {
LOG(WARNING) << warning;
}
template <>
inline void logWarningIfNotUnit<Unit>(const std::string& warning) {
// do nothing
}
} // detail
// TODO Have read/write/etc check that pipeline has been finalized
template <class R, class W>
void Pipeline<R, W>::finalize() {
......@@ -298,48 +259,4 @@ void Pipeline<R, W>::finalize() {
}
}
template <class R, class W>
template <class H>
bool Pipeline<R, W>::setOwner(H* handler) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
for (auto& ctx : ctxs_) {
auto ctxImpl = dynamic_cast<Context*>(ctx.get());
if (ctxImpl && ctxImpl->getHandler() == handler) {
owner_ = ctx;
return true;
}
}
return false;
}
template <class R, class W>
template <class Context>
void Pipeline<R, W>::addContextFront(Context* ctx) {
addHelper(std::shared_ptr<Context>(ctx, [](Context*){}), true);
}
template <class R, class W>
void Pipeline<R, W>::detachHandlers() {
for (auto& ctx : ctxs_) {
if (ctx != owner_) {
ctx->detachPipeline();
}
}
}
template <class R, class W>
template <class Context>
Pipeline<R, W>& Pipeline<R, W>::addHelper(
std::shared_ptr<Context>&& ctx,
bool front) {
ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::IN) {
inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
}
if (Context::dir == HandlerDir::BOTH || Context::dir == HandlerDir::OUT) {
outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
}
return *this;
}
}} // folly::wangle
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/channel/Pipeline.h>
namespace folly { namespace wangle {
void PipelineBase::setWriteFlags(WriteFlags flags) {
writeFlags_ = flags;
}
WriteFlags PipelineBase::getWriteFlags() {
return writeFlags_;
}
void PipelineBase::setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) {
readBufferSettings_ = std::make_pair(minAvailable, allocationSize);
}
std::pair<uint64_t, uint64_t> PipelineBase::getReadBufferSettings() {
return readBufferSettings_;
}
typename PipelineBase::ContextIterator PipelineBase::removeAt(
const typename PipelineBase::ContextIterator& it) {
(*it)->detachPipeline();
const auto dir = (*it)->getDirection();
if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
CHECK(it2 != inCtxs_.end());
inCtxs_.erase(it2);
}
if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
CHECK(it2 != outCtxs_.end());
outCtxs_.erase(it2);
}
return ctxs_.erase(it);
}
PipelineBase& PipelineBase::removeFront() {
if (ctxs_.empty()) {
throw std::invalid_argument("No handlers in pipeline");
}
removeAt(ctxs_.begin());
return *this;
}
PipelineBase& PipelineBase::removeBack() {
if (ctxs_.empty()) {
throw std::invalid_argument("No handlers in pipeline");
}
removeAt(--ctxs_.end());
return *this;
}
void PipelineBase::detachHandlers() {
for (auto& ctx : ctxs_) {
if (ctx != owner_) {
ctx->detachPipeline();
}
}
}
}} // folly::wangle
......@@ -16,16 +16,18 @@
#pragma once
#include <folly/wangle/channel/HandlerContext.h>
#include <folly/futures/Future.h>
#include <folly/futures/Unit.h>
#include <folly/io/async/AsyncTransport.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/wangle/channel/HandlerContext.h>
#include <folly/ExceptionWrapper.h>
#include <folly/Memory.h>
namespace folly { namespace wangle {
class PipelineBase;
class PipelineManager {
public:
virtual ~PipelineManager() = default;
......@@ -54,92 +56,43 @@ class PipelineBase : public DelayedDestruction {
return transport_;
}
private:
PipelineManager* manager_{nullptr};
std::shared_ptr<AsyncTransport> transport_;
};
/*
* R is the inbound type, i.e. inbound calls start with pipeline.read(R)
* W is the outbound type, i.e. outbound calls start with pipeline.write(W)
*
* Use Unit for one of the types if your pipeline is unidirectional.
* If R is Unit, read(), readEOF(), and readException() will be disabled.
* If W is Unit, write() and close() will be disabled.
*/
template <class R, class W = Unit>
class Pipeline : public PipelineBase {
public:
Pipeline();
~Pipeline();
void setWriteFlags(WriteFlags flags);
WriteFlags getWriteFlags();
void setReadBufferSettings(uint64_t minAvailable, uint64_t allocationSize);
std::pair<uint64_t, uint64_t> getReadBufferSettings();
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
read(R msg);
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
readEOF();
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
readException(exception_wrapper e);
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
transportActive();
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
transportInactive();
template <class T = W>
typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
write(W msg);
template <class T = W>
typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
close();
template <class H>
Pipeline& addBack(std::shared_ptr<H> handler);
PipelineBase& addBack(std::shared_ptr<H> handler);
template <class H>
Pipeline& addBack(H&& handler);
PipelineBase& addBack(H&& handler);
template <class H>
Pipeline& addBack(H* handler);
PipelineBase& addBack(H* handler);
template <class H>
Pipeline& addFront(std::shared_ptr<H> handler);
PipelineBase& addFront(std::shared_ptr<H> handler);
template <class H>
Pipeline& addFront(H&& handler);
PipelineBase& addFront(H&& handler);
template <class H>
Pipeline& addFront(H* handler);
PipelineBase& addFront(H* handler);
template <class H>
Pipeline& remove(H* handler);
PipelineBase& remove(H* handler);
template <class H>
Pipeline& remove();
PipelineBase& remove();
Pipeline& removeFront();
PipelineBase& removeFront();
Pipeline& removeBack();
PipelineBase& removeBack();
template <class H>
H* getHandler(int i);
void finalize();
// If one of the handlers owns the pipeline itself, use setOwner to ensure
// that the pipeline doesn't try to detach the handler during destruction,
// lest destruction ordering issues occur.
......@@ -147,20 +100,27 @@ class Pipeline : public PipelineBase {
template <class H>
bool setOwner(H* handler);
protected:
explicit Pipeline(bool isStatic);
virtual void finalize() = 0;
protected:
template <class Context>
void addContextFront(Context* ctx);
void detachHandlers();
std::vector<std::shared_ptr<PipelineContext>> ctxs_;
std::vector<PipelineContext*> inCtxs_;
std::vector<PipelineContext*> outCtxs_;
private:
PipelineManager* manager_{nullptr};
std::shared_ptr<AsyncTransport> transport_;
template <class Context>
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
PipelineBase& addHelper(std::shared_ptr<Context>&& ctx, bool front);
template <class H>
Pipeline& removeHelper(H* handler, bool checkEqual);
PipelineBase& removeHelper(H* handler, bool checkEqual);
typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
ContextIterator;
......@@ -170,11 +130,59 @@ class Pipeline : public PipelineBase {
WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
bool isStatic_{false};
std::shared_ptr<PipelineContext> owner_;
std::vector<std::shared_ptr<PipelineContext>> ctxs_;
std::vector<PipelineContext*> inCtxs_;
std::vector<PipelineContext*> outCtxs_;
};
/*
* R is the inbound type, i.e. inbound calls start with pipeline.read(R)
* W is the outbound type, i.e. outbound calls start with pipeline.write(W)
*
* Use Unit for one of the types if your pipeline is unidirectional.
* If R is Unit, read(), readEOF(), and readException() will be disabled.
* If W is Unit, write() and close() will be disabled.
*/
template <class R, class W = Unit>
class Pipeline : public PipelineBase {
public:
Pipeline();
~Pipeline();
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
read(R msg);
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
readEOF();
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
readException(exception_wrapper e);
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
transportActive();
template <class T = R>
typename std::enable_if<!std::is_same<T, Unit>::value>::type
transportInactive();
template <class T = W>
typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
write(W msg);
template <class T = W>
typename std::enable_if<!std::is_same<T, Unit>::value, Future<void>>::type
close();
void finalize() override;
protected:
explicit Pipeline(bool isStatic);
private:
bool isStatic_{false};
InboundLink<R>* front_{nullptr};
OutboundLink<W>* back_{nullptr};
};
......
......@@ -131,7 +131,7 @@ class StaticPipeline<R, W, Handler, Handlers...>
bool isFirst_;
std::shared_ptr<Handler> handlerPtr_;
typename ContextType<Handler, Pipeline<R, W>>::type ctx_;
typename ContextType<Handler>::type ctx_;
};
}} // folly::wangle
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment