Commit 71431a93 authored by Dave Watson's avatar Dave Watson Committed by dcsommer

move shutdown socket set

Summary:
Move shutdownsocketset to folly, since it is a dep of the asyncsockets

Previoulsy tried moving it in to the server directly: D1583629, but had issues - close(fd) is called before the error callback, so we can't remove the fd before the close, which is essential to it working properly.

Just move it to folly instead.

Test Plan: fbconfig -r thrift/lib/cpp thrift/lib/cpp2; fbmake runtests

Reviewed By: dcsommer@fb.com

Subscribers: mshneer, trunkagent, fugalh, jsedgwick, doug, alandau, bmatheny, njormrod

FB internal diff: D1594950
parent d28152cd
...@@ -113,6 +113,7 @@ nobase_follyinclude_HEADERS = \ ...@@ -113,6 +113,7 @@ nobase_follyinclude_HEADERS = \
io/RecordIO.h \ io/RecordIO.h \
io/RecordIO-inl.h \ io/RecordIO-inl.h \
io/TypedIOBuf.h \ io/TypedIOBuf.h \
io/ShutdownSocketSet.h \
io/async/AsyncTimeout.h \ io/async/AsyncTimeout.h \
io/async/DelayedDestruction.h \ io/async/DelayedDestruction.h \
io/async/EventBase.h \ io/async/EventBase.h \
...@@ -238,6 +239,7 @@ libfolly_la_SOURCES = \ ...@@ -238,6 +239,7 @@ libfolly_la_SOURCES = \
io/IOBuf.cpp \ io/IOBuf.cpp \
io/IOBufQueue.cpp \ io/IOBufQueue.cpp \
io/RecordIO.cpp \ io/RecordIO.cpp \
io/ShutdownSocketSet.cpp \
io/async/AsyncTimeout.cpp \ io/async/AsyncTimeout.cpp \
io/async/EventBase.cpp \ io/async/EventBase.cpp \
io/async/EventBaseManager.cpp \ io/async/EventBaseManager.cpp \
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/io/ShutdownSocketSet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <chrono>
#include <thread>
#include <glog/logging.h>
#include <folly/FileUtil.h>
#include <folly/Malloc.h>
namespace folly {
ShutdownSocketSet::ShutdownSocketSet(size_t maxFd)
: maxFd_(maxFd),
data_(static_cast<std::atomic<uint8_t>*>(
folly::checkedCalloc(maxFd, sizeof(std::atomic<uint8_t>)))),
nullFile_("/dev/null", O_RDWR) {
}
void ShutdownSocketSet::add(int fd) {
// Silently ignore any fds >= maxFd_, very unlikely
DCHECK_GE(fd, 0);
if (fd >= maxFd_) {
return;
}
auto& sref = data_[fd];
uint8_t prevState = FREE;
CHECK(sref.compare_exchange_strong(prevState,
IN_USE,
std::memory_order_acq_rel))
<< "Invalid prev state for fd " << fd << ": " << int(prevState);
}
void ShutdownSocketSet::remove(int fd) {
DCHECK_GE(fd, 0);
if (fd >= maxFd_) {
return;
}
auto& sref = data_[fd];
uint8_t prevState = 0;
retry_load:
prevState = sref.load(std::memory_order_relaxed);
retry:
switch (prevState) {
case IN_SHUTDOWN:
std::this_thread::sleep_for(std::chrono::milliseconds(1));
goto retry_load;
case FREE:
LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState);
}
if (!sref.compare_exchange_weak(prevState,
FREE,
std::memory_order_acq_rel)) {
goto retry;
}
}
int ShutdownSocketSet::close(int fd) {
DCHECK_GE(fd, 0);
if (fd >= maxFd_) {
return folly::closeNoInt(fd);
}
auto& sref = data_[fd];
uint8_t prevState = sref.load(std::memory_order_relaxed);
uint8_t newState = 0;
retry:
switch (prevState) {
case IN_USE:
case SHUT_DOWN:
newState = FREE;
break;
case IN_SHUTDOWN:
newState = MUST_CLOSE;
break;
default:
LOG(FATAL) << "Invalid prev state for fd " << fd << ": " << int(prevState);
}
if (!sref.compare_exchange_weak(prevState,
newState,
std::memory_order_acq_rel)) {
goto retry;
}
return newState == FREE ? folly::closeNoInt(fd) : 0;
}
void ShutdownSocketSet::shutdown(int fd, bool abortive) {
DCHECK_GE(fd, 0);
if (fd >= maxFd_) {
doShutdown(fd, abortive);
return;
}
auto& sref = data_[fd];
uint8_t prevState = IN_USE;
if (!sref.compare_exchange_strong(prevState,
IN_SHUTDOWN,
std::memory_order_acq_rel)) {
return;
}
doShutdown(fd, abortive);
prevState = IN_SHUTDOWN;
if (sref.compare_exchange_strong(prevState,
SHUT_DOWN,
std::memory_order_acq_rel)) {
return;
}
CHECK_EQ(prevState, MUST_CLOSE)
<< "Invalid prev state for fd " << fd << ": " << int(prevState);
folly::closeNoInt(fd); // ignore errors, nothing to do
CHECK(sref.compare_exchange_strong(prevState,
FREE,
std::memory_order_acq_rel))
<< "Invalid prev state for fd " << fd << ": " << int(prevState);
}
void ShutdownSocketSet::shutdownAll(bool abortive) {
for (size_t i = 0; i < maxFd_; ++i) {
auto& sref = data_[i];
if (sref.load(std::memory_order_acquire) == IN_USE) {
shutdown(i, abortive);
}
}
}
void ShutdownSocketSet::doShutdown(int fd, bool abortive) {
// shutdown() the socket first, to awaken any threads blocked on the fd
// (subsequent IO will fail because it's been shutdown); close()ing the
// socket does not wake up blockers, see
// http://stackoverflow.com/a/3624545/1736339
folly::shutdownNoInt(fd, SHUT_RDWR);
// If abortive shutdown is desired, we'll set the SO_LINGER option on
// the socket with a timeout of 0; this will cause RST to be sent on
// close.
if (abortive) {
struct linger l = {1, 0};
if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) {
// Probably not a socket, ignore.
return;
}
}
// We can't close() the socket, as that would be dangerous; a new file
// could be opened and get the same file descriptor, and then code assuming
// the old fd would do IO in the wrong place. We'll (atomically) dup2
// /dev/null onto the fd instead.
folly::dup2NoInt(nullFile_.fd(), fd);
}
} // namespaces
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <cstdlib>
#include <memory>
#include <boost/noncopyable.hpp>
#include <folly/File.h>
namespace folly {
/**
* Set of sockets that allows immediate, take-no-prisoners abort.
*/
class ShutdownSocketSet : private boost::noncopyable {
public:
/**
* Create a socket set that can handle file descriptors < maxFd.
* The default value (256Ki) is high enough for just about all
* applications, even if you increased the number of file descriptors
* on your system.
*/
explicit ShutdownSocketSet(size_t maxFd = 1 << 18);
/**
* Add an already open socket to the list of sockets managed by
* ShutdownSocketSet. You MUST close the socket by calling
* ShutdownSocketSet::close (which will, as a side effect, also handle EINTR
* properly) and not by calling close() on the file descriptor.
*/
void add(int fd);
/**
* Remove a socket from the list of sockets managed by ShutdownSocketSet.
* Note that remove() might block! (which we lamely implement by
* sleep()-ing) in the extremely rare case that the fd is currently
* being shutdown().
*/
void remove(int fd);
/**
* Close a socket managed by ShutdownSocketSet. Returns the same return code
* as ::close() (and sets errno accordingly).
*/
int close(int fd);
/**
* Shut down a socket. If abortive is true, we perform an abortive
* shutdown (send RST rather than FIN). Note that we might still end up
* sending FIN due to the rather interesting implementation.
*
* This is async-signal-safe and ignores errors. Obviously, subsequent
* read() and write() operations to the socket will fail. During normal
* operation, just call ::shutdown() on the socket.
*/
void shutdown(int fd, bool abortive=false);
/**
* Shut down all sockets managed by ShutdownSocketSet. This is
* async-signal-safe and ignores errors.
*/
void shutdownAll(bool abortive=false);
private:
void doShutdown(int fd, bool abortive);
// State transitions:
// add():
// FREE -> IN_USE
//
// close():
// IN_USE -> (::close()) -> FREE
// SHUT_DOWN -> (::close()) -> FREE
// IN_SHUTDOWN -> MUST_CLOSE
// (If the socket is currently being shut down, we must defer the
// ::close() until the shutdown completes)
//
// shutdown():
// IN_USE -> IN_SHUTDOWN
// (::shutdown())
// IN_SHUTDOWN -> SHUT_DOWN
// MUST_CLOSE -> (::close()) -> FREE
enum State : uint8_t {
FREE = 0,
IN_USE,
IN_SHUTDOWN,
SHUT_DOWN,
MUST_CLOSE
};
struct Free {
template <class T>
void operator()(T* ptr) const {
::free(ptr);
}
};
const size_t maxFd_;
std::unique_ptr<std::atomic<uint8_t>[], Free> data_;
folly::File nullFile_;
};
} // namespaces
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment