Commit caf6cdc0 authored by Dave Watson's avatar Dave Watson Committed by woo

fixup service filter interface

Summary:
Based on a more thourough reading of finagle's interface:
* adds close/isAvailable, which seem very close to thrift's interfaces
* ComposedServices are hardcoded to underlying services, to simplify the code (means extra allocs?)
* Made everything a shared_ptr
* Addd ServiceFactoryFilters

Test Plan: Updated the existing unittests and added some new ones

Reviewed By: jsedgwick@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2037206

Signature: t1:2037206:1432147489:3464d4c12a9e434d4973febcabbf7b2b3a883257
parent d5550513
...@@ -26,11 +26,11 @@ namespace folly { namespace wangle { ...@@ -26,11 +26,11 @@ namespace folly { namespace wangle {
* only one request is allowed at a time. * only one request is allowed at a time.
*/ */
template <typename Pipeline, typename Req, typename Resp = Req> template <typename Pipeline, typename Req, typename Resp = Req>
class SerialClientDispatcher : public InboundHandler<Req> class SerialClientDispatcher : public HandlerAdapter<Req, Resp>
, public Service<Req, Resp> { , public Service<Req, Resp> {
public: public:
typedef typename InboundHandler<Req>::Context Context; typedef typename HandlerAdapter<Req, Resp>::Context Context;
void setPipeline(Pipeline* pipeline) { void setPipeline(Pipeline* pipeline) {
pipeline_ = pipeline; pipeline_ = pipeline;
...@@ -54,6 +54,13 @@ class SerialClientDispatcher : public InboundHandler<Req> ...@@ -54,6 +54,13 @@ class SerialClientDispatcher : public InboundHandler<Req>
return f; return f;
} }
virtual Future<void> close() override {
return HandlerAdapter<Req, Resp>::close(nullptr);
}
virtual Future<void> close(Context* ctx) override {
return HandlerAdapter<Req, Resp>::close(ctx);
}
private: private:
Pipeline* pipeline_{nullptr}; Pipeline* pipeline_{nullptr};
folly::Optional<Promise<Resp>> p_; folly::Optional<Promise<Resp>> p_;
......
...@@ -34,6 +34,12 @@ class Service { ...@@ -34,6 +34,12 @@ class Service {
public: public:
virtual Future<Resp> operator()(Req request) = 0; virtual Future<Resp> operator()(Req request) = 0;
virtual ~Service() {} virtual ~Service() {}
virtual Future<void> close() {
return makeFuture();
}
virtual bool isAvailable() {
return true;
}
}; };
/** /**
...@@ -57,40 +63,23 @@ class Service { ...@@ -57,40 +63,23 @@ class Service {
*/ */
template <typename ReqA, typename RespA, template <typename ReqA, typename RespA,
typename ReqB = ReqA, typename RespB = RespA> typename ReqB = ReqA, typename RespB = RespA>
class Filter { class ServiceFilter : public Service<ReqA, RespA> {
public: public:
virtual Future<RespA> operator()( explicit ServiceFilter(std::shared_ptr<Service<ReqB, RespB>> service)
Service<ReqB, RespB>* service, ReqA request) = 0; : service_(service) {}
std::unique_ptr<Service<ReqA, RespA>> compose( virtual ~ServiceFilter() {}
Service<ReqB, RespB>* service);
virtual ~Filter() {}
};
template <typename ReqA, typename RespA, virtual Future<void> close() override {
typename ReqB = ReqA, typename RespB = RespA> return service_->close();
class ComposedService : public Service<ReqA, RespA> {
public:
ComposedService(Service<ReqB, RespB>* service,
Filter<ReqA, RespA, ReqB, RespB>* filter)
: service_(service)
, filter_(filter) {}
virtual Future<RespA> operator()(ReqA request) override {
return (*filter_)(service_, request);
} }
~ComposedService(){} virtual bool isAvailable() override {
private: return service_->isAvailable();
Service<ReqB, RespB>* service_; }
Filter<ReqA, RespA, ReqB, RespB>* filter_;
};
template <typename ReqA, typename RespA, protected:
typename ReqB, typename RespB> std::shared_ptr<Service<ReqB, RespB>> service_;
std::unique_ptr<Service<ReqA, RespA>> };
Filter<ReqA, RespA, ReqB, RespB>::compose(Service<ReqB, RespB>* service) {
return folly::make_unique<ComposedService<ReqA, RespA, ReqB, RespB>>(
service, this);
}
/** /**
* A factory that creates services, given a client. This lets you * A factory that creates services, given a client. This lets you
...@@ -101,10 +90,65 @@ template <typename ReqA, typename RespA, ...@@ -101,10 +90,65 @@ template <typename ReqA, typename RespA,
template <typename Pipeline, typename Req, typename Resp> template <typename Pipeline, typename Req, typename Resp>
class ServiceFactory { class ServiceFactory {
public: public:
virtual Future<Service<Req, Resp>*> operator()( virtual Future<std::shared_ptr<Service<Req, Resp>>> operator()(
ClientBootstrap<Pipeline>* client) = 0; std::shared_ptr<ClientBootstrap<Pipeline>> client) = 0;
virtual ~ServiceFactory() = default; virtual ~ServiceFactory() = default;
}; };
template <typename Pipeline, typename Req, typename Resp>
class ConstFactory : public ServiceFactory<Pipeline, Req, Resp> {
public:
explicit ConstFactory(std::shared_ptr<Service<Req, Resp>> service)
: service_(service) {}
virtual Future<std::shared_ptr<Service<Req, Resp>>> operator()(
std::shared_ptr<ClientBootstrap<Pipeline>> client) {
return service_;
}
private:
std::shared_ptr<Service<Req, Resp>> service_;
};
template <typename Pipeline, typename ReqA, typename RespA,
typename ReqB = ReqA, typename RespB = RespA>
class ServiceFactoryFilter : public ServiceFactory<Pipeline, ReqA, RespA> {
public:
explicit ServiceFactoryFilter(
std::shared_ptr<ServiceFactory<Pipeline, ReqB, RespB>> serviceFactory)
: serviceFactory_(std::move(serviceFactory)) {}
virtual ~ServiceFactoryFilter() = default;
protected:
std::shared_ptr<ServiceFactory<Pipeline, ReqB, RespB>> serviceFactory_;
};
template <typename Pipeline, typename Req, typename Resp = Req>
class FactoryToService : public Service<Req, Resp> {
public:
explicit FactoryToService(
std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory)
: factory_(factory) {}
virtual ~FactoryToService() {}
virtual Future<Resp> operator()(Req request) override {
DCHECK(factory_);
return ((*factory_)(nullptr)).then(
[=](std::shared_ptr<Service<Req, Resp>> service)
{
return (*service)(std::move(request)).ensure(
[this]() {
this->close();
});
});
}
private:
std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory_;
};
} // namespace } // namespace
...@@ -38,14 +38,14 @@ class SimpleDecode : public ByteToMessageCodec { ...@@ -38,14 +38,14 @@ class SimpleDecode : public ByteToMessageCodec {
class EchoService : public Service<std::string, std::string> { class EchoService : public Service<std::string, std::string> {
public: public:
virtual Future<std::string> operator()(std::string req) override { virtual Future<std::string> operator()(std::string req) override {
return makeFuture<std::string>(std::move(req)); return req;
} }
}; };
class EchoIntService : public Service<std::string, int> { class EchoIntService : public Service<std::string, int> {
public: public:
virtual Future<int> operator()(std::string req) override { virtual Future<int> operator()(std::string req) override {
return makeFuture<int>(folly::to<int>(req)); return folly::to<int>(req);
} }
}; };
...@@ -101,10 +101,10 @@ class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> { ...@@ -101,10 +101,10 @@ class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> {
SerialClientDispatcher<Pipeline, Req, Resp> dispatcher_; SerialClientDispatcher<Pipeline, Req, Resp> dispatcher_;
}; };
Future<Service<Req, Resp>*> operator()( Future<std::shared_ptr<Service<Req, Resp>>> operator() (
ClientBootstrap<Pipeline>* client) override { std::shared_ptr<ClientBootstrap<Pipeline>> client) override {
return makeFuture<Service<Req, Resp>*>( return Future<std::shared_ptr<Service<Req, Resp>>>(
new ClientService(client->getPipeline())); std::make_shared<ClientService>(client->getPipeline()));
} }
}; };
...@@ -124,8 +124,7 @@ TEST(Wangle, ClientServerTest) { ...@@ -124,8 +124,7 @@ TEST(Wangle, ClientServerTest) {
std::make_shared<ClientPipelineFactory<std::string, std::string>>()); std::make_shared<ClientPipelineFactory<std::string, std::string>>());
SocketAddress addr("127.0.0.1", port); SocketAddress addr("127.0.0.1", port);
client->connect(addr); client->connect(addr);
auto service = std::shared_ptr<Service<std::string, std::string>>( auto service = serviceFactory(client).value();
serviceFactory(client.get()).value());
auto rep = (*service)("test"); auto rep = (*service)("test");
rep.then([&](std::string value) { rep.then([&](std::string value) {
...@@ -138,68 +137,114 @@ TEST(Wangle, ClientServerTest) { ...@@ -138,68 +137,114 @@ TEST(Wangle, ClientServerTest) {
client.reset(); client.reset();
} }
class AppendFilter : public Filter<std::string, std::string> { class AppendFilter : public ServiceFilter<std::string, std::string> {
public: public:
virtual Future<std::string> operator()( explicit AppendFilter(
Service<std::string, std::string>* service, std::string req) { std::shared_ptr<Service<std::string, std::string>> service) :
return (*service)(req + "\n"); ServiceFilter<std::string, std::string>(service) {}
virtual Future<std::string> operator()(std::string req) {
return (*service_)(req + "\n");
} }
}; };
class IntToStringFilter : public Filter<int, int, std::string, std::string> { class IntToStringFilter
: public ServiceFilter<int, int, std::string, std::string> {
public: public:
virtual Future<int> operator()( explicit IntToStringFilter(
Service<std::string, std::string>* service, int req) { std::shared_ptr<Service<std::string, std::string>> service) :
return (*service)(folly::to<std::string>(req)).then([](std::string resp) { ServiceFilter<int, int, std::string, std::string>(service) {}
virtual Future<int> operator()(int req) {
return (*service_)(folly::to<std::string>(req)).then([](std::string resp) {
return folly::to<int>(resp); return folly::to<int>(resp);
}); });
} }
}; };
TEST(Wangle, FilterTest) { TEST(Wangle, FilterTest) {
auto service = folly::make_unique<EchoService>(); auto service = std::make_shared<EchoService>();
auto filter = folly::make_unique<AppendFilter>(); auto filter = std::make_shared<AppendFilter>(service);
auto result = (*filter)(service.get(), "test"); auto result = (*filter)("test");
EXPECT_EQ(result.value(), "test\n"); EXPECT_EQ(result.value(), "test\n");
// Check composition
auto composed_service = filter->compose(service.get());
auto result2 = (*composed_service)("test");
EXPECT_EQ(result2.value(), "test\n");
} }
TEST(Wangle, ComplexFilterTest) { TEST(Wangle, ComplexFilterTest) {
auto service = folly::make_unique<EchoService>(); auto service = std::make_shared<EchoService>();
auto filter = folly::make_unique<IntToStringFilter>(); auto filter = std::make_shared<IntToStringFilter>(service);
auto result = (*filter)(service.get(), 1); auto result = (*filter)(1);
EXPECT_EQ(result.value(), 1); EXPECT_EQ(result.value(), 1);
// Check composition
auto composed_service = filter->compose(service.get());
auto result2 = (*composed_service)(2);
EXPECT_EQ(result2.value(), 2);
} }
class ChangeTypeFilter : public Filter<int, std::string, std::string, int> { class ChangeTypeFilter
: public ServiceFilter<int, std::string, std::string, int> {
public: public:
virtual Future<std::string> operator()( explicit ChangeTypeFilter(
Service<std::string, int>* service, int req) { std::shared_ptr<Service<std::string, int>> service) :
return (*service)(folly::to<std::string>(req)).then([](int resp) { ServiceFilter<int, std::string, std::string, int>(service) {}
virtual Future<std::string> operator()(int req) {
return (*service_)(folly::to<std::string>(req)).then([](int resp) {
return folly::to<std::string>(resp); return folly::to<std::string>(resp);
}); });
} }
}; };
TEST(Wangle, SuperComplexFilterTest) { TEST(Wangle, SuperComplexFilterTest) {
auto service = folly::make_unique<EchoIntService>(); auto service = std::make_shared<EchoIntService>();
auto filter = folly::make_unique<ChangeTypeFilter>(); auto filter = std::make_shared<ChangeTypeFilter>(service);
auto result = (*filter)(service.get(), 1); auto result = (*filter)(1);
EXPECT_EQ(result.value(), "1"); EXPECT_EQ(result.value(), "1");
}
template <typename Pipeline, typename Req, typename Resp>
class ConnectionCountFilter : public ServiceFactoryFilter<Pipeline, Req, Resp> {
public:
explicit ConnectionCountFilter(
std::shared_ptr<ServiceFactory<Pipeline, Req, Resp>> factory)
: ServiceFactoryFilter<Pipeline, Req, Resp>(factory) {}
virtual Future<std::shared_ptr<Service<Req, Resp>>> operator()(
std::shared_ptr<ClientBootstrap<Pipeline>> client) {
connectionCount++;
return (*this->serviceFactory_)(client);
}
int connectionCount{0};
};
TEST(Wangle, ServiceFactoryFilter) {
auto clientFactory =
std::make_shared<
ClientServiceFactory<ServicePipeline, std::string, std::string>>();
auto countingFactory =
std::make_shared<
ConnectionCountFilter<ServicePipeline, std::string, std::string>>(
clientFactory);
auto client = std::make_shared<ClientBootstrap<ServicePipeline>>();
client->pipelineFactory(
std::make_shared<ClientPipelineFactory<std::string, std::string>>());
// It doesn't matter if connect succeds or not, but it needs to be called
// to create a pipeline
client->connect(folly::SocketAddress("::1", 8090));
auto service = (*countingFactory)(client).value();
// After the first service goes away, the client can be reused
service = (*countingFactory)(client).value();
EXPECT_EQ(2, countingFactory->connectionCount);
}
TEST(Wangle, FactoryToService) {
auto constfactory =
std::make_shared<ConstFactory<ServicePipeline, std::string, std::string>>(
std::make_shared<EchoService>());
FactoryToService<ServicePipeline, std::string, std::string> service(
constfactory);
// Check composition EXPECT_EQ("test", service("test").value());
auto composed_service = filter->compose(service.get());
auto result2 = (*composed_service)(2);
EXPECT_EQ(result2.value(), "2");
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment