Commit 150a10cf authored by Dongyi Ye's avatar Dongyi Ye Committed by Facebook Github Bot

Add an option to use clientAddress hash to despatch UDP packets.

Summary:
Currently udp socket send packet to listener in a round robin fashion. This will cause issue if there are more than one packet in one "session". Because packets from one session could be routed to two different listeners.

This diff added a packet dispatch option which use clientAddress (address family + ip + port) hashing. In this way, the packets from same client will always routed to same listener when the total number of listener is consistent.

Reviewed By: yfeldblum

Differential Revision: D13994665

fbshipit-source-id: e450c2fcbc95c55cb37cb5cda13c5df6d8119812
parent 35b194e8
......@@ -76,6 +76,8 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
virtual ~Callback() = default;
};
enum class DispatchMechanism { RoundRobin, ClientAddressHash };
/**
* Create a new UDP server socket
*
......@@ -83,8 +85,11 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
* If packet are larger than this value, as per UDP protocol, remaining data
* is dropped and you get `truncated = true` in onDataAvailable callback
*/
explicit AsyncUDPServerSocket(EventBase* evb, size_t sz = 1500)
: evb_(evb), packetSize_(sz), nextListener_(0) {}
explicit AsyncUDPServerSocket(
EventBase* evb,
size_t sz = 1500,
DispatchMechanism dm = DispatchMechanism::RoundRobin)
: evb_(evb), packetSize_(sz), dispatchMechanism_(dm), nextListener_(0) {}
~AsyncUDPServerSocket() override {
if (socket_) {
......@@ -198,12 +203,30 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
return;
}
if (nextListener_ >= listeners_.size()) {
nextListener_ = 0;
uint32_t listenerId = 0;
uint64_t client_hash_lo = 0;
switch (dispatchMechanism_) {
case DispatchMechanism::ClientAddressHash:
// Hash base on clientAddress.
// 1. This logic is samilar to: clientAddress.hash() % listeners_.size()
// But runs faster as it use multiply and shift instead of division.
// 2. Only use the lower 32 bit from the address hash result for faster
// computation.
client_hash_lo = static_cast<uint32_t>(clientAddress.hash());
listenerId = (client_hash_lo * listeners_.size()) >> 32;
break;
case DispatchMechanism::RoundRobin: // round robin is default.
default:
if (nextListener_ >= listeners_.size()) {
nextListener_ = 0;
}
listenerId = nextListener_;
++nextListener_;
break;
}
auto client = clientAddress;
auto callback = listeners_[nextListener_].second;
auto callback = listeners_[listenerId].second;
auto socket = socket_;
// Schedule it in the listener's eventbase
......@@ -216,8 +239,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
callback->onDataAvailable(socket, client, std::move(data), truncated);
};
listeners_[nextListener_].first->runInEventBaseThread(std::move(f));
++nextListener_;
listeners_[listenerId].first->runInEventBaseThread(std::move(f));
}
void onReadError(const AsyncSocketException& ex) noexcept override {
......@@ -245,6 +267,8 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
typedef std::pair<EventBase*, Callback*> Listener;
std::vector<Listener> listeners_;
DispatchMechanism dispatchMechanism_;
// Next listener to send packet to
uint32_t nextListener_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment