Commit 4abb5a3a authored by Vitaly Berov's avatar Vitaly Berov Committed by Facebook Github Bot

Replace ShutdownSocketSet to singleton

Summary:
We recently found out that ShutdownSocketSet consumes 150+MB for our service, which uses duplex channels. The problem is that we create ~1000 of ThriftServers, and each of the creates its own ShutdownSocketSet.
In reality, ShutdownSocketSet is only needed to kill all socket's FD in emergency before crash dump is taken, so they don't hand around waiting for crash dump to complete. There is no need to keep a SSS per ThriftServer, singleton should work just fine.
There is a problem here, though. Currently a ThriftServer has 'immediateShutdown' method, which kills all sockets from SSS. So, if SSS becomes a singleton, and we have more than one ThriftServer, calling 'immediateShutdown' on one will kill sockets from the other one. First, it's a quite surprising behavior, and second, it complicates unit tests, which emulate thrift servers running in different processes.

As a result,
1. ShutdownSocketSet is created as a singleton, but each ThriftServer still keeps weak ptr to it (mostly for unit tests support).
2. replaceShutdownSocketSet method is added to ThriftServer.h, so unit tests could set different SSS for different ThriftServers.
3. method immediateShutdown is removed from ThriftServer, because its behavior would be 'surprising'.

There still may be unexpected consequences of this change for the tests because of Singleton, but let's see.

Reviewed By: yfeldblum

Differential Revision: D6015576

fbshipit-source-id: dab70dbf82d01bcc71bbe063f983e862911ceb24
parent 69d97159
...@@ -22,16 +22,26 @@ ...@@ -22,16 +22,26 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <folly/FileUtil.h> #include <folly/FileUtil.h>
#include <folly/Singleton.h>
#include <folly/portability/Sockets.h> #include <folly/portability/Sockets.h>
namespace folly { namespace folly {
namespace {
struct PrivateTag {};
folly::Singleton<folly::ShutdownSocketSet, PrivateTag> singleton;
} // namespace
ShutdownSocketSet::ShutdownSocketSet(int maxFd) ShutdownSocketSet::ShutdownSocketSet(int maxFd)
: maxFd_(maxFd), : maxFd_(maxFd),
data_(static_cast<std::atomic<uint8_t>*>( data_(static_cast<std::atomic<uint8_t>*>(
folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))), folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))),
nullFile_("/dev/null", O_RDWR) {} nullFile_("/dev/null", O_RDWR) {}
std::shared_ptr<ShutdownSocketSet> ShutdownSocketSet::getInstance() {
return singleton.try_get();
}
void ShutdownSocketSet::add(int fd) { void ShutdownSocketSet::add(int fd) {
// Silently ignore any fds >= maxFd_, very unlikely // Silently ignore any fds >= maxFd_, very unlikely
DCHECK_GE(fd, 0); DCHECK_GE(fd, 0);
......
...@@ -39,6 +39,10 @@ class ShutdownSocketSet : private boost::noncopyable { ...@@ -39,6 +39,10 @@ class ShutdownSocketSet : private boost::noncopyable {
*/ */
explicit ShutdownSocketSet(int maxFd = 1 << 18); explicit ShutdownSocketSet(int maxFd = 1 << 18);
// Singleton instance used by all thrift servers.
// May return nullptr on startup/shutdown.
static std::shared_ptr<ShutdownSocketSet> getInstance();
/** /**
* Add an already open socket to the list of sockets managed by * Add an already open socket to the list of sockets managed by
* ShutdownSocketSet. You MUST close the socket by calling * ShutdownSocketSet. You MUST close the socket by calling
...@@ -73,8 +77,24 @@ class ShutdownSocketSet : private boost::noncopyable { ...@@ -73,8 +77,24 @@ class ShutdownSocketSet : private boost::noncopyable {
void shutdown(int fd, bool abortive=false); void shutdown(int fd, bool abortive=false);
/** /**
* Shut down all sockets managed by ShutdownSocketSet. This is * Immediate shutdown of all connections. This is a hard-hitting hammer;
* async-signal-safe and ignores errors. * all reads and writes will return errors and no new connections will
* be accepted.
*
* To be used only in dire situations. We're using it from the failure
* signal handler to close all connections quickly, even though the server
* might take multiple seconds to finish crashing.
*
* The optional bool parameter indicates whether to set the active
* connections in to not linger. The effect of that includes RST packets
* being immediately sent to clients which will result
* in errors (and not normal EOF) on the client side. This also causes
* the local (ip, tcp port number) tuple to be reusable immediately, instead
* of having to wait the standard amount of time. For full details see
* the `shutdown` method of `ShutdownSocketSet` (incl. notes about the
* `abortive` parameter).
*
* This is async-signal-safe and ignores errors.
*/ */
void shutdownAll(bool abortive=false); void shutdownAll(bool abortive=false);
......
...@@ -146,7 +146,7 @@ class AsyncServerSocket::BackoffTimeout : public AsyncTimeout { ...@@ -146,7 +146,7 @@ class AsyncServerSocket::BackoffTimeout : public AsyncTimeout {
*/ */
AsyncServerSocket::AsyncServerSocket(EventBase* eventBase) AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
: eventBase_(eventBase), : eventBase_(eventBase),
accepting_(false), accepting_(false),
maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce), maxAcceptAtOnce_(kDefaultMaxAcceptAtOnce),
maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue), maxNumMsgsInQueue_(kDefaultMaxMessagesInQueue),
...@@ -158,25 +158,30 @@ AsyncServerSocket::AsyncServerSocket(EventBase* eventBase) ...@@ -158,25 +158,30 @@ AsyncServerSocket::AsyncServerSocket(EventBase* eventBase)
backoffTimeout_(nullptr), backoffTimeout_(nullptr),
callbacks_(), callbacks_(),
keepAliveEnabled_(true), keepAliveEnabled_(true),
closeOnExec_(true), closeOnExec_(true) {}
shutdownSocketSet_(nullptr) {
} void AsyncServerSocket::setShutdownSocketSet(
const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
const auto newSS = wNewSS.lock();
const auto shutdownSocketSet = wShutdownSocketSet_.lock();
void AsyncServerSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { if (shutdownSocketSet == newSS) {
if (shutdownSocketSet_ == newSS) {
return; return;
} }
if (shutdownSocketSet_) {
if (shutdownSocketSet) {
for (auto& h : sockets_) { for (auto& h : sockets_) {
shutdownSocketSet_->remove(h.socket_); shutdownSocketSet->remove(h.socket_);
} }
} }
shutdownSocketSet_ = newSS;
if (shutdownSocketSet_) { if (newSS) {
for (auto& h : sockets_) { for (auto& h : sockets_) {
shutdownSocketSet_->add(h.socket_); newSS->add(h.socket_);
} }
} }
wShutdownSocketSet_ = wNewSS;
} }
AsyncServerSocket::~AsyncServerSocket() { AsyncServerSocket::~AsyncServerSocket() {
...@@ -203,8 +208,8 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) { ...@@ -203,8 +208,8 @@ int AsyncServerSocket::stopAccepting(int shutdownFlags) {
for (; !sockets_.empty(); sockets_.pop_back()) { for (; !sockets_.empty(); sockets_.pop_back()) {
auto& handler = sockets_.back(); auto& handler = sockets_.back();
handler.unregisterHandler(); handler.unregisterHandler();
if (shutdownSocketSet_) { if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet_->close(handler.socket_); shutdownSocketSet->close(handler.socket_);
} else if (shutdownFlags >= 0) { } else if (shutdownFlags >= 0) {
result = shutdownNoInt(handler.socket_, shutdownFlags); result = shutdownNoInt(handler.socket_, shutdownFlags);
pendingCloseSockets_.push_back(handler.socket_); pendingCloseSockets_.push_back(handler.socket_);
...@@ -504,8 +509,9 @@ void AsyncServerSocket::bind(uint16_t port) { ...@@ -504,8 +509,9 @@ void AsyncServerSocket::bind(uint16_t port) {
for (const auto& socket : sockets_) { for (const auto& socket : sockets_) {
if (socket.socket_ <= 0) { if (socket.socket_ <= 0) {
continue; continue;
} else if (shutdownSocketSet_) { } else if (
shutdownSocketSet_->close(socket.socket_); const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet->close(socket.socket_);
} else { } else {
closeNoInt(socket.socket_); closeNoInt(socket.socket_);
} }
...@@ -793,8 +799,8 @@ void AsyncServerSocket::setupSocket(int fd, int family) { ...@@ -793,8 +799,8 @@ void AsyncServerSocket::setupSocket(int fd, int family) {
} }
#endif #endif
if (shutdownSocketSet_) { if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet_->add(fd); shutdownSocketSet->add(fd);
} }
} }
......
...@@ -227,7 +227,7 @@ class AsyncServerSocket : public DelayedDestruction ...@@ -227,7 +227,7 @@ class AsyncServerSocket : public DelayedDestruction
Destructor()); Destructor());
} }
void setShutdownSocketSet(ShutdownSocketSet* newSS); void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wNewSS);
/** /**
* Destroy the socket. * Destroy the socket.
...@@ -877,7 +877,7 @@ class AsyncServerSocket : public DelayedDestruction ...@@ -877,7 +877,7 @@ class AsyncServerSocket : public DelayedDestruction
bool tfo_{false}; bool tfo_{false};
bool noTransparentTls_{false}; bool noTransparentTls_{false};
uint32_t tfoMaxQueueSize_{0}; uint32_t tfoMaxQueueSize_{0};
ShutdownSocketSet* shutdownSocketSet_; std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
ConnectionEventCallback* connectionEventCallback_{nullptr}; ConnectionEventCallback* connectionEventCallback_{nullptr};
}; };
......
...@@ -307,7 +307,7 @@ void AsyncSocket::init() { ...@@ -307,7 +307,7 @@ void AsyncSocket::init() {
readCallback_ = nullptr; readCallback_ = nullptr;
writeReqHead_ = nullptr; writeReqHead_ = nullptr;
writeReqTail_ = nullptr; writeReqTail_ = nullptr;
shutdownSocketSet_ = nullptr; wShutdownSocketSet_.reset();
appBytesWritten_ = 0; appBytesWritten_ = 0;
appBytesReceived_ = 0; appBytesReceived_ = 0;
sendMsgParamCallback_ = &defaultSendMsgParamsCallback; sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
...@@ -336,8 +336,8 @@ int AsyncSocket::detachFd() { ...@@ -336,8 +336,8 @@ int AsyncSocket::detachFd() {
<< ", events=" << std::hex << eventFlags_ << ")"; << ", events=" << std::hex << eventFlags_ << ")";
// Extract the fd, and set fd_ to -1 first, so closeNow() won't // Extract the fd, and set fd_ to -1 first, so closeNow() won't
// actually close the descriptor. // actually close the descriptor.
if (shutdownSocketSet_) { if (const auto socketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet_->remove(fd_); socketSet->remove(fd_);
} }
int fd = fd_; int fd = fd_;
fd_ = -1; fd_ = -1;
...@@ -355,17 +355,24 @@ const folly::SocketAddress& AsyncSocket::anyAddress() { ...@@ -355,17 +355,24 @@ const folly::SocketAddress& AsyncSocket::anyAddress() {
return anyAddress; return anyAddress;
} }
void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) { void AsyncSocket::setShutdownSocketSet(
if (shutdownSocketSet_ == newSS) { const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
const auto newSS = wNewSS.lock();
const auto shutdownSocketSet = wShutdownSocketSet_.lock();
if (newSS == shutdownSocketSet) {
return; return;
} }
if (shutdownSocketSet_ && fd_ != -1) {
shutdownSocketSet_->remove(fd_); if (shutdownSocketSet && fd_ != -1) {
shutdownSocketSet->remove(fd_);
} }
shutdownSocketSet_ = newSS;
if (shutdownSocketSet_ && fd_ != -1) { if (newSS && fd_ != -1) {
shutdownSocketSet_->add(fd_); newSS->add(fd_);
} }
wShutdownSocketSet_ = wNewSS;
} }
void AsyncSocket::setCloseOnExec() { void AsyncSocket::setCloseOnExec() {
...@@ -420,8 +427,8 @@ void AsyncSocket::connect(ConnectCallback* callback, ...@@ -420,8 +427,8 @@ void AsyncSocket::connect(ConnectCallback* callback,
withAddr("failed to create socket"), withAddr("failed to create socket"),
errnoCopy); errnoCopy);
} }
if (shutdownSocketSet_) { if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet_->add(fd_); shutdownSocketSet->add(fd_);
} }
ioHandler_.changeHandlerFD(fd_); ioHandler_.changeHandlerFD(fd_);
...@@ -2685,8 +2692,8 @@ void AsyncSocket::invalidState(WriteCallback* callback) { ...@@ -2685,8 +2692,8 @@ void AsyncSocket::invalidState(WriteCallback* callback) {
void AsyncSocket::doClose() { void AsyncSocket::doClose() {
if (fd_ == -1) return; if (fd_ == -1) return;
if (shutdownSocketSet_) { if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
shutdownSocketSet_->close(fd_); shutdownSocketSet->close(fd_);
} else { } else {
::close(fd_); ::close(fd_);
} }
......
...@@ -222,7 +222,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -222,7 +222,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
*/ */
explicit AsyncSocket(EventBase* evb); explicit AsyncSocket(EventBase* evb);
void setShutdownSocketSet(ShutdownSocketSet* ss); void setShutdownSocketSet(const std::weak_ptr<ShutdownSocketSet>& wSS);
/** /**
* Create a new AsyncSocket and begin the connection process. * Create a new AsyncSocket and begin the connection process.
...@@ -1195,7 +1195,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper { ...@@ -1195,7 +1195,7 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
ReadCallback* readCallback_; ///< ReadCallback ReadCallback* readCallback_; ///< ReadCallback
WriteRequest* writeReqHead_; ///< Chain of WriteRequests WriteRequest* writeReqHead_; ///< Chain of WriteRequests
WriteRequest* writeReqTail_; ///< End of WriteRequest chain WriteRequest* writeReqTail_; ///< End of WriteRequest chain
ShutdownSocketSet* shutdownSocketSet_; std::weak_ptr<ShutdownSocketSet> wShutdownSocketSet_;
size_t appBytesReceived_; ///< Num of bytes received from socket size_t appBytesReceived_; ///< Num of bytes received from socket
size_t appBytesWritten_; ///< Num of bytes written to socket size_t appBytesWritten_; ///< Num of bytes written to socket
bool isBufferMovable_{false}; bool isBufferMovable_{false};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment