Commit 74560278 authored by Nick Terrell's avatar Nick Terrell Committed by Facebook Github Bot

Add streaming API

Summary:
Adds a C-style streaming API to `folly/io/Compression.h`, with a zlib-esque interface.
Stacked diffs will add streaming support to zstd, zlib, gzip, lzma, lz4_frame, and automatic codecs.
This interface is targeting advanced users who are building higher level interfaces.
They can use this as a common base so they don't have to reimplement the same code for every codec.

Reviewed By: yfeldblum

Differential Revision: D5026332

fbshipit-source-id: e3abf1767b493c2fef153b895858a3a81b67d989
parent e70058f4
......@@ -176,6 +176,242 @@ std::string Codec::doUncompressString(
return output;
}
uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
if (uncompressedLength == 0) {
return 0;
}
return doMaxCompressedLength(uncompressedLength);
}
Optional<uint64_t> Codec::getUncompressedLength(
const folly::IOBuf* data,
Optional<uint64_t> uncompressedLength) const {
auto const compressedLength = data->computeChainDataLength();
if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
throw std::runtime_error("Invalid uncompressed length");
}
return 0;
}
return doGetUncompressedLength(data, uncompressedLength);
}
Optional<uint64_t> Codec::doGetUncompressedLength(
const folly::IOBuf*,
Optional<uint64_t> uncompressedLength) const {
return uncompressedLength;
}
bool StreamCodec::needsDataLength() const {
return doNeedsDataLength();
}
bool StreamCodec::doNeedsDataLength() const {
return false;
}
void StreamCodec::assertStateIs(State expected) const {
if (state_ != expected) {
throw std::logic_error(folly::to<std::string>(
"Codec: state is ", state_, "; expected state ", expected));
}
}
void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
state_ = State::RESET;
uncompressedLength_ = uncompressedLength;
doResetStream();
}
bool StreamCodec::compressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (state_ == State::RESET && input.empty()) {
if (flushOp == StreamCodec::FlushOp::NONE) {
return false;
}
if (flushOp == StreamCodec::FlushOp::END &&
uncompressedLength().value_or(0) != 0) {
throw std::runtime_error("Codec: invalid uncompressed length");
}
return true;
}
if (state_ == State::RESET && !input.empty() &&
uncompressedLength() == uint64_t(0)) {
throw std::runtime_error("Codec: invalid uncompressed length");
}
// Handle input state transitions
switch (flushOp) {
case StreamCodec::FlushOp::NONE:
if (state_ == State::RESET) {
state_ = State::COMPRESS;
}
assertStateIs(State::COMPRESS);
break;
case StreamCodec::FlushOp::FLUSH:
if (state_ == State::RESET || state_ == State::COMPRESS) {
state_ = State::COMPRESS_FLUSH;
}
assertStateIs(State::COMPRESS_FLUSH);
break;
case StreamCodec::FlushOp::END:
if (state_ == State::RESET || state_ == State::COMPRESS) {
state_ = State::COMPRESS_END;
}
assertStateIs(State::COMPRESS_END);
break;
}
bool const done = doCompressStream(input, output, flushOp);
// Handle output state transitions
if (done) {
if (state_ == State::COMPRESS_FLUSH) {
state_ = State::COMPRESS;
} else if (state_ == State::COMPRESS_END) {
state_ = State::END;
}
// Check internal invariants
DCHECK(input.empty());
DCHECK(flushOp != StreamCodec::FlushOp::NONE);
}
return done;
}
bool StreamCodec::uncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (state_ == State::RESET && input.empty()) {
if (uncompressedLength().value_or(0) == 0) {
return true;
}
return false;
}
// Handle input state transitions
if (state_ == State::RESET) {
state_ = State::UNCOMPRESS;
}
assertStateIs(State::UNCOMPRESS);
bool const done = doUncompressStream(input, output, flushOp);
// Handle output state transitions
if (done) {
state_ = State::END;
}
return done;
}
static std::unique_ptr<IOBuf> addOutputBuffer(
MutableByteRange& output,
uint64_t size) {
DCHECK(output.empty());
auto buffer = IOBuf::create(size);
buffer->append(buffer->capacity());
output = {buffer->writableData(), buffer->length()};
return buffer;
}
std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
uint64_t const uncompressedLength = data->computeChainDataLength();
resetStream(uncompressedLength);
uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
MutableByteRange output;
auto buffer = addOutputBuffer(
output,
maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
: kDefaultBufferLength);
// Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
IOBuf const* current = data;
ByteRange input{current->data(), current->length()};
StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
for (;;) {
while (input.empty() && current->next() != data) {
current = current->next();
input = {current->data(), current->length()};
}
if (current->next() == data) {
// This is the last input buffer so end the stream
flushOp = StreamCodec::FlushOp::END;
}
if (output.empty()) {
buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
}
bool const done = compressStream(input, output, flushOp);
if (done) {
DCHECK(input.empty());
DCHECK(flushOp == StreamCodec::FlushOp::END);
DCHECK_EQ(current->next(), data);
break;
}
}
buffer->prev()->trimEnd(output.size());
return buffer;
}
static uint64_t computeBufferLength(
uint64_t const compressedLength,
uint64_t const blockSize) {
uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
return std::min(goodBufferSize, kMaxBufferLength);
}
std::unique_ptr<IOBuf> StreamCodec::doUncompress(
IOBuf const* data,
Optional<uint64_t> uncompressedLength) {
auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
auto constexpr kBlockSize = uint64_t(128) << 10;
auto const defaultBufferLength =
computeBufferLength(data->computeChainDataLength(), kBlockSize);
uncompressedLength = getUncompressedLength(data, uncompressedLength);
resetStream(uncompressedLength);
MutableByteRange output;
auto buffer = addOutputBuffer(
output,
(uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
? *uncompressedLength
: defaultBufferLength));
// Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
IOBuf const* current = data;
ByteRange input{current->data(), current->length()};
StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
for (;;) {
while (input.empty() && current->next() != data) {
current = current->next();
input = {current->data(), current->length()};
}
if (current->next() == data) {
// Tell the uncompressor there is no more input (it may optimize)
flushOp = StreamCodec::FlushOp::END;
}
if (output.empty()) {
buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
}
bool const done = uncompressStream(input, output, flushOp);
if (done) {
break;
}
}
if (!input.empty()) {
throw std::runtime_error("Codec: Junk after end of data");
}
buffer->prev()->trimEnd(output.size());
if (uncompressedLength &&
*uncompressedLength != buffer->computeChainDataLength()) {
throw std::runtime_error("Codec: invalid uncompressed length");
}
return buffer;
}
namespace {
/**
......@@ -187,6 +423,7 @@ class NoCompressionCodec final : public Codec {
explicit NoCompressionCodec(int level, CodecType type);
private:
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
......@@ -212,6 +449,11 @@ NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
}
}
uint64_t NoCompressionCodec::doMaxCompressedLength(
uint64_t uncompressedLength) const {
return uncompressedLength;
}
std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
const IOBuf* data) {
return data->clone();
......@@ -288,14 +530,6 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
memcpy(&result[0], &prefix, n);
return result;
}
static uint64_t computeBufferLength(
uint64_t const compressedLength,
uint64_t const blockSize) {
uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
return std::min(goodBufferSize, kMaxBufferLength);
}
} // namespace
#if FOLLY_HAVE_LIBLZ4
......@@ -311,6 +545,7 @@ class LZ4Codec final : public Codec {
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
......@@ -360,6 +595,11 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const {
return LZ4_MAX_INPUT_SIZE;
}
uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
return LZ4_compressBound(uncompressedLength) +
(encodeSize() ? kMaxVarintLength64 : 0);
}
std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
IOBuf clone;
if (data->isChained()) {
......@@ -368,8 +608,7 @@ std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
data = &clone;
}
uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
auto out = IOBuf::create(maxCompressedLength(data->length()));
if (encodeSize()) {
encodeVarintToIOBuf(data->length(), out.get());
}
......@@ -452,6 +691,8 @@ class LZ4FrameCodec final : public Codec {
const override;
private:
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
......@@ -481,6 +722,14 @@ bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
return dataStartsWithLE(data, kLZ4FrameMagicLE);
}
uint64_t LZ4FrameCodec::doMaxCompressedLength(
uint64_t uncompressedLength) const {
LZ4F_preferences_t prefs{};
prefs.compressionLevel = level_;
prefs.frameInfo.contentSize = uncompressedLength;
return LZ4F_compressFrameBound(uncompressedLength, &prefs);
}
static size_t lz4FrameThrowOnError(size_t code) {
if (LZ4F_isError(code)) {
throw std::runtime_error(
......@@ -535,7 +784,7 @@ std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
prefs.compressionLevel = level_;
prefs.frameInfo.contentSize = uncompressedLength;
// Compress
auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
buf->writableTail(),
buf->tailroom(),
......@@ -659,6 +908,7 @@ class SnappyCodec final : public Codec {
private:
uint64_t doMaxUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
......@@ -688,10 +938,13 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const {
return std::numeric_limits<uint32_t>::max();
}
uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
return snappy::MaxCompressedLength(uncompressedLength);
}
std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
IOBufSnappySource source(data);
auto out =
IOBuf::create(snappy::MaxCompressedLength(source.Available()));
auto out = IOBuf::create(maxCompressedLength(source.Available()));
snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
out->writableTail()));
......@@ -748,6 +1001,7 @@ class ZlibCodec final : public Codec {
const override;
private:
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
......@@ -819,6 +1073,10 @@ bool ZlibCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
}
}
uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
return deflateBound(nullptr, uncompressedLength);
}
std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
return std::make_unique<ZlibCodec>(level, type);
}
......@@ -1075,6 +1333,7 @@ class LZMA2Codec final : public Codec {
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
......@@ -1141,6 +1400,11 @@ uint64_t LZMA2Codec::doMaxUncompressedLength() const {
return uint64_t(1) << 63;
}
uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
return lzma_stream_buffer_bound(uncompressedLength) +
(encodeSize() ? kMaxVarintLength64 : 0);
}
std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
lzma_stream* stream,
size_t length) {
......@@ -1334,6 +1598,7 @@ class ZSTDCodec final : public Codec {
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
......@@ -1380,6 +1645,10 @@ bool ZSTDCodec::doNeedsUncompressedLength() const {
return false;
}
uint64_t ZSTDCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
return ZSTD_compressBound(uncompressedLength);
}
void zstdThrowIfError(size_t rc) {
if (!ZSTD_isError(rc)) {
return;
......@@ -1414,7 +1683,8 @@ std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
zstdThrowIfError(rc);
Cursor cursor(data);
auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
auto result =
IOBuf::createCombined(maxCompressedLength(cursor.totalLength()));
ZSTD_outBuffer out;
out.dst = result->writableTail();
......@@ -1557,6 +1827,7 @@ class Bzip2Codec final : public Codec {
const override;
private:
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
std::unique_ptr<IOBuf> doUncompress(
IOBuf const* data,
......@@ -1602,6 +1873,14 @@ bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
}
uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
// http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
// To guarantee that the compressed data will fit in its buffer, allocate an
// output buffer of size 1% larger than the uncompressed data, plus six
// hundred extra bytes.
return uncompressedLength + uncompressedLength / 100 + 600;
}
static bz_stream createBzStream() {
bz_stream stream;
stream.bzalloc = nullptr;
......@@ -1626,14 +1905,6 @@ static int bzCheck(int const rc) {
}
}
static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
// http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
// To guarantee that the compressed data will fit in its buffer, allocate an
// output buffer of size 1% larger than the uncompressed data, plus six
// hundred extra bytes.
return uncompressedLength + uncompressedLength / 100 + 600;
}
static std::unique_ptr<IOBuf> addOutputBuffer(
bz_stream* stream,
uint64_t const bufferLength) {
......@@ -1657,13 +1928,13 @@ std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
};
uint64_t const uncompressedLength = data->computeChainDataLength();
uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
auto out = addOutputBuffer(
&stream,
maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
: kDefaultBufferLength);
for (auto range : *data) {
......@@ -1766,6 +2037,10 @@ class AutomaticCodec final : public Codec {
bool doNeedsUncompressedLength() const override;
uint64_t doMaxUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t) const override {
throw std::runtime_error(
"AutomaticCodec error: maxCompressedLength() not supported.");
}
std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
throw std::runtime_error("AutomaticCodec error: compress() not supported.");
}
......@@ -1909,93 +2184,112 @@ std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
}
} // namespace
using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
struct Factory {
CodecFactory codec;
StreamCodecFactory stream;
};
typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
static constexpr CodecFactory
constexpr Factory
codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
nullptr, // USER_DEFINED
NoCompressionCodec::create,
{}, // USER_DEFINED
{NoCompressionCodec::create, nullptr},
#if FOLLY_HAVE_LIBLZ4
LZ4Codec::create,
{LZ4Codec::create, nullptr},
#else
nullptr,
{},
#endif
#if FOLLY_HAVE_LIBSNAPPY
SnappyCodec::create,
{SnappyCodec::create, nullptr},
#else
nullptr,
{},
#endif
#if FOLLY_HAVE_LIBZ
ZlibCodec::create,
{ZlibCodec::create, nullptr},
#else
nullptr,
{},
#endif
#if FOLLY_HAVE_LIBLZ4
LZ4Codec::create,
{LZ4Codec::create, nullptr},
#else
nullptr,
{},
#endif
#if FOLLY_HAVE_LIBLZMA
LZMA2Codec::create,
LZMA2Codec::create,
{LZMA2Codec::create, nullptr},
{LZMA2Codec::create, nullptr},
#else
nullptr,
nullptr,
{},
{},
#endif
#if FOLLY_HAVE_LIBZSTD
ZSTDCodec::create,
{ZSTDCodec::create, nullptr},
#else
nullptr,
{},
#endif
#if FOLLY_HAVE_LIBZ
ZlibCodec::create,
{ZlibCodec::create, nullptr},
#else
nullptr,
{},
#endif
#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
LZ4FrameCodec::create,
{LZ4FrameCodec::create, nullptr},
#else
nullptr,
{},
#endif
#if FOLLY_HAVE_LIBBZ2
Bzip2Codec::create,
{Bzip2Codec::create, nullptr},
#else
nullptr
{},
#endif
};
bool hasCodec(CodecType type) {
size_t idx = static_cast<size_t>(type);
Factory const& getFactory(CodecType type) {
size_t const idx = static_cast<size_t>(type);
if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
throw std::invalid_argument(
to<std::string>("Compression type ", idx, " invalid"));
}
return codecFactories[idx] != nullptr;
return codecFactories[idx];
}
} // namespace
bool hasCodec(CodecType type) {
return getFactory(type).codec != nullptr;
}
std::unique_ptr<Codec> getCodec(CodecType type, int level) {
size_t idx = static_cast<size_t>(type);
if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
auto const factory = getFactory(type).codec;
if (!factory) {
throw std::invalid_argument(
to<std::string>("Compression type ", idx, " invalid"));
to<std::string>("Compression type ", type, " not supported"));
}
auto factory = codecFactories[idx];
auto codec = (*factory)(level, type);
DCHECK(codec->type() == type);
return codec;
}
bool hasStreamCodec(CodecType type) {
return getFactory(type).stream != nullptr;
}
std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
auto const factory = getFactory(type).stream;
if (!factory) {
throw std::invalid_argument(to<std::string>(
"Compression type ", idx, " not supported"));
throw std::invalid_argument(
to<std::string>("Compression type ", type, " not supported"));
}
auto codec = (*factory)(level, type);
DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
DCHECK(codec->type() == type);
return codec;
}
......
......@@ -107,11 +107,12 @@ class Codec {
public:
virtual ~Codec() { }
static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1);
/**
* Return the maximum length of data that may be compressed with this codec.
* NO_COMPRESSION and ZLIB support arbitrary lengths;
* LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB.
* May return UNLIMITED_UNCOMPRESSED_LENGTH (uint64_t(-1)) if unlimited.
* May return UNLIMITED_UNCOMPRESSED_LENGTH if unlimited.
*/
uint64_t maxUncompressedLength() const;
......@@ -154,8 +155,6 @@ class Codec {
* Regardless of the behavior of the underlying compressor, uncompressing
* an empty IOBuf chain will return an empty IOBuf chain.
*/
static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1);
std::unique_ptr<IOBuf> uncompress(
const IOBuf* data,
folly::Optional<uint64_t> uncompressedLength = folly::none);
......@@ -169,6 +168,24 @@ class Codec {
StringPiece data,
folly::Optional<uint64_t> uncompressedLength = folly::none);
/**
* Returns a bound on the maximum compressed length when compressing data with
* the given uncompressed length.
*/
uint64_t maxCompressedLength(uint64_t uncompressedLength) const;
/**
* Extracts the uncompressed length from the compressed data if possible.
* If the codec doesn't store the uncompressed length, or the data is
* corrupted it returns the given uncompressedLength.
* If the uncompressed length is stored in the compressed data and
* uncompressedLength is not none and they do not match a std::runtime_error
* is thrown.
*/
folly::Optional<uint64_t> getUncompressedLength(
const folly::IOBuf* data,
folly::Optional<uint64_t> uncompressedLength = folly::none) const;
protected:
explicit Codec(CodecType type);
......@@ -209,7 +226,169 @@ class Codec {
StringPiece data,
folly::Optional<uint64_t> uncompressedLength);
virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const = 0;
// default: returns the passed uncompressedLength.
virtual folly::Optional<uint64_t> doGetUncompressedLength(
const folly::IOBuf* data,
folly::Optional<uint64_t> uncompressedLength) const;
CodecType type_;
};
class StreamCodec : public Codec {
public:
virtual ~StreamCodec() {}
/**
* Does the codec need the data length before compression streaming?
*/
bool needsDataLength() const;
/*****************************************************************************
* Streaming API
*****************************************************************************
* A low-level stateful streaming API.
* Streaming operations can be started in two ways:
* 1. From a clean Codec on which no non-const methods have been called.
* 2. A call to resetStream(), which will reset any codec to a clean state.
* After a streaming operation has begun, either compressStream() or
* uncompressStream() must be called until the streaming operation ends.
* compressStream() ends when it returns true with flushOp END.
* uncompressStream() ends when it returns true. At this point the codec
* may be reused by calling resetStream().
*
* compress() and uncompress() can be called at any time, but they interrupt
* any ongoing streaming operations (state is lost and resetStream() must be
* called before another streaming operation).
*/
/**
* Reset the state of the codec, and set the uncompressed length for the next
* streaming operation. If uncompressedLength is not none it must be exactly
* the uncompressed length. compressStream() must be passed exactly
* uncompressedLength input bytes before the stream is ended.
* uncompressStream() must be passed a compressed frame that uncompresses to
* uncompressedLength.
*/
void resetStream(folly::Optional<uint64_t> uncompressedLength = folly::none);
enum class FlushOp { NONE, FLUSH, END };
/**
* Compresses some data from the input buffer and writes the compressed data
* into the output buffer. It may read input without producing any output,
* except when forced to flush.
*
* The input buffer is advanced to point to the range of data that hasn't yet
* been read. Compression will resume at this point for the next call to
* compressStream(). The output buffer is advanced one byte past the last byte
* written.
*
* The default flushOp is NONE, which allows compressStream() complete
* discretion in how much data to gather before writing any output.
*
* If flushOp is END, all pending and input data is flushed to the output
* buffer, and the frame is ended. compressStream() must be called with the
* same input and flushOp END until it returns true. At this point the caller
* must call resetStream() to use the codec again.
*
* If flushOp is FLUSH, all pending and input data is flushed to the output
* buffer, but the frame is not ended. compressStream() must be called with
* the same input and flushOp END until it returns true. At this point the
* caller can continue to compressStream() with any input data and flushOp.
* The uncompressor, if passed all the produced output data, will be able to
* uncompress all the input data passed to compressStream() so far. Excessive
* use of flushOp FLUSH will deteriorate compression ratio. This is useful for
* stateful streaming across a network. Most users don't need to use this
* flushOp.
*
* A std::logic_error is thrown on incorrect usage of the API.
* A std::runtime_error is thrown upon error conditions.
*/
bool compressStream(
folly::ByteRange& input,
folly::MutableByteRange& output,
FlushOp flushOp = StreamCodec::FlushOp::NONE);
/**
* Uncompresses some data from the input buffer and writes the uncompressed
* data into the output buffer. It may read input without producing any
* output.
*
* The input buffer is advanced to point to the range of data that hasn't yet
* been read. Uncompression will resume at this point for the next call to
* uncompressStream(). The output buffer is advanced one byte past the last
* byte written.
*
* The default flushOp is NONE, which allows uncompressStream() complete
* discretion in how much output data to flush. The uncompressor may not make
* maximum forward progress, but will make some forward progress when
* possible.
*
* If flushOp is END, the caller guarantees that no more input will be
* presented to uncompressStream(). uncompressStream() must be called with the
* same input and flushOp END until it returns true. This is not mandatory,
* but if the input is all available in one buffer, and there is enough output
* space to write the entire frame, codecs can uncompress faster.
*
* If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum
* amount of forward progress possible. When using this flushOp and
* uncompressStream() returns with `!output.empty()` the caller knows that all
* pending output has been flushed. This is useful for stateful streaming
* across a network, and it should be used in conjunction with
* compressStream() with flushOp FLUSH. Most users don't need to use this
* flushOp.
*
* Returns true at the end of a frame. At this point resetStream() must be
* called to reuse the codec.
*/
bool uncompressStream(
folly::ByteRange& input,
folly::MutableByteRange& output,
FlushOp flushOp = StreamCodec::FlushOp::NONE);
protected:
explicit StreamCodec(CodecType type) : Codec(type) {}
// Returns the uncompressed length last passed to resetStream() or none if it
// hasn't been called yet.
folly::Optional<uint64_t> uncompressedLength() const {
return uncompressedLength_;
}
private:
// default: Implemented using the streaming API.
std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) override;
virtual std::unique_ptr<IOBuf> doUncompress(
const folly::IOBuf* data,
folly::Optional<uint64_t> uncompressedLength) override;
// default: Returns false
virtual bool doNeedsDataLength() const;
virtual void doResetStream() = 0;
virtual bool doCompressStream(
folly::ByteRange& input,
folly::MutableByteRange& output,
FlushOp flushOp) = 0;
virtual bool doUncompressStream(
folly::ByteRange& input,
folly::MutableByteRange& output,
FlushOp flushOp) = 0;
enum class State {
RESET,
COMPRESS,
COMPRESS_FLUSH,
COMPRESS_END,
UNCOMPRESS,
END,
};
void assertStateIs(State expected) const;
CodecType type_;
State state_{State::RESET};
ByteRange previousInput_{};
folly::Optional<uint64_t> uncompressedLength_{};
};
constexpr int COMPRESSION_LEVEL_FASTEST = -1;
......@@ -232,7 +411,28 @@ constexpr int COMPRESSION_LEVEL_BEST = -3;
* decompress all data compressed with the a codec of the same type, regardless
* of compression level.
*/
std::unique_ptr<Codec> getCodec(CodecType type,
std::unique_ptr<Codec> getCodec(
CodecType type,
int level = COMPRESSION_LEVEL_DEFAULT);
/**
* Return a codec for the given type. Throws on error. The level
* is a non-negative codec-dependent integer indicating the level of
* compression desired, or one of the following constants:
*
* COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory,
* worst compression)
* COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between
* FASTEST and BEST)
* COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
* best compression)
*
* When decompressing, the compression level is ignored. All codecs will
* decompress all data compressed with the a codec of the same type, regardless
* of compression level.
*/
std::unique_ptr<StreamCodec> getStreamCodec(
CodecType type,
int level = COMPRESSION_LEVEL_DEFAULT);
/**
......@@ -262,4 +462,8 @@ std::unique_ptr<Codec> getAutoUncompressionCodec(
*/
bool hasCodec(CodecType type);
/**
* Check if a specified codec is supported and supports streaming.
*/
bool hasStreamCodec(CodecType type);
}} // namespaces
......@@ -16,10 +16,12 @@
#include <folly/io/Compression.h>
#include <algorithm>
#include <random>
#include <set>
#include <thread>
#include <unordered_map>
#include <utility>
#include <boost/noncopyable.hpp>
#include <glog/logging.h>
......@@ -147,6 +149,19 @@ static std::vector<CodecType> availableCodecs() {
return codecs;
}
static std::vector<CodecType> availableStreamCodecs() {
std::vector<CodecType> codecs;
for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
auto type = static_cast<CodecType>(i);
if (hasStreamCodec(type)) {
codecs.push_back(type);
}
}
return codecs;
}
TEST(CompressionTestNeedsUncompressedLength, Simple) {
static const struct { CodecType type; bool needsUncompressedLength; }
expectations[] = {
......@@ -399,6 +414,422 @@ INSTANTIATE_TEST_CASE_P(
CodecType::BZIP2,
})));
class StreamingUnitTest : public testing::TestWithParam<CodecType> {
protected:
void SetUp() override {
codec_ = getStreamCodec(GetParam());
}
std::unique_ptr<StreamCodec> codec_;
};
TEST_P(StreamingUnitTest, maxCompressedLength) {
EXPECT_EQ(0, codec_->maxCompressedLength(0));
for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
EXPECT_GE(codec_->maxCompressedLength(length), length);
}
}
TEST_P(StreamingUnitTest, getUncompressedLength) {
auto const empty = IOBuf::create(0);
EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
auto const compressed = codec_->compress(data.get());
EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
if (auto const length = codec_->getUncompressedLength(data.get())) {
EXPECT_EQ(100, *length);
}
EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
// If the uncompressed length is stored in the frame, then make sure it throws
// when it is given the wrong length.
if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
}
}
TEST_P(StreamingUnitTest, emptyData) {
ByteRange input{};
auto buffer = IOBuf::create(1);
buffer->append(buffer->capacity());
MutableByteRange output{};
// Test compressing empty data in one pass
EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
codec_->resetStream(0);
EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
codec_->resetStream();
output = {buffer->writableData(), buffer->length()};
EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
EXPECT_EQ(buffer->length(), output.size());
// Test compressing empty data with multiple calls to compressStream()
codec_->resetStream();
output = {};
EXPECT_FALSE(codec_->compressStream(input, output));
EXPECT_TRUE(
codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
codec_->resetStream();
output = {buffer->writableData(), buffer->length()};
EXPECT_FALSE(codec_->compressStream(input, output));
EXPECT_TRUE(
codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
EXPECT_EQ(buffer->length(), output.size());
// Test uncompressing empty data
output = {};
codec_->resetStream();
EXPECT_TRUE(codec_->uncompressStream(input, output));
codec_->resetStream();
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
codec_->resetStream();
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
codec_->resetStream(0);
EXPECT_TRUE(codec_->uncompressStream(input, output));
codec_->resetStream(0);
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
codec_->resetStream(0);
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
}
TEST_P(StreamingUnitTest, noForwardProgressOkay) {
auto inBuffer = IOBuf::create(2);
inBuffer->writableData()[0] = 'a';
inBuffer->writableData()[0] = 'a';
inBuffer->append(2);
auto input = inBuffer->coalesce();
auto compressed = codec_->compress(inBuffer.get());
auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
ByteRange emptyInput;
MutableByteRange emptyOutput;
// Compress some data to avoid empty data special casing
codec_->resetStream();
while (!input.empty()) {
codec_->compressStream(input, output);
}
// empty input and output is okay for flush NONE and FLUSH.
codec_->compressStream(emptyInput, emptyOutput);
codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
codec_->resetStream();
input = inBuffer->coalesce();
output = {outBuffer->writableTail(), outBuffer->tailroom()};
while (!input.empty()) {
codec_->compressStream(input, output);
}
// empty input and output is okay for flush END.
codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
codec_->resetStream();
input = compressed->coalesce();
input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
output = {inBuffer->writableData(), inBuffer->length()};
// Uncompress some data to avoid empty data special casing
while (!input.empty()) {
EXPECT_FALSE(codec_->uncompressStream(input, output));
}
// empty input and output is okay for all flush values.
EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
EXPECT_FALSE(codec_->uncompressStream(
emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
EXPECT_FALSE(codec_->uncompressStream(
emptyInput, emptyOutput, StreamCodec::FlushOp::END));
}
TEST_P(StreamingUnitTest, stateTransitions) {
auto inBuffer = IOBuf::create(1);
inBuffer->writableData()[0] = 'a';
inBuffer->append(1);
auto compressed = codec_->compress(inBuffer.get());
ByteRange const in = compressed->coalesce();
auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
auto compress = [&](
StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
bool empty = false) {
auto input = in;
auto output = empty ? MutableByteRange{} : out;
return codec_->compressStream(input, output, flushOp);
};
auto uncompress = [&](
StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
bool empty = false) {
auto input = in;
auto output = empty ? MutableByteRange{} : out;
return codec_->uncompressStream(input, output, flushOp);
};
// compression flow
codec_->resetStream();
EXPECT_FALSE(compress());
EXPECT_FALSE(compress());
EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
EXPECT_FALSE(compress());
EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
// uncompression flow
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
codec_->resetStream();
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
// compress -> uncompress
codec_->resetStream();
EXPECT_FALSE(compress());
EXPECT_THROW(uncompress(), std::logic_error);
// uncompress -> compress
codec_->resetStream();
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
EXPECT_THROW(compress(), std::logic_error);
// end -> compress
codec_->resetStream();
EXPECT_FALSE(compress());
EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
EXPECT_THROW(compress(), std::logic_error);
// end -> uncompress
codec_->resetStream();
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
EXPECT_THROW(uncompress(), std::logic_error);
// flush -> compress
codec_->resetStream();
EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
EXPECT_THROW(compress(), std::logic_error);
// flush -> end
codec_->resetStream();
EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
// undefined -> compress
codec_->compress(inBuffer.get());
EXPECT_THROW(compress(), std::logic_error);
codec_->uncompress(compressed.get());
EXPECT_THROW(compress(), std::logic_error);
// undefined -> undefined
codec_->uncompress(compressed.get());
codec_->compress(inBuffer.get());
}
INSTANTIATE_TEST_CASE_P(
StreamingUnitTest,
StreamingUnitTest,
testing::ValuesIn(availableStreamCodecs()));
class StreamingCompressionTest
: public testing::TestWithParam<std::tuple<int, int, CodecType>> {
protected:
void SetUp() override {
auto const tup = GetParam();
uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
chunkSize_ = size_t(1) << std::get<1>(tup);
codec_ = getStreamCodec(std::get<2>(tup));
}
void runResetStreamTest(DataHolder const& dh);
void runCompressStreamTest(DataHolder const& dh);
void runUncompressStreamTest(DataHolder const& dh);
void runFlushTest(DataHolder const& dh);
private:
std::vector<ByteRange> split(ByteRange data) const;
uint64_t uncompressedLength_;
size_t chunkSize_;
std::unique_ptr<StreamCodec> codec_;
};
std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
std::vector<ByteRange> result;
result.reserve(pieces + 1);
while (!data.empty()) {
size_t const pieceSize = std::min(data.size(), chunkSize_);
result.push_back(data.subpiece(0, pieceSize));
data.uncheckedAdvance(pieceSize);
}
return result;
}
static std::unique_ptr<IOBuf> compressSome(
StreamCodec* codec,
ByteRange data,
uint64_t bufferSize,
StreamCodec::FlushOp flush) {
bool result;
IOBufQueue queue;
do {
auto buffer = IOBuf::create(bufferSize);
buffer->append(buffer->capacity());
MutableByteRange output{buffer->writableData(), buffer->length()};
result = codec->compressStream(data, output, flush);
buffer->trimEnd(output.size());
queue.append(std::move(buffer));
} while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
EXPECT_TRUE(data.empty());
return queue.move();
}
static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
StreamCodec* codec,
ByteRange& data,
uint64_t bufferSize,
StreamCodec::FlushOp flush) {
bool result;
IOBufQueue queue;
do {
auto buffer = IOBuf::create(bufferSize);
buffer->append(buffer->capacity());
MutableByteRange output{buffer->writableData(), buffer->length()};
result = codec->uncompressStream(data, output, flush);
buffer->trimEnd(output.size());
queue.append(std::move(buffer));
} while (queue.tailroom() == 0 && !result);
return std::make_pair(result, queue.move());
}
void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
auto const input = dh.data(uncompressedLength_);
// Compress some but leave state unclean
codec_->resetStream(uncompressedLength_);
compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
// Reset stream and compress all
codec_->resetStream();
auto compressed =
compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
}
TEST_P(StreamingCompressionTest, resetStream) {
runResetStreamTest(constantDataHolder);
runResetStreamTest(randomDataHolder);
}
void StreamingCompressionTest::runCompressStreamTest(
const folly::io::test::DataHolder& dh) {
auto const inputs = split(dh.data(uncompressedLength_));
IOBufQueue queue;
codec_->resetStream(uncompressedLength_);
// Compress many inputs in a row
for (auto const input : inputs) {
queue.append(compressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
}
// Finish the operation with empty input.
ByteRange empty;
queue.append(
compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
auto const uncompressed = codec_->uncompress(queue.front());
EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
}
TEST_P(StreamingCompressionTest, compressStream) {
runCompressStreamTest(constantDataHolder);
runCompressStreamTest(randomDataHolder);
}
void StreamingCompressionTest::runUncompressStreamTest(
const folly::io::test::DataHolder& dh) {
auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
// Concatenate 3 compressed frames in a row
auto compressed = codec_->compress(data.get());
compressed->prependChain(codec_->compress(data.get()));
compressed->prependChain(codec_->compress(data.get()));
// Pass all 3 compressed frames in one input buffer
auto input = compressed->coalesce();
// Uncompress the first frame
codec_->resetStream(data->computeChainDataLength());
{
auto const result = uncompressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
ASSERT_TRUE(result.first);
ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
}
// Uncompress the second frame
codec_->resetStream();
{
auto const result = uncompressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
ASSERT_TRUE(result.first);
ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
}
// Uncompress the third frame
codec_->resetStream();
{
auto const result = uncompressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
ASSERT_TRUE(result.first);
ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
}
EXPECT_TRUE(input.empty());
}
TEST_P(StreamingCompressionTest, uncompressStream) {
runUncompressStreamTest(constantDataHolder);
runUncompressStreamTest(randomDataHolder);
}
void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
auto const inputs = split(dh.data(uncompressedLength_));
auto uncodec = getStreamCodec(codec_->type());
codec_->resetStream();
for (auto input : inputs) {
// Compress some data and flush the stream
auto compressed = compressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
auto compressedRange = compressed->coalesce();
// Uncompress the compressed data
auto result = uncompressSome(
uncodec.get(),
compressedRange,
chunkSize_,
StreamCodec::FlushOp::FLUSH);
// All compressed data should have been consumed
EXPECT_TRUE(compressedRange.empty());
// The frame isn't complete
EXPECT_FALSE(result.first);
// The uncompressed data should be exactly the input data
EXPECT_EQ(input.size(), result.second->computeChainDataLength());
auto const data = IOBuf::wrapBuffer(input);
EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
}
}
TEST_P(StreamingCompressionTest, testFlush) {
runFlushTest(constantDataHolder);
runFlushTest(randomDataHolder);
}
INSTANTIATE_TEST_CASE_P(
StreamingCompressionTest,
StreamingCompressionTest,
testing::Combine(
testing::Values(0, 1, 12, 22, 27),
testing::Values(12, 17, 20),
testing::ValuesIn(availableStreamCodecs())));
class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
protected:
void SetUp() override {
......@@ -499,6 +930,10 @@ class CustomCodec : public Codec {
return {prefix_};
}
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
}
bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
auto clone = data->cloneCoalescedAsValue();
if (clone.length() < prefix_.size()) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment