Commit 589052a6 authored by James Sedgwick's avatar James Sedgwick Committed by Praveen Kumar Ramakrishnan

Telnet client

Summary:
A client example to match telnet server.

Required a couple additions:
* future result when socket actually connects, similar to netty
* clients support IOThreadPoolExecutor groups
* a pipeline stage to make sure everything runs in the right eventbase thread.

Test Plan:
fbconfig follg/wangle/example/telnet && fbmake dbg
telnet_server
telnet_client

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2010289

Signature: t1:2010289:1430766232:65c6f946e454000f6ea9f41b49197ddbeea5ba3f
parent adba2ec8
......@@ -276,6 +276,7 @@ nobase_follyinclude_HEADERS = \
wangle/bootstrap/ServerSocketFactory.h \
wangle/bootstrap/ClientBootstrap.h \
wangle/channel/AsyncSocketHandler.h \
wangle/channel/EventBaseHandler.h \
wangle/channel/Handler.h \
wangle/channel/HandlerContext.h \
wangle/channel/HandlerContext-inl.h \
......
......@@ -34,10 +34,9 @@ class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
std::unique_ptr<BytesPipeline, folly::DelayedDestruction::Destructor>
newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
// We probably aren't connected immedately, check after a small delay
EventBaseManager::get()->getEventBase()->tryRunAfterDelay([sock](){
CHECK(sock->good());
CHECK(sock->readable());
}, 100);
return nullptr;
......
......@@ -16,6 +16,9 @@
#pragma once
#include <folly/wangle/channel/Pipeline.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBaseManager.h>
namespace folly {
......@@ -25,20 +28,60 @@ namespace folly {
*/
template <typename Pipeline>
class ClientBootstrap {
class ConnectCallback : public AsyncSocket::ConnectCallback {
public:
ConnectCallback(Promise<Pipeline*> promise, ClientBootstrap* bootstrap)
: promise_(std::move(promise))
, bootstrap_(bootstrap) {}
void connectSuccess() noexcept override {
promise_.setValue(bootstrap_->getPipeline());
delete this;
}
void connectErr(const AsyncSocketException& ex) noexcept override {
promise_.setException(
folly::make_exception_wrapper<AsyncSocketException>(ex));
delete this;
}
private:
Promise<Pipeline*> promise_;
ClientBootstrap* bootstrap_;
};
public:
ClientBootstrap() {
}
ClientBootstrap* group(
std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group) {
group_ = group;
return this;
}
ClientBootstrap* bind(int port) {
port_ = port;
return this;
}
ClientBootstrap* connect(SocketAddress address) {
Future<Pipeline*> connect(SocketAddress address) {
DCHECK(pipelineFactory_);
pipeline_=
pipelineFactory_->newPipeline(
AsyncSocket::newSocket(
EventBaseManager::get()->getEventBase(), address));
return this;
auto base = EventBaseManager::get()->getEventBase();
if (group_) {
base = group_->getEventBase();
}
Future<Pipeline*> retval((Pipeline*)nullptr);
base->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
auto socket = AsyncSocket::newSocket(base);
Promise<Pipeline*> promise;
retval = promise.getFuture();
socket->connect(
new ConnectCallback(std::move(promise), this), address);
pipeline_ = pipelineFactory_->newPipeline(socket);
if (pipeline_) {
pipeline_->attachTransport(socket);
}
});
return retval;
}
ClientBootstrap* pipelineFactory(
......@@ -60,6 +103,7 @@ class ClientBootstrap {
int port_;
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group_;
};
} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace wangle {
class EventBaseHandler : public OutboundBytesToBytesHandler {
public:
folly::Future<void> write(
Context* ctx,
std::unique_ptr<folly::IOBuf> buf) override {
folly::Future<void> retval;
DCHECK(ctx->getTransport());
DCHECK(ctx->getTransport()->getEventBase());
ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
retval = ctx->fireWrite(std::move(buf));
});
return retval;
}
Future<void> close(Context* ctx) override {
DCHECK(ctx->getTransport());
DCHECK(ctx->getTransport()->getEventBase());
Future<void> retval;
ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
retval = ctx->fireClose();
});
return retval;
}
};
}} // namespace
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment