Commit 96e30d31 authored by James Sedgwick's avatar James Sedgwick Committed by Dave Watson

AsyncSocketHandler

Summary:
mostly copypasta from TAsyncTransportHandler

Test Plan: compiles

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@

FB internal diff: D1690973

Signature: t1:1690973:1416529528:4feb187a68ad5405662b9b0efb160edd253a2977
parent 899edcb7
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/wangle/channel/ChannelHandler.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
namespace folly { namespace wangle {
class AsyncSocketHandler
: public folly::wangle::BytesToBytesHandler,
public AsyncSocket::ReadCallback {
public:
explicit AsyncSocketHandler(
std::shared_ptr<AsyncSocket> socket)
: socket_(std::move(socket)) {}
AsyncSocketHandler(AsyncSocketHandler&&) = default;
~AsyncSocketHandler() {
if (socket_) {
detachReadCallback();
}
}
void attachReadCallback() {
socket_->setReadCB(socket_->good() ? this : nullptr);
}
void detachReadCallback() {
if (socket_->getReadCallback() == this) {
socket_->setReadCB(nullptr);
}
}
void attachEventBase(folly::EventBase* eventBase) {
if (eventBase && !socket_->getEventBase()) {
socket_->attachEventBase(eventBase);
}
}
void detachEventBase() {
detachReadCallback();
if (socket_->getEventBase()) {
socket_->detachEventBase();
}
}
void attachPipeline(Context* ctx) override {
CHECK(!ctx_);
ctx_ = ctx;
}
folly::wangle::Future<void> write(
Context* ctx,
std::unique_ptr<folly::IOBuf> buf) override {
if (UNLIKELY(!buf)) {
return folly::wangle::makeFuture();
}
if (!socket_->good()) {
VLOG(5) << "socket is closed in write()";
return folly::wangle::makeFuture<void>(AsyncSocketException(
AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
"socket is closed in write()"));
}
auto cb = new WriteCallback();
auto future = cb->promise_.getFuture();
socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
return future;
};
folly::wangle::Future<void> close(Context* ctx) {
if (socket_) {
detachReadCallback();
socket_->closeNow();
}
return folly::wangle::makeFuture();
}
// Must override to avoid warnings about hidden overloaded virtual due to
// AsyncSocket::ReadCallback::readEOF()
void readEOF(Context* ctx) override {
ctx->fireReadEOF();
}
void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
const auto readBufferSettings = ctx_->getReadBufferSettings();
const auto ret = bufQueue_.preallocate(
readBufferSettings.first,
readBufferSettings.second);
*bufReturn = ret.first;
*lenReturn = ret.second;
}
void readDataAvailable(size_t len) noexcept override {
bufQueue_.postallocate(len);
ctx_->fireRead(bufQueue_);
}
void readEOF() noexcept override {
ctx_->fireReadEOF();
}
void readErr(const AsyncSocketException& ex)
noexcept override {
ctx_->fireReadException(ex);
}
private:
class WriteCallback : private AsyncSocket::WriteCallback {
void writeSuccess() noexcept override {
promise_.setValue();
delete this;
}
void writeErr(size_t bytesWritten,
const AsyncSocketException& ex)
noexcept override {
promise_.setException(ex);
delete this;
}
private:
friend class AsyncSocketHandler;
folly::wangle::Promise<void> promise_;
};
Context* ctx_{nullptr};
folly::IOBufQueue bufQueue_;
std::shared_ptr<AsyncSocket> socket_{nullptr};
};
}}
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include <folly/experimental/wangle/channel/ChannelHandler.h> #include <folly/experimental/wangle/channel/ChannelHandler.h>
#include <folly/experimental/wangle/channel/ChannelPipeline.h> #include <folly/experimental/wangle/channel/ChannelPipeline.h>
#include <folly/experimental/wangle/channel/AsyncSocketHandler.h>
#include <folly/experimental/wangle/channel/OutputBufferingHandler.h>
#include <folly/io/IOBufQueue.h> #include <folly/io/IOBufQueue.h>
#include <folly/Memory.h> #include <folly/Memory.h>
#include <folly/Conv.h> #include <folly/Conv.h>
...@@ -103,3 +105,13 @@ TEST(ChannelTest, PlzCompile2) { ...@@ -103,3 +105,13 @@ TEST(ChannelTest, PlzCompile2) {
.finalize(); .finalize();
pipeline.read(42); pipeline.read(42);
} }
TEST(ChannelTest, HandlersCompile) {
EventBase eb;
auto socket = AsyncSocket::newSocket(&eb);
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
pipeline
.addBack(AsyncSocketHandler(socket))
.addBack(OutputBufferingHandler())
.finalize();
}
...@@ -106,7 +106,7 @@ class AsyncSocket : virtual public AsyncTransport { ...@@ -106,7 +106,7 @@ class AsyncSocket : virtual public AsyncTransport {
* *
* If getReadBuffer() throws an exception, returns a nullptr buffer, or * If getReadBuffer() throws an exception, returns a nullptr buffer, or
* returns a 0 length, the ReadCallback will be uninstalled and its * returns a 0 length, the ReadCallback will be uninstalled and its
* readError() method will be invoked. * readErr() method will be invoked.
* *
* getReadBuffer() is not allowed to change the transport state before it * getReadBuffer() is not allowed to change the transport state before it
* returns. (For example, it should never uninstall the read callback, or * returns. (For example, it should never uninstall the read callback, or
...@@ -144,11 +144,11 @@ class AsyncSocket : virtual public AsyncTransport { ...@@ -144,11 +144,11 @@ class AsyncSocket : virtual public AsyncTransport {
virtual void readEOF() noexcept = 0; virtual void readEOF() noexcept = 0;
/** /**
* readError() will be invoked if an error occurs reading from the * readErr() will be invoked if an error occurs reading from the
* transport. * transport.
* *
* The read callback will be automatically uninstalled immediately before * The read callback will be automatically uninstalled immediately before
* readError() is invoked. * readErr() is invoked.
* *
* @param ex An exception describing the error that occurred. * @param ex An exception describing the error that occurred.
*/ */
...@@ -174,7 +174,7 @@ class AsyncSocket : virtual public AsyncTransport { ...@@ -174,7 +174,7 @@ class AsyncSocket : virtual public AsyncTransport {
virtual void writeSuccess() noexcept = 0; virtual void writeSuccess() noexcept = 0;
/** /**
* writeError() will be invoked if an error occurs writing the data. * writeErr() will be invoked if an error occurs writing the data.
* *
* @param bytesWritten The number of bytes that were successfull * @param bytesWritten The number of bytes that were successfull
* @param ex An exception describing the error that occurred. * @param ex An exception describing the error that occurred.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment