Commit 9eed30dd authored by James Sedgwick's avatar James Sedgwick Committed by Andrii Grynenko

fix detachPipeline/attachPipeline ordering

Summary:
detachPipeline always goes bottom to top
attachPipeline always goes top to bottom
now we can attachReadCallback in AsyncSocketHandler::attachPipeline()

not sure of the implications for TAsyncTransportHandler... looks like Cpp2Channel still wants to attach/detach cb manually

Test Plan: unit

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2023982

Tasks: 6836580

Signature: t1:2023982:1430157500:e09a4103550a3e5721effaa1b28ac5bed071fa67
parent 03ec96f3
...@@ -67,6 +67,7 @@ class AsyncSocketHandler ...@@ -67,6 +67,7 @@ class AsyncSocketHandler
void attachPipeline(Context* ctx) override { void attachPipeline(Context* ctx) override {
CHECK(!ctx_); CHECK(!ctx_);
ctx_ = ctx; ctx_ = ctx;
attachReadCallback();
} }
folly::Future<void> write( folly::Future<void> write(
......
...@@ -59,6 +59,7 @@ class PipelineContext { ...@@ -59,6 +59,7 @@ class PipelineContext {
public: public:
virtual ~PipelineContext() {} virtual ~PipelineContext() {}
virtual void attachPipeline() = 0;
virtual void detachPipeline() = 0; virtual void detachPipeline() = 0;
virtual void attachTransport() = 0; virtual void attachTransport() = 0;
...@@ -107,24 +108,31 @@ class ContextImpl : public HandlerContext<typename H::rout, ...@@ -107,24 +108,31 @@ class ContextImpl : public HandlerContext<typename H::rout,
initialize(pipeline, std::move(handler)); initialize(pipeline, std::move(handler));
} }
void initialize(P* pipeline, std::shared_ptr<H> handler) {
pipeline_ = pipeline;
handler_ = std::move(handler);
handler_->attachPipeline(this);
}
// For StaticPipeline // For StaticPipeline
ContextImpl() {} ContextImpl() {}
~ContextImpl() {} ~ContextImpl() {}
void initialize(P* pipeline, std::shared_ptr<H> handler) {
pipeline_ = pipeline;
handler_ = std::move(handler);
}
H* getHandler() { H* getHandler() {
return handler_.get(); return handler_.get();
} }
// PipelineContext overrides // PipelineContext overrides
void attachPipeline() override {
if (!attached_) {
handler_->attachPipeline(this);
attached_ = true;
}
}
void detachPipeline() override { void detachPipeline() override {
handler_->detachPipeline(this); handler_->detachPipeline(this);
attached_ = false;
} }
void setNextIn(PipelineContext* ctx) override { void setNextIn(PipelineContext* ctx) override {
...@@ -257,6 +265,7 @@ class ContextImpl : public HandlerContext<typename H::rout, ...@@ -257,6 +265,7 @@ class ContextImpl : public HandlerContext<typename H::rout,
std::shared_ptr<H> handler_; std::shared_ptr<H> handler_;
InboundHandlerContext<Rout>* nextIn_{nullptr}; InboundHandlerContext<Rout>* nextIn_{nullptr};
OutboundHandlerContext<Wout>* nextOut_{nullptr}; OutboundHandlerContext<Wout>* nextOut_{nullptr};
bool attached_{false};
}; };
}} }}
...@@ -33,7 +33,13 @@ namespace folly { namespace wangle { ...@@ -33,7 +33,13 @@ namespace folly { namespace wangle {
template <class R, class W> template <class R, class W>
class Pipeline : public DelayedDestruction { class Pipeline : public DelayedDestruction {
public: public:
Pipeline() {} Pipeline() : isStatic_(false) {}
~Pipeline() {
if (!isStatic_) {
detachHandlers();
}
}
std::shared_ptr<AsyncTransport> getTransport() { std::shared_ptr<AsyncTransport> getTransport() {
return transport_; return transport_;
...@@ -136,6 +142,10 @@ class Pipeline : public DelayedDestruction { ...@@ -136,6 +142,10 @@ class Pipeline : public DelayedDestruction {
if (!front_) { if (!front_) {
throw std::invalid_argument("wrong type for first handler"); throw std::invalid_argument("wrong type for first handler");
} }
for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
(*it)->attachPipeline();
}
} }
// If one of the handlers owns the pipeline itself, use setOwner to ensure // If one of the handlers owns the pipeline itself, use setOwner to ensure
...@@ -170,6 +180,10 @@ class Pipeline : public DelayedDestruction { ...@@ -170,6 +180,10 @@ class Pipeline : public DelayedDestruction {
} }
protected: protected:
explicit Pipeline(bool isStatic) : isStatic_(isStatic) {
CHECK(isStatic_);
}
template <class Context> template <class Context>
void addContextFront(Context* context) { void addContextFront(Context* context) {
ctxs_.insert( ctxs_.insert(
...@@ -190,6 +204,7 @@ class Pipeline : public DelayedDestruction { ...@@ -190,6 +204,7 @@ class Pipeline : public DelayedDestruction {
WriteFlags writeFlags_{WriteFlags::NONE}; WriteFlags writeFlags_{WriteFlags::NONE};
std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048}; std::pair<uint64_t, uint64_t> readBufferSettings_{2048, 2048};
bool isStatic_{false};
InboundHandlerContext<R>* front_{nullptr}; InboundHandlerContext<R>* front_{nullptr};
OutboundHandlerContext<W>* back_{nullptr}; OutboundHandlerContext<W>* back_{nullptr};
std::vector<std::shared_ptr<PipelineContext>> ctxs_; std::vector<std::shared_ptr<PipelineContext>> ctxs_;
......
...@@ -47,7 +47,7 @@ class StaticPipeline; ...@@ -47,7 +47,7 @@ class StaticPipeline;
template <class R, class W> template <class R, class W>
class StaticPipeline<R, W> : public Pipeline<R, W> { class StaticPipeline<R, W> : public Pipeline<R, W> {
protected: protected:
explicit StaticPipeline(bool) : Pipeline<R, W>() {} explicit StaticPipeline(bool) : Pipeline<R, W>(true) {}
}; };
template <class R, class W, class Handler, class... Handlers> template <class R, class W, class Handler, class... Handlers>
......
...@@ -79,8 +79,11 @@ TEST(PipelineTest, FireActions) { ...@@ -79,8 +79,11 @@ TEST(PipelineTest, FireActions) {
IntHandler handler1; IntHandler handler1;
IntHandler handler2; IntHandler handler2;
EXPECT_CALL(handler1, attachPipeline(_)); {
EXPECT_CALL(handler2, attachPipeline(_)); InSequence sequence;
EXPECT_CALL(handler2, attachPipeline(_));
EXPECT_CALL(handler1, attachPipeline(_));
}
StaticPipeline<int, int, IntHandler, IntHandler> StaticPipeline<int, int, IntHandler, IntHandler>
pipeline(&handler1, &handler2); pipeline(&handler1, &handler2);
...@@ -105,8 +108,11 @@ TEST(PipelineTest, FireActions) { ...@@ -105,8 +108,11 @@ TEST(PipelineTest, FireActions) {
EXPECT_CALL(handler1, close_(_)).Times(1); EXPECT_CALL(handler1, close_(_)).Times(1);
EXPECT_NO_THROW(pipeline.close().value()); EXPECT_NO_THROW(pipeline.close().value());
EXPECT_CALL(handler1, detachPipeline(_)); {
EXPECT_CALL(handler2, detachPipeline(_)); InSequence sequence;
EXPECT_CALL(handler1, detachPipeline(_));
EXPECT_CALL(handler2, detachPipeline(_));
}
} }
// Test that nothing bad happens when actions reach the end of the pipeline // Test that nothing bad happens when actions reach the end of the pipeline
...@@ -140,8 +146,11 @@ TEST(PipelineTest, TurnAround) { ...@@ -140,8 +146,11 @@ TEST(PipelineTest, TurnAround) {
IntHandler handler1; IntHandler handler1;
IntHandler handler2; IntHandler handler2;
EXPECT_CALL(handler1, attachPipeline(_)); {
EXPECT_CALL(handler2, attachPipeline(_)); InSequence sequence;
EXPECT_CALL(handler2, attachPipeline(_));
EXPECT_CALL(handler1, attachPipeline(_));
}
StaticPipeline<int, int, IntHandler, IntHandler> StaticPipeline<int, int, IntHandler, IntHandler>
pipeline(&handler1, &handler2); pipeline(&handler1, &handler2);
...@@ -151,8 +160,11 @@ TEST(PipelineTest, TurnAround) { ...@@ -151,8 +160,11 @@ TEST(PipelineTest, TurnAround) {
EXPECT_CALL(handler1, write_(_, _)).Times(1); EXPECT_CALL(handler1, write_(_, _)).Times(1);
pipeline.read(1); pipeline.read(1);
EXPECT_CALL(handler1, detachPipeline(_)); {
EXPECT_CALL(handler2, detachPipeline(_)); InSequence sequence;
EXPECT_CALL(handler1, detachPipeline(_));
EXPECT_CALL(handler2, detachPipeline(_));
}
} }
TEST(PipelineTest, DynamicFireActions) { TEST(PipelineTest, DynamicFireActions) {
...@@ -161,8 +173,11 @@ TEST(PipelineTest, DynamicFireActions) { ...@@ -161,8 +173,11 @@ TEST(PipelineTest, DynamicFireActions) {
StaticPipeline<int, int, IntHandler> StaticPipeline<int, int, IntHandler>
pipeline(&handler2); pipeline(&handler2);
EXPECT_CALL(handler1, attachPipeline(_)); {
EXPECT_CALL(handler3, attachPipeline(_)); InSequence sequence;
EXPECT_CALL(handler3, attachPipeline(_));
EXPECT_CALL(handler1, attachPipeline(_));
}
pipeline pipeline
.addFront(&handler1) .addFront(&handler1)
...@@ -183,9 +198,46 @@ TEST(PipelineTest, DynamicFireActions) { ...@@ -183,9 +198,46 @@ TEST(PipelineTest, DynamicFireActions) {
EXPECT_CALL(handler1, write_(_, _)).Times(1); EXPECT_CALL(handler1, write_(_, _)).Times(1);
EXPECT_NO_THROW(pipeline.write(1).value()); EXPECT_NO_THROW(pipeline.write(1).value());
EXPECT_CALL(handler1, detachPipeline(_)); {
EXPECT_CALL(handler2, detachPipeline(_)); InSequence sequence;
EXPECT_CALL(handler3, detachPipeline(_)); EXPECT_CALL(handler1, detachPipeline(_));
EXPECT_CALL(handler2, detachPipeline(_));
EXPECT_CALL(handler3, detachPipeline(_));
}
}
TEST(PipelineTest, DynamicAttachDetachOrder) {
IntHandler handler1, handler2;
Pipeline<int, int> pipeline;
{
InSequence sequence;
EXPECT_CALL(handler2, attachPipeline(_));
EXPECT_CALL(handler1, attachPipeline(_));
}
pipeline
.addBack(&handler1)
.addBack(&handler2)
.finalize();
{
InSequence sequence;
EXPECT_CALL(handler1, detachPipeline(_));
EXPECT_CALL(handler2, detachPipeline(_));
}
}
TEST(PipelineTest, HandlerInMultiplePipelines) {
IntHandler handler;
EXPECT_CALL(handler, attachPipeline(_)).Times(2);
StaticPipeline<int, int, IntHandler> pipeline1(&handler);
StaticPipeline<int, int, IntHandler> pipeline2(&handler);
EXPECT_CALL(handler, detachPipeline(_)).Times(2);
}
TEST(PipelineTest, HandlerInPipelineTwice) {
IntHandler handler;
EXPECT_CALL(handler, attachPipeline(_)).Times(2);
StaticPipeline<int, int, IntHandler, IntHandler> pipeline(&handler, &handler);
EXPECT_CALL(handler, detachPipeline(_)).Times(2);
} }
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin> template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
......
...@@ -52,7 +52,6 @@ class ServerPipelineFactory ...@@ -52,7 +52,6 @@ class ServerPipelineFactory
pipeline->addBack(StringCodec()); pipeline->addBack(StringCodec());
pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_)); pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
pipeline->finalize(); pipeline->finalize();
pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
return pipeline; return pipeline;
} }
...@@ -69,8 +68,6 @@ class ClientPipelineFactory : public PipelineFactory<ServicePipeline> { ...@@ -69,8 +68,6 @@ class ClientPipelineFactory : public PipelineFactory<ServicePipeline> {
auto pipeline = new ServicePipeline(); auto pipeline = new ServicePipeline();
pipeline->addBack(AsyncSocketHandler(socket)); pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(StringCodec()); pipeline->addBack(StringCodec());
pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
return pipeline; return pipeline;
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment