Commit 4a0d0c51 authored by Christopher Dykes's avatar Christopher Dykes Committed by Facebook Github Bot 8

Properly support socketpair and reading and writing non-blocking sockets

Summary:
Also handle invalid file descriptors correctly and switch away from `WSASendMsg` for the implementation of `sendmsg` due to limitations imposed by WinSock.
This also fixes up a few places that were explicitly referencing the global version of some socket functions, which was bypassing the socket portability layer.

Reviewed By: yfeldblum

Differential Revision: D3692358

fbshipit-source-id: 05f394dcc9bac0591df7581345071ba2501b7695
parent 5d8ca09a
...@@ -468,7 +468,7 @@ void AsyncSocket::connect(ConnectCallback* callback, ...@@ -468,7 +468,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
} }
int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) { int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
int rv = ::connect(fd_, saddr, len); int rv = fsp::connect(fd_, saddr, len);
if (rv < 0) { if (rv < 0) {
auto errnoCopy = errno; auto errnoCopy = errno;
if (errnoCopy == EINPROGRESS) { if (errnoCopy == EINPROGRESS) {
......
...@@ -246,6 +246,7 @@ class TestServer { ...@@ -246,6 +246,7 @@ class TestServer {
} }
int acceptFD(int timeout=50) { int acceptFD(int timeout=50) {
namespace fsp = folly::portability::sockets;
struct pollfd pfd; struct pollfd pfd;
pfd.fd = fd_; pfd.fd = fd_;
pfd.events = POLLIN; pfd.events = POLLIN;
...@@ -261,7 +262,7 @@ class TestServer { ...@@ -261,7 +262,7 @@ class TestServer {
errno); errno);
} }
int acceptedFd = ::accept(fd_, nullptr, nullptr); int acceptedFd = fsp::accept(fd_, nullptr, nullptr);
if (acceptedFd < 0) { if (acceptedFd < 0) {
throw folly::AsyncSocketException( throw folly::AsyncSocketException(
folly::AsyncSocketException::INTERNAL_ERROR, folly::AsyncSocketException::INTERNAL_ERROR,
......
...@@ -698,7 +698,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) { ...@@ -698,7 +698,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
acceptedSocket->flush(); acceptedSocket->flush();
// Shutdown writes to the accepted socket. This will cause it to see EOF // Shutdown writes to the accepted socket. This will cause it to see EOF
// and uninstall the read callback. // and uninstall the read callback.
::shutdown(acceptedSocket->getSocketFD(), SHUT_WR); shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
// Loop // Loop
evb.loop(); evb.loop();
...@@ -791,7 +791,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) { ...@@ -791,7 +791,7 @@ TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
acceptedSocket->flush(); acceptedSocket->flush();
// Shutdown writes to the accepted socket. This will cause it to see EOF // Shutdown writes to the accepted socket. This will cause it to see EOF
// and uninstall the read callback. // and uninstall the read callback.
::shutdown(acceptedSocket->getSocketFD(), SHUT_WR); shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
// Loop // Loop
evb.loop(); evb.loop();
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <event2/util.h>
#include <MSWSock.h> #include <MSWSock.h>
#include <folly/ScopeGuard.h> #include <folly/ScopeGuard.h>
...@@ -48,6 +50,9 @@ bool is_fh_socket(int fh) { ...@@ -48,6 +50,9 @@ bool is_fh_socket(int fh) {
} }
SOCKET fd_to_socket(int fd) { SOCKET fd_to_socket(int fd) {
if (fd == -1) {
return INVALID_SOCKET;
}
// We do this in a roundabout way to allow us to compile even if // We do this in a roundabout way to allow us to compile even if
// we're doing a bit of trickery to ensure that things aren't // we're doing a bit of trickery to ensure that things aren't
// being implicitly converted to a SOCKET by temporarily // being implicitly converted to a SOCKET by temporarily
...@@ -59,6 +64,9 @@ SOCKET fd_to_socket(int fd) { ...@@ -59,6 +64,9 @@ SOCKET fd_to_socket(int fd) {
} }
int socket_to_fd(SOCKET s) { int socket_to_fd(SOCKET s) {
if (s == INVALID_SOCKET) {
return -1;
}
return _open_osfhandle((intptr_t)s, O_RDWR | O_BINARY); return _open_osfhandle((intptr_t)s, O_RDWR | O_BINARY);
} }
...@@ -79,7 +87,11 @@ int bind(int s, const struct sockaddr* name, socklen_t namelen) { ...@@ -79,7 +87,11 @@ int bind(int s, const struct sockaddr* name, socklen_t namelen) {
} }
int connect(int s, const struct sockaddr* name, socklen_t namelen) { int connect(int s, const struct sockaddr* name, socklen_t namelen) {
return wrapSocketFunction<int>(::connect, s, name, namelen); auto r = wrapSocketFunction<int>(::connect, s, name, namelen);
if (r == -1 && errno == WSAEWOULDBLOCK) {
errno = EINPROGRESS;
}
return r;
} }
int getpeername(int s, struct sockaddr* name, socklen_t* namelen) { int getpeername(int s, struct sockaddr* name, socklen_t* namelen) {
...@@ -228,23 +240,23 @@ ssize_t sendmsg(int s, const struct msghdr* message, int fl) { ...@@ -228,23 +240,23 @@ ssize_t sendmsg(int s, const struct msghdr* message, int fl) {
if (message->msg_name != nullptr || message->msg_namelen != 0) { if (message->msg_name != nullptr || message->msg_namelen != 0) {
return (ssize_t)-1; return (ssize_t)-1;
} }
WSAMSG msg;
msg.name = nullptr; // Unfortunately, WSASendMsg requires the socket to have been opened
msg.namelen = 0; // as either SOCK_DGRAM or SOCK_RAW, but sendmsg has no such requirement,
msg.Control.buf = (CHAR*)message->msg_control; // so we have to implement it based on send instead :(
msg.Control.len = (ULONG)message->msg_controllen; ssize_t bytesSent = 0;
msg.dwFlags = 0;
msg.dwBufferCount = (DWORD)message->msg_iovlen;
msg.lpBuffers = new WSABUF[message->msg_iovlen];
SCOPE_EXIT { delete[] msg.lpBuffers; };
for (size_t i = 0; i < message->msg_iovlen; i++) { for (size_t i = 0; i < message->msg_iovlen; i++) {
msg.lpBuffers[i].buf = (CHAR*)message->msg_iov[i].iov_base; auto r = ::send(
msg.lpBuffers[i].len = (ULONG)message->msg_iov[i].iov_len; h,
(const char*)message->msg_iov[i].iov_base,
message->msg_iov[i].iov_len,
message->msg_flags);
if (r == -1) {
return -1;
}
bytesSent += r;
} }
return bytesSent;
DWORD bytesSent;
int res = WSASendMsg(h, &msg, 0, &bytesSent, nullptr, nullptr);
return res == 0 ? (ssize_t)bytesSent : -1;
} }
ssize_t sendto( ssize_t sendto(
...@@ -309,8 +321,17 @@ int socket(int af, int type, int protocol) { ...@@ -309,8 +321,17 @@ int socket(int af, int type, int protocol) {
} }
int socketpair(int domain, int type, int protocol, int sv[2]) { int socketpair(int domain, int type, int protocol, int sv[2]) {
// Stub this out for now, to get things compiling. if (domain != PF_UNIX || type != SOCK_STREAM || protocol != 0) {
return -1; return -1;
}
intptr_t pair[2];
auto r = evutil_socketpair(AF_INET, type, protocol, pair);
if (r == -1) {
return r;
}
sv[0] = _open_osfhandle(pair[0], O_RDWR | O_BINARY);
sv[1] = _open_osfhandle(pair[1], O_RDWR | O_BINARY);
return 0;
} }
} }
} }
......
...@@ -37,7 +37,7 @@ using sa_family_t = ADDRESS_FAMILY; ...@@ -37,7 +37,7 @@ using sa_family_t = ADDRESS_FAMILY;
// We don't actually support either of these flags // We don't actually support either of these flags
// currently. // currently.
#define MSG_DONTWAIT 1 #define MSG_DONTWAIT 0
#define MSG_EOR 0 #define MSG_EOR 0
struct msghdr { struct msghdr {
void* msg_name; void* msg_name;
...@@ -61,6 +61,7 @@ struct sockaddr_un { ...@@ -61,6 +61,7 @@ struct sockaddr_un {
// These are the same, but PF_LOCAL // These are the same, but PF_LOCAL
// isn't defined by WinSock. // isn't defined by WinSock.
#define PF_LOCAL PF_UNIX #define PF_LOCAL PF_UNIX
#define SO_REUSEPORT SO_REUSEADDR
// Someone thought it would be a good idea // Someone thought it would be a good idea
// to define a field via a macro... // to define a field via a macro...
...@@ -71,6 +72,7 @@ namespace folly { ...@@ -71,6 +72,7 @@ namespace folly {
namespace portability { namespace portability {
namespace sockets { namespace sockets {
#ifndef _WIN32 #ifndef _WIN32
using ::accept;
using ::bind; using ::bind;
using ::connect; using ::connect;
using ::getpeername; using ::getpeername;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment