Commit c152d43d authored by Dave Watson's avatar Dave Watson Committed by woo

Finagle interfaces

Summary: Future service interfaces similar to finagle.  Service creators for client, filters

Test Plan: Unittests included - also sets up a simple pipeline to test a full stack client/server.

Reviewed By: hans@fb.com

Subscribers: jsedgwick, trunkagent, njormrod, folly-diffs@, doug, fugalh

FB internal diff: D1573086

Tasks: 5002456

Signature: t1:1573086:1421970698:328453c4a980bb6950fc9aeed6a2b6d9819c20db
parent 47768bc7
......@@ -26,13 +26,11 @@ using namespace folly;
typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
class TestServer : public ServerBootstrap<Pipeline> {
Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
return nullptr;
}
};
typedef ServerBootstrap<Pipeline> TestServer;
typedef ClientBootstrap<Pipeline> TestClient;
class TestClient : public ClientBootstrap<Pipeline> {
class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
public:
Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
......@@ -76,6 +74,7 @@ TEST(Bootstrap, ClientServerTest) {
server.getSockets()[0]->getAddress(&address);
TestClient client;
client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
base->loop();
server.stop();
......@@ -98,9 +97,12 @@ TEST(Bootstrap, ClientConnectionManagerTest) {
server.getSockets()[0]->getAddress(&address);
TestClient client;
client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
TestClient client2;
client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client2.connect(address);
base->loop();
......@@ -124,6 +126,7 @@ TEST(Bootstrap, ServerAcceptGroupTest) {
boost::barrier barrier(2);
auto thread = std::thread([&](){
TestClient client;
client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
barrier.wait();
......@@ -162,6 +165,8 @@ TEST(Bootstrap, ServerAcceptGroup2Test) {
server.getSockets()[0]->getAddress(&address);
TestClient client;
client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
EventBaseManager::get()->getEventBase()->loop();
......@@ -198,18 +203,23 @@ TEST(Bootstrap, SharedThreadPool) {
server.getSockets()[0]->getAddress(&address);
TestClient client;
client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client.connect(address);
TestClient client2;
client2.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client2.connect(address);
TestClient client3;
client3.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client3.connect(address);
TestClient client4;
client4.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client4.connect(address);
TestClient client5;
client5.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
client5.connect(address);
EventBaseManager::get()->getEventBase()->loop();
......
......@@ -33,13 +33,24 @@ class ClientBootstrap {
return this;
}
ClientBootstrap* connect(SocketAddress address) {
DCHECK(pipelineFactory_);
pipeline_.reset(
newPipeline(
pipelineFactory_->newPipeline(
AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
));
return this;
}
ClientBootstrap* pipelineFactory(
std::shared_ptr<PipelineFactory<Pipeline>> factory) {
pipelineFactory_ = factory;
return this;
}
Pipeline* getPipeline() {
return pipeline_.get();
}
virtual ~ClientBootstrap() {}
protected:
......@@ -48,7 +59,7 @@ class ClientBootstrap {
int port_;
virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket> socket) = 0;
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
};
} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/channel/ChannelHandler.h>
namespace folly { namespace wangle {
/**
* Dispatch a request, satisfying Promise `p` with the response;
* the returned Future is satisfied when the response is received:
* only one request is allowed at a time.
*/
template <typename Pipeline, typename Req, typename Resp = Req>
class SerialClientDispatcher : public ChannelHandlerAdapter<Req, Resp>
, public Service<Req, Resp> {
public:
typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
void setPipeline(Pipeline* pipeline) {
pipeline_ = pipeline;
pipeline->addBack(
ChannelHandlerPtr<SerialClientDispatcher<Pipeline, Req, Resp>, false>(
this));
pipeline->finalize();
}
void read(Context* ctx, Req in) override {
DCHECK(p_);
p_->setValue(std::move(in));
p_ = none;
}
virtual Future<Resp> operator()(Req arg) override {
CHECK(!p_);
DCHECK(pipeline_);
p_ = Promise<Resp>();
auto f = p_->getFuture();
pipeline_->write(std::move(arg));
return f;
}
private:
Pipeline* pipeline_{nullptr};
folly::Optional<Promise<Resp>> p_;
};
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/channel/ChannelHandler.h>
namespace folly { namespace wangle {
/**
* Dispatch requests from pipeline one at a time synchronously.
* Concurrent requests are queued in the pipeline.
*/
template <typename Req, typename Resp = Req>
class SerialServerDispatcher : public ChannelHandlerAdapter<Req, Resp> {
public:
typedef typename ChannelHandlerAdapter<Req, Resp>::Context Context;
explicit SerialServerDispatcher(Service<Req, Resp>* service)
: service_(service) {}
void read(Context* ctx, Req in) override {
auto resp = (*service_)(std::move(in)).get();
ctx->fireWrite(std::move(resp));
}
private:
Service<Req, Resp>* service_;
};
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/futures/Future.h>
#include <folly/Memory.h>
#include <folly/wangle/bootstrap/ServerBootstrap.h>
#include <folly/wangle/bootstrap/ClientBootstrap.h>
#include <folly/wangle/channel/ChannelPipeline.h>
#include <folly/wangle/channel/AsyncSocketHandler.h>
namespace folly {
/**
* A Service is an asynchronous function from Request to
* Future<Response>. It is the basic unit of the RPC interface.
*/
template <typename Req, typename Resp = Req>
class Service {
public:
virtual Future<Resp> operator()(Req request) = 0;
virtual ~Service() {}
};
/**
* A Filter acts as a decorator/transformer of a service. It may apply
* transformations to the input and output of that service:
*
* class MyService
*
* ReqA -> |
* | -> ReqB
* | <- RespB
* RespA <- |
*
* For example, you may have a service that takes Strings and parses
* them as Ints. If you want to expose this as a Network Service via
* Thrift, it is nice to isolate the protocol handling from the
* business rules. Hence you might have a Filter that converts back
* and forth between Thrift structs:
*
* [ThriftIn -> (String -> Int) -> ThriftOut]
*/
template <typename ReqA, typename RespA,
typename ReqB = ReqA, typename RespB = RespA>
class Filter {
public:
virtual Future<RespA> operator()(
Service<ReqB, RespB>* service, ReqA request) = 0;
std::unique_ptr<Service<ReqA, RespA>> compose(
Service<ReqB, RespB>* service);
virtual ~Filter() {}
};
template <typename ReqA, typename RespA,
typename ReqB = ReqA, typename RespB = RespA>
class ComposedService : public Service<ReqA, RespA> {
public:
ComposedService(Service<ReqB, RespB>* service,
Filter<ReqA, RespA, ReqB, RespB>* filter)
: service_(service)
, filter_(filter) {}
virtual Future<RespA> operator()(ReqA request) override {
return (*filter_)(service_, request);
}
~ComposedService(){}
private:
Service<ReqB, RespB>* service_;
Filter<ReqA, RespA, ReqB, RespB>* filter_;
};
template <typename ReqA, typename RespA,
typename ReqB, typename RespB>
std::unique_ptr<Service<ReqA, RespA>>
Filter<ReqA, RespA, ReqB, RespB>::compose(Service<ReqB, RespB>* service) {
return folly::make_unique<ComposedService<ReqA, RespA, ReqB, RespB>>(
service, this);
}
/**
* A factory that creates services, given a client. This lets you
* make RPC calls on the Service interface over a client's pipeline.
*
* Clients can be reused after you are done using the service.
*/
template <typename Pipeline, typename Req, typename Resp>
class ServiceFactory {
public:
virtual Future<Service<Req, Resp>*> operator()(
ClientBootstrap<Pipeline>* client) = 0;
virtual ~ServiceFactory() = default;
};
} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/wangle/service/Service.h>
#include <folly/wangle/service/ServerDispatcher.h>
#include <folly/wangle/service/ClientDispatcher.h>
namespace folly {
using namespace wangle;
typedef ChannelPipeline<IOBufQueue&, std::string> Pipeline;
class EchoService : public Service<std::string, std::string> {
public:
virtual Future<std::string> operator()(std::string req) override {
return makeFuture<std::string>(std::move(req));
}
};
class EchoIntService : public Service<std::string, int> {
public:
virtual Future<int> operator()(std::string req) override {
return makeFuture<int>(folly::to<int>(req));
}
};
class StringCodec : public ChannelHandler<IOBufQueue&, std::string,
std::string, std::unique_ptr<IOBuf>> {
public:
typedef typename ChannelHandler<
IOBufQueue&, std::string,
std::string, std::unique_ptr<IOBuf>>::Context Context;
void read(Context* ctx, IOBufQueue& q) override {
auto buf = q.pop_front();
buf->coalesce();
std::string data((const char*)buf->data(), buf->length());
ctx->fireRead(data);
}
Future<void> write(Context* ctx, std::string msg) override {
auto buf = IOBuf::copyBuffer(msg.data(), msg.length());
return ctx->fireWrite(std::move(buf));
}
};
template <typename Req, typename Resp>
class ServerPipelineFactory
: public PipelineFactory<Pipeline> {
public:
Pipeline* newPipeline(
std::shared_ptr<AsyncSocket> socket) override {
auto pipeline = new Pipeline();
pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(StringCodec());
pipeline->addBack(SerialServerDispatcher<Req, Resp>(&service_));
pipeline->finalize();
pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
return pipeline;
}
private:
EchoService service_;
};
template <typename Req, typename Resp>
class ClientPipelineFactory : public PipelineFactory<Pipeline> {
public:
Pipeline* newPipeline(
std::shared_ptr<AsyncSocket> socket) override {
auto pipeline = new Pipeline();
pipeline->addBack(AsyncSocketHandler(socket));
pipeline->addBack(StringCodec());
pipeline->template getHandler<AsyncSocketHandler>(0)->attachReadCallback();
return pipeline;
}
};
template <typename Pipeline, typename Req, typename Resp>
class ClientServiceFactory : public ServiceFactory<Pipeline, Req, Resp> {
public:
class ClientService : public Service<Req, Resp> {
public:
explicit ClientService(Pipeline* pipeline) {
dispatcher_.setPipeline(pipeline);
}
Future<Resp> operator()(Req request) override {
return dispatcher_(std::move(request));
}
private:
SerialClientDispatcher<Pipeline, Req, Resp> dispatcher_;
};
Future<Service<Req, Resp>*> operator()(
ClientBootstrap<Pipeline>* client) override {
return makeFuture<Service<Req, Resp>*>(
new ClientService(client->getPipeline()));
}
};
TEST(Wangle, ClientServerTest) {
int port = 1234;
// server
ServerBootstrap<Pipeline> server;
server.childPipeline(
std::make_shared<ServerPipelineFactory<std::string, std::string>>());
server.bind(port);
// client
ClientBootstrap<Pipeline> client;
ClientServiceFactory<Pipeline, std::string, std::string> serviceFactory;
client.pipelineFactory(
std::make_shared<ClientPipelineFactory<std::string, std::string>>());
SocketAddress addr("127.0.0.1", port);
client.connect(addr);
auto service = serviceFactory(&client).value();
auto rep = (*service)("test");
rep.then([&](std::string value) {
EXPECT_EQ("test", value);
EventBaseManager::get()->getEventBase()->terminateLoopSoon();
});
EventBaseManager::get()->getEventBase()->loopForever();
server.stop();
}
class AppendFilter : public Filter<std::string, std::string> {
public:
virtual Future<std::string> operator()(
Service<std::string, std::string>* service, std::string req) {
return (*service)(req + "\n");
}
};
class IntToStringFilter : public Filter<int, int, std::string, std::string> {
public:
virtual Future<int> operator()(
Service<std::string, std::string>* service, int req) {
return (*service)(folly::to<std::string>(req)).then([](std::string resp) {
return folly::to<int>(resp);
});
}
};
TEST(Wangle, FilterTest) {
auto service = folly::make_unique<EchoService>();
auto filter = folly::make_unique<AppendFilter>();
auto result = (*filter)(service.get(), "test");
EXPECT_EQ(result.value(), "test\n");
// Check composition
auto composed_service = filter->compose(service.get());
auto result2 = (*composed_service)("test");
EXPECT_EQ(result2.value(), "test\n");
}
TEST(Wangle, ComplexFilterTest) {
auto service = folly::make_unique<EchoService>();
auto filter = folly::make_unique<IntToStringFilter>();
auto result = (*filter)(service.get(), 1);
EXPECT_EQ(result.value(), 1);
// Check composition
auto composed_service = filter->compose(service.get());
auto result2 = (*composed_service)(2);
EXPECT_EQ(result2.value(), 2);
}
class ChangeTypeFilter : public Filter<int, std::string, std::string, int> {
public:
virtual Future<std::string> operator()(
Service<std::string, int>* service, int req) {
return (*service)(folly::to<std::string>(req)).then([](int resp) {
return folly::to<std::string>(resp);
});
}
};
TEST(Wangle, SuperComplexFilterTest) {
auto service = folly::make_unique<EchoIntService>();
auto filter = folly::make_unique<ChangeTypeFilter>();
auto result = (*filter)(service.get(), 1);
EXPECT_EQ(result.value(), "1");
// Check composition
auto composed_service = filter->compose(service.get());
auto result2 = (*composed_service)(2);
EXPECT_EQ(result2.value(), "2");
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
google::InitGoogleLogging(argv[0]);
google::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
}
} // namespace
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment