Commit a484a114 authored by Orvid King's avatar Orvid King Committed by Facebook Github Bot

Shift AsyncPipe to NetworkSocket

Summary:
Shift AsyncPipe to be backed by NetworkSocket, as the only things this can be used for are listenable to by libevent. At the moment, the APIs for what are listenable to by libevent are all described in terms of NetworkSocket.
While, by name, this may seem incorrect, in practice, it is correct in function, due to the limitations of how NetworkSocket's are obtainable on Windows, and all existing calls to these functions on posix platforms are already functional by being file descriptor backed.
The plan is to have this, slightly badly named for the location, interface for the moment, and to clean it up in the future.

Reviewed By: yfeldblum

Differential Revision: D14702972

fbshipit-source-id: 785281a7bb9fafc63f5295e33f5bc136b9bc70f1
parent dbff71d2
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <folly/io/async/AsyncPipe.h> #include <folly/io/async/AsyncPipe.h>
#include <folly/FileUtil.h> #include <folly/FileUtil.h>
#include <folly/detail/FileUtilDetail.h>
#include <folly/io/async/AsyncSocketException.h> #include <folly/io/async/AsyncSocketException.h>
using folly::IOBuf; using folly::IOBuf;
...@@ -42,18 +43,28 @@ void AsyncPipeReader::failRead(const AsyncSocketException& ex) { ...@@ -42,18 +43,28 @@ void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
void AsyncPipeReader::close() { void AsyncPipeReader::close() {
unregisterHandler(); unregisterHandler();
if (fd_ >= 0) { if (fd_ != NetworkSocket()) {
changeHandlerFD(NetworkSocket()); changeHandlerFD(NetworkSocket());
if (closeCb_) { if (closeCb_) {
closeCb_(fd_); closeCb_(fd_);
} else { } else {
::close(fd_); netops::close(fd_);
} }
fd_ = -1; fd_ = NetworkSocket();
} }
} }
#if _WIN32
static int recv_internal(NetworkSocket s, void* buf, size_t count) {
auto r = netops::recv(s, buf, count, 0);
if (r == -1 && WSAGetLastError() == WSAEWOULDBLOCK) {
errno = EAGAIN;
}
return r;
}
#endif
void AsyncPipeReader::handlerReady(uint16_t events) noexcept { void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
DestructorGuard dg(this); DestructorGuard dg(this);
CHECK(events & EventHandler::READ); CHECK(events & EventHandler::READ);
...@@ -104,7 +115,13 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept { ...@@ -104,7 +115,13 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
} }
// Perform the read // Perform the read
ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen); #if _WIN32
// On Windows you can't call read on a socket, so call recv instead.
ssize_t bytesRead =
folly::fileutil_detail::wrapNoInt(recv_internal, fd_, buf, buflen);
#else
ssize_t bytesRead = folly::readNoInt(fd_.toFd(), buf, buflen);
#endif
if (bytesRead > 0) { if (bytesRead > 0) {
if (movable) { if (movable) {
...@@ -190,15 +207,15 @@ void AsyncPipeWriter::closeNow() { ...@@ -190,15 +207,15 @@ void AsyncPipeWriter::closeNow() {
failAllWrites(AsyncSocketException( failAllWrites(AsyncSocketException(
AsyncSocketException::NOT_OPEN, "closed with pending writes")); AsyncSocketException::NOT_OPEN, "closed with pending writes"));
} }
if (fd_ >= 0) { if (fd_ != NetworkSocket()) {
unregisterHandler(); unregisterHandler();
changeHandlerFD(NetworkSocket()); changeHandlerFD(NetworkSocket());
if (closeCb_) { if (closeCb_) {
closeCb_(fd_); closeCb_(fd_);
} else { } else {
close(fd_); netops::close(fd_);
} }
fd_ = -1; fd_ = NetworkSocket();
} }
} }
...@@ -220,6 +237,16 @@ void AsyncPipeWriter::handlerReady(uint16_t events) noexcept { ...@@ -220,6 +237,16 @@ void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
handleWrite(); handleWrite();
} }
#if _WIN32
static int send_internal(NetworkSocket s, const void* buf, size_t count) {
auto r = netops::send(s, buf, count, 0);
if (r == -1 && WSAGetLastError() == WSAEWOULDBLOCK) {
errno = EAGAIN;
}
return r;
}
#endif
void AsyncPipeWriter::handleWrite() { void AsyncPipeWriter::handleWrite() {
DestructorGuard dg(this); DestructorGuard dg(this);
assert(!queue_.empty()); assert(!queue_.empty());
...@@ -230,7 +257,13 @@ void AsyncPipeWriter::handleWrite() { ...@@ -230,7 +257,13 @@ void AsyncPipeWriter::handleWrite() {
// someday, support writev. The logic for partial writes is a bit complex // someday, support writev. The logic for partial writes is a bit complex
const IOBuf* head = curQueue.front(); const IOBuf* head = curQueue.front();
CHECK(head->length()); CHECK(head->length());
ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length()); #if _WIN32
// On Windows you can't call write on a socket.
ssize_t rc = folly::fileutil_detail::wrapNoInt(
send_internal, fd_, head->data(), head->length());
#else
ssize_t rc = folly::writeNoInt(fd_.toFd(), head->data(), head->length());
#endif
if (rc < 0) { if (rc < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
// pipe is full // pipe is full
......
...@@ -39,14 +39,20 @@ class AsyncPipeReader : public EventHandler, ...@@ -39,14 +39,20 @@ class AsyncPipeReader : public EventHandler,
unique_ptr<AsyncPipeReader, folly::DelayedDestruction::Destructor> unique_ptr<AsyncPipeReader, folly::DelayedDestruction::Destructor>
UniquePtr; UniquePtr;
template <typename... Args> static UniquePtr newReader(
static UniquePtr newReader(Args&&... args) { folly::EventBase* eventBase,
return UniquePtr(new AsyncPipeReader(std::forward<Args>(args)...)); NetworkSocket pipeFd) {
return UniquePtr(new AsyncPipeReader(eventBase, pipeFd));
} }
static UniquePtr newReader(folly::EventBase* eventBase, int pipeFd) {
return newReader(eventBase, NetworkSocket::fromFd(pipeFd));
}
AsyncPipeReader(folly::EventBase* eventBase, NetworkSocket pipeFd)
: EventHandler(eventBase, pipeFd), fd_(pipeFd) {}
AsyncPipeReader(folly::EventBase* eventBase, int pipeFd) AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
: EventHandler(eventBase, folly::NetworkSocket::fromFd(pipeFd)), : AsyncPipeReader(eventBase, NetworkSocket::fromFd(pipeFd)) {}
fd_(pipeFd) {}
/** /**
* Set the read callback and automatically install/uninstall the handler * Set the read callback and automatically install/uninstall the handler
...@@ -74,7 +80,7 @@ class AsyncPipeReader : public EventHandler, ...@@ -74,7 +80,7 @@ class AsyncPipeReader : public EventHandler,
/** /**
* Set a special hook to close the socket (otherwise, will call close()) * Set a special hook to close the socket (otherwise, will call close())
*/ */
void setCloseCallback(std::function<void(int)> closeCb) { void setCloseCallback(std::function<void(NetworkSocket)> closeCb) {
closeCb_ = closeCb; closeCb_ = closeCb;
} }
...@@ -85,9 +91,9 @@ class AsyncPipeReader : public EventHandler, ...@@ -85,9 +91,9 @@ class AsyncPipeReader : public EventHandler,
void failRead(const AsyncSocketException& ex); void failRead(const AsyncSocketException& ex);
void close(); void close();
int fd_; NetworkSocket fd_;
AsyncReader::ReadCallback* readCallback_{nullptr}; AsyncReader::ReadCallback* readCallback_{nullptr};
std::function<void(int)> closeCb_; std::function<void(NetworkSocket)> closeCb_;
}; };
/** /**
...@@ -101,14 +107,20 @@ class AsyncPipeWriter : public EventHandler, ...@@ -101,14 +107,20 @@ class AsyncPipeWriter : public EventHandler,
unique_ptr<AsyncPipeWriter, folly::DelayedDestruction::Destructor> unique_ptr<AsyncPipeWriter, folly::DelayedDestruction::Destructor>
UniquePtr; UniquePtr;
template <typename... Args> static UniquePtr newWriter(
static UniquePtr newWriter(Args&&... args) { folly::EventBase* eventBase,
return UniquePtr(new AsyncPipeWriter(std::forward<Args>(args)...)); NetworkSocket pipeFd) {
return UniquePtr(new AsyncPipeWriter(eventBase, pipeFd));
}
static UniquePtr newWriter(folly::EventBase* eventBase, int pipeFd) {
return newWriter(eventBase, NetworkSocket::fromFd(pipeFd));
} }
AsyncPipeWriter(folly::EventBase* eventBase, NetworkSocket pipeFd)
: EventHandler(eventBase, pipeFd), fd_(pipeFd) {}
AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd) AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
: EventHandler(eventBase, folly::NetworkSocket::fromFd(pipeFd)), : AsyncPipeWriter(eventBase, NetworkSocket::fromFd(pipeFd)) {}
fd_(pipeFd) {}
/** /**
* Asynchronously write the given iobuf to this pipe, and invoke the callback * Asynchronously write the given iobuf to this pipe, and invoke the callback
...@@ -121,7 +133,7 @@ class AsyncPipeWriter : public EventHandler, ...@@ -121,7 +133,7 @@ class AsyncPipeWriter : public EventHandler,
/** /**
* Set a special hook to close the socket (otherwise, will call close()) * Set a special hook to close the socket (otherwise, will call close())
*/ */
void setCloseCallback(std::function<void(int)> closeCb) { void setCloseCallback(std::function<void(NetworkSocket)> closeCb) {
closeCb_ = closeCb; closeCb_ = closeCb;
} }
...@@ -129,7 +141,7 @@ class AsyncPipeWriter : public EventHandler, ...@@ -129,7 +141,7 @@ class AsyncPipeWriter : public EventHandler,
* Returns true if the pipe is closed * Returns true if the pipe is closed
*/ */
bool closed() const { bool closed() const {
return (fd_ < 0 || closeOnEmpty_); return (fd_ == NetworkSocket() || closeOnEmpty_);
} }
/** /**
...@@ -175,10 +187,10 @@ class AsyncPipeWriter : public EventHandler, ...@@ -175,10 +187,10 @@ class AsyncPipeWriter : public EventHandler,
void handleWrite(); void handleWrite();
void failAllWrites(const AsyncSocketException& ex); void failAllWrites(const AsyncSocketException& ex);
int fd_; NetworkSocket fd_;
std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_; std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
bool closeOnEmpty_{false}; bool closeOnEmpty_{false};
std::function<void(int)> closeCb_; std::function<void(NetworkSocket)> closeCb_;
~AsyncPipeWriter() override { ~AsyncPipeWriter() override {
closeNow(); closeNow();
......
...@@ -1008,8 +1008,8 @@ bool AsyncSSLSocket::willBlock( ...@@ -1008,8 +1008,8 @@ bool AsyncSSLSocket::willBlock(
// a native handle type to pass to the constructor. // a native handle type to pass to the constructor.
auto native_handle = NetworkSocket::native_handle_type(ofd); auto native_handle = NetworkSocket::native_handle_type(ofd);
auto asyncPipeReader = AsyncPipeReader::newReader( auto asyncPipeReader =
eventBase_, NetworkSocket(native_handle).toFd()); AsyncPipeReader::newReader(eventBase_, NetworkSocket(native_handle));
auto asyncPipeReaderPtr = asyncPipeReader.get(); auto asyncPipeReaderPtr = asyncPipeReader.get();
if (!asyncOperationFinishCallback_) { if (!asyncOperationFinishCallback_) {
asyncOperationFinishCallback_.reset( asyncOperationFinishCallback_.reset(
......
...@@ -104,8 +104,10 @@ class AsyncPipeTest : public Test { ...@@ -104,8 +104,10 @@ class AsyncPipeTest : public Test {
EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0); EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0); EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
reader_ = folly::AsyncPipeReader::newReader(&eventBase_, pipeFds_[0]); reader_ = folly::AsyncPipeReader::newReader(
writer_ = folly::AsyncPipeWriter::newWriter(&eventBase_, pipeFds_[1]); &eventBase_, folly::NetworkSocket::fromFd(pipeFds_[0]));
writer_ = folly::AsyncPipeWriter::newWriter(
&eventBase_, folly::NetworkSocket::fromFd(pipeFds_[1]));
readCallback_.setMovable(movable); readCallback_.setMovable(movable);
} }
......
...@@ -1402,8 +1402,9 @@ static int customRsaPrivEnc( ...@@ -1402,8 +1402,9 @@ static int customRsaPrivEnc(
int ret = 0; int ret = 0;
int* retptr = &ret; int* retptr = &ret;
auto asyncPipeWriter = auto hand = folly::NetworkSocket::native_handle_type(pipefds[1]);
folly::AsyncPipeWriter::newWriter(asyncJobEvb, pipefds[1]); auto asyncPipeWriter = folly::AsyncPipeWriter::newWriter(
asyncJobEvb, folly::NetworkSocket(hand));
asyncJobEvb->runInEventBaseThread([retptr = retptr, asyncJobEvb->runInEventBaseThread([retptr = retptr,
flen = flen, flen = flen,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment