Commit 842ecea5 authored by Brandon Schlinker's avatar Brandon Schlinker Committed by Facebook GitHub Bot

ByteEvent (socket timestamp) foundation

Summary:
Adding support for write and socket timestamps by introducing `ByteEvent` that can be delivered to observers.
`AsyncTransport::WriteFlags` has long had timestamping related flags, such as `TIMESTAMP_TX`, but the code required to act on these flags only existed in proxygen. This diff generalizes the approach so that it works for other use cases of `AsyncSocket`.

This diff is long, but much of it is unit tests designed to prevent regressions given the trickiness of socket timestamps and `ByteEvent`.

**Each `ByteEvent` contains:**
- Type (WRITE, SCHED, TX, ACK)
- Byte stream offset that the timestamp is for (relative to the raw byte stream, which means after SSL in the case of AsyncSSLSocket)
- `steady_clock` timestamp recorded by AsyncSocket when generating the `ByteEvent`
- For SCHED, TX, and ACK events, if available, hardware and software (kernel) timestamps

**How `ByteEvent` are used:**
- Support is enabled when an observer is attached with the `byteEvents` config flag set. If the socket does not support timestamps, the observer is notified through the `byteEventsUnavailable` callback. Otherwise, `byteEventsEnabled` is called
- When the application writes to a socket with `ByteEvent` support enabled and a relevant `WriteFlag`, SCHED/TX/ACK `ByteEvent` are requested from the kernel, and WRITE `ByteEvent` are generated by the socket for the *last byte* in the write.
  - If the entire write buffer cannot be written at once, then additional `ByteEvent` will also be generated for the last byte in each write.
  - This means that if the application wants to timestamp a specific byte, it must break up the write buffer before handing it to `AsyncSocket` such that the byte to timestamp is the last byte in the write buffer.
- When socket timestamps arrive from the kernel via the socket error queue, they are transformed into `ByteEvent` and passed to observers

**Caveats:**
1. Socket timestamps received from the kernel contain the byte's offset in the write stream. This counter is a `uint32_t`, and thus rolls over every ~4GB. When transforming raw timestamp into `ByteEvent`, we correct for this and transform the raw offset into an offset relative to the raw byte offset stored by `AsyncSocket` (returned via `getRawBytesWritten()`).
2. At the moment, a read callback must be installed to receive socket timestamps due to epoll's behavior. I will correct this with a patch to epoll, see https://github.com/libevent/libevent/issues/1038#issuecomment-703315425 for details
3. If a msghdr's ancillary data contains a timestamping flag (such as `SOF_TIMESTAMPING_TX_SOFTWARE`), the kernel will enqueue a socket error message containing the byte offset of the write ( `SO_EE_ORIGIN_TIMESTAMPING`) even if timestamping has not been enabled by an associated call to `setsockopt`. This creates a problem:
    1. If an application was to use a timestamp `WriteFlags` such as `TIMESTAMP_TX` without enabling timestamping, and if `AsyncSocket` transformed such `WriteFlags` to ancillary data by default, it could create a situation where epoll continues to return `EV_READ` (due to items in the socket error queue), but `AsyncSocket` would not fetch anything from the socket error queue.
    2. To prevent this scenario, `WriteFlags` related to timestamping are not translated into msghdr ancillary data unless timestamping is enabled. This required adding a boolean to `getAncillaryData` and `getAncillaryDataSize`.

Differential Revision: D24094832

fbshipit-source-id: e3bec730ddd1fc1696023d8c982ae02ab9b5fb7d
parent 220215e9
......@@ -1861,6 +1861,18 @@ int AsyncSSLSocket::sslVerifyCallback(
return 1;
}
void AsyncSSLSocket::enableByteEvents() {
if (getSSLVersion() == SSL3_VERSION || getSSLVersion() == TLS1_VERSION) {
// Socket timestamping can cause us to split up TLS records in a way that
// breaks some old Android (<= 3.0) clients.
return failByteEvents(AsyncSocketException(
AsyncSocketException::NOT_SUPPORTED,
withAddr("failed to enable byte events: "
"not supported for SSLv3 or TLSv1")));
}
AsyncSocket::enableByteEvents();
}
void AsyncSSLSocket::enableClientHelloParsing() {
parseClientHello_ = true;
clientHelloInfo_ = std::make_unique<ssl::ClientHelloInfo>();
......
......@@ -382,6 +382,18 @@ class AsyncSSLSocket : public AsyncSocket {
void setEorTracking(bool track) override;
size_t getRawBytesWritten() const override;
size_t getRawBytesReceived() const override;
// End of methods inherited from AsyncTransport
/**
* Enable ByteEvents for this socket.
*
* ByteEvents cannot be enabled if TLS 1.0 or earlier is in use, as these
* client implementations often have trouble handling cases where a TLS
* record is split across multiple packets.
*/
void enableByteEvents() override;
void enableClientHelloParsing();
/**
......
This diff is collapsed.
......@@ -215,30 +215,41 @@ class AsyncSocket : public AsyncTransport {
}
/**
* getAncillaryData() will be invoked to initialize ancillary data
* buffer referred by "msg_control" field of msghdr structure passed to
* ::sendmsg() system call based on the flags set in the passed
* folly::WriteFlags enum. Some flags in folly::WriteFlags are not relevant
* during this process. The function assumes that the size of buffer
* is not smaller than the value returned by getAncillaryDataSize() method
* for the same combination of flags.
* getAncillaryData() will be invoked to initialize ancillary data buffer
* referred by "msg_control" field of msghdr structure passed to ::sendmsg()
* system call based on the flags set in the passed folly::WriteFlags enum.
*
* Some flags in folly::WriteFlags are not relevant during this process;
* the default implementation only handles timestamping flags.
*
* The function requires that the size of buffer passed is equal to the
* value returned by getAncillaryDataSize() method for the same combination
* of flags.
*
* @param flags Write flags requested for the given write operation
* @param data Pointer to ancillary data buffer to initialize.
* @param byteEventsEnabled If byte events are enabled for this socket.
* When enabled, flags relevant to socket
* timestamps (e.g., TIMESTAMP_TX) should be
* included in ancillary (msg_control) data.
*/
virtual void getAncillaryData(
folly::WriteFlags /*flags*/, void* /*data*/) noexcept {}
folly::WriteFlags flags,
void* data,
const bool byteEventsEnabled = false) noexcept;
/**
* getAncillaryDataSize() will be invoked to retrieve the size of
* ancillary data buffer which should be passed to ::sendmsg() system call
*
* @param flags Write flags requested for the given write operation
* @param byteEventsEnabled If byte events are enabled for this socket.
* When enabled, flags relevant to socket
* timestamps (e.g., TIMESTAMP_TX) should be
* included in ancillary (msg_control) data.
*/
virtual uint32_t getAncillaryDataSize(
folly::WriteFlags /*flags*/) noexcept {
return 0;
}
folly::WriteFlags flags, const bool byteEventsEnabled = false) noexcept;
static const size_t maxAncillaryDataSize{0x5000};
......@@ -269,6 +280,72 @@ class AsyncSocket : public AsyncTransport {
int getDefaultFlags(folly::WriteFlags flags, bool zeroCopyEnabled) noexcept;
};
/**
* Container with state and processing logic for ByteEvents.
*/
struct ByteEventHelper {
bool byteEventsEnabled{false};
size_t rawBytesWrittenWhenByteEventsEnabled{0};
folly::Optional<AsyncSocketException> maybeEx;
/**
* Process a Cmsg and return a ByteEvent if available.
*
* The kernel will pass two cmsg for each timestamp:
* 1. ScmTimestamping: Software / Hardware Timestamps.
* 2. SockExtendedErrTimestamping: Byte offset associated with timestamp.
*
* These messages will be passed back-to-back; processCmsg() can handle them
* in any order (1 then 2, or 2 then 1), as long the order is consistent
* across timestamps.
*
* processCmsg() gracefully ignores Cmsg unrelated to socket timestamps, but
* will throw if it receives a sequence of Cmsg that are not compliant with
* its expectations.
*
* @return If the helper has received all components required to generate a
* ByteEvent (e.g., ScmTimestamping and SockExtendedErrTimestamping
* messages), it returns a ByteEvent and clears its local state.
* Otherwise, returns an empty optional.
*
* If the helper has previously thrown a ByteEventHelper::Exception,
* it will not process further Cmsg and will continiously return an
* empty optional.
*
* @throw If the helper receives a sequence of Cmsg that violate its
* expectations (e.g., multiple ScmTimestamping messages in a row
* without corresponding SockExtendedErrTimestamping messages), it
* throws a ByteEventHelper::Exception. Subsequent calls will return
* an empty optional.
*/
folly::Optional<ByteEvent> processCmsg(
const cmsghdr& cmsg, const size_t rawBytesWritten);
/**
* Exception class thrown by processCmsg.
*
* ByteEventHelper does not know the socket address and thus cannot
* construct a AsyncSocketException. Instead, ByteEventHelper throws a
* custom Exception and AsyncSocket rewraps it as an AsyncSocketException.
*/
class Exception : public std::runtime_error {
using std::runtime_error::runtime_error;
};
private:
// state, reinitialized each time a complete timestamp is processed
struct TimestampState {
bool serrReceived{false};
uint32_t typeRaw{0};
uint32_t byteOffsetKernel{0};
bool scmTsReceived{false};
folly::Optional<std::chrono::nanoseconds> maybeSoftwareTs;
folly::Optional<std::chrono::nanoseconds> maybeHardwareTs;
};
folly::Optional<TimestampState> maybeTsState_;
};
explicit AsyncSocket();
/**
* Create a new unconnected AsyncSocket.
......@@ -668,6 +745,8 @@ class AsyncSocket : public AsyncTransport {
}
size_t getRawBytesBuffered() const override { return getAppBytesBuffered(); }
// End of methods inherited from AsyncTransport
std::chrono::nanoseconds getConnectTime() const {
return connectEndTime_ - connectStartTime_;
}
......@@ -1026,7 +1105,7 @@ class AsyncSocket : public AsyncTransport {
uint32_t totalBytesWritten_{0}; ///< total bytes written
};
class LifecycleObserver : public AsyncTransport::LifecycleObserver {
class LifecycleObserver : virtual public AsyncTransport::LifecycleObserver {
public:
using AsyncTransport::LifecycleObserver::LifecycleObserver;
......@@ -1367,6 +1446,7 @@ class AsyncSocket : public AsyncTransport {
const AsyncSocketException& ex);
void failWrite(const char* fn, const AsyncSocketException& ex);
void failAllWrites(const AsyncSocketException& ex);
void failByteEvents(const AsyncSocketException& ex);
virtual void invokeConnectErr(const AsyncSocketException& ex);
virtual void invokeConnectSuccess();
void invalidState(ConnectCallback* callback);
......@@ -1397,6 +1477,27 @@ class AsyncSocket : public AsyncTransport {
bool containsZeroCopyBuf(folly::IOBuf* ptr);
void releaseZeroCopyBuf(uint32_t id);
/**
* Attempt to enable Observer ByteEvents for this socket.
*
* Once enabled, ByteEvents rename enabled for the socket's life.
*
* ByteEvents are delivered to Observers; when an observer is added:
* - If this function has already been called, byteEventsEnabled() or
* byteEventsUnavailable() will be called, depending on ByteEvent state.
* - Else if the socket is connected, this function is called immediately.
* - Else if the socket has not yet connected, this function will be called
* after the socket has connected (ByteEvents cannot be set up earlier).
*
* If ByteEvents are successfully enabled, byteEventsEnabled() will be called
* on each Observer that has requested ByteEvents. If unable to enable, or if
* ByteEvents become unavailable (e.g., due to close), byteEventsUnavailable()
* will be called on each Observer that has requested ByteEvents.
*
* This function does need to be explicitly called under other circumstances.
*/
virtual void enableByteEvents();
AsyncWriter::ZeroCopyEnableFunc zeroCopyEnableFunc_;
// a folly::IOBuf can be used in multiple partial requests
......@@ -1477,6 +1578,10 @@ class AsyncSocket : public AsyncTransport {
bool noTSocks_{false};
// Whether to track EOR or not.
bool trackEor_{false};
// ByteEvent state
std::unique_ptr<ByteEventHelper> byteEventHelper_;
bool zeroCopyEnabled_{false};
bool zeroCopyVal_{false};
// zerocopy re-enable logic
......
......@@ -16,8 +16,10 @@
#pragma once
#include <chrono>
#include <memory>
#include <folly/Optional.h>
#include <folly/io/IOBuf.h>
#include <folly/io/async/AsyncSocketBase.h>
#include <folly/io/async/AsyncTransportCertificate.h>
......@@ -81,6 +83,10 @@ enum class WriteFlags : uint32_t {
* Request timestamp when entire buffer has entered packet scheduler.
*/
TIMESTAMP_SCHED = 0x40,
/*
* Request timestamp when entire buffer has been written to system socket.
*/
TIMESTAMP_WRITE = 0x80,
};
/*
......@@ -136,6 +142,12 @@ constexpr bool isSet(WriteFlags a, WriteFlags b) {
return (a & b) == b;
}
/**
* Write flags that are related to timestamping.
*/
constexpr WriteFlags kWriteFlagsForTimestamping = WriteFlags::TIMESTAMP_SCHED |
WriteFlags::TIMESTAMP_TX | WriteFlags::TIMESTAMP_ACK;
class AsyncReader {
public:
class ReadCallback {
......@@ -743,10 +755,97 @@ class AsyncTransport : public DelayedDestruction,
}
}
/**
* Structure used to communicate ByteEvents, such as TX and ACK timestamps.
*/
struct ByteEvent {
enum Type : uint8_t { WRITE = 1, SCHED = 2, TX = 3, ACK = 4 };
// type
Type type;
// offset of corresponding byte in raw byte stream
uint64_t offset{0};
// transport timestamp, as recorded by AsyncTransport implementation
std::chrono::steady_clock::time_point ts = {
std::chrono::steady_clock::now()};
// kernel software timestamp; for Linux this is CLOCK_REALTIME
// see https://www.kernel.org/doc/Documentation/networking/timestamping.txt
folly::Optional<std::chrono::nanoseconds> maybeSoftwareTs;
// hardware timestamp; see kernel documentation
// see https://www.kernel.org/doc/Documentation/networking/timestamping.txt
folly::Optional<std::chrono::nanoseconds> maybeHardwareTs;
// for WRITE ByteEvents, additional WriteFlags passed
folly::Optional<WriteFlags> maybeWriteFlags;
/**
* For WRITE events, returns if SCHED timestamp requested.
*/
bool schedTimestampRequested() const {
CHECK_EQ(Type::WRITE, type);
CHECK(maybeWriteFlags.has_value());
return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_SCHED);
}
/**
* For WRITE events, returns if TX timestamp requested.
*/
bool txTimestampRequested() const {
CHECK_EQ(Type::WRITE, type);
CHECK(maybeWriteFlags.has_value());
return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_TX);
}
/**
* For WRITE events, returns if ACK timestamp requested.
*/
bool ackTimestampRequested() const {
CHECK_EQ(Type::WRITE, type);
CHECK(maybeWriteFlags.has_value());
return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_ACK);
}
};
/**
* Observer of transport events.
*/
class LifecycleObserver {
public:
/**
* Observer configuration.
*
* Specifies events observer wants to receive. Cannot be changed post
* initialization because the transport may turn on / off instrumentation
* when observers are added / removed, based on the observer configuration.
*/
struct Config {
// enables full support for ByteEvents
bool byteEvents{false};
};
/**
* Constructor for observer, uses default config (instrumentation disabled).
*/
LifecycleObserver() : LifecycleObserver(Config()) {}
/**
* Constructor for observer.
*
* @param config Config, defaults to auxilary instrumentaton disabled.
*/
explicit LifecycleObserver(const Config& observerConfig)
: observerConfig_(observerConfig) {}
virtual ~LifecycleObserver() = default;
/**
* Returns observers configuration.
*/
const Config& getConfig() { return observerConfig_; }
/**
* observerAttach() will be invoked when an observer is added.
*
......@@ -794,26 +893,75 @@ class AsyncTransport : public DelayedDestruction,
virtual void connect(AsyncTransport* /* transport */) noexcept = 0;
/**
* Called when the socket has been attached to a new EVB
* and is called from within the new EVB's thread
* Invoked when the transport is being attached to an EventBase.
*
* Called from within the EventBase thread being attached.
*
* @param socket The socket on which the new EVB was attached.
* @param evb The new event base that is being attached.
* @param transport Transport with EventBase change.
* @param evb The EventBase being attached.
*/
virtual void evbAttach(AsyncTransport* /* socket */, EventBase* /* evb */) {
// do nothing
}
virtual void evbAttach(
AsyncTransport* /* transport */, EventBase* /* evb */) {}
/**
* Called when the socket is detached from an EVB and
* is called from the existing EVB's thread.
* Invoked when the transport is being detached from an EventBase.
*
* Called from within the EventBase thread being detached.
*
* @param socket The socket from which the EVB was detached.
* @param evb The existing evb that is being detached.
* @param transport Transport with EventBase change.
* @param evb The EventBase that is being detached.
*/
virtual void evbDetach(AsyncTransport* /* socket */, EventBase* /* evb */) {
// do nothing
}
virtual void evbDetach(
AsyncTransport* /* transport */, EventBase* /* evb */) {}
/**
* Invoked each time a ByteEvent is available.
*
* Multiple ByteEvent may be generated for the same byte offset and event.
* For instance, kernel software and hardware TX timestamps for the same
* are delivered in separate CMsg, and thus will result in separate
* ByteEvent.
*
* @param transport Transport that ByteEvent is available for.
* @param event ByteEvent (WRITE, SCHED, TX, ACK).
*/
virtual void byteEvent(
AsyncTransport* /* transport */,
const ByteEvent& /* event */) noexcept {}
/**
* Invoked if ByteEvents are enabled.
*
* Only called if the observer's configuration requested ByteEvents. May
* be invoked multiple times if ByteEvent configuration changes (i.e., if
* ByteEvents are enabled without hardware timestamps, and then enabled
* with them).
*
* @param transport Transport that ByteEvents are enabled for.
*/
virtual void byteEventsEnabled(AsyncTransport* /* transport */) noexcept {}
/**
* Invoked if ByteEvents could not be enabled, or if an error occurred that
* will prevent further delivery of ByteEvents.
*
* An observer may be waiting to receive a ByteEvent, such as an ACK event
* confirming delivery of the last byte of a payload, before closing the
* transport. If the transport has become unhealthy then this ByteEvent may
* never occur, yet the handler may be unaware that the transport is
* unhealthy if reads have been shutdown and no writes are occurring; this
* observer signal breaks this 'deadlock'.
*
* @param transport Transport that ByteEvents are now unavailable for.
* @param ex Details on why ByteEvents are now unavailable.
*/
virtual void byteEventsUnavailable(
AsyncTransport* /* transport */,
const AsyncSocketException& /* ex */) noexcept {}
protected:
// observer configuration; cannot be changed post instantiation
const Config observerConfig_;
};
/**
......
This diff is collapsed.
......@@ -74,12 +74,16 @@ class SendMsgParamsCallbackBase
return oldCallback_->getFlags(flags, false /*zeroCopyEnabled*/);
}
void getAncillaryData(folly::WriteFlags flags, void* data) noexcept override {
oldCallback_->getAncillaryData(flags, data);
void getAncillaryData(
folly::WriteFlags flags,
void* data,
const bool byteEventsEnabled) noexcept override {
oldCallback_->getAncillaryData(flags, data, byteEventsEnabled);
}
uint32_t getAncillaryDataSize(folly::WriteFlags flags) noexcept override {
return oldCallback_->getAncillaryDataSize(flags);
uint32_t getAncillaryDataSize(
folly::WriteFlags flags, const bool byteEventsEnabled) noexcept override {
return oldCallback_->getAncillaryDataSize(flags, byteEventsEnabled);
}
std::shared_ptr<AsyncSSLSocket> socket_;
......@@ -119,7 +123,10 @@ class SendMsgAncillaryDataCallback : public SendMsgParamsCallbackBase {
*/
folly::WriteFlags getObservedWriteFlags() { return observedWriteFlags_; }
void getAncillaryData(folly::WriteFlags flags, void* data) noexcept override {
void getAncillaryData(
folly::WriteFlags flags,
void* data,
const bool byteEventsEnabled) noexcept override {
// getAncillaryData is called through a long chain of functions after send
// record the observed write flags so we can compare later
observedWriteFlags_ = flags;
......@@ -128,16 +135,17 @@ class SendMsgAncillaryDataCallback : public SendMsgParamsCallbackBase {
std::cerr << "getAncillaryData: copying data" << std::endl;
memcpy(data, ancillaryData_.data(), ancillaryData_.size());
} else {
oldCallback_->getAncillaryData(flags, data);
oldCallback_->getAncillaryData(flags, data, byteEventsEnabled);
}
}
uint32_t getAncillaryDataSize(folly::WriteFlags flags) noexcept override {
uint32_t getAncillaryDataSize(
folly::WriteFlags flags, const bool byteEventsEnabled) noexcept override {
if (ancillaryData_.size()) {
std::cerr << "getAncillaryDataSize: returning size" << std::endl;
return ancillaryData_.size();
} else {
return oldCallback_->getAncillaryDataSize(flags);
return oldCallback_->getAncillaryDataSize(flags, byteEventsEnabled);
}
}
......@@ -201,114 +209,6 @@ class ExpectWriteErrorCallback : public WriteCallbackBase {
}
};
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
/* copied from include/uapi/linux/net_tstamp.h */
/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
enum SOF_TIMESTAMPING {
SOF_TIMESTAMPING_TX_SOFTWARE = (1 << 1),
SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
SOF_TIMESTAMPING_OPT_ID = (1 << 7),
SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
SOF_TIMESTAMPING_TX_ACK = (1 << 9),
SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
};
class WriteCheckTimestampCallback : public WriteCallbackBase {
public:
explicit WriteCheckTimestampCallback(SendMsgParamsCallbackBase* mcb = nullptr)
: WriteCallbackBase(mcb) {}
~WriteCheckTimestampCallback() override { EXPECT_EQ(STATE_SUCCEEDED, state); }
void setSocket(const std::shared_ptr<AsyncSSLSocket>& socket) override {
WriteCallbackBase::setSocket(socket);
EXPECT_NE(socket_->getNetworkSocket(), NetworkSocket());
int flags = SOF_TIMESTAMPING_OPT_ID | SOF_TIMESTAMPING_OPT_TSONLY |
SOF_TIMESTAMPING_SOFTWARE;
SocketOptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
int ret = tstampingOpt.apply(socket_->getNetworkSocket(), flags);
EXPECT_EQ(ret, 0);
}
std::vector<int32_t> getTimestampNotifications() noexcept {
auto fd = socket_->getNetworkSocket();
std::vector<char> ctrl(1024, 0);
unsigned char data;
struct msghdr msg;
iovec entry;
memset(&msg, 0, sizeof(msg));
entry.iov_base = &data;
entry.iov_len = sizeof(data);
msg.msg_iov = &entry;
msg.msg_iovlen = 1;
msg.msg_control = ctrl.data();
msg.msg_controllen = ctrl.size();
std::vector<int32_t> timestampsFound;
folly::Optional<int32_t> timestampType;
bool gotTimestamp = false;
bool gotByteSeq = false;
int ret;
while (true) {
ret = netops::recvmsg(fd, &msg, MSG_ERRQUEUE);
if (ret < 0) {
if (errno != EAGAIN) {
auto errnoCopy = errno;
std::cerr << "::recvmsg exited with code " << ret
<< ", errno: " << errnoCopy << std::endl;
AsyncSocketException ex(
AsyncSocketException::INTERNAL_ERROR,
"recvmsg() failed",
errnoCopy);
exception = ex;
}
return timestampsFound;
}
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg != nullptr && cmsg->cmsg_len != 0;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET &&
cmsg->cmsg_type == SCM_TIMESTAMPING) {
CHECK(!gotTimestamp); // shouldn't already be set
gotTimestamp = true;
}
if ((cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVERR) ||
(cmsg->cmsg_level == SOL_IPV6 && cmsg->cmsg_type == IPV6_RECVERR)) {
const struct cmsghdr& cmsgh = *cmsg;
const auto serr = reinterpret_cast<const struct sock_extended_err*>(
CMSG_DATA(&cmsgh));
if (serr->ee_errno != ENOMSG ||
serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
// not a timestamp
continue;
}
CHECK(!timestampType); // shouldn't already be set
CHECK(!gotByteSeq); // shouldn't already be set
gotByteSeq = true;
timestampType = serr->ee_info;
}
// check if we have both a timestamp and byte sequence
if (gotTimestamp && gotByteSeq) {
timestampsFound.push_back(*timestampType);
timestampType = folly::none;
gotTimestamp = false;
gotByteSeq = false;
}
} // for(...)
} // while(true)
return timestampsFound;
}
};
#endif // FOLLY_HAVE_MSG_ERRQUEUE
class ReadCallbackBase : public AsyncTransport::ReadCallback {
public:
explicit ReadCallbackBase(WriteCallbackBase* wcb)
......@@ -352,8 +252,13 @@ class ReadCallbackBase : public AsyncTransport::ReadCallback {
*/
class ReadCallback : public ReadCallbackBase {
public:
explicit ReadCallback(WriteCallbackBase* wcb)
: ReadCallbackBase(wcb), buffers(), writeFlags(folly::WriteFlags::NONE) {}
explicit ReadCallback(WriteCallbackBase* wcb, bool reflect = true)
: ReadCallbackBase(wcb),
buffers(),
writeFlags(folly::WriteFlags::NONE),
reflect(reflect) {}
explicit ReadCallback() : ReadCallback(nullptr, false) {}
~ReadCallback() override {
for (std::vector<Buffer>::iterator it = buffers.begin();
......@@ -382,13 +287,46 @@ class ReadCallback : public ReadCallbackBase {
}
// Write back the same data.
if (reflect) {
socket_->write(wcb_, currentBuffer.buffer, len, writeFlags);
}
buffers.push_back(currentBuffer);
currentBuffer.reset();
state = STATE_SUCCEEDED;
}
void verifyData(const char* expected, size_t expectedLen) const {
verifyData((const unsigned char*)expected, expectedLen);
}
void verifyData(const unsigned char* expected, size_t expectedLen) const {
size_t offset = 0;
for (size_t idx = 0; idx < buffers.size(); ++idx) {
const auto& buf = buffers[idx];
size_t cmpLen = std::min(buf.length, expectedLen - offset);
CHECK_EQ(memcmp(buf.buffer, expected + offset, cmpLen), 0);
CHECK_EQ(cmpLen, buf.length);
offset += cmpLen;
}
CHECK_EQ(offset, expectedLen);
}
void clearData() {
for (auto& buffer : buffers) {
buffer.free();
}
buffers.clear();
}
size_t dataRead() const {
size_t ret = 0;
for (const auto& buf : buffers) {
ret += buf.length;
}
return ret;
}
/**
* These flags will be used when writing the read data back to the socket.
*/
......@@ -420,6 +358,7 @@ class ReadCallback : public ReadCallbackBase {
std::vector<Buffer> buffers;
Buffer currentBuffer;
folly::WriteFlags writeFlags;
bool reflect; // whether read bytes will be written back to the transport
};
class ReadErrorCallback : public ReadCallbackBase {
......
......@@ -145,6 +145,10 @@ class ReadCallback : public folly::AsyncTransport::ReadCallback {
}
void verifyData(const char* expected, size_t expectedLen) const {
verifyData((const unsigned char*)expected, expectedLen);
}
void verifyData(const unsigned char* expected, size_t expectedLen) const {
size_t offset = 0;
for (size_t idx = 0; idx < buffers.size(); ++idx) {
const auto& buf = buffers[idx];
......@@ -156,6 +160,13 @@ class ReadCallback : public folly::AsyncTransport::ReadCallback {
CHECK_EQ(offset, expectedLen);
}
void clearData() {
for (auto& buffer : buffers) {
buffer.free();
}
buffers.clear();
}
size_t dataRead() const {
size_t ret = 0;
for (const auto& buf : buffers) {
......@@ -261,7 +272,10 @@ class TestSendMsgParamsCallback
return flags_;
}
void getAncillaryData(folly::WriteFlags flags, void* data) noexcept override {
void getAncillaryData(
folly::WriteFlags flags,
void* data,
const bool /* byteEventsEnabled */) noexcept override {
queriedData_ = true;
if (writeFlags_ == folly::WriteFlags::NONE) {
writeFlags_ = flags;
......@@ -272,7 +286,9 @@ class TestSendMsgParamsCallback
memcpy(data, data_, dataSize_);
}
uint32_t getAncillaryDataSize(folly::WriteFlags flags) noexcept override {
uint32_t getAncillaryDataSize(
folly::WriteFlags flags,
const bool /* byteEventsEnabled */) noexcept override {
if (writeFlags_ == folly::WriteFlags::NONE) {
writeFlags_ = flags;
} else {
......
This diff is collapsed.
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/io/async/AsyncSocket.h>
#include <folly/portability/GMock.h>
namespace folly {
namespace test {
/*
* Mock class for AsyncSocketLifecycleObserver.
*
* Deriving from MockAsyncTransportLifecycleObserver results in diamond
* inheritance that creates a mess for Stict/Weak mocks; easier to just derive
* directly from AsyncSocket::LifecycleObserver and clone mocks
*/
class MockAsyncSocketLifecycleObserver : public AsyncSocket::LifecycleObserver {
public:
using AsyncSocket::LifecycleObserver::LifecycleObserver;
GMOCK_METHOD1_(, noexcept, , observerAttach, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , observerDetach, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , destroy, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , close, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , connect, void(AsyncTransport*));
GMOCK_METHOD2_(, noexcept, , evbAttach, void(AsyncTransport*, EventBase*));
GMOCK_METHOD2_(, noexcept, , evbDetach, void(AsyncTransport*, EventBase*));
GMOCK_METHOD2_(
,
noexcept,
,
byteEvent,
void(AsyncTransport*, const AsyncTransport::ByteEvent&));
GMOCK_METHOD1_(, noexcept, , byteEventsEnabled, void(AsyncTransport*));
GMOCK_METHOD2_(
,
noexcept,
,
byteEventsUnavailable,
void(AsyncTransport*, const AsyncSocketException&));
// additional handlers specific to AsyncSocket::LifecycleObserver
GMOCK_METHOD1_(, noexcept, , fdDetach, void(AsyncSocket*));
GMOCK_METHOD2_(, noexcept, , move, void(AsyncSocket*, AsyncSocket*));
};
} // namespace test
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/io/async/AsyncSocketException.h>
#include <folly/io/async/AsyncTransport.h>
#include <folly/portability/GMock.h>
namespace folly {
namespace test {
class MockAsyncTransportLifecycleObserver
: public AsyncTransport::LifecycleObserver {
public:
using AsyncTransport::LifecycleObserver::LifecycleObserver;
GMOCK_METHOD1_(, noexcept, , observerAttach, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , observerDetach, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , destroy, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , close, void(AsyncTransport*));
GMOCK_METHOD1_(, noexcept, , connect, void(AsyncTransport*));
GMOCK_METHOD2_(, noexcept, , evbAttach, void(AsyncTransport*, EventBase*));
GMOCK_METHOD2_(, noexcept, , evbDetach, void(AsyncTransport*, EventBase*));
GMOCK_METHOD2_(
,
noexcept,
,
byteEvent,
void(AsyncTransport*, const AsyncTransport::ByteEvent&));
GMOCK_METHOD1_(, noexcept, , byteEventsEnabled, void(AsyncTransport*));
GMOCK_METHOD2_(
,
noexcept,
,
byteEventsUnavailable,
void(AsyncTransport*, const AsyncSocketException&));
};
/**
* Extends mock class to simplify ByteEvents tests.
*/
class MockAsyncTransportObserverForByteEvents
: public MockAsyncTransportLifecycleObserver {
public:
MockAsyncTransportObserverForByteEvents(
AsyncTransport* transport,
const MockAsyncTransportObserverForByteEvents::Config& observerConfig)
: MockAsyncTransportLifecycleObserver(observerConfig),
transport_(transport) {
ON_CALL(*this, byteEvent(testing::_, testing::_))
.WillByDefault(
testing::Invoke([this](
AsyncTransport* transport,
const AsyncTransport::ByteEvent& event) {
CHECK_EQ(this->transport_, transport);
byteEvents_.emplace_back(event);
}));
ON_CALL(*this, byteEventsEnabled(testing::_))
.WillByDefault(testing::Invoke([this](AsyncTransport* transport) {
CHECK_EQ(this->transport_, transport);
byteEventsEnabledCalled_++;
}));
ON_CALL(*this, byteEventsUnavailable(testing::_, testing::_))
.WillByDefault(testing::Invoke(
[this](AsyncTransport* transport, const AsyncSocketException& ex) {
CHECK_EQ(this->transport_, transport);
byteEventsUnavailableCalled_++;
byteEventsUnavailableCalledEx_.emplace(ex);
}));
transport->addLifecycleObserver(this);
}
folly::Optional<AsyncTransport::ByteEvent> getByteEventReceivedWithOffset(
const uint64_t offset, const AsyncTransport::ByteEvent::Type type) {
for (const auto& byteEvent : byteEvents_) {
if (type == byteEvent.type && offset == byteEvent.offset) {
return byteEvent;
}
}
return folly::none;
}
folly::Optional<uint64_t> maxOffsetForByteEventReceived(
const AsyncTransport::ByteEvent::Type type) {
folly::Optional<uint64_t> maybeMaxOffset;
for (const auto& byteEvent : byteEvents_) {
if (type == byteEvent.type &&
(!maybeMaxOffset.has_value() ||
maybeMaxOffset.value() <= byteEvent.offset)) {
maybeMaxOffset = byteEvent.offset;
}
}
return maybeMaxOffset;
}
bool checkIfByteEventReceived(
const AsyncTransport::ByteEvent::Type type, const uint64_t offset) {
for (const auto& byteEvent : byteEvents_) {
if (type == byteEvent.type && offset == byteEvent.offset) {
return true;
}
}
return false;
}
void waitForByteEvent(
const AsyncTransport::ByteEvent::Type type, const uint64_t offset) {
while (!checkIfByteEventReceived(type, offset)) {
transport_->getEventBase()->loopOnce();
}
}
// Exposed ByteEvent helper fields with const
const uint32_t& byteEventsEnabledCalled{byteEventsEnabledCalled_};
const uint32_t& byteEventsUnavailableCalled{byteEventsUnavailableCalled_};
const folly::Optional<AsyncSocketException>& byteEventsUnavailableCalledEx{
byteEventsUnavailableCalledEx_};
const std::vector<AsyncTransport::ByteEvent>& byteEvents{byteEvents_};
private:
const AsyncTransport* transport_;
// ByteEvents helpers
uint32_t byteEventsEnabledCalled_{0};
uint32_t byteEventsUnavailableCalled_{0};
folly::Optional<AsyncSocketException> byteEventsUnavailableCalledEx_;
std::vector<AsyncTransport::ByteEvent> byteEvents_;
};
} // namespace test
} // namespace folly
......@@ -116,6 +116,7 @@ struct mmsghdr {
#ifdef MSG_ERRQUEUE
#define FOLLY_HAVE_MSG_ERRQUEUE 1
#define FOLLY_HAVE_SO_TIMESTAMPING 1
/* for struct sock_extended_err*/
#include <linux/errqueue.h>
#endif
......
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/net/NetOpsDispatcher.h>
#include <folly/portability/GMock.h>
namespace folly {
namespace netops {
namespace test {
class MockDispatcher : public Dispatcher {
public:
MockDispatcher() = default;
virtual ~MockDispatcher() = default;
/**
* Configures mocked methods to forward calls to default implementation.
*/
void forwardToDefaultImpl() {
ON_CALL(
*this,
getsockopt(testing::_, testing::_, testing::_, testing::_, testing::_))
.WillByDefault(testing::Invoke([this](
NetworkSocket s,
int level,
int optname,
void* optval,
socklen_t* optlen) {
return Dispatcher::getsockopt(s, level, optname, optval, optlen);
}));
ON_CALL(*this, sendmsg(testing::_, testing::_, testing::_))
.WillByDefault(testing::Invoke(
[this](NetworkSocket s, const msghdr* message, int flags) {
return Dispatcher::sendmsg(s, message, flags);
}));
ON_CALL(
*this,
setsockopt(testing::_, testing::_, testing::_, testing::_, testing::_))
.WillByDefault(testing::Invoke([this](
NetworkSocket s,
int level,
int optname,
const void* optval,
socklen_t optlen) {
return Dispatcher::setsockopt(s, level, optname, optval, optlen);
}));
}
MOCK_METHOD5(
getsockopt,
int(NetworkSocket s,
int level,
int optname,
void* optval,
socklen_t* optlen));
MOCK_METHOD3(
sendmsg, ssize_t(NetworkSocket s, const msghdr* message, int flags));
MOCK_METHOD5(
setsockopt,
int(NetworkSocket s,
int level,
int optname,
const void* optval,
socklen_t optlen));
};
} // namespace test
} // namespace netops
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment