Commit 0d412cdd authored by Giuseppe Ottaviano's avatar Giuseppe Ottaviano Committed by Facebook GitHub Bot

Reuse the writable tail after IOBufQueue::append(IOBuf)

Summary:
After calling `IOBufQueue::append(IOBuf)` (both the `unique_ptr` and value overloads) the existing writable tail is lost, because the new tail buffer may not have a writable tail (in practice it's often shared or sized to fit).

This is especially a problem for zero-copy Thrift serialization, for example of a struct like
```
struct S {
  1: binary (cpp2.type = "folly::IOBuf") data;
}
```
Thrift allocates at least 16KiB for each new buffer, and writes a handful of bytes before and after the `IOBuf`, so the epilogue will take a whole new 16KiB allocation. The problem is amplified with consecutive binary fields, if they don't fit in 4KiB (`kMaxPackCopy`) and the existing writable tail, each will cost an extra 16KiB, which means a 4x memory amplification.

This diff reuses the previous writable tail buffer by appending a new `IOBuf` to the chain that references it.

Reviewed By: philippv

Differential Revision: D31882377

fbshipit-source-id: fe1d0c82f7df5528534d42b46da78f6597a9e519
parent 59e15e8f
......@@ -1239,12 +1239,14 @@ class QueueAppender : public detail::Writable<QueueAppender> {
void insert(std::unique_ptr<folly::IOBuf> buf) {
if (buf) {
queueCache_.queue()->append(std::move(buf), true);
queueCache_.queue()->append(
std::move(buf), /* pack */ true, /* allowTailReuse */ true);
}
}
void insert(const folly::IOBuf& buf) {
queueCache_.queue()->append(buf, true);
queueCache_.queue()->append(
buf, /* pack */ true, /* allowTailReuse */ true);
}
template <CursorAccess access>
......
......@@ -95,7 +95,8 @@ namespace folly {
* In other words, when multiple IOBufs share the same underlying buffer, the
* data() and tail() methods on each IOBuf may point to a different segment of
* the data. However, the buffer() and bufferEnd() methods will point to the
* same location for all IOBufs sharing the same underlying buffer.
* same location for all IOBufs sharing the same underlying buffer, unless the
* tail was trimmed with trimWritableTail().
*
* +-----------+ +---------+
* | IOBuf 1 | | IOBuf 2 |
......@@ -784,6 +785,15 @@ class IOBuf {
length_ -= amount;
}
/**
* Adjust the buffer end pointer to reduce the buffer capacity. This can be
* used to pass the ownership of the writable tail to another IOBuf.
*/
void trimWritableTail(std::size_t amount) {
DCHECK_LE(amount, tailroom());
capacity_ -= amount;
}
/**
* Clear the buffer.
*
......
......@@ -76,16 +76,15 @@ IOBufQueue::~IOBufQueue() {
IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept
: options_(other.options_), cachePtr_(&localCache_) {
other.clearWritableRangeCache();
head_ = std::move(other.head_);
chainLength_ = other.chainLength_;
chainLength_ = std::exchange(other.chainLength_, 0);
reusableTail_ = std::exchange(other.reusableTail_, nullptr);
tailStart_ = other.tailStart_;
localCache_.cachedRange = other.localCache_.cachedRange;
tailStart_ = std::exchange(other.tailStart_, nullptr);
localCache_.cachedRange =
std::exchange(other.localCache_.cachedRange, {nullptr, nullptr});
localCache_.attached = true;
other.chainLength_ = 0;
other.tailStart_ = nullptr;
other.localCache_.cachedRange = {nullptr, nullptr};
}
IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
......@@ -95,15 +94,13 @@ IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
options_ = other.options_;
head_ = std::move(other.head_);
chainLength_ = other.chainLength_;
chainLength_ = std::exchange(other.chainLength_, 0);
reusableTail_ = std::exchange(other.reusableTail_, nullptr);
tailStart_ = other.tailStart_;
localCache_.cachedRange = other.localCache_.cachedRange;
tailStart_ = std::exchange(other.tailStart_, nullptr);
localCache_.cachedRange =
std::exchange(other.localCache_.cachedRange, {nullptr, nullptr});
localCache_.attached = true;
other.chainLength_ = 0;
other.tailStart_ = nullptr;
other.localCache_.cachedRange = {nullptr, nullptr};
}
return *this;
}
......@@ -140,7 +137,8 @@ void IOBufQueue::prepend(const void* buf, std::size_t n) {
chainLength_ += n;
}
void IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
void IOBufQueue::append(
unique_ptr<IOBuf>&& buf, bool pack, bool allowTailReuse) {
if (!buf) {
return;
}
......@@ -149,9 +147,13 @@ void IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
chainLength_ += buf->computeChainDataLength();
}
appendToChain(head_, std::move(buf), pack);
if (allowTailReuse) {
maybeReuseTail();
}
}
void IOBufQueue::append(const folly::IOBuf& buf, bool pack) {
void IOBufQueue::append(
const folly::IOBuf& buf, bool pack, bool allowTailReuse) {
if (!head_ || !pack) {
append(buf.clone(), pack);
return;
......@@ -185,9 +187,12 @@ void IOBufQueue::append(const folly::IOBuf& buf, bool pack) {
head_->prependChain(src->cloneOne());
src = src->next();
} while (src != &buf);
if (allowTailReuse) {
maybeReuseTail();
}
}
void IOBufQueue::append(IOBufQueue& other, bool pack) {
void IOBufQueue::append(IOBufQueue& other, bool pack, bool allowTailReuse) {
if (!other.head_) {
return;
}
......@@ -202,6 +207,9 @@ void IOBufQueue::append(IOBufQueue& other, bool pack) {
}
}
appendToChain(head_, std::move(other.head_), pack);
if (allowTailReuse) {
maybeReuseTail();
}
other.chainLength_ = 0;
}
......@@ -248,10 +256,41 @@ pair<void*, std::size_t> IOBufQueue::preallocateSlow(
tailStart_ = newBuf->writableTail();
cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
tailStart_, tailStart_ + newBuf->tailroom());
reusableTail_ = newBuf.get();
appendToChain(head_, std::move(newBuf), false);
return make_pair(writableTail(), std::min<std::size_t>(max, tailroom()));
}
void IOBufQueue::maybeReuseTail() {
if (reusableTail_ == nullptr || reusableTail_->isSharedOne() ||
// Includes the reusableTail_ == head_->prev() case.
reusableTail_->tailroom() <= head_->prev()->tailroom()) {
return;
}
std::unique_ptr<IOBuf> newTail;
if (reusableTail_->length() == 0) {
// Nothing was written to the old tail, we can just move it to the end.
if (reusableTail_ == head_.get()) {
newTail = std::exchange(head_, head_->pop());
} else {
newTail = reusableTail_->unlink();
}
} else {
// We know the tail is not shared, so we can clone it and wrap it in a
// new (unshared) IOBuf that owns its writable tail to reuse it.
newTail = IOBuf::takeOwnership(
reusableTail_->writableTail(),
reusableTail_->tailroom(),
0,
[](void*, void* p) { delete reinterpret_cast<IOBuf*>(p); },
reusableTail_->cloneOne().release());
// Adjust the capacity of the old buffer to release ownership of its tail.
reusableTail_->trimWritableTail(reusableTail_->tailroom());
reusableTail_ = newTail.get();
}
head_->prependChain(std::move(newTail));
}
unique_ptr<IOBuf> IOBufQueue::split(size_t n, bool throwOnUnderflow) {
auto guard = updateGuard();
unique_ptr<IOBuf> result;
......
......@@ -291,17 +291,26 @@ class IOBufQueue {
* by copying some data from the first buffers in the buf chain (and
* releasing the buffers), if possible. If pack is false, we leave
* the chain topology unchanged.
*
* If allowTailReuse is true, the current writable tail is reappended at the
* end of the chain when possible and beneficial.
*/
void append(std::unique_ptr<folly::IOBuf>&& buf, bool pack = false);
void append(const folly::IOBuf& buf, bool pack = false);
void append(
std::unique_ptr<folly::IOBuf>&& buf,
bool pack = false,
bool allowTailReuse = false);
void append(
const folly::IOBuf& buf, bool pack = false, bool allowTailReuse = false);
/**
* Add a queue to the end of this queue. The queue takes ownership of
* all buffers from the other queue.
*/
void append(IOBufQueue& other, bool pack = false);
void append(IOBufQueue&& other, bool pack = false) {
append(other, pack); // call lvalue reference overload, above
void append(
IOBufQueue& other, bool pack = false, bool allowTailReuse = false);
void append(
IOBufQueue&& other, bool pack = false, bool allowTailReuse = false) {
append(other, pack, allowTailReuse);
}
/**
......@@ -556,6 +565,12 @@ class IOBufQueue {
WritableRangeCacheData* cachePtr_{nullptr};
WritableRangeCacheData localCache_;
// Non-null only if points to the current tail buffer, and that buffer was
// originally created by this IOBufQueue, so it can be safely
// reused. Initially set by preallocateSlow() and updated by maybeReuseTail()
// or invalidated by updateGuard().
folly::IOBuf* reusableTail_ = nullptr;
void dcheckCacheIntegrity() const {
// Tail start should always be less than tail end.
DCHECK_LE((void*)tailStart_, (void*)cachePtr_->cachedRange.first);
......@@ -570,13 +585,21 @@ class IOBufQueue {
DCHECK(cachePtr_->attached);
// Either cache is empty or it coincides with the tail.
DCHECK(
cachePtr_->cachedRange.first == nullptr ||
(head_ != nullptr && tailStart_ == head_->prev()->writableTail() &&
tailStart_ <= cachePtr_->cachedRange.first &&
cachePtr_->cachedRange.first >= head_->prev()->writableTail() &&
cachePtr_->cachedRange.second ==
head_->prev()->writableTail() + head_->prev()->tailroom()));
if (cachePtr_->cachedRange.first != nullptr) {
DCHECK(head_ != nullptr);
DCHECK(tailStart_ == head_->prev()->writableTail());
DCHECK(tailStart_ <= cachePtr_->cachedRange.first);
DCHECK(cachePtr_->cachedRange.first >= head_->prev()->writableTail());
DCHECK(
cachePtr_->cachedRange.second ==
head_->prev()->writableTail() + head_->prev()->tailroom());
}
// If reusableTail_ is not null it should point to the current tail buffer.
if (reusableTail_ != nullptr) {
DCHECK(head_ != nullptr);
DCHECK(reusableTail_ == head_->prev());
}
}
/**
......@@ -629,6 +652,10 @@ class IOBufQueue {
* Update cached writable tail range. Called by updateGuard()
*/
void updateWritableTailCache() {
if (head_ == nullptr || reusableTail_ != head_->prev()) {
reusableTail_ = nullptr;
}
if (LIKELY(head_ != nullptr)) {
IOBuf* buf = head_->prev();
if (LIKELY(!buf->isSharedOne())) {
......@@ -644,6 +671,8 @@ class IOBufQueue {
std::pair<void*, std::size_t> preallocateSlow(
std::size_t min, std::size_t newAllocationSize, std::size_t max);
void maybeReuseTail();
};
} // namespace folly
......@@ -614,6 +614,26 @@ TEST(IOBuf, QueueAppenderInsertClone) {
EXPECT_EQ(x, queue.front()->next()->data()[0]);
}
TEST(IOBuf, QueueAppenderReuseTail) {
folly::IOBufQueue queue;
QueueAppender appender{&queue, 100};
constexpr StringPiece prologue = "hello";
appender.pushAtMost(
reinterpret_cast<const uint8_t*>(prologue.data()), prologue.size());
size_t expectedCapacity = queue.front()->capacity();
auto unpackable = IOBuf::create(folly::IOBufQueue::kMaxPackCopy + 1);
unpackable->append(folly::IOBufQueue::kMaxPackCopy + 1);
expectedCapacity += unpackable->capacity();
appender.insert(std::move(unpackable));
constexpr StringPiece epilogue = " world";
appender.pushAtMost(
reinterpret_cast<const uint8_t*>(epilogue.data()), epilogue.size());
EXPECT_EQ(queue.front()->computeChainCapacity(), expectedCapacity);
}
TEST(IOBuf, QueueAppenderRWCursor) {
folly::IOBufQueue queue;
......
......@@ -20,6 +20,7 @@
#include <iostream>
#include <stdexcept>
#include <fmt/format.h>
#include <folly/Range.h>
#include <folly/portability/GTest.h>
......@@ -157,15 +158,15 @@ TEST(IOBufQueue, AppendIOBufRefChain) {
TEST(IOBufQueue, AppendIOBufRefChainPartial) {
IOBufQueue queue(clOptions);
queue.append(*stringToIOBuf("abc", 3), true);
queue.preallocate(10, 10, 10);
queue.preallocate(16, 16, 16);
auto numElements = queue.front()->countChainElements();
auto chain = stringToIOBuf("Hello", 5);
chain->prependChain(stringToIOBuf("World!", 6));
chain->prependChain(stringToIOBuf("Test", 4));
auto chain = stringToIOBuf("This fits in 16B", 16);
chain->prependChain(stringToIOBuf("Hello ", 5));
chain->prependChain(stringToIOBuf("World", 5));
queue.append(*chain, true);
// Make sure that we performed a copy of first IOBuf and cloned the rest.
EXPECT_EQ(numElements + 2, queue.front()->countChainElements());
EXPECT_EQ("abcHelloWorld!Test", queueToString(queue));
EXPECT_EQ("abcThis fits in 16BHelloWorld", queueToString(queue));
}
TEST(IOBufQueue, Split) {
......@@ -527,3 +528,76 @@ TEST(IOBufQueue, Gather) {
queue.front()->length());
EXPECT_EQ("hello world", s);
}
TEST(IOBufQueue, ReuseTail) {
const auto test = [](bool asValue, bool withEmptyHead) {
SCOPED_TRACE(
fmt::format("asValue={}, withEmptyHead={}", asValue, withEmptyHead));
IOBufQueue queue;
IOBufQueue::WritableRangeCache cache(&queue);
constexpr size_t kInitialCapacity = 1024;
queue.preallocate(kInitialCapacity, kInitialCapacity);
size_t expectedCapacity = queue.front()->capacity();
const auto makeUnpackable = [] {
auto unpackable = IOBuf::create(IOBufQueue::kMaxPackCopy + 1);
unpackable->append(IOBufQueue::kMaxPackCopy + 1);
return unpackable;
};
auto unpackable = makeUnpackable();
expectedCapacity += unpackable->capacity();
std::unique_ptr<IOBuf> buf;
size_t packableLength = 0;
if (withEmptyHead) {
// In this case, the unpackable buffer should just shift the empty head
// buffer forward.
buf = std::move(unpackable);
} else {
queue.append("hello ");
buf = stringToIOBuf(SCL("world"));
packableLength = buf->length();
buf->appendChain(std::move(unpackable));
}
const auto oldTail = reinterpret_cast<uint8_t*>(queue.writableTail());
const auto oldTailroom = queue.tailroom();
// Append two buffers in a row to verify that the reused tail gets pushed
// forward without wrapping.
for (size_t i = 0; i < 2; ++i) {
SCOPED_TRACE(fmt::format("i={}", i));
if (asValue) {
queue.append(
std::move(buf), /* pack */ true, /* allowTailReuse */ true);
} else {
queue.append(*buf, /* pack */ true, /* allowTailReuse */ true);
}
// We should be able to avoid allocating new memory because we still had
// room in the old tail, even after packing the first IOBuf.
EXPECT_EQ(queue.writableTail(), oldTail + packableLength);
EXPECT_EQ(queue.tailroom(), oldTailroom - packableLength);
EXPECT_EQ(queue.front()->computeChainCapacity(), expectedCapacity);
EXPECT_EQ(
queue.front()->countChainElements(), i + (withEmptyHead ? 2 : 3));
if (i == 0) {
buf = makeUnpackable();
expectedCapacity += buf->capacity();
}
}
};
// Test both unique_ptr and value overloads, and check that an empty head is
// handled correctly.
for (bool asValue : {false, true}) {
for (bool withEmptyHead : {false, true}) {
test(asValue, withEmptyHead);
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment