Commit 14812ec5 authored by Prabhakaran Ganesan's avatar Prabhakaran Ganesan Committed by Facebook Github Bot

Reflect TOS from thrift server

Summary:
There are efforts as part of the OlympicQos initiative to ensure QOS configuration for all network transfers. As part of these efforts, one of the requirements is the ensure TOS marking for server-->client traffic. Since the server does not have a way of knowing the priority of the response, the server needs to reflect the TOS settings from the client. The idea is the cache the client's SYN, derive the TOS from the same and set hte same TOS for the responses. Made changes to

1) setsockopt(TCP_SAVE_SYN) in AsyncServerSocket() for all listen sockets
2) Upon an accepted connection, getsockopt(TCP_SAVED_SYN) to get the TOS from IPv6 header and setsockopt to set TOS on the connected socket. Do this conditionallly (based on tosReflect_ setting and only for family IPv6)
3) Added API setTosReflect() to enable the feature and included flag on thrift server to control the same

Reviewed By: yfeldblum

Differential Revision: D9566136

fbshipit-source-id: ad61fed4e205893e2b7c5654e1b4f21ed5472c06
parent 64c7918d
......@@ -39,6 +39,14 @@ namespace fsp = folly::portability::sockets;
namespace folly {
#ifndef TCP_SAVE_SYN
#define TCP_SAVE_SYN 27
#endif
#ifndef TCP_SAVED_SYN
#define TCP_SAVED_SYN 28
#endif
static constexpr bool msgErrQueueSupported =
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
true;
......@@ -742,6 +750,35 @@ int AsyncServerSocket::createSocket(int family) {
return fd;
}
/**
* Enable/Disable TOS reflection for the server socket
* If enabled, the 'accepted' connections will reflect the
* TOS derived from the client's connect request
*/
void AsyncServerSocket::setTosReflect(bool enable) {
if (!kIsLinux || enable == false) {
tosReflect_ = false;
return;
}
for (auto& handler : sockets_) {
if (handler.socket_ < 0) {
continue;
}
int val = (enable) ? 1 : 0;
int ret = setsockopt(
handler.socket_, IPPROTO_TCP, TCP_SAVE_SYN, &val, sizeof(val));
if (ret == 0) {
VLOG(10) << "Enabled SYN save for socket " << handler.socket_;
} else {
folly::throwSystemError(errno, "failed to enable TOS reflect");
}
}
tosReflect_ = true;
}
void AsyncServerSocket::setupSocket(int fd, int family) {
// Put the socket in non-blocking mode
if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
......@@ -850,6 +887,40 @@ void AsyncServerSocket::handlerReady(
connectionEventCallback_->onConnectionAccepted(clientSocket, address);
}
// Connection accepted, get the SYN packet from the client if
// TOS reflect is enabled
if (kIsLinux && clientSocket >= 0 && tosReflect_) {
std::array<uint32_t, 64> buffer;
socklen_t len = sizeof(buffer);
int ret =
getsockopt(clientSocket, IPPROTO_TCP, TCP_SAVED_SYN, &buffer, &len);
if (ret == 0) {
uint32_t tosWord = folly::Endian::big(buffer[0]);
if (addressFamily == AF_INET6) {
tosWord = (tosWord & 0x0FC00000) >> 20;
ret = setsockopt(
clientSocket,
IPPROTO_IPV6,
IPV6_TCLASS,
&tosWord,
sizeof(tosWord));
} else if (addressFamily == AF_INET) {
tosWord = (tosWord & 0x00FC0000) >> 16;
ret = setsockopt(
clientSocket, IPPROTO_IP, IP_TOS, &tosWord, sizeof(tosWord));
}
if (ret != 0) {
LOG(ERROR) << "Unable to set TOS for accepted socket "
<< clientSocket;
}
} else {
LOG(ERROR) << "Unable to get SYN packet for accepted socket "
<< clientSocket;
}
}
std::chrono::time_point<std::chrono::steady_clock> nowMs =
std::chrono::steady_clock::now();
auto timeSinceLastAccept = std::max<int64_t>(
......
......@@ -577,6 +577,15 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
acceptRateAdjustSpeed_ = speed;
}
/**
* Enable/Disable TOS reflection for the server socket
*/
void setTosReflect(bool enable);
bool getTosReflect() {
return tosReflect_;
}
/**
* Get the number of connections dropped by the AsyncServerSocket
*/
......@@ -878,6 +887,7 @@ class AsyncServerSocket : public DelayedDestruction, public AsyncSocketBase {
uint32_t tfoMaxQueueSize_{0};
std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
ConnectionEventCallback* connectionEventCallback_{nullptr};
bool tosReflect_{false};
};
} // namespace folly
......@@ -22,6 +22,10 @@
namespace folly {
#ifndef TCP_SAVE_SYN
#define TCP_SAVE_SYN 27
#endif
TEST(AsyncSocketTest, getSockOpt) {
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, 0);
......@@ -86,4 +90,31 @@ TEST(AsyncSocketTest, duplicateBind) {
EXPECT_THROW(server2->bind(address.getPort()), std::exception);
}
TEST(AsyncSocketTest, tosReflect) {
EventBase base;
auto server1 = AsyncServerSocket::newSocket(&base);
server1->bind(0);
server1->listen(10);
int fd = server1->getSocket();
// Verify if tos reflect is disabled by default
// and the TCP_SAVE_SYN setting is not enabled
EXPECT_FALSE(server1->getTosReflect());
int value;
socklen_t valueLength = sizeof(value);
int rc = getsockopt(fd, IPPROTO_TCP, TCP_SAVE_SYN, &value, &valueLength);
ASSERT_EQ(rc, 0);
ASSERT_EQ(value, 0);
// Enable TOS reflect on the server socket
server1->setTosReflect(true);
// Verify if tos reflect is enabled now
// and the TCP_SAVE_SYN setting is also enabled
EXPECT_TRUE(server1->getTosReflect());
rc = getsockopt(fd, IPPROTO_TCP, TCP_SAVE_SYN, &value, &valueLength);
ASSERT_EQ(rc, 0);
ASSERT_EQ(value, 1);
}
} // namespace folly
......@@ -3505,4 +3505,122 @@ TEST(AsyncSocketTest, UnixDomainSocketErrMessageCB) {
ASSERT_EQ(recv_data, test_data);
#endif // FOLLY_HAVE_MSG_ERRQUEUE
}
TEST(AsyncSocketTest, V6TosReflectTest) {
EventBase eventBase;
// Create a server socket
std::shared_ptr<AsyncServerSocket> serverSocket(
AsyncServerSocket::newSocket(&eventBase));
folly::IPAddress ip("::1");
std::vector<folly::IPAddress> serverIp;
serverIp.push_back(ip);
serverSocket->bind(serverIp, 10000);
serverSocket->listen(16);
folly::SocketAddress serverAddress;
serverSocket->getAddress(&serverAddress);
// Enable TOS reflect
serverSocket->setTosReflect(true);
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
[&](int /* fd */, const folly::SocketAddress& /* addr */) {
serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
});
acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
});
serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
serverSocket->startAccepting();
// Create a client socket, setsockopt() the TOS before connecting
auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
ConnCallback* ccb,
EventBase* evb,
folly::SocketAddress sAddr) {
clientSock = AsyncSocket::newSocket(evb);
AsyncSocket::OptionKey v6Opts = {IPPROTO_IPV6, IPV6_TCLASS};
AsyncSocket::OptionMap optionMap;
optionMap.insert({v6Opts, 0x2c});
SocketAddress bindAddr("0.0.0.0", 0);
clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
};
std::shared_ptr<AsyncSocket> socket(nullptr);
ConnCallback cb;
clientThread(socket, &cb, &eventBase, serverAddress);
eventBase.loop();
// Verify if the connection is accepted and if the accepted socket has
// setsockopt on the TOS for the same value that was on the client socket
int fd = acceptCallback.getEvents()->at(1).fd;
ASSERT_GE(fd, 0);
int value;
socklen_t valueLength = sizeof(value);
int rc = getsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &value, &valueLength);
ASSERT_EQ(rc, 0);
ASSERT_EQ(value, 0x2c);
}
TEST(AsyncSocketTest, V4TosReflectTest) {
EventBase eventBase;
// Create a server socket
std::shared_ptr<AsyncServerSocket> serverSocket(
AsyncServerSocket::newSocket(&eventBase));
folly::IPAddress ip("127.0.0.1");
std::vector<folly::IPAddress> serverIp;
serverIp.push_back(ip);
serverSocket->bind(serverIp, 10000);
serverSocket->listen(16);
folly::SocketAddress serverAddress;
serverSocket->getAddress(&serverAddress);
// Enable TOS reflect
serverSocket->setTosReflect(true);
// Add a callback to accept one connection then stop the loop
TestAcceptCallback acceptCallback;
acceptCallback.setConnectionAcceptedFn(
[&](int /* fd */, const folly::SocketAddress& /* addr */) {
serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
});
acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
});
serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
serverSocket->startAccepting();
// Create a client socket, setsockopt() the TOS before connecting
auto clientThread = [](std::shared_ptr<AsyncSocket>& clientSock,
ConnCallback* ccb,
EventBase* evb,
folly::SocketAddress sAddr) {
clientSock = AsyncSocket::newSocket(evb);
AsyncSocket::OptionKey v4Opts = {IPPROTO_IP, IP_TOS};
AsyncSocket::OptionMap optionMap;
optionMap.insert({v4Opts, 0x2c});
SocketAddress bindAddr("0.0.0.0", 0);
clientSock->connect(ccb, sAddr, 30, optionMap, bindAddr);
};
std::shared_ptr<AsyncSocket> socket(nullptr);
ConnCallback cb;
clientThread(socket, &cb, &eventBase, serverAddress);
eventBase.loop();
// Verify if the connection is accepted and if the accepted socket has
// setsockopt on the TOS for the same value that was on the client socket
int fd = acceptCallback.getEvents()->at(1).fd;
ASSERT_GE(fd, 0);
int value;
socklen_t valueLength = sizeof(value);
int rc = getsockopt(fd, IPPROTO_IP, IP_TOS, &value, &valueLength);
ASSERT_EQ(rc, 0);
ASSERT_EQ(value, 0x2c);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment