Commit 31c71948 authored by James Sedgwick's avatar James Sedgwick Committed by Sara Golemon

pipeline handler removal, fix service test

Summary: add remove<T>, remove(Handler*), removeFront(), removeBack() to Pipeline
employ these to fix up reusing client pipelines with client dispatchers, which in turn fixes the broken ServiceTest

Reviewed By: @djwatson

Differential Revision: D2152636
parent 53e6886f
...@@ -36,6 +36,8 @@ class PipelineContext { ...@@ -36,6 +36,8 @@ class PipelineContext {
virtual void setNextIn(PipelineContext* ctx) = 0; virtual void setNextIn(PipelineContext* ctx) = 0;
virtual void setNextOut(PipelineContext* ctx) = 0; virtual void setNextOut(PipelineContext* ctx) = 0;
virtual HandlerDir getDirection() = 0;
}; };
template <class In> template <class In>
...@@ -86,6 +88,10 @@ class ContextImplBase : public PipelineContext { ...@@ -86,6 +88,10 @@ class ContextImplBase : public PipelineContext {
} }
void setNextIn(PipelineContext* ctx) override { void setNextIn(PipelineContext* ctx) override {
if (!ctx) {
nextIn_ = nullptr;
return;
}
auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx); auto nextIn = dynamic_cast<InboundLink<typename H::rout>*>(ctx);
if (nextIn) { if (nextIn) {
nextIn_ = nextIn; nextIn_ = nextIn;
...@@ -95,6 +101,10 @@ class ContextImplBase : public PipelineContext { ...@@ -95,6 +101,10 @@ class ContextImplBase : public PipelineContext {
} }
void setNextOut(PipelineContext* ctx) override { void setNextOut(PipelineContext* ctx) override {
if (!ctx) {
nextOut_ = nullptr;
return;
}
auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx); auto nextOut = dynamic_cast<OutboundLink<typename H::wout>*>(ctx);
if (nextOut) { if (nextOut) {
nextOut_ = nextOut; nextOut_ = nextOut;
...@@ -103,6 +113,10 @@ class ContextImplBase : public PipelineContext { ...@@ -103,6 +113,10 @@ class ContextImplBase : public PipelineContext {
} }
} }
HandlerDir getDirection() override {
return H::dir;
}
protected: protected:
Context* impl_; Context* impl_;
P* pipeline_; P* pipeline_;
......
...@@ -164,6 +164,80 @@ Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) { ...@@ -164,6 +164,80 @@ Pipeline<R, W>& Pipeline<R, W>::addFront(H* handler) {
return addFront(std::shared_ptr<H>(handler, [](H*){})); return addFront(std::shared_ptr<H>(handler, [](H*){}));
} }
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::removeHelper(H* handler, bool checkEqual) {
typedef typename ContextType<H, Pipeline<R, W>>::type Context;
bool removed = false;
for (auto it = ctxs_.begin(); it != ctxs_.end(); it++) {
auto ctx = std::dynamic_pointer_cast<Context>(*it);
if (ctx && (!checkEqual || ctx->getHandler() == handler)) {
it = removeAt(it);
removed = true;
if (it == ctxs_.end()) {
break;
}
}
}
if (!removed) {
throw std::invalid_argument("No such handler in pipeline");
}
return *this;
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::remove() {
return removeHelper<H>(nullptr, false);
}
template <class R, class W>
template <class H>
Pipeline<R, W>& Pipeline<R, W>::remove(H* handler) {
return removeHelper<H>(handler, true);
}
template <class R, class W>
typename Pipeline<R, W>::ContextIterator Pipeline<R, W>::removeAt(
const typename Pipeline<R, W>::ContextIterator& it) {
(*it)->detachPipeline();
const auto dir = (*it)->getDirection();
if (dir == HandlerDir::BOTH || dir == HandlerDir::IN) {
auto it2 = std::find(inCtxs_.begin(), inCtxs_.end(), it->get());
CHECK(it2 != inCtxs_.end());
inCtxs_.erase(it2);
}
if (dir == HandlerDir::BOTH || dir == HandlerDir::OUT) {
auto it2 = std::find(outCtxs_.begin(), outCtxs_.end(), it->get());
CHECK(it2 != outCtxs_.end());
outCtxs_.erase(it2);
}
return ctxs_.erase(it);
}
template <class R, class W>
Pipeline<R, W>& Pipeline<R, W>::removeFront() {
if (ctxs_.empty()) {
throw std::invalid_argument("No handlers in pipeline");
}
removeAt(ctxs_.begin());
return *this;
}
template <class R, class W>
Pipeline<R, W>& Pipeline<R, W>::removeBack() {
if (ctxs_.empty()) {
throw std::invalid_argument("No handlers in pipeline");
}
removeAt(--ctxs_.end());
return *this;
}
template <class R, class W> template <class R, class W>
template <class H> template <class H>
H* Pipeline<R, W>::getHandler(int i) { H* Pipeline<R, W>::getHandler(int i) {
...@@ -190,18 +264,22 @@ inline void logWarningIfNotNothing<Nothing>(const std::string& warning) { ...@@ -190,18 +264,22 @@ inline void logWarningIfNotNothing<Nothing>(const std::string& warning) {
// TODO Have read/write/etc check that pipeline has been finalized // TODO Have read/write/etc check that pipeline has been finalized
template <class R, class W> template <class R, class W>
void Pipeline<R, W>::finalize() { void Pipeline<R, W>::finalize() {
front_ = nullptr;
if (!inCtxs_.empty()) { if (!inCtxs_.empty()) {
front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front()); front_ = dynamic_cast<InboundLink<R>*>(inCtxs_.front());
for (size_t i = 0; i < inCtxs_.size() - 1; i++) { for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
inCtxs_[i]->setNextIn(inCtxs_[i+1]); inCtxs_[i]->setNextIn(inCtxs_[i+1]);
} }
inCtxs_.back()->setNextIn(nullptr);
} }
back_ = nullptr;
if (!outCtxs_.empty()) { if (!outCtxs_.empty()) {
back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back()); back_ = dynamic_cast<OutboundLink<W>*>(outCtxs_.back());
for (size_t i = outCtxs_.size() - 1; i > 0; i--) { for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
outCtxs_[i]->setNextOut(outCtxs_[i-1]); outCtxs_[i]->setNextOut(outCtxs_[i-1]);
} }
outCtxs_.front()->setNextOut(nullptr);
} }
if (!front_) { if (!front_) {
......
...@@ -126,6 +126,16 @@ class Pipeline : public PipelineBase { ...@@ -126,6 +126,16 @@ class Pipeline : public PipelineBase {
template <class H> template <class H>
Pipeline& addFront(H* handler); Pipeline& addFront(H* handler);
template <class H>
Pipeline& remove(H* handler);
template <class H>
Pipeline& remove();
Pipeline& removeFront();
Pipeline& removeBack();
template <class H> template <class H>
H* getHandler(int i); H* getHandler(int i);
...@@ -150,6 +160,14 @@ class Pipeline : public PipelineBase { ...@@ -150,6 +160,14 @@ class Pipeline : public PipelineBase {
template <class Context> template <class Context>
Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front); Pipeline& addHelper(std::shared_ptr<Context>&& ctx, bool front);
template <class H>
Pipeline& removeHelper(H* handler, bool checkEqual);
typedef std::vector<std::shared_ptr<PipelineContext>>::iterator
ContextIterator;
ContextIterator removeAt(const ContextIterator& it);
WriteFlags writeFlags_{WriteFlags::NONE}; WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048}; std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
......
...@@ -304,3 +304,88 @@ TEST(Pipeline, DynamicConstruction) { ...@@ -304,3 +304,88 @@ TEST(Pipeline, DynamicConstruction) {
.finalize()); .finalize());
} }
} }
TEST(Pipeline, RemovePointer) {
IntHandler handler1, handler2;
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler2, attachPipeline(_));
Pipeline<int, int> pipeline;
pipeline
.addBack(&handler1)
.addBack(&handler2)
.finalize();
EXPECT_CALL(handler1, detachPipeline(_));
pipeline
.remove(&handler1)
.finalize();
EXPECT_CALL(handler2, read_(_, _));
pipeline.read(1);
EXPECT_CALL(handler2, detachPipeline(_));
}
TEST(Pipeline, RemoveFront) {
IntHandler handler1, handler2;
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler2, attachPipeline(_));
Pipeline<int, int> pipeline;
pipeline
.addBack(&handler1)
.addBack(&handler2)
.finalize();
EXPECT_CALL(handler1, detachPipeline(_));
pipeline
.removeFront()
.finalize();
EXPECT_CALL(handler2, read_(_, _));
pipeline.read(1);
EXPECT_CALL(handler2, detachPipeline(_));
}
TEST(Pipeline, RemoveBack) {
IntHandler handler1, handler2;
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler2, attachPipeline(_));
Pipeline<int, int> pipeline;
pipeline
.addBack(&handler1)
.addBack(&handler2)
.finalize();
EXPECT_CALL(handler2, detachPipeline(_));
pipeline
.removeBack()
.finalize();
EXPECT_CALL(handler1, read_(_, _));
pipeline.read(1);
EXPECT_CALL(handler1, detachPipeline(_));
}
TEST(Pipeline, RemoveType) {
IntHandler handler1;
IntHandler2 handler2;
EXPECT_CALL(handler1, attachPipeline(_));
EXPECT_CALL(handler2, attachPipeline(_));
Pipeline<int, int> pipeline;
pipeline
.addBack(&handler1)
.addBack(&handler2)
.finalize();
EXPECT_CALL(handler1, detachPipeline(_));
pipeline
.remove<IntHandler>()
.finalize();
EXPECT_CALL(handler2, read_(_, _));
pipeline.read(1);
EXPECT_CALL(handler2, detachPipeline(_));
}
...@@ -29,13 +29,27 @@ template <typename Pipeline, typename Req, typename Resp = Req> ...@@ -29,13 +29,27 @@ template <typename Pipeline, typename Req, typename Resp = Req>
class SerialClientDispatcher : public HandlerAdapter<Req, Resp> class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
, public Service<Req, Resp> { , public Service<Req, Resp> {
public: public:
typedef typename HandlerAdapter<Req, Resp>::Context Context; typedef typename HandlerAdapter<Req, Resp>::Context Context;
~SerialClientDispatcher() {
if (pipeline_) {
try {
pipeline_->remove(this).finalize();
} catch (const std::invalid_argument& e) {
// not in pipeline; this is fine
}
}
}
void setPipeline(Pipeline* pipeline) { void setPipeline(Pipeline* pipeline) {
try {
pipeline->template remove<SerialClientDispatcher>();
} catch (const std::invalid_argument& e) {
// no existing dispatcher; this is fine
}
pipeline_ = pipeline; pipeline_ = pipeline;
pipeline->addBack(this); pipeline_->addBack(this);
pipeline->finalize(); pipeline_->finalize();
} }
void read(Context* ctx, Req in) override { void read(Context* ctx, Req in) override {
...@@ -61,6 +75,11 @@ class SerialClientDispatcher : public HandlerAdapter<Req, Resp> ...@@ -61,6 +75,11 @@ class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
virtual Future<void> close(Context* ctx) override { virtual Future<void> close(Context* ctx) override {
return HandlerAdapter<Req, Resp>::close(ctx); return HandlerAdapter<Req, Resp>::close(ctx);
} }
void detachPipeline(Context* ctx) override {
pipeline_ = nullptr;
}
private: private:
Pipeline* pipeline_{nullptr}; Pipeline* pipeline_{nullptr};
folly::Optional<Promise<Resp>> p_; folly::Optional<Promise<Resp>> p_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment