Commit 8e922526 authored by James Sedgwick's avatar James Sedgwick Committed by facebook-github-bot-4

delete folly/wangle

Summary: she's gooooooone, oh why, oh why, i better learn how to face it...

https://www.youtube.com/watch?v=bnVXIUyshng

Reviewed By: @djwatson

Differential Revision: D2219135
parent eafa57ba
......@@ -668,9 +668,9 @@ Although inspired by the C++11 std::future interface, it is not a drop-in replac
<p><tt>via()</tt> wouldn&#039;t be of much use without practical implementations around. We have a handful, and here&#039;s a (possibly incomplete) list.</p>
<ul>
<li><a href="https://github.com/facebook/folly/blob/master/folly/wangle/concurrent/ThreadPoolExecutor.h" target="_blank">ThreadPoolExecutor</a> is an abstract thread pool implementation that supports resizing, custom thread factories, pool and per-task stats, NUMA awareness, user-defined task expiration, and Codel task expiration. It and its subclasses are under active development. It currently has two implementations:<ul>
<li><a href="https://github.com/facebook/folly/blob/master/folly/wangle/concurrent/CPUThreadPoolExecutor.h" target="_blank">CPUThreadPoolExecutor</a> is a general purpose thread pool. In addition to the above features, it also supports task priorities.</li>
<li><a href="https://github.com/facebook/folly/blob/master/folly/wangle/concurrent/IOThreadPoolExecutor.h" target="_blank">IOThreadPoolExecutor</a> is similar to CPUThreadPoolExecutor, but each thread spins on an EventBase (accessible to callbacks via <a href="https://github.com/facebook/folly/blob/master/folly/io/async/EventBaseManager.h" target="_blank">EventBaseManager</a>)</li>
<li><a href="https://github.com/facebook/folly/blob/master/wangle/concurrent/ThreadPoolExecutor.h" target="_blank">ThreadPoolExecutor</a> is an abstract thread pool implementation that supports resizing, custom thread factories, pool and per-task stats, NUMA awareness, user-defined task expiration, and Codel task expiration. It and its subclasses are under active development. It currently has two implementations:<ul>
<li><a href="https://github.com/facebook/folly/blob/master/wangle/concurrent/CPUThreadPoolExecutor.h" target="_blank">CPUThreadPoolExecutor</a> is a general purpose thread pool. In addition to the above features, it also supports task priorities.</li>
<li><a href="https://github.com/facebook/folly/blob/master/wangle/concurrent/IOThreadPoolExecutor.h" target="_blank">IOThreadPoolExecutor</a> is similar to CPUThreadPoolExecutor, but each thread spins on an EventBase (accessible to callbacks via <a href="https://github.com/facebook/folly/blob/master/folly/io/async/EventBaseManager.h" target="_blank">EventBaseManager</a>)</li>
</ul></li>
<li>folly&#039;s <a href="https://github.com/facebook/folly/blob/master/folly/io/async/EventBase.h" target="_blank">EventBase</a> is an Executor and executes work as a callback in the event loop</li>
<li><a href="https://github.com/facebook/folly/blob/master/folly/futures/ManualExecutor.h" target="_blank">ManualExecutor</a> only executes work when manually cranked. This is useful for testing.</li>
......@@ -678,7 +678,7 @@ Although inspired by the C++11 std::future interface, it is not a drop-in replac
<li><a href="https://github.com/facebook/folly/blob/master/folly/futures/QueuedImmediateExecutor.h" target="_blank">QueuedImmediateExecutor</a> is similar to InlineExecutor, but work added during callback execution will be queued instead of immediately executed</li>
<li><a href="https://github.com/facebook/folly/blob/master/folly/futures/ScheduledExecutor.h" target="_blank">ScheduledExecutor</a> is a subinterface of Executor that supports scheduled (i.e. delayed) execution. There aren&#039;t many implementations yet, see <a class="remarkup-task" href="https://our.intern.facebook.com/intern/tasks/?t=5924392" target="_blank">T5924392</a></li>
<li>Thrift&#039;s <a href="https://github.com/facebook/fbthrift/blob/master/thrift/lib/cpp/concurrency/ThreadManager.h" target="_blank">ThreadManager</a> is an Executor but we aim to deprecate it in favor of the aforementioned CPUThreadPoolExecutor</li>
<li><a href="https://github.com/facebook/folly/blob/master/folly/wangle/concurrent/FutureExecutor.h" target="_blank">FutureExecutor</a> wraps another Executor and provides <tt>Future&lt;T&gt; addFuture(F func)</tt> which returns a Future representing the result of func. This is equivalent to <tt>futures::async(executor, func)</tt> and the latter should probably be preferred.</li>
<li><a href="https://github.com/facebook/folly/blob/master/wangle/concurrent/FutureExecutor.h" target="_blank">FutureExecutor</a> wraps another Executor and provides <tt>Future&lt;T&gt; addFuture(F func)</tt> which returns a Future representing the result of func. This is equivalent to <tt>futures::async(executor, func)</tt> and the latter should probably be preferred.</li>
</ul></section><section class="dex_document"><h1>Timeouts and related features</h1><p class="dex_introduction">Futures provide a number of timing-related features. Here's an overview.</p><h2 id="timing-implementation">Timing implementation <a href="#timing-implementation" class="headerLink">#</a></h2>
<h3 id="timing-resolution">Timing resolution <a href="#timing-resolution" class="headerLink">#</a></h3>
......@@ -1070,4 +1070,4 @@ The three laws refer to a different formulation of the axioms, in terms of the K
<p>The tradeoff is memory. Each continuation has a stack, and that stack is usually fixed-size and has to be big enough to support whatever ordinary computation you might want to do on it. So each living continuation requires a relatively large amount of memory. If you know the number of continuations will be small, this might be a good fit. In particular, it might be faster, the code might read cleaner, and debugging stack traces might be much easier.</p>
<p>Futures takes the middle road between callback hell and continuations, one which has been trodden and proved useful in other languages. It doesn&#039;t claim to be the best model for all situations. Use your tools wisely.</p></section></section>
\ No newline at end of file
<p>Futures takes the middle road between callback hell and continuations, one which has been trodden and proved useful in other languages. It doesn&#039;t claim to be the best model for all situations. Use your tools wisely.</p></section></section>
This diff is collapsed.
This diff is collapsed.
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
namespace folly {
class IConnectionCounter {
public:
virtual uint64_t getNumConnections() const = 0;
/**
* Get the maximum number of non-whitelisted client-side connections
* across all Acceptors managed by this. A value
* of zero means "unlimited."
*/
virtual uint64_t getMaxConnections() const = 0;
/**
* Increment the count of client-side connections.
*/
virtual void onConnectionAdded() = 0;
/**
* Decrement the count of client-side connections.
*/
virtual void onConnectionRemoved() = 0;
virtual ~IConnectionCounter() = default;
};
class SimpleConnectionCounter: public IConnectionCounter {
public:
uint64_t getNumConnections() const override { return numConnections_; }
uint64_t getMaxConnections() const override { return maxConnections_; }
void setMaxConnections(uint64_t maxConnections) {
maxConnections_ = maxConnections;
}
void onConnectionAdded() override { numConnections_++; }
void onConnectionRemoved() override { numConnections_--; }
virtual ~SimpleConnectionCounter() = default;
protected:
uint64_t maxConnections_{0};
uint64_t numConnections_{0};
};
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/acceptor/ConnectionManager.h>
#include <glog/logging.h>
#include <folly/io/async/EventBase.h>
using folly::HHWheelTimer;
using std::chrono::milliseconds;
namespace folly { namespace wangle {
ConnectionManager::ConnectionManager(EventBase* eventBase,
milliseconds timeout, Callback* callback)
: connTimeouts_(new HHWheelTimer(eventBase)),
callback_(callback),
eventBase_(eventBase),
idleIterator_(conns_.end()),
idleLoopCallback_(this),
timeout_(timeout),
idleConnEarlyDropThreshold_(timeout_ / 2) {
}
void
ConnectionManager::addConnection(ManagedConnection* connection,
bool timeout) {
CHECK_NOTNULL(connection);
ConnectionManager* oldMgr = connection->getConnectionManager();
if (oldMgr != this) {
if (oldMgr) {
// 'connection' was being previously managed in a different thread.
// We must remove it from that manager before adding it to this one.
oldMgr->removeConnection(connection);
}
// put the connection into busy part first. This should not matter at all
// because the last callback for an idle connection must be onDeactivated(),
// so the connection must be moved to idle part then.
conns_.push_front(*connection);
connection->setConnectionManager(this);
if (callback_) {
callback_->onConnectionAdded(*this);
}
}
if (timeout) {
scheduleTimeout(connection, timeout_);
}
}
void
ConnectionManager::scheduleTimeout(ManagedConnection* const connection,
std::chrono::milliseconds timeout) {
if (timeout > std::chrono::milliseconds(0)) {
connTimeouts_->scheduleTimeout(connection, timeout);
}
}
void ConnectionManager::scheduleTimeout(
folly::HHWheelTimer::Callback* callback,
std::chrono::milliseconds timeout) {
connTimeouts_->scheduleTimeout(callback, timeout);
}
void
ConnectionManager::removeConnection(ManagedConnection* connection) {
if (connection->getConnectionManager() == this) {
connection->cancelTimeout();
connection->setConnectionManager(nullptr);
// Un-link the connection from our list, being careful to keep the iterator
// that we're using for idle shedding valid
auto it = conns_.iterator_to(*connection);
if (it == idleIterator_) {
++idleIterator_;
}
conns_.erase(it);
if (callback_) {
callback_->onConnectionRemoved(*this);
if (getNumConnections() == 0) {
callback_->onEmpty(*this);
}
}
}
}
void
ConnectionManager::initiateGracefulShutdown(
std::chrono::milliseconds idleGrace) {
if (idleGrace.count() > 0) {
idleLoopCallback_.scheduleTimeout(idleGrace);
VLOG(3) << "Scheduling idle grace period of " << idleGrace.count() << "ms";
} else {
action_ = ShutdownAction::DRAIN2;
VLOG(3) << "proceeding directly to closing idle connections";
}
drainAllConnections();
}
void
ConnectionManager::drainAllConnections() {
DestructorGuard g(this);
size_t numCleared = 0;
size_t numKept = 0;
auto it = idleIterator_ == conns_.end() ?
conns_.begin() : idleIterator_;
while (it != conns_.end() && (numKept + numCleared) < 64) {
ManagedConnection& conn = *it++;
if (action_ == ShutdownAction::DRAIN1) {
conn.notifyPendingShutdown();
} else {
// Second time around: close idle sessions. If they aren't idle yet,
// have them close when they are idle
if (conn.isBusy()) {
numKept++;
} else {
numCleared++;
}
conn.closeWhenIdle();
}
}
if (action_ == ShutdownAction::DRAIN2) {
VLOG(2) << "Idle connections cleared: " << numCleared <<
", busy conns kept: " << numKept;
}
if (it != conns_.end()) {
idleIterator_ = it;
eventBase_->runInLoop(&idleLoopCallback_);
} else {
action_ = ShutdownAction::DRAIN2;
}
}
void
ConnectionManager::dropAllConnections() {
DestructorGuard g(this);
// Iterate through our connection list, and drop each connection.
VLOG(3) << "connections to drop: " << conns_.size();
idleLoopCallback_.cancelTimeout();
unsigned i = 0;
while (!conns_.empty()) {
ManagedConnection& conn = conns_.front();
conns_.pop_front();
conn.cancelTimeout();
conn.setConnectionManager(nullptr);
// For debugging purposes, dump information about the first few
// connections.
static const unsigned MAX_CONNS_TO_DUMP = 2;
if (++i <= MAX_CONNS_TO_DUMP) {
conn.dumpConnectionState(3);
}
conn.dropConnection();
}
idleIterator_ = conns_.end();
idleLoopCallback_.cancelLoopCallback();
if (callback_) {
callback_->onEmpty(*this);
}
}
void
ConnectionManager::onActivated(ManagedConnection& conn) {
auto it = conns_.iterator_to(conn);
if (it == idleIterator_) {
idleIterator_++;
}
conns_.erase(it);
conns_.push_front(conn);
}
void
ConnectionManager::onDeactivated(ManagedConnection& conn) {
auto it = conns_.iterator_to(conn);
conns_.erase(it);
conns_.push_back(conn);
if (idleIterator_ == conns_.end()) {
idleIterator_--;
}
}
size_t
ConnectionManager::dropIdleConnections(size_t num) {
VLOG(4) << "attempt to drop " << num << " idle connections";
if (idleConnEarlyDropThreshold_ >= timeout_) {
return 0;
}
size_t count = 0;
while(count < num) {
auto it = idleIterator_;
if (it == conns_.end()) {
return count; // no more idle session
}
auto idleTime = it->getIdleTime();
if (idleTime == std::chrono::milliseconds(0) ||
idleTime <= idleConnEarlyDropThreshold_) {
VLOG(4) << "conn's idletime: " << idleTime.count()
<< ", earlyDropThreshold: " << idleConnEarlyDropThreshold_.count()
<< ", attempt to drop " << count << "/" << num;
return count; // idleTime cannot be further reduced
}
ManagedConnection& conn = *it;
idleIterator_++;
conn.timeoutExpired();
count++;
}
return count;
}
}} // folly::wangle
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/acceptor/ManagedConnection.h>
#include <chrono>
#include <folly/Memory.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/HHWheelTimer.h>
#include <folly/io/async/DelayedDestruction.h>
#include <folly/io/async/EventBase.h>
namespace folly { namespace wangle {
/**
* A ConnectionManager keeps track of ManagedConnections.
*/
class ConnectionManager: public folly::DelayedDestruction,
private ManagedConnection::Callback {
public:
/**
* Interface for an optional observer that's notified about
* various events in a ConnectionManager
*/
class Callback {
public:
virtual ~Callback() = default;
/**
* Invoked when the number of connections managed by the
* ConnectionManager changes from nonzero to zero.
*/
virtual void onEmpty(const ConnectionManager& cm) = 0;
/**
* Invoked when a connection is added to the ConnectionManager.
*/
virtual void onConnectionAdded(const ConnectionManager& cm) = 0;
/**
* Invoked when a connection is removed from the ConnectionManager.
*/
virtual void onConnectionRemoved(const ConnectionManager& cm) = 0;
};
typedef std::unique_ptr<ConnectionManager, Destructor> UniquePtr;
/**
* Returns a new instance of ConnectionManager wrapped in a unique_ptr
*/
template<typename... Args>
static UniquePtr makeUnique(Args&&... args) {
return folly::make_unique<ConnectionManager, Destructor>(
std::forward<Args>(args)...);
}
/**
* Constructor not to be used by itself.
*/
ConnectionManager(folly::EventBase* eventBase,
std::chrono::milliseconds timeout,
Callback* callback = nullptr);
/**
* Add a connection to the set of connections managed by this
* ConnectionManager.
*
* @param connection The connection to add.
* @param timeout Whether to immediately register this connection
* for an idle timeout callback.
*/
void addConnection(ManagedConnection* connection,
bool timeout = false);
/**
* Schedule a timeout callback for a connection.
*/
void scheduleTimeout(ManagedConnection* const connection,
std::chrono::milliseconds timeout);
/*
* Schedule a callback on the wheel timer
*/
void scheduleTimeout(folly::HHWheelTimer::Callback* callback,
std::chrono::milliseconds timeout);
/**
* Remove a connection from this ConnectionManager and, if
* applicable, cancel the pending timeout callback that the
* ConnectionManager has scheduled for the connection.
*
* @note This method does NOT destroy the connection.
*/
void removeConnection(ManagedConnection* connection);
/* Begin gracefully shutting down connections in this ConnectionManager.
* Notify all connections of pending shutdown, and after idleGrace,
* begin closing idle connections.
*/
void initiateGracefulShutdown(std::chrono::milliseconds idleGrace);
/**
* Destroy all connections Managed by this ConnectionManager, even
* the ones that are busy.
*/
void dropAllConnections();
size_t getNumConnections() const { return conns_.size(); }
template <typename F>
void iterateConns(F func) {
auto it = conns_.begin();
while ( it != conns_.end()) {
func(&(*it));
it++;
}
}
std::chrono::milliseconds getDefaultTimeout() const {
return timeout_;
}
void setLoweredIdleTimeout(std::chrono::milliseconds timeout) {
CHECK(timeout >= std::chrono::milliseconds(0));
CHECK(timeout <= timeout_);
idleConnEarlyDropThreshold_ = timeout;
}
/**
* try to drop num idle connections to release system resources. Return the
* actual number of dropped idle connections
*/
size_t dropIdleConnections(size_t num);
/**
* ManagedConnection::Callbacks
*/
void onActivated(ManagedConnection& conn);
void onDeactivated(ManagedConnection& conn);
private:
class CloseIdleConnsCallback :
public folly::EventBase::LoopCallback,
public folly::AsyncTimeout {
public:
explicit CloseIdleConnsCallback(ConnectionManager* manager)
: folly::AsyncTimeout(manager->eventBase_),
manager_(manager) {}
void runLoopCallback() noexcept override {
VLOG(3) << "Draining more conns from loop callback";
manager_->drainAllConnections();
}
void timeoutExpired() noexcept override {
VLOG(3) << "Idle grace expired";
manager_->drainAllConnections();
}
private:
ConnectionManager* manager_;
};
enum class ShutdownAction : uint8_t {
/**
* Drain part 1: inform remote that you will soon reject new requests.
*/
DRAIN1 = 0,
/**
* Drain part 2: start rejecting new requests.
*/
DRAIN2 = 1,
};
~ConnectionManager() = default;
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager& operator=(ConnectionManager&) = delete;
/**
* Destroy all connections managed by this ConnectionManager that
* are currently idle, as determined by a call to each ManagedConnection's
* isBusy() method.
*/
void drainAllConnections();
/**
* All the managed connections. idleIterator_ seperates them into two parts:
* idle and busy ones. [conns_.begin(), idleIterator_) are the busy ones,
* while [idleIterator_, conns_.end()) are the idle one. Moreover, the idle
* ones are organized in the decreasing idle time order. */
folly::CountedIntrusiveList<
ManagedConnection,&ManagedConnection::listHook_> conns_;
/** Connections that currently are registered for timeouts */
folly::HHWheelTimer::UniquePtr connTimeouts_;
/** Optional callback to notify of state changes */
Callback* callback_;
/** Event base in which we run */
folly::EventBase* eventBase_;
/** Iterator to the next connection to shed; used by drainAllConnections() */
folly::CountedIntrusiveList<
ManagedConnection,&ManagedConnection::listHook_>::iterator idleIterator_;
CloseIdleConnsCallback idleLoopCallback_;
ShutdownAction action_{ShutdownAction::DRAIN1};
/**
* the default idle timeout for downstream sessions when no system resource
* limit is reached
*/
std::chrono::milliseconds timeout_;
/**
* The idle connections can be closed earlier that their idle timeout when any
* system resource limit is reached. This feature can be considerred as a pre
* load shedding stage for the system, and can be easily disabled by setting
* idleConnEarlyDropThreshold_ to defaultIdleTimeout_. Also,
* idleConnEarlyDropThreshold_ can be used to bottom the idle timeout. That
* is, connection manager will not early drop the idle connections whose idle
* time is less than idleConnEarlyDropThreshold_.
*/
std::chrono::milliseconds idleConnEarlyDropThreshold_;
};
}} // folly::wangle
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <string>
namespace folly {
struct dn_char_traits : public std::char_traits<char> {
static bool eq(char c1, char c2) {
return ::tolower(c1) == ::tolower(c2);
}
static bool ne(char c1, char c2) {
return ::tolower(c1) != ::tolower(c2);
}
static bool lt(char c1, char c2) {
return ::tolower(c1) < ::tolower(c2);
}
static int compare(const char* s1, const char* s2, size_t n) {
while (n--) {
if(::tolower(*s1) < ::tolower(*s2) ) {
return -1;
}
if(::tolower(*s1) > ::tolower(*s2) ) {
return 1;
}
++s1;
++s2;
}
return 0;
}
static const char* find(const char* s, size_t n, char a) {
char la = ::tolower(a);
while (n--) {
if(::tolower(*s) == la) {
return s;
} else {
++s;
}
}
return nullptr;
}
};
// Case insensitive string
typedef std::basic_string<char, dn_char_traits> DNString;
struct DNStringHash : public std::hash<std::string> {
size_t operator()(const DNString& s1) const noexcept {
std::string s2(s1.data(), s1.size());
for (char& c : s2)
c = ::tolower(c);
return std::hash<std::string>()(s2);
}
};
} // namespace
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <folly/wangle/acceptor/LoadShedConfiguration.h>
#include <folly/Conv.h>
#include <openssl/ssl.h>
using std::string;
namespace folly {
void LoadShedConfiguration::addWhitelistAddr(folly::StringPiece input) {
auto addr = input.str();
size_t separator = addr.find_first_of('/');
if (separator == string::npos) {
whitelistAddrs_.insert(SocketAddress(addr, 0));
} else {
unsigned prefixLen = folly::to<unsigned>(addr.substr(separator + 1));
addr.erase(separator);
whitelistNetworks_.insert(NetworkAddress(SocketAddress(addr, 0), prefixLen));
}
}
bool LoadShedConfiguration::isWhitelisted(const SocketAddress& address) const {
if (whitelistAddrs_.find(address) != whitelistAddrs_.end()) {
return true;
}
for (auto& network : whitelistNetworks_) {
if (network.contains(address)) {
return true;
}
}
return false;
}
}
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <chrono>
#include <folly/Range.h>
#include <folly/SocketAddress.h>
#include <glog/logging.h>
#include <list>
#include <set>
#include <string>
#include <folly/wangle/acceptor/NetworkAddress.h>
namespace folly {
/**
* Class that holds an LoadShed configuration for a service
*/
class LoadShedConfiguration {
public:
// Comparison function for SocketAddress that disregards the port
struct AddressOnlyCompare {
bool operator()(
const SocketAddress& addr1,
const SocketAddress& addr2) const {
return addr1.getIPAddress() < addr2.getIPAddress();
}
};
typedef std::set<SocketAddress, AddressOnlyCompare> AddressSet;
typedef std::set<NetworkAddress> NetworkSet;
LoadShedConfiguration() = default;
~LoadShedConfiguration() = default;
void addWhitelistAddr(folly::StringPiece);
/**
* Set/get the set of IPs that should be whitelisted through even when we're
* trying to shed load.
*/
void setWhitelistAddrs(const AddressSet& addrs) { whitelistAddrs_ = addrs; }
const AddressSet& getWhitelistAddrs() const { return whitelistAddrs_; }
/**
* Set/get the set of networks that should be whitelisted through even
* when we're trying to shed load.
*/
void setWhitelistNetworks(const NetworkSet& networks) {
whitelistNetworks_ = networks;
}
const NetworkSet& getWhitelistNetworks() const { return whitelistNetworks_; }
/**
* Set/get the maximum number of downstream connections across all VIPs.
*/
void setMaxConnections(uint64_t maxConns) { maxConnections_ = maxConns; }
uint64_t getMaxConnections() const { return maxConnections_; }
/**
* Set/get the maximum cpu usage.
*/
void setMaxMemUsage(double max) {
CHECK(max >= 0);
CHECK(max <= 1);
maxMemUsage_ = max;
}
double getMaxMemUsage() const { return maxMemUsage_; }
/**
* Set/get the maximum memory usage.
*/
void setMaxCpuUsage(double max) {
CHECK(max >= 0);
CHECK(max <= 1);
maxCpuUsage_ = max;
}
double getMaxCpuUsage() const { return maxCpuUsage_; }
/**
* Set/get the minium actual free memory on the system.
*/
void setMinFreeMem(uint64_t min) {
minFreeMem_ = min;
}
uint64_t getMinFreeMem() const {
return minFreeMem_;
}
void setLoadUpdatePeriod(std::chrono::milliseconds period) {
period_ = period;
}
std::chrono::milliseconds getLoadUpdatePeriod() const { return period_; }
bool isWhitelisted(const SocketAddress& addr) const;
private:
AddressSet whitelistAddrs_;
NetworkSet whitelistNetworks_;
uint64_t maxConnections_{0};
uint64_t minFreeMem_{0};
double maxMemUsage_;
double maxCpuUsage_;
std::chrono::milliseconds period_;
};
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/acceptor/ManagedConnection.h>
#include <folly/wangle/acceptor/ConnectionManager.h>
namespace folly { namespace wangle {
ManagedConnection::ManagedConnection()
: connectionManager_(nullptr) {
}
ManagedConnection::~ManagedConnection() {
if (connectionManager_) {
connectionManager_->removeConnection(this);
}
}
void
ManagedConnection::resetTimeout() {
if (connectionManager_) {
resetTimeoutTo(connectionManager_->getDefaultTimeout());
}
}
void
ManagedConnection::resetTimeoutTo(std::chrono::milliseconds timeout) {
if (connectionManager_) {
connectionManager_->scheduleTimeout(this, timeout);
}
}
void
ManagedConnection::scheduleTimeout(
folly::HHWheelTimer::Callback* callback,
std::chrono::milliseconds timeout) {
if (connectionManager_) {
connectionManager_->scheduleTimeout(callback, timeout);
}
}
////////////////////// Globals /////////////////////
std::ostream&
operator<<(std::ostream& os, const ManagedConnection& conn) {
conn.describe(os);
return os;
}
}} // folly::wangle
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/IntrusiveList.h>
#include <ostream>
#include <folly/io/async/HHWheelTimer.h>
#include <folly/io/async/DelayedDestruction.h>
namespace folly { namespace wangle {
class ConnectionManager;
/**
* Interface describing a connection that can be managed by a
* container such as an Acceptor.
*/
class ManagedConnection:
public folly::HHWheelTimer::Callback,
public folly::DelayedDestruction {
public:
ManagedConnection();
class Callback {
public:
virtual ~Callback() = default;
/* Invoked when this connection becomes busy */
virtual void onActivated(ManagedConnection& conn) = 0;
/* Invoked when a connection becomes idle */
virtual void onDeactivated(ManagedConnection& conn) = 0;
};
// HHWheelTimer::Callback API (left for subclasses to implement).
virtual void timeoutExpired() noexcept = 0;
/**
* Print a human-readable description of the connection.
* @param os Destination stream.
*/
virtual void describe(std::ostream& os) const = 0;
/**
* Check whether the connection has any requests outstanding.
*/
virtual bool isBusy() const = 0;
/**
* Get the idle time of the connection. If it returning 0, that means the idle
* connections will never be dropped during pre load shedding stage.
*/
virtual std::chrono::milliseconds getIdleTime() const {
return std::chrono::milliseconds(0);
}
/**
* Notify the connection that a shutdown is pending. This method will be
* called at the beginning of graceful shutdown.
*/
virtual void notifyPendingShutdown() = 0;
/**
* Instruct the connection that it should shutdown as soon as it is
* safe. This is called after notifyPendingShutdown().
*/
virtual void closeWhenIdle() = 0;
/**
* Forcibly drop a connection.
*
* If a request is in progress, this should cause the connection to be
* closed with a reset.
*/
virtual void dropConnection() = 0;
/**
* Dump the state of the connection to the log
*/
virtual void dumpConnectionState(uint8_t loglevel) = 0;
/**
* If the connection has a connection manager, reset the timeout countdown to
* connection manager's default timeout.
* @note If the connection manager doesn't have the connection scheduled
* for a timeout already, this method will schedule one. If the
* connection manager does have the connection connection scheduled
* for a timeout, this method will push back the timeout to N msec
* from now, where N is the connection manager's timer interval.
*/
virtual void resetTimeout();
/**
* If the connection has a connection manager, reset the timeout countdown to
* user specified timeout.
*/
void resetTimeoutTo(std::chrono::milliseconds);
// Schedule an arbitrary timeout on the HHWheelTimer
virtual void scheduleTimeout(
folly::HHWheelTimer::Callback* callback,
std::chrono::milliseconds timeout);
ConnectionManager* getConnectionManager() {
return connectionManager_;
}
protected:
virtual ~ManagedConnection();
private:
friend class ConnectionManager;
void setConnectionManager(ConnectionManager* mgr) {
connectionManager_ = mgr;
}
ConnectionManager* connectionManager_;
folly::SafeIntrusiveListHook listHook_;
};
std::ostream& operator<<(std::ostream& os, const ManagedConnection& conn);
}} // folly::wangle
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <folly/SocketAddress.h>
namespace folly {
/**
* A simple wrapper around SocketAddress that represents
* a network in CIDR notation
*/
class NetworkAddress {
public:
/**
* Create a NetworkAddress for an addr/prefixLen
* @param addr IPv4 or IPv6 address of the network
* @param prefixLen Prefix length, in bits
*/
NetworkAddress(const folly::SocketAddress& addr,
unsigned prefixLen):
addr_(addr), prefixLen_(prefixLen) {}
/** Get the network address */
const folly::SocketAddress& getAddress() const {
return addr_;
}
/** Get the prefix length in bits */
unsigned getPrefixLength() const { return prefixLen_; }
/** Check whether a given address lies within the network */
bool contains(const folly::SocketAddress& addr) const {
return addr_.prefixMatch(addr, prefixLen_);
}
/** Comparison operator to enable use in ordered collections */
bool operator<(const NetworkAddress& other) const {
if (addr_ < other.addr_) {
return true;
} else if (other.addr_ < addr_) {
return false;
} else {
return (prefixLen_ < other.prefixLen_);
}
}
private:
folly::SocketAddress addr_;
unsigned prefixLen_;
};
} // namespace
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <folly/wangle/ssl/SSLCacheOptions.h>
#include <folly/wangle/ssl/SSLContextConfig.h>
#include <folly/wangle/ssl/TLSTicketKeySeeds.h>
#include <folly/wangle/ssl/SSLUtil.h>
#include <folly/wangle/acceptor/SocketOptions.h>
#include <boost/optional.hpp>
#include <chrono>
#include <fcntl.h>
#include <folly/Random.h>
#include <folly/SocketAddress.h>
#include <folly/String.h>
#include <folly/io/async/SSLContext.h>
#include <list>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/SSLContext.h>
#include <folly/SocketAddress.h>
namespace folly {
/**
* Configuration for a single Acceptor.
*
* This configures not only accept behavior, but also some types of SSL
* behavior that may make sense to configure on a per-VIP basis (e.g. which
* cert(s) we use, etc).
*/
struct ServerSocketConfig {
ServerSocketConfig() {
// generate a single random current seed
uint8_t seed[32];
folly::Random::secureRandom(seed, sizeof(seed));
initialTicketSeeds.currentSeeds.push_back(
SSLUtil::hexlify(std::string((char *)seed, sizeof(seed))));
}
bool isSSL() const { return !(sslContextConfigs.empty()); }
/**
* Set/get the socket options to apply on all downstream connections.
*/
void setSocketOptions(
const AsyncSocket::OptionMap& opts) {
socketOptions_ = filterIPSocketOptions(opts, bindAddress.getFamily());
}
AsyncSocket::OptionMap&
getSocketOptions() {
return socketOptions_;
}
const AsyncSocket::OptionMap&
getSocketOptions() const {
return socketOptions_;
}
bool hasExternalPrivateKey() const {
for (const auto& cfg : sslContextConfigs) {
if (!cfg.isLocalPrivateKey) {
return true;
}
}
return false;
}
/**
* The name of this acceptor; used for stats/reporting purposes.
*/
std::string name;
/**
* The depth of the accept queue backlog.
*/
uint32_t acceptBacklog{1024};
/**
* The number of milliseconds a connection can be idle before we close it.
*/
std::chrono::milliseconds connectionIdleTimeout{600000};
/**
* The address to bind to.
*/
SocketAddress bindAddress;
/**
* Options for controlling the SSL cache.
*/
SSLCacheOptions sslCacheOptions{std::chrono::seconds(600), 20480, 200};
/**
* The initial TLS ticket seeds.
*/
TLSTicketKeySeeds initialTicketSeeds;
/**
* The configs for all the SSL_CTX for use by this Acceptor.
*/
std::vector<SSLContextConfig> sslContextConfigs;
/**
* Determines if the Acceptor does strict checking when loading the SSL
* contexts.
*/
bool strictSSL{true};
/**
* Maximum number of concurrent pending SSL handshakes
*/
uint32_t maxConcurrentSSLHandshakes{30720};
private:
AsyncSocket::OptionMap socketOptions_;
};
} // folly
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <folly/wangle/acceptor/SocketOptions.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
namespace folly {
AsyncSocket::OptionMap filterIPSocketOptions(
const AsyncSocket::OptionMap& allOptions,
const int addrFamily) {
AsyncSocket::OptionMap opts;
int exclude;
if (addrFamily == AF_INET) {
exclude = IPPROTO_IPV6;
} else if (addrFamily == AF_INET6) {
exclude = IPPROTO_IP;
} else {
LOG(FATAL) << "Address family " << addrFamily << " was not IPv4 or IPv6";
return opts;
}
for (const auto& opt: allOptions) {
if (opt.first.level != exclude) {
opts[opt.first] = opt.second;
}
}
return opts;
}
}
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <folly/io/async/AsyncSocket.h>
namespace folly {
/**
* Returns a copy of the socket options excluding options with the given
* level.
*/
AsyncSocket::OptionMap filterIPSocketOptions(
const AsyncSocket::OptionMap& allOptions,
const int addrFamily);
}
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include <folly/wangle/acceptor/TransportInfo.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <folly/io/async/AsyncSocket.h>
using std::chrono::microseconds;
using std::map;
using std::string;
namespace folly {
bool TransportInfo::initWithSocket(const AsyncSocket* sock) {
#if defined(__linux__) || defined(__FreeBSD__)
if (!TransportInfo::readTcpInfo(&tcpinfo, sock)) {
tcpinfoErrno = errno;
return false;
}
rtt = microseconds(tcpinfo.tcpi_rtt);
/* The ratio of packet retransmission (rtx) is a good indicator of network
* bandwidth condition. Unfortunately, the number of segmentOut is not
* available in current tcpinfo. To workaround this limitation, totalBytes
* and MSS are used to estimate it.
*/
#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 17
if (tcpinfo.tcpi_total_retrans == 0) {
rtx = 0;
} else if (tcpinfo.tcpi_total_retrans > 0 && tcpinfo.tcpi_snd_mss > 0 &&
totalBytes > 0) {
// numSegmentOut is the underestimation of the number of tcp packets sent
double numSegmentOut = double(totalBytes) / tcpinfo.tcpi_snd_mss;
// so rtx is the overestimation of actual packet retransmission rate
rtx = tcpinfo.tcpi_total_retrans / numSegmentOut;
} else {
rtx = -1;
}
#else
rtx = -1;
#endif // __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 17
validTcpinfo = true;
#else
tcpinfoErrno = EINVAL;
rtt = microseconds(-1);
rtx = -1;
#endif
return true;
}
int64_t TransportInfo::readRTT(const AsyncSocket* sock) {
#if defined(__linux__) || defined(__FreeBSD__)
struct tcp_info tcpinfo;
if (!TransportInfo::readTcpInfo(&tcpinfo, sock)) {
return -1;
}
return tcpinfo.tcpi_rtt;
#else
return -1;
#endif
}
#if defined(__linux__) || defined(__FreeBSD__)
bool TransportInfo::readTcpInfo(struct tcp_info* tcpinfo,
const AsyncSocket* sock) {
socklen_t len = sizeof(struct tcp_info);
if (!sock) {
return false;
}
if (getsockopt(sock->getFd(), IPPROTO_TCP,
TCP_INFO, (void*) tcpinfo, &len) < 0) {
VLOG(4) << "Error calling getsockopt(): " << strerror(errno);
return false;
}
return true;
}
#endif
} // folly
/*
* Copyright (c) 2015, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#pragma once
#include <folly/wangle/ssl/SSLUtil.h>
#include <chrono>
#include <netinet/tcp.h>
#include <string>
namespace folly {
class AsyncSocket;
/**
* A structure that encapsulates byte counters related to the HTTP headers.
*/
struct HTTPHeaderSize {
/**
* The number of bytes used to represent the header after compression or
* before decompression. If header compression is not supported, the value
* is set to 0.
*/
size_t compressed{0};
/**
* The number of bytes used to represent the serialized header before
* compression or after decompression, in plain-text format.
*/
size_t uncompressed{0};
};
struct TransportInfo {
/*
* timestamp of when the connection handshake was completed
*/
std::chrono::steady_clock::time_point acceptTime{};
/*
* connection RTT (Round-Trip Time)
*/
std::chrono::microseconds rtt{0};
/*
* the estimated ratio of packet retransmisions in current socket
*/
double rtx{-1};
#if defined(__linux__) || defined(__FreeBSD__)
/*
* TCP information as fetched from getsockopt(2)
*/
tcp_info tcpinfo {
#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 17
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 // 32
#else
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0 // 29
#endif // __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 17
};
#endif // defined(__linux__) || defined(__FreeBSD__)
/*
* time for setting the connection, from the moment in was accepted until it
* is established.
*/
std::chrono::milliseconds setupTime{0};
/*
* time for setting up the SSL connection or SSL handshake
*/
std::chrono::milliseconds sslSetupTime{0};
/*
* The name of the SSL ciphersuite used by the transaction's
* transport. Returns null if the transport is not SSL.
*/
std::shared_ptr<std::string> sslCipher{nullptr};
/*
* The SSL server name used by the transaction's
* transport. Returns null if the transport is not SSL.
*/
std::shared_ptr<std::string> sslServerName{nullptr};
/*
* list of ciphers sent by the client
*/
std::shared_ptr<std::string> sslClientCiphers{nullptr};
/*
* list of compression methods sent by the client
*/
std::shared_ptr<std::string> sslClientComprMethods{nullptr};
/*
* list of TLS extensions sent by the client
*/
std::shared_ptr<std::string> sslClientExts{nullptr};
/*
* hash of all the SSL parameters sent by the client
*/
std::shared_ptr<std::string> sslSignature{nullptr};
/*
* list of ciphers supported by the server
*/
std::shared_ptr<std::string> sslServerCiphers{nullptr};
/*
* guessed "(os) (browser)" based on SSL Signature
*/
std::shared_ptr<std::string> guessedUserAgent{nullptr};
/**
* The result of SSL NPN negotiation.
*/
std::shared_ptr<std::string> sslNextProtocol{nullptr};
/*
* total number of bytes sent over the connection
*/
int64_t totalBytes{0};
/**
* If the client passed through one of our L4 proxies (using PROXY Protocol),
* then this will contain the IP address of the proxy host.
*/
std::shared_ptr<folly::SocketAddress> clientAddrOriginal;
/**
* header bytes read
*/
HTTPHeaderSize ingressHeader;
/*
* header bytes written
*/
HTTPHeaderSize egressHeader;
/*
* Here is how the timeToXXXByte variables are planned out:
* 1. All timeToXXXByte variables are measuring the ByteEvent from reqStart_
* 2. You can get the timing between two ByteEvents by calculating their
* differences. For example:
* timeToLastBodyByteAck - timeToFirstByte
* => Total time to deliver the body
* 3. The calculation in point (2) is typically done outside acceptor
*
* Future plan:
* We should log the timestamps (TimePoints) and allow
* the consumer to calculate the latency whatever it
* wants instead of calculating them in wangle, for the sake of flexibility.
* For example:
* 1. TimePoint reqStartTimestamp;
* 2. TimePoint firstHeaderByteSentTimestamp;
* 3. TimePoint firstBodyByteTimestamp;
* 3. TimePoint lastBodyByteTimestamp;
* 4. TimePoint lastBodyByteAckTimestamp;
*/
/*
* time to first header byte written to the kernel send buffer
* NOTE: It is not 100% accurate since TAsyncSocket does not do
* do callback on partial write.
*/
int32_t timeToFirstHeaderByte{-1};
/*
* time to first body byte written to the kernel send buffer
*/
int32_t timeToFirstByte{-1};
/*
* time to last body byte written to the kernel send buffer
*/
int32_t timeToLastByte{-1};
/*
* time to TCP Ack received for the last written body byte
*/
int32_t timeToLastBodyByteAck{-1};
/*
* time it took the client to ACK the last byte, from the moment when the
* kernel sent the last byte to the client and until it received the ACK
* for that byte
*/
int32_t lastByteAckLatency{-1};
/*
* time spent inside wangle
*/
int32_t proxyLatency{-1};
/*
* time between connection accepted and client message headers completed
*/
int32_t clientLatency{-1};
/*
* latency for communication with the server
*/
int32_t serverLatency{-1};
/*
* time used to get a usable connection.
*/
int32_t connectLatency{-1};
/*
* body bytes written
*/
uint32_t egressBodySize{0};
/*
* value of errno in case of getsockopt() error
*/
int tcpinfoErrno{0};
/*
* bytes read & written during SSL Setup
*/
uint32_t sslSetupBytesWritten{0};
uint32_t sslSetupBytesRead{0};
/**
* SSL error detail
*/
uint32_t sslError{0};
/**
* body bytes read
*/
uint32_t ingressBodySize{0};
/*
* The SSL version used by the transaction's transport, in
* OpenSSL's format: 4 bits for the major version, followed by 4 bits
* for the minor version. Returns zero for non-SSL.
*/
uint16_t sslVersion{0};
/*
* The SSL certificate size.
*/
uint16_t sslCertSize{0};
/**
* response status code
*/
uint16_t statusCode{0};
/*
* The SSL mode for the transaction's transport: new session,
* resumed session, or neither (non-SSL).
*/
SSLResumeEnum sslResume{SSLResumeEnum::NA};
/*
* true if the tcpinfo was successfully read from the kernel
*/
bool validTcpinfo{false};
/*
* true if the connection is SSL, false otherwise
*/
bool ssl{false};
/*
* get the RTT value in milliseconds
*/
std::chrono::milliseconds getRttMs() const {
return std::chrono::duration_cast<std::chrono::milliseconds>(rtt);
}
/*
* initialize the fields related with tcp_info
*/
bool initWithSocket(const AsyncSocket* sock);
/*
* Get the kernel's estimate of round-trip time (RTT) to the transport's peer
* in microseconds. Returns -1 on error.
*/
static int64_t readRTT(const AsyncSocket* sock);
#if defined(__linux__) || defined(__FreeBSD__)
/*
* perform the getsockopt(2) syscall to fetch TCP info for a given socket
*/
static bool readTcpInfo(struct tcp_info* tcpinfo,
const AsyncSocket* sock);
#endif
};
} // folly
This diff is collapsed.
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/channel/Pipeline.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBaseManager.h>
namespace folly {
/*
* A thin wrapper around Pipeline and AsyncSocket to match
* ServerBootstrap. On connect() a new pipeline is created.
*/
template <typename Pipeline>
class ClientBootstrap {
class ConnectCallback : public AsyncSocket::ConnectCallback {
public:
ConnectCallback(Promise<Pipeline*> promise, ClientBootstrap* bootstrap)
: promise_(std::move(promise))
, bootstrap_(bootstrap) {}
void connectSuccess() noexcept override {
if (bootstrap_->getPipeline()) {
bootstrap_->getPipeline()->transportActive();
}
promise_.setValue(bootstrap_->getPipeline());
delete this;
}
void connectErr(const AsyncSocketException& ex) noexcept override {
promise_.setException(
folly::make_exception_wrapper<AsyncSocketException>(ex));
delete this;
}
private:
Promise<Pipeline*> promise_;
ClientBootstrap* bootstrap_;
};
public:
ClientBootstrap() {
}
ClientBootstrap* group(
std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group) {
group_ = group;
return this;
}
ClientBootstrap* bind(int port) {
port_ = port;
return this;
}
Future<Pipeline*> connect(SocketAddress address) {
DCHECK(pipelineFactory_);
auto base = EventBaseManager::get()->getEventBase();
if (group_) {
base = group_->getEventBase();
}
Future<Pipeline*> retval((Pipeline*)nullptr);
base->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
auto socket = AsyncSocket::newSocket(base);
Promise<Pipeline*> promise;
retval = promise.getFuture();
socket->connect(
new ConnectCallback(std::move(promise), this), address);
pipeline_ = pipelineFactory_->newPipeline(socket);
});
return retval;
}
ClientBootstrap* pipelineFactory(
std::shared_ptr<PipelineFactory<Pipeline>> factory) {
pipelineFactory_ = factory;
return this;
}
Pipeline* getPipeline() {
return pipeline_.get();
}
virtual ~ClientBootstrap() = default;
protected:
std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> pipeline_;
int port_;
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group_;
};
} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/acceptor/Acceptor.h>
#include <folly/wangle/bootstrap/ServerSocketFactory.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
#include <folly/wangle/acceptor/ManagedConnection.h>
#include <folly/wangle/channel/Pipeline.h>
#include <folly/wangle/channel/Handler.h>
namespace folly {
template <typename Pipeline>
class ServerAcceptor
: public Acceptor
, public folly::wangle::InboundHandler<void*> {
typedef std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor> PipelinePtr;
class ServerConnection : public wangle::ManagedConnection,
public wangle::PipelineManager {
public:
explicit ServerConnection(PipelinePtr pipeline)
: pipeline_(std::move(pipeline)) {
pipeline_->setPipelineManager(this);
}
~ServerConnection() = default;
void timeoutExpired() noexcept override {
}
void describe(std::ostream& os) const override {}
bool isBusy() const override {
return false;
}
void notifyPendingShutdown() override {}
void closeWhenIdle() override {}
void dropConnection() override {
delete this;
}
void dumpConnectionState(uint8_t loglevel) override {}
void deletePipeline(wangle::PipelineBase* p) override {
CHECK(p == pipeline_.get());
delete this;
}
private:
PipelinePtr pipeline_;
};
public:
explicit ServerAcceptor(
std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory,
std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline,
EventBase* base)
: Acceptor(ServerSocketConfig())
, base_(base)
, childPipelineFactory_(pipelineFactory)
, acceptorPipeline_(acceptorPipeline) {
Acceptor::init(nullptr, base_);
CHECK(acceptorPipeline_);
acceptorPipeline_->addBack(this);
acceptorPipeline_->finalize();
}
void read(Context* ctx, void* conn) {
AsyncSocket::UniquePtr transport((AsyncSocket*)conn);
std::unique_ptr<Pipeline,
folly::DelayedDestruction::Destructor>
pipeline(childPipelineFactory_->newPipeline(
std::shared_ptr<AsyncSocket>(
transport.release(),
folly::DelayedDestruction::Destructor())));
pipeline->transportActive();
auto connection = new ServerConnection(std::move(pipeline));
Acceptor::addConnection(connection);
}
/* See Acceptor::onNewConnection for details */
void onNewConnection(
AsyncSocket::UniquePtr transport, const SocketAddress* address,
const std::string& nextProtocolName, const TransportInfo& tinfo) {
acceptorPipeline_->read(transport.release());
}
// UDP thunk
void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept {
acceptorPipeline_->read(buf.release());
}
private:
EventBase* base_;
std::shared_ptr<PipelineFactory<Pipeline>> childPipelineFactory_;
std::shared_ptr<folly::wangle::Pipeline<void*>> acceptorPipeline_;
};
template <typename Pipeline>
class ServerAcceptorFactory : public AcceptorFactory {
public:
explicit ServerAcceptorFactory(
std::shared_ptr<PipelineFactory<Pipeline>> factory,
std::shared_ptr<PipelineFactory<folly::wangle::Pipeline<void*>>> pipeline)
: factory_(factory)
, pipeline_(pipeline) {}
std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
std::shared_ptr<folly::wangle::Pipeline<void*>> pipeline(
pipeline_->newPipeline(nullptr));
return std::make_shared<ServerAcceptor<Pipeline>>(factory_, pipeline, base);
}
private:
std::shared_ptr<PipelineFactory<Pipeline>> factory_;
std::shared_ptr<PipelineFactory<
folly::wangle::Pipeline<void*>>> pipeline_;
};
class ServerWorkerPool : public folly::wangle::ThreadPoolExecutor::Observer {
public:
explicit ServerWorkerPool(
std::shared_ptr<AcceptorFactory> acceptorFactory,
folly::wangle::IOThreadPoolExecutor* exec,
std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets,
std::shared_ptr<ServerSocketFactory> socketFactory)
: acceptorFactory_(acceptorFactory)
, exec_(exec)
, sockets_(sockets)
, socketFactory_(socketFactory) {
CHECK(exec);
}
template <typename F>
void forEachWorker(F&& f) const;
void threadStarted(
folly::wangle::ThreadPoolExecutor::ThreadHandle*);
void threadStopped(
folly::wangle::ThreadPoolExecutor::ThreadHandle*);
void threadPreviouslyStarted(
folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
threadStarted(thread);
}
void threadNotYetStopped(
folly::wangle::ThreadPoolExecutor::ThreadHandle* thread) {
threadStopped(thread);
}
private:
std::map<folly::wangle::ThreadPoolExecutor::ThreadHandle*,
std::shared_ptr<Acceptor>> workers_;
std::shared_ptr<AcceptorFactory> acceptorFactory_;
folly::wangle::IOThreadPoolExecutor* exec_{nullptr};
std::shared_ptr<std::vector<std::shared_ptr<folly::AsyncSocketBase>>> sockets_;
std::shared_ptr<ServerSocketFactory> socketFactory_;
};
template <typename F>
void ServerWorkerPool::forEachWorker(F&& f) const {
for (const auto& kv : workers_) {
f(kv.second.get());
}
}
class DefaultAcceptPipelineFactory
: public PipelineFactory<wangle::Pipeline<void*>> {
typedef wangle::Pipeline<void*> AcceptPipeline;
public:
std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
newPipeline(std::shared_ptr<AsyncSocket>) {
return std::unique_ptr<AcceptPipeline, folly::DelayedDestruction::Destructor>
(new AcceptPipeline);
}
};
} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/bootstrap/ServerBootstrap.h>
#include <folly/wangle/concurrent/NamedThreadFactory.h>
#include <folly/wangle/channel/Handler.h>
#include <folly/io/async/EventBaseManager.h>
namespace folly {
void ServerWorkerPool::threadStarted(
folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
auto worker = acceptorFactory_->newAcceptor(exec_->getEventBase(h));
workers_.insert({h, worker});
for(auto socket : *sockets_) {
socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[this, worker, socket](){
socketFactory_->addAcceptCB(
socket, worker.get(), worker->getEventBase());
});
}
}
void ServerWorkerPool::threadStopped(
folly::wangle::ThreadPoolExecutor::ThreadHandle* h) {
auto worker = workers_.find(h);
CHECK(worker != workers_.end());
for (auto socket : *sockets_) {
socket->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[&]() {
socketFactory_->removeAcceptCB(
socket, worker->second.get(), nullptr);
});
}
if (!worker->second->getEventBase()->isInEventBaseThread()) {
worker->second->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait(
[=]() {
worker->second->dropAllConnections();
});
} else {
worker->second->dropAllConnections();
}
workers_.erase(worker);
}
} // namespace
This diff is collapsed.
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/bootstrap/ServerBootstrap-inl.h>
#include <folly/io/async/AsyncServerSocket.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/io/async/AsyncUDPServerSocket.h>
namespace folly {
class ServerSocketFactory {
public:
virtual std::shared_ptr<AsyncSocketBase> newSocket(
int port, SocketAddress address, int backlog,
bool reuse, ServerSocketConfig& config) = 0;
virtual void stopSocket(
std::shared_ptr<AsyncSocketBase>& socket) = 0;
virtual void removeAcceptCB(std::shared_ptr<AsyncSocketBase> sock, Acceptor *callback, EventBase* base) = 0;
virtual void addAcceptCB(std::shared_ptr<AsyncSocketBase> sock, Acceptor* callback, EventBase* base) = 0 ;
virtual ~ServerSocketFactory() = default;
};
class AsyncServerSocketFactory : public ServerSocketFactory {
public:
std::shared_ptr<AsyncSocketBase> newSocket(
int port, SocketAddress address, int backlog, bool reuse,
ServerSocketConfig& config) {
auto socket = folly::AsyncServerSocket::newSocket();
socket->setReusePortEnabled(reuse);
socket->attachEventBase(EventBaseManager::get()->getEventBase());
if (port >= 0) {
socket->bind(port);
} else {
socket->bind(address);
}
socket->listen(config.acceptBacklog);
socket->startAccepting();
return socket;
}
virtual void stopSocket(
std::shared_ptr<AsyncSocketBase>& s) {
auto socket = std::dynamic_pointer_cast<AsyncServerSocket>(s);
DCHECK(socket);
socket->stopAccepting();
socket->detachEventBase();
}
virtual void removeAcceptCB(std::shared_ptr<AsyncSocketBase> s,
Acceptor *callback, EventBase* base) {
auto socket = std::dynamic_pointer_cast<AsyncServerSocket>(s);
CHECK(socket);
socket->removeAcceptCallback(callback, base);
}
virtual void addAcceptCB(std::shared_ptr<AsyncSocketBase> s,
Acceptor* callback, EventBase* base) {
auto socket = std::dynamic_pointer_cast<AsyncServerSocket>(s);
CHECK(socket);
socket->addAcceptCallback(callback, base);
}
};
class AsyncUDPServerSocketFactory : public ServerSocketFactory {
public:
std::shared_ptr<AsyncSocketBase> newSocket(
int port, SocketAddress address, int backlog, bool reuse,
ServerSocketConfig& config) {
auto socket = std::make_shared<AsyncUDPServerSocket>(
EventBaseManager::get()->getEventBase());
socket->setReusePort(reuse);
if (port >= 0) {
SocketAddress addressr("::1", port);
socket->bind(addressr);
} else {
socket->bind(address);
}
socket->listen();
return socket;
}
virtual void stopSocket(
std::shared_ptr<AsyncSocketBase>& s) {
auto socket = std::dynamic_pointer_cast<AsyncUDPServerSocket>(s);
DCHECK(socket);
socket->close();
}
virtual void removeAcceptCB(std::shared_ptr<AsyncSocketBase> s,
Acceptor *callback, EventBase* base) {
}
virtual void addAcceptCB(std::shared_ptr<AsyncSocketBase> s,
Acceptor* callback, EventBase* base) {
auto socket = std::dynamic_pointer_cast<AsyncUDPServerSocket>(s);
DCHECK(socket);
socket->addListener(base, callback);
}
};
} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/channel/Handler.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
namespace folly { namespace wangle {
// This handler may only be used in a single Pipeline
class AsyncSocketHandler
: public folly::wangle::BytesToBytesHandler,
public AsyncSocket::ReadCallback {
public:
explicit AsyncSocketHandler(
std::shared_ptr<AsyncSocket> socket)
: socket_(std::move(socket)) {}
AsyncSocketHandler(AsyncSocketHandler&&) = default;
~AsyncSocketHandler() {
detachReadCallback();
}
void attachReadCallback() {
socket_->setReadCB(socket_->good() ? this : nullptr);
}
void detachReadCallback() {
if (socket_ && socket_->getReadCallback() == this) {
socket_->setReadCB(nullptr);
}
auto ctx = getContext();
if (ctx && !firedInactive_) {
firedInactive_ = true;
ctx->fireTransportInactive();
}
}
void attachEventBase(folly::EventBase* eventBase) {
if (eventBase && !socket_->getEventBase()) {
socket_->attachEventBase(eventBase);
}
}
void detachEventBase() {
detachReadCallback();
if (socket_->getEventBase()) {
socket_->detachEventBase();
}
}
void transportActive(Context* ctx) override {
ctx->getPipeline()->setTransport(socket_);
attachReadCallback();
ctx->fireTransportActive();
}
void detachPipeline(Context* ctx) override {
detachReadCallback();
}
folly::Future<Unit> write(
Context* ctx,
std::unique_ptr<folly::IOBuf> buf) override {
if (UNLIKELY(!buf)) {
return folly::makeFuture();
}
if (!socket_->good()) {
VLOG(5) << "socket is closed in write()";
return folly::makeFuture<Unit>(AsyncSocketException(
AsyncSocketException::AsyncSocketExceptionType::NOT_OPEN,
"socket is closed in write()"));
}
auto cb = new WriteCallback();
auto future = cb->promise_.getFuture();
socket_->writeChain(cb, std::move(buf), ctx->getWriteFlags());
return future;
};
folly::Future<Unit> close(Context* ctx) override {
if (socket_) {
detachReadCallback();
socket_->closeNow();
}
ctx->getPipeline()->deletePipeline();
return folly::makeFuture();
}
// Must override to avoid warnings about hidden overloaded virtual due to
// AsyncSocket::ReadCallback::readEOF()
void readEOF(Context* ctx) override {
ctx->fireReadEOF();
}
void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
const auto readBufferSettings = getContext()->getReadBufferSettings();
const auto ret = bufQueue_.preallocate(
readBufferSettings.first,
readBufferSettings.second);
*bufReturn = ret.first;
*lenReturn = ret.second;
}
void readDataAvailable(size_t len) noexcept override {
bufQueue_.postallocate(len);
getContext()->fireRead(bufQueue_);
}
void readEOF() noexcept override {
getContext()->fireReadEOF();
}
void readErr(const AsyncSocketException& ex)
noexcept override {
getContext()->fireReadException(
make_exception_wrapper<AsyncSocketException>(ex));
}
private:
class WriteCallback : private AsyncSocket::WriteCallback {
void writeSuccess() noexcept override {
promise_.setValue();
delete this;
}
void writeErr(size_t bytesWritten,
const AsyncSocketException& ex)
noexcept override {
promise_.setException(ex);
delete this;
}
private:
friend class AsyncSocketHandler;
folly::Promise<Unit> promise_;
};
folly::IOBufQueue bufQueue_{folly::IOBufQueue::cacheChainLength()};
std::shared_ptr<AsyncSocket> socket_{nullptr};
bool firedInactive_{false};
};
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace wangle {
class EventBaseHandler : public OutboundBytesToBytesHandler {
public:
folly::Future<Unit> write(
Context* ctx,
std::unique_ptr<folly::IOBuf> buf) override {
folly::Future<Unit> retval;
DCHECK(ctx->getTransport());
DCHECK(ctx->getTransport()->getEventBase());
ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
retval = ctx->fireWrite(std::move(buf));
});
return retval;
}
Future<Unit> close(Context* ctx) override {
DCHECK(ctx->getTransport());
DCHECK(ctx->getTransport()->getEventBase());
Future<Unit> retval;
ctx->getTransport()->getEventBase()->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
retval = ctx->fireClose();
});
return retval;
}
};
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/channel/FileRegion.h>
using namespace folly;
using namespace folly::wangle;
namespace {
struct FileRegionReadPool {};
Singleton<IOThreadPoolExecutor, FileRegionReadPool> readPool(
[]{
return new IOThreadPoolExecutor(
sysconf(_SC_NPROCESSORS_ONLN),
std::make_shared<NamedThreadFactory>("FileRegionReadPool"));
});
}
namespace folly { namespace wangle {
FileRegion::FileWriteRequest::FileWriteRequest(AsyncSocket* socket,
WriteCallback* callback, int fd, off_t offset, size_t count)
: WriteRequest(socket, callback),
readFd_(fd), offset_(offset), count_(count) {
}
void FileRegion::FileWriteRequest::destroy() {
readBase_->runInEventBaseThread([this]{
delete this;
});
}
bool FileRegion::FileWriteRequest::performWrite() {
if (!started_) {
start();
return true;
}
int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
ssize_t spliced = ::splice(pipe_out_, nullptr,
socket_->getFd(), nullptr,
bytesInPipe_, flags);
if (spliced == -1) {
if (errno == EAGAIN) {
return true;
}
return false;
}
bytesInPipe_ -= spliced;
bytesWritten(spliced);
return true;
}
void FileRegion::FileWriteRequest::consume() {
// do nothing
}
bool FileRegion::FileWriteRequest::isComplete() {
return totalBytesWritten_ == count_;
}
void FileRegion::FileWriteRequest::messageAvailable(size_t&& count) {
bool shouldWrite = bytesInPipe_ == 0;
bytesInPipe_ += count;
if (shouldWrite) {
socket_->writeRequestReady();
}
}
#ifdef __GLIBC__
# if (__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9))
# define GLIBC_AT_LEAST_2_9 1
# endif
#endif
void FileRegion::FileWriteRequest::start() {
started_ = true;
readBase_ = readPool.get()->getEventBase();
readBase_->runInEventBaseThread([this]{
auto flags = fcntl(readFd_, F_GETFL);
if (flags == -1) {
fail(__func__, AsyncSocketException(
AsyncSocketException::INTERNAL_ERROR,
"fcntl F_GETFL failed", errno));
return;
}
flags &= O_ACCMODE;
if (flags == O_WRONLY) {
fail(__func__, AsyncSocketException(
AsyncSocketException::BAD_ARGS, "file not open for reading"));
return;
}
#ifndef GLIBC_AT_LEAST_2_9
fail(__func__, AsyncSocketException(
AsyncSocketException::NOT_SUPPORTED,
"writeFile unsupported on glibc < 2.9"));
return;
#else
int pipeFds[2];
if (::pipe2(pipeFds, O_NONBLOCK) == -1) {
fail(__func__, AsyncSocketException(
AsyncSocketException::INTERNAL_ERROR,
"pipe2 failed", errno));
return;
}
#ifdef F_SETPIPE_SZ
// Max size for unprevileged processes as set in /proc/sys/fs/pipe-max-size
// Ignore failures and just roll with it
// TODO maybe read max size from /proc?
fcntl(pipeFds[0], F_SETPIPE_SZ, 1048576);
fcntl(pipeFds[1], F_SETPIPE_SZ, 1048576);
#endif
pipe_out_ = pipeFds[0];
socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
startConsuming(socket_->getEventBase(), &queue_);
});
readHandler_ = folly::make_unique<FileReadHandler>(
this, pipeFds[1], count_);
#endif
});
}
FileRegion::FileWriteRequest::~FileWriteRequest() {
CHECK(readBase_->isInEventBaseThread());
socket_->getEventBase()->runInEventBaseThreadAndWait([&]{
stopConsuming();
if (pipe_out_ > -1) {
::close(pipe_out_);
}
});
}
void FileRegion::FileWriteRequest::fail(
const char* fn,
const AsyncSocketException& ex) {
socket_->getEventBase()->runInEventBaseThread([=]{
WriteRequest::fail(fn, ex);
});
}
FileRegion::FileWriteRequest::FileReadHandler::FileReadHandler(
FileWriteRequest* req, int pipe_in, size_t bytesToRead)
: req_(req), pipe_in_(pipe_in), bytesToRead_(bytesToRead) {
CHECK(req_->readBase_->isInEventBaseThread());
initHandler(req_->readBase_, pipe_in);
if (!registerHandler(EventFlags::WRITE | EventFlags::PERSIST)) {
req_->fail(__func__, AsyncSocketException(
AsyncSocketException::INTERNAL_ERROR,
"registerHandler failed"));
}
}
FileRegion::FileWriteRequest::FileReadHandler::~FileReadHandler() {
CHECK(req_->readBase_->isInEventBaseThread());
unregisterHandler();
::close(pipe_in_);
}
void FileRegion::FileWriteRequest::FileReadHandler::handlerReady(
uint16_t events) noexcept {
CHECK(events & EventHandler::WRITE);
if (bytesToRead_ == 0) {
unregisterHandler();
return;
}
int flags = SPLICE_F_NONBLOCK | SPLICE_F_MORE;
ssize_t spliced = ::splice(req_->readFd_, &req_->offset_,
pipe_in_, nullptr,
bytesToRead_, flags);
if (spliced == -1) {
if (errno == EAGAIN) {
return;
} else {
req_->fail(__func__, AsyncSocketException(
AsyncSocketException::INTERNAL_ERROR,
"splice failed", errno));
return;
}
}
if (spliced > 0) {
bytesToRead_ -= spliced;
try {
req_->queue_.putMessage(static_cast<size_t>(spliced));
} catch (...) {
req_->fail(__func__, AsyncSocketException(
AsyncSocketException::INTERNAL_ERROR,
"putMessage failed"));
return;
}
}
}
}} // folly::wangle
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Singleton.h>
#include <folly/io/async/AsyncTransport.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/NotificationQueue.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
namespace folly { namespace wangle {
class FileRegion {
public:
FileRegion(int fd, off_t offset, size_t count)
: fd_(fd), offset_(offset), count_(count) {}
Future<Unit> transferTo(std::shared_ptr<AsyncTransport> transport) {
auto socket = std::dynamic_pointer_cast<AsyncSocket>(
transport);
CHECK(socket);
auto cb = new WriteCallback();
auto f = cb->promise_.getFuture();
auto req = new FileWriteRequest(socket.get(), cb, fd_, offset_, count_);
socket->writeRequest(req);
return f;
}
private:
class WriteCallback : private AsyncSocket::WriteCallback {
void writeSuccess() noexcept override {
promise_.setValue();
delete this;
}
void writeErr(size_t bytesWritten,
const AsyncSocketException& ex)
noexcept override {
promise_.setException(ex);
delete this;
}
friend class FileRegion;
folly::Promise<Unit> promise_;
};
const int fd_;
const off_t offset_;
const size_t count_;
class FileWriteRequest : public AsyncSocket::WriteRequest,
public NotificationQueue<size_t>::Consumer {
public:
FileWriteRequest(AsyncSocket* socket, WriteCallback* callback,
int fd, off_t offset, size_t count);
void destroy() override;
bool performWrite() override;
void consume() override;
bool isComplete() override;
void messageAvailable(size_t&& count) override;
void start() override;
class FileReadHandler : public folly::EventHandler {
public:
FileReadHandler(FileWriteRequest* req, int pipe_in, size_t bytesToRead);
~FileReadHandler();
void handlerReady(uint16_t events) noexcept override;
private:
FileWriteRequest* req_;
int pipe_in_;
size_t bytesToRead_;
};
private:
~FileWriteRequest();
void fail(const char* fn, const AsyncSocketException& ex);
const int readFd_;
off_t offset_;
const size_t count_;
bool started_{false};
int pipe_out_{-1};
size_t bytesInPipe_{0};
folly::EventBase* readBase_;
folly::NotificationQueue<size_t> queue_;
std::unique_ptr<FileReadHandler> readHandler_;
};
};
}} // folly::wangle
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/futures/Future.h>
#include <folly/wangle/channel/Pipeline.h>
#include <folly/io/IOBuf.h>
#include <folly/io/IOBufQueue.h>
namespace folly { namespace wangle {
template <class Context>
class HandlerBase {
public:
virtual ~HandlerBase() = default;
virtual void attachPipeline(Context* ctx) {}
virtual void detachPipeline(Context* ctx) {}
Context* getContext() {
if (attachCount_ != 1) {
return nullptr;
}
CHECK(ctx_);
return ctx_;
}
private:
friend PipelineContext;
uint64_t attachCount_{0};
Context* ctx_{nullptr};
};
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class Handler : public HandlerBase<HandlerContext<Rout, Wout>> {
public:
static const HandlerDir dir = HandlerDir::BOTH;
typedef Rin rin;
typedef Rout rout;
typedef Win win;
typedef Wout wout;
typedef HandlerContext<Rout, Wout> Context;
virtual ~Handler() = default;
virtual void read(Context* ctx, Rin msg) = 0;
virtual void readEOF(Context* ctx) {
ctx->fireReadEOF();
}
virtual void readException(Context* ctx, exception_wrapper e) {
ctx->fireReadException(std::move(e));
}
virtual void transportActive(Context* ctx) {
ctx->fireTransportActive();
}
virtual void transportInactive(Context* ctx) {
ctx->fireTransportInactive();
}
virtual Future<Unit> write(Context* ctx, Win msg) = 0;
virtual Future<Unit> close(Context* ctx) {
return ctx->fireClose();
}
/*
// Other sorts of things we might want, all shamelessly stolen from Netty
// inbound
virtual void exceptionCaught(
HandlerContext* ctx,
exception_wrapper e) {}
virtual void channelRegistered(HandlerContext* ctx) {}
virtual void channelUnregistered(HandlerContext* ctx) {}
virtual void channelReadComplete(HandlerContext* ctx) {}
virtual void userEventTriggered(HandlerContext* ctx, void* evt) {}
virtual void channelWritabilityChanged(HandlerContext* ctx) {}
// outbound
virtual Future<Unit> bind(
HandlerContext* ctx,
SocketAddress localAddress) {}
virtual Future<Unit> connect(
HandlerContext* ctx,
SocketAddress remoteAddress, SocketAddress localAddress) {}
virtual Future<Unit> disconnect(HandlerContext* ctx) {}
virtual Future<Unit> deregister(HandlerContext* ctx) {}
virtual Future<Unit> read(HandlerContext* ctx) {}
virtual void flush(HandlerContext* ctx) {}
*/
};
template <class Rin, class Rout = Rin>
class InboundHandler : public HandlerBase<InboundHandlerContext<Rout>> {
public:
static const HandlerDir dir = HandlerDir::IN;
typedef Rin rin;
typedef Rout rout;
typedef Unit win;
typedef Unit wout;
typedef InboundHandlerContext<Rout> Context;
virtual ~InboundHandler() = default;
virtual void read(Context* ctx, Rin msg) = 0;
virtual void readEOF(Context* ctx) {
ctx->fireReadEOF();
}
virtual void readException(Context* ctx, exception_wrapper e) {
ctx->fireReadException(std::move(e));
}
virtual void transportActive(Context* ctx) {
ctx->fireTransportActive();
}
virtual void transportInactive(Context* ctx) {
ctx->fireTransportInactive();
}
};
template <class Win, class Wout = Win>
class OutboundHandler : public HandlerBase<OutboundHandlerContext<Wout>> {
public:
static const HandlerDir dir = HandlerDir::OUT;
typedef Unit rin;
typedef Unit rout;
typedef Win win;
typedef Wout wout;
typedef OutboundHandlerContext<Wout> Context;
virtual ~OutboundHandler() = default;
virtual Future<Unit> write(Context* ctx, Win msg) = 0;
virtual Future<Unit> close(Context* ctx) {
return ctx->fireClose();
}
};
template <class R, class W = R>
class HandlerAdapter : public Handler<R, R, W, W> {
public:
typedef typename Handler<R, R, W, W>::Context Context;
void read(Context* ctx, R msg) override {
ctx->fireRead(std::forward<R>(msg));
}
Future<Unit> write(Context* ctx, W msg) override {
return ctx->fireWrite(std::forward<W>(msg));
}
};
typedef HandlerAdapter<IOBufQueue&, std::unique_ptr<IOBuf>>
BytesToBytesHandler;
typedef InboundHandler<IOBufQueue&, std::unique_ptr<IOBuf>>
InboundBytesToBytesHandler;
typedef OutboundHandler<std::unique_ptr<IOBuf>>
OutboundBytesToBytesHandler;
}}
This diff is collapsed.
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/io/async/AsyncTransport.h>
#include <folly/futures/Future.h>
#include <folly/ExceptionWrapper.h>
namespace folly { namespace wangle {
class PipelineBase;
template <class In, class Out>
class HandlerContext {
public:
virtual ~HandlerContext() = default;
virtual void fireRead(In msg) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
virtual void fireTransportActive() = 0;
virtual void fireTransportInactive() = 0;
virtual Future<Unit> fireWrite(Out msg) = 0;
virtual Future<Unit> fireClose() = 0;
virtual PipelineBase* getPipeline() = 0;
std::shared_ptr<AsyncTransport> getTransport() {
return getPipeline()->getTransport();
}
virtual void setWriteFlags(WriteFlags flags) = 0;
virtual WriteFlags getWriteFlags() = 0;
virtual void setReadBufferSettings(
uint64_t minAvailable,
uint64_t allocationSize) = 0;
virtual std::pair<uint64_t, uint64_t> getReadBufferSettings() = 0;
/* TODO
template <class H>
virtual void addHandlerBefore(H&&) {}
template <class H>
virtual void addHandlerAfter(H&&) {}
template <class H>
virtual void replaceHandler(H&&) {}
virtual void removeHandler() {}
*/
};
template <class In>
class InboundHandlerContext {
public:
virtual ~InboundHandlerContext() = default;
virtual void fireRead(In msg) = 0;
virtual void fireReadEOF() = 0;
virtual void fireReadException(exception_wrapper e) = 0;
virtual void fireTransportActive() = 0;
virtual void fireTransportInactive() = 0;
virtual PipelineBase* getPipeline() = 0;
std::shared_ptr<AsyncTransport> getTransport() {
return getPipeline()->getTransport();
}
// TODO Need get/set writeFlags, readBufferSettings? Probably not.
// Do we even really need them stored in the pipeline at all?
// Could just always delegate to the socket impl
};
template <class Out>
class OutboundHandlerContext {
public:
virtual ~OutboundHandlerContext() = default;
virtual Future<Unit> fireWrite(Out msg) = 0;
virtual Future<Unit> fireClose() = 0;
virtual PipelineBase* getPipeline() = 0;
std::shared_ptr<AsyncTransport> getTransport() {
return getPipeline()->getTransport();
}
};
enum class HandlerDir {
IN,
OUT,
BOTH
};
}} // folly::wangle
#include <folly/wangle/channel/HandlerContext-inl.h>
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/channel/Handler.h>
#include <gmock/gmock.h>
namespace folly { namespace wangle {
template <class Rin, class Rout = Rin, class Win = Rout, class Wout = Rin>
class MockHandler : public Handler<Rin, Rout, Win, Wout> {
public:
typedef typename Handler<Rin, Rout, Win, Wout>::Context Context;
MockHandler() = default;
MockHandler(MockHandler&&) = default;
MOCK_METHOD2_T(read_, void(Context*, Rin&));
MOCK_METHOD1_T(readEOF, void(Context*));
MOCK_METHOD2_T(readException, void(Context*, exception_wrapper));
MOCK_METHOD2_T(write_, void(Context*, Win&));
MOCK_METHOD1_T(close_, void(Context*));
MOCK_METHOD1_T(attachPipeline, void(Context*));
MOCK_METHOD1_T(attachTransport, void(Context*));
MOCK_METHOD1_T(detachPipeline, void(Context*));
MOCK_METHOD1_T(detachTransport, void(Context*));
void read(Context* ctx, Rin msg) override {
read_(ctx, msg);
}
Future<Unit> write(Context* ctx, Win msg) override {
return makeFutureWith([&](){
write_(ctx, msg);
});
}
Future<Unit> close(Context* ctx) override {
return makeFutureWith([&](){
close_(ctx);
});
}
};
template <class R, class W = R>
using MockHandlerAdapter = MockHandler<R, R, W, W>;
}}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment