Commit a05360ec authored by Brandon Schlinker's avatar Brandon Schlinker Committed by Facebook GitHub Bot

Enable observers to request socket timestamps

Summary:
D24094832 (https://github.com/facebook/folly/commit/842ecea531e8d6a90559f213be3793f7cd36781b) added `ByteEvent` support to `AsyncSocket`, making it easier to use socket timestamps for SCHED/TX/ACK events. With D24094832 (https://github.com/facebook/folly/commit/842ecea531e8d6a90559f213be3793f7cd36781b):
- An application can request socket timestamps by installing an observer with `ByteEvents` enabled, and then writing to the socket with a relevant timestamping flag (e.g., `TIMESTAMP_TX`, `TIMESTAMP_ACK`).
- Timestamps are delivered to the observer via the `byteEvent` callback.

This diff enables *observers* to request socket timestamping by interposing between the application and the socket by way of the `prewrite` event:
- Each time bytes from the application are about to be written to the underlying raw socket / FD, `AsyncSocket` will give observers an opportunity to request timestamping via a `prewrite` event.
- If an observer wishes to request timestamping, it can return a `PrewriteRequest` with information about the `WriteFlags` to add.
- If an observer wishes to timestamp a specific byte (first byte, every 1000th byte, etc.), it can request this with the `maybeOffsetToSplitWrite` field — socket timestamp requests apply to the *last byte* in the buffer being written, and thus if an observer wants to timestamp a specific byte, the buffer must be split so that the byte to timestamp is the final byte. The `AsyncSocket` implementation handles this split on behalf of the observer and adds `WriteFlags::CORK` (triggering `MSG_MORE`) where appropriate.
- If multiple observers are attached, `PrewriteRequests` are combined so that all observer needs are satisfied. In addition, `WriteFlags` set by the application and `WriteFlags` set by observers are combined during processing of `PrewriteRequests`.

Reviewed By: yfeldblum

Differential Revision: D24976575

fbshipit-source-id: 885720173d4a9ceefebc929a86d5e0f10f8889c4
parent 5c4c45a4
......@@ -522,6 +522,8 @@ void disableTransparentFunctions(
#endif
}
constexpr size_t kSmallIoVecSize = 64;
} // namespace
AsyncSocket::AsyncSocket()
......@@ -1469,13 +1471,12 @@ void AsyncSocket::writeChain(
flags |= WriteFlags::WRITE_MSG_ZEROCOPY;
}
constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
if (count <= kSmallSizeMax) {
if (count <= kSmallIoVecSize) {
// suppress "warning: variable length array 'vec' is used [-Wvla]"
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wvla")
iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallSizeMax)];
iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallIoVecSize)];
FOLLY_POP_WARNING
writeChainImpl(callback, vec, count, std::move(buf), flags);
......@@ -2575,6 +2576,57 @@ AsyncSocket::getLifecycleObservers() const {
lifecycleObservers_.begin(), lifecycleObservers_.end());
}
void AsyncSocket::splitIovecArray(
const size_t startOffset,
const size_t endOffset,
const iovec* srcVec,
const size_t srcCount,
iovec* dstVec,
size_t& dstCount) {
CHECK_GE(endOffset, startOffset);
CHECK_GE(dstCount, srcCount);
dstCount = 0;
const size_t targetBytes = endOffset - startOffset + 1;
size_t dstBytes = 0;
size_t processedBytes = 0;
for (size_t i = 0; i < srcCount; processedBytes += srcVec[i].iov_len, i++) {
iovec currentOp = srcVec[i];
if (currentOp.iov_len == 0) { // to handle the oddballs
continue;
}
// if we haven't found the start offset yet, see if it is in this op
if (dstCount == 0) {
if (processedBytes + currentOp.iov_len < startOffset + 1) {
continue; // start offset isn't in this op
}
// offset iov_base to get the start offset
const size_t trimFromStart = startOffset - processedBytes;
currentOp.iov_base =
reinterpret_cast<uint8_t*>(currentOp.iov_base) + trimFromStart;
currentOp.iov_len -= trimFromStart;
}
// trim the end of the iovec, if needed
ssize_t trimFromEnd = (dstBytes + currentOp.iov_len) - targetBytes;
if (trimFromEnd > 0) {
currentOp.iov_len -= trimFromEnd;
}
dstVec[dstCount] = currentOp;
dstCount++;
dstBytes += currentOp.iov_len;
CHECK_GE(targetBytes, dstBytes);
if (targetBytes == dstBytes) {
break; // done
}
}
CHECK_EQ(targetBytes, dstBytes);
}
void AsyncSocket::handleRead() noexcept {
VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
......@@ -3016,56 +3068,188 @@ ssize_t AsyncSocket::tfoSendMsg(
AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
const iovec* vec, size_t count, WriteFlags flags) {
const bool byteEventsEnabled =
(byteEventHelper_ && byteEventHelper_->byteEventsEnabled &&
!byteEventHelper_->maybeEx.has_value());
struct msghdr msg = {};
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_iov = const_cast<struct iovec*>(vec);
msg.msg_iovlen = std::min<size_t>(count, kIovMax);
msg.msg_flags = 0; // ignored, must forward flags via sendmsg parameter
msg.msg_control = nullptr;
msg.msg_controllen =
sendMsgParamCallback_->getAncillaryDataSize(flags, byteEventsEnabled);
CHECK_GE(
AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
msg.msg_controllen);
if (msg.msg_controllen != 0) {
msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
sendMsgParamCallback_->getAncillaryData(
flags, msg.msg_control, byteEventsEnabled);
}
int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
const auto prewriteRawBytesWritten = getRawBytesWritten();
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
if (writeResult.writeReturn > 0 && byteEventsEnabled &&
isSet(flags, WriteFlags::TIMESTAMP_WRITE)) {
CHECK_GT(getRawBytesWritten(), prewriteRawBytesWritten); // sanity check
ByteEvent byteEvent = {};
byteEvent.type = ByteEvent::Type::WRITE;
byteEvent.offset = getRawBytesWritten() - 1;
byteEvent.maybeWriteFlags = flags;
// lambda to gather and merge PrewriteRequests from observers
auto mergePrewriteRequests = [this,
vec,
count,
flags,
maybeVecTotalBytes =
folly::Optional<size_t>()]() mutable {
AsyncTransport::LifecycleObserver::PrewriteRequest mergedRequest = {};
if (lifecycleObservers_.empty()) {
return mergedRequest;
}
// determine total number of bytes in vec, reuse once determined
if (!maybeVecTotalBytes.has_value()) {
maybeVecTotalBytes = 0;
for (size_t i = 0; i < count; ++i) {
maybeVecTotalBytes.value() += vec[i].iov_len;
}
}
auto& vecTotalBytes = maybeVecTotalBytes.value();
const auto startOffset = getRawBytesWritten();
const auto endOffset = getRawBytesWritten() + vecTotalBytes - 1;
const AsyncTransport::LifecycleObserver::PrewriteState prewriteState = [&] {
AsyncTransport::LifecycleObserver::PrewriteState state = {};
state.startOffset = startOffset;
state.endOffset = endOffset;
state.writeFlags = flags;
state.ts = std::chrono::steady_clock::now();
return state;
}();
for (const auto& observer : lifecycleObservers_) {
if (observer->getConfig().byteEvents) {
observer->byteEvent(this, byteEvent);
if (!observer->getConfig().prewrite) {
continue;
}
const auto request = observer->prewrite(this, prewriteState);
mergedRequest.writeFlagsToAdd |= request.writeFlagsToAdd;
if (request.maybeOffsetToSplitWrite.has_value()) {
CHECK_GE(endOffset, request.maybeOffsetToSplitWrite.value());
if (
// case 1: offset not set in merged request
!mergedRequest.maybeOffsetToSplitWrite.has_value() ||
// case 2: offset in merged request > offset in current request
mergedRequest.maybeOffsetToSplitWrite >
request.maybeOffsetToSplitWrite) {
mergedRequest.maybeOffsetToSplitWrite =
request.maybeOffsetToSplitWrite; // update
mergedRequest.writeFlagsToAddAtOffset =
request.writeFlagsToAddAtOffset; // reset
} else if (
// case 3: offset in merged request == offset in current request
request.maybeOffsetToSplitWrite ==
mergedRequest.maybeOffsetToSplitWrite) {
mergedRequest.writeFlagsToAddAtOffset |=
request.writeFlagsToAddAtOffset; // merge
}
// case 4: offset in merged request < offset in current request
// (do nothing)
}
}
}
if (writeResult.writeReturn < 0 && zeroCopyEnabled_ && errno == ENOBUFS) {
// workaround for running with zerocopy enabled but without a big enough
// memlock value - see ulimit -l
zeroCopyEnabled_ = false;
zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;
msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
writeResult = sendSocketMessage(fd_, &msg, msg_flags);
// if maybeOffsetToSplitWrite points to end of the vector, remove the split
if (mergedRequest.maybeOffsetToSplitWrite.has_value() && // explicit
mergedRequest.maybeOffsetToSplitWrite == endOffset) {
mergedRequest.maybeOffsetToSplitWrite.reset(); // no split needed
}
return mergedRequest;
};
// lambda to prepare and send a message, and handle byte events
// parameters have L at the end to prevent shadowing warning from gcc
auto prepSendMsg = [this](
const iovec* vecL,
const size_t countL,
const WriteFlags flagsL) {
const bool byteEventsEnabled =
(byteEventHelper_ && byteEventHelper_->byteEventsEnabled &&
!byteEventHelper_->maybeEx.has_value());
struct msghdr msg = {};
msg.msg_name = nullptr;
msg.msg_namelen = 0;
msg.msg_iov = const_cast<struct iovec*>(vecL);
msg.msg_iovlen = std::min<size_t>(countL, kIovMax);
msg.msg_flags = 0; // passed to sendSocketMessage below, it sets them
msg.msg_control = nullptr;
msg.msg_controllen =
sendMsgParamCallback_->getAncillaryDataSize(flagsL, byteEventsEnabled);
CHECK_GE(
AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
msg.msg_controllen);
if (msg.msg_controllen != 0) {
msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
sendMsgParamCallback_->getAncillaryData(
flagsL, msg.msg_control, byteEventsEnabled);
}
const auto prewriteRawBytesWritten = getRawBytesWritten();
int msg_flags = sendMsgParamCallback_->getFlags(flagsL, zeroCopyEnabled_);
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
if (writeResult.writeReturn < 0 && zeroCopyEnabled_ && errno == ENOBUFS) {
// workaround for running with zerocopy enabled but without a big enough
// memlock value - see ulimit -l
zeroCopyEnabled_ = false;
zeroCopyReenableCounter_ = zeroCopyReenableThreshold_;
msg_flags = sendMsgParamCallback_->getFlags(flagsL, zeroCopyEnabled_);
writeResult = sendSocketMessage(fd_, &msg, msg_flags);
}
if (writeResult.writeReturn > 0 && byteEventsEnabled &&
isSet(flagsL, WriteFlags::TIMESTAMP_WRITE)) {
CHECK_GT(getRawBytesWritten(), prewriteRawBytesWritten); // sanity check
ByteEvent byteEvent = {};
byteEvent.type = ByteEvent::Type::WRITE;
byteEvent.offset = getRawBytesWritten() - 1;
byteEvent.maybeRawBytesWritten = writeResult.writeReturn;
byteEvent.maybeRawBytesTriedToWrite = 0;
for (size_t i = 0; i < countL; ++i) {
byteEvent.maybeRawBytesTriedToWrite.value() += vecL[i].iov_len;
}
byteEvent.maybeWriteFlags = flagsL;
for (const auto& observer : lifecycleObservers_) {
if (observer->getConfig().byteEvents) {
observer->byteEvent(this, byteEvent);
}
}
}
return writeResult;
};
// get PrewriteRequests (if any), merge flags with write flags
const auto prewriteRequest = mergePrewriteRequests();
auto mergedFlags = flags | prewriteRequest.writeFlagsToAdd |
prewriteRequest.writeFlagsToAddAtOffset;
// if no PrewriteRequests, or none requiring the write to be split, proceed
if (!prewriteRequest.maybeOffsetToSplitWrite.has_value()) {
return prepSendMsg(vec, count, mergedFlags);
}
return writeResult;
// we need to split the write...
// add CORK flag to inform the OS that more data is on the way...
mergedFlags |= WriteFlags::CORK;
// TODO(bschlinker): When prewrite splits a write, try to continue writing
// after a write returns; this will improve efficiency.
const auto splitWriteAtOffset = *prewriteRequest.maybeOffsetToSplitWrite;
if (count <= kSmallIoVecSize) {
// suppress "warning: variable length array 'vec' is used [-Wvla]"
FOLLY_PUSH_WARNING
FOLLY_GNU_DISABLE_WARNING("-Wvla")
iovec tmpVec[BOOST_PP_IF(FOLLY_HAVE_VLA_01, count, kSmallIoVecSize)];
FOLLY_POP_WARNING
size_t tmpVecCount = count;
splitIovecArray(
0,
splitWriteAtOffset - getRawBytesWritten(),
vec,
count,
tmpVec,
tmpVecCount);
return prepSendMsg(tmpVec, tmpVecCount, mergedFlags);
} else {
auto tmpVecPtr = std::make_unique<iovec[]>(count);
auto tmpVec = tmpVecPtr.get();
size_t tmpVecCount = count;
splitIovecArray(
0,
splitWriteAtOffset - getRawBytesWritten(),
vec,
count,
tmpVec,
tmpVecCount);
return prepSendMsg(tmpVec, tmpVecCount, mergedFlags);
}
}
AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
......
......@@ -1178,6 +1178,17 @@ class AsyncSocket : public AsyncTransport {
FOLLY_NODISCARD virtual std::vector<AsyncTransport::LifecycleObserver*>
getLifecycleObservers() const override;
/**
* Split iovec array at given byte offsets; produce a new array with result.
*/
static void splitIovecArray(
const size_t startOffset,
const size_t endOffset,
const iovec* srcVec,
const size_t srcCount,
iovec* dstVec,
size_t& dstCount);
protected:
enum ReadResultEnum {
READ_EOF = 0,
......
......@@ -798,32 +798,47 @@ class AsyncTransport : public DelayedDestruction,
* Structure used to communicate ByteEvents, such as TX and ACK timestamps.
*/
struct ByteEvent {
enum Type : uint8_t { WRITE = 1, SCHED = 2, TX = 3, ACK = 4 };
// types of events; start from 0 to enable indexing in arrays
enum Type : uint8_t {
WRITE = 0,
SCHED = 1,
TX = 2,
ACK = 3,
};
// type
Type type;
// offset of corresponding byte in raw byte stream
uint64_t offset{0};
size_t offset{0};
// transport timestamp, as recorded by AsyncTransport implementation
std::chrono::steady_clock::time_point ts = {
std::chrono::steady_clock::now()};
// kernel software timestamp; for Linux this is CLOCK_REALTIME
// kernel software timestamp for non-WRITE; for Linux this is CLOCK_REALTIME
// see https://www.kernel.org/doc/Documentation/networking/timestamping.txt
folly::Optional<std::chrono::nanoseconds> maybeSoftwareTs;
// hardware timestamp; see kernel documentation
// hardware timestamp for non-WRITE events; see kernel documentation
// see https://www.kernel.org/doc/Documentation/networking/timestamping.txt
folly::Optional<std::chrono::nanoseconds> maybeHardwareTs;
// for WRITE events, the number of raw bytes written to the socket
// optional to prevent accidental misuse in other event types
folly::Optional<size_t> maybeRawBytesWritten;
// for WRITE events, the number of raw bytes we tried to write to the socket
// optional to prevent accidental misuse in other event types
folly::Optional<size_t> maybeRawBytesTriedToWrite;
// for WRITE ByteEvents, additional WriteFlags passed
// optional to prevent accidental misuse in other event types
folly::Optional<WriteFlags> maybeWriteFlags;
/**
* For WRITE events, returns if SCHED timestamp requested.
*/
bool schedTimestampRequested() const {
bool schedTimestampRequestedOnWrite() const {
CHECK_EQ(Type::WRITE, type);
CHECK(maybeWriteFlags.has_value());
return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_SCHED);
......@@ -832,7 +847,7 @@ class AsyncTransport : public DelayedDestruction,
/**
* For WRITE events, returns if TX timestamp requested.
*/
bool txTimestampRequested() const {
bool txTimestampRequestedOnWrite() const {
CHECK_EQ(Type::WRITE, type);
CHECK(maybeWriteFlags.has_value());
return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_TX);
......@@ -841,7 +856,7 @@ class AsyncTransport : public DelayedDestruction,
/**
* For WRITE events, returns if ACK timestamp requested.
*/
bool ackTimestampRequested() const {
bool ackTimestampRequestedOnWrite() const {
CHECK_EQ(Type::WRITE, type);
CHECK(maybeWriteFlags.has_value());
return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_ACK);
......@@ -861,8 +876,49 @@ class AsyncTransport : public DelayedDestruction,
* when observers are added / removed, based on the observer configuration.
*/
struct Config {
// enables full support for ByteEvents
// receive ByteEvents
bool byteEvents{false};
// observer is notified during prewrite stage and can add WriteFlags
bool prewrite{false};
};
/**
* Information provided to observer during prewrite event.
*
* Based on this information, an observer can build a PrewriteRequest.
*/
struct PrewriteState {
// raw byte stream offsets
size_t startOffset{0};
size_t endOffset{0};
// flags already set
WriteFlags writeFlags{WriteFlags::NONE};
// transport timestamp, as recorded by AsyncTransport implementation
//
// supports sequencing of PrewriteState events and ByteEvents for debug
std::chrono::steady_clock::time_point ts = {
std::chrono::steady_clock::now()};
};
/**
* Request that can be generated by observer in response to prewrite event.
*
* An observer can use a PrewriteRequest to request WriteFlags to be added
* to a write and/or to request that the write be split up, both of which
* can be used for timestamping.
*/
struct PrewriteRequest {
// offset to split write at; may be split at earlier offset by another req
folly::Optional<size_t> maybeOffsetToSplitWrite;
// write flags to be added if write split at requested offset
WriteFlags writeFlagsToAddAtOffset{WriteFlags::NONE};
// write flags to be added regardless of where write happens
WriteFlags writeFlagsToAdd{WriteFlags::NONE};
};
/**
......@@ -881,7 +937,9 @@ class AsyncTransport : public DelayedDestruction,
virtual ~LifecycleObserver() = default;
/**
* Returns observers configuration.
* Returns observer's configuration.
*
* @return Observer configuration.
*/
const Config& getConfig() { return observerConfig_; }
......@@ -998,6 +1056,33 @@ class AsyncTransport : public DelayedDestruction,
AsyncTransport* /* transport */,
const AsyncSocketException& /* ex */) noexcept {}
/**
* Invoked before each write to the transport if prewrite support enabled.
*
* The observer receives information about the pending write in the
* PrewriteState and can request ByteEvents / socket timestamps by returning
* a PrewriteRequest. The request contains the offset to split the write at
* (if any) and WriteFlags to apply.
*
* PrewriteRequests are aggregated across observers. The write buffer is
* split at the lowest offset returned by all observers. Flags are applied
* based on configuration within the PrewriteRequest. Requests are not
* sticky and expire after each write.
*
* Fewer bytes may be written than indicated in the PrewriteState or in the
* PrewriteRequest split if the underlying transport / socket / kernel
* blocks on write.
*
* @param transport Transport that ByteEvents are now unavailable for.
* @param state Pending write start and end offsets and flags.
* @return Request containing offset to split write at and flags.
*/
virtual PrewriteRequest prewrite(
AsyncTransport* /* transport */, const PrewriteState& /* state */) {
folly::terminate_with<std::runtime_error>(
"prewrite() called but not defined");
}
protected:
// observer configuration; cannot be changed post instantiation
const Config observerConfig_;
......
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -73,6 +73,17 @@ class BlockingSocket : public folly::AsyncSocket::ConnectCallback,
return folly::to_narrow(folly::to_signed(len));
}
void writev(
const iovec* vec,
size_t count,
folly::WriteFlags flags = folly::WriteFlags::NONE) {
sock_->writev(this, vec, count, flags);
eventBase_.loop();
if (err_.has_value()) {
throw err_.value();
}
}
void flush() {}
int32_t readAll(uint8_t* buf, size_t len) {
......@@ -81,6 +92,10 @@ class BlockingSocket : public folly::AsyncSocket::ConnectCallback,
int32_t read(uint8_t* buf, size_t len) { return readHelper(buf, len, false); }
int32_t readNoBlock(uint8_t* buf, size_t len) {
return readHelper(buf, len, false, EVLOOP_NONBLOCK);
}
folly::NetworkSocket getNetworkSocket() const {
return sock_->getNetworkSocket();
}
......@@ -125,16 +140,15 @@ class BlockingSocket : public folly::AsyncSocket::ConnectCallback,
err_ = ex;
}
int32_t readHelper(uint8_t* buf, size_t len, bool all) {
int32_t readHelper(uint8_t* buf, size_t len, bool all, int flags = 0) {
if (!sock_->good()) {
return 0;
}
readBuf_ = buf;
readLen_ = len;
sock_->setReadCB(this);
while (!err_ && sock_->good() && readLen_ > 0) {
eventBase_.loopOnce();
eventBase_.loopOnce(flags);
if (!all) {
break;
}
......
......@@ -40,6 +40,8 @@ class MockAsyncTransportLifecycleObserver
MOCK_METHOD2(
byteEventsUnavailableMock,
void(AsyncTransport*, const AsyncSocketException&));
MOCK_METHOD2(
prewriteMock, PrewriteRequest(AsyncTransport*, const PrewriteState&));
private:
void observerAttach(AsyncTransport* trans) noexcept override {
......@@ -69,6 +71,10 @@ class MockAsyncTransportLifecycleObserver
AsyncTransport* trans, const AsyncSocketException& ex) noexcept override {
byteEventsUnavailableMock(trans, ex);
}
PrewriteRequest prewrite(
AsyncTransport* trans, const PrewriteState& state) noexcept override {
return prewriteMock(trans, state);
}
};
/**
......@@ -106,6 +112,10 @@ class MockAsyncTransportObserverForByteEvents
transport->addLifecycleObserver(this);
}
const std::vector<AsyncTransport::ByteEvent>& getByteEvents() {
return byteEvents_;
}
folly::Optional<AsyncTransport::ByteEvent> getByteEventReceivedWithOffset(
const uint64_t offset, const AsyncTransport::ByteEvent::Type type) {
for (const auto& byteEvent : byteEvents_) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment