Commit cca8b5c3 authored by Nick Terrell's avatar Nick Terrell Committed by Facebook Github Bot

Add LZ4_FRAME codec

Summary:
The LZ4 Frame codec encodes data using the LZ4 frame format.
One advantage of the LZ4 frame format is that it has 4 magic bytes in the header, so users can transparently determine compression type.
It also allows the user to interop with the lz4 command line tool.

Reviewed By: yfeldblum

Differential Revision: D4715918

fbshipit-source-id: 689833fef526b1cfe98685179e7b494380d49cba
parent 55cd50f3
......@@ -18,6 +18,7 @@
#if FOLLY_HAVE_LIBLZ4
#include <lz4.h>
#include <lz4frame.h>
#include <lz4hc.h>
#endif
......@@ -383,7 +384,160 @@ std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
return out;
}
#endif // FOLLY_HAVE_LIBLZ4
class LZ4FrameCodec final : public Codec {
public:
static std::unique_ptr<Codec> create(int level, CodecType type);
explicit LZ4FrameCodec(int level, CodecType type);
~LZ4FrameCodec();
private:
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data,
uint64_t uncompressedLength) override;
// Reset the dctx_ if it is dirty or null.
void resetDCtx();
int level_;
LZ4F_dctx* dctx_{nullptr};
bool dirty_{false};
};
/* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
int level,
CodecType type) {
return make_unique<LZ4FrameCodec>(level, type);
}
static size_t lz4FrameThrowOnError(size_t code) {
if (LZ4F_isError(code)) {
throw std::runtime_error(
to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
}
return code;
}
void LZ4FrameCodec::resetDCtx() {
if (dctx_ && !dirty_) {
return;
}
if (dctx_) {
LZ4F_freeDecompressionContext(dctx_);
}
lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
dirty_ = false;
}
LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::LZ4_FRAME);
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT:
level_ = 0;
break;
case COMPRESSION_LEVEL_BEST:
level_ = 16;
break;
default:
level_ = level;
break;
}
}
LZ4FrameCodec::~LZ4FrameCodec() {
if (dctx_) {
LZ4F_freeDecompressionContext(dctx_);
}
}
std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
// LZ4 Frame compression doesn't support streaming so we have to coalesce
IOBuf clone;
if (data->isChained()) {
clone = data->cloneCoalescedAsValue();
data = &clone;
}
// Set preferences
const auto uncompressedLength = data->length();
LZ4F_preferences_t prefs{};
prefs.compressionLevel = level_;
prefs.frameInfo.contentSize = uncompressedLength;
// Compress
auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
buf->writableTail(),
buf->tailroom(),
data->data(),
data->length(),
&prefs));
buf->append(written);
return buf;
}
std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
const IOBuf* data,
uint64_t uncompressedLength) {
// Reset the dctx if any errors have occurred
resetDCtx();
// Coalesce the data
ByteRange in = *data->begin();
IOBuf clone;
if (data->isChained()) {
clone = data->cloneCoalescedAsValue();
in = clone.coalesce();
}
data = nullptr;
// Select decompression options
LZ4F_decompressOptions_t options;
options.stableDst = 1;
// Select blockSize and growthSize for the IOBufQueue
IOBufQueue queue(IOBufQueue::cacheChainLength());
auto blockSize = uint64_t{64} << 10;
auto growthSize = uint64_t{4} << 20;
if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
// Allocate uncompressedLength in one chunk (up to 64 MB)
const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
queue.preallocate(allocateSize, allocateSize);
blockSize = std::min(uncompressedLength, blockSize);
growthSize = std::min(uncompressedLength, growthSize);
} else {
// Reduce growthSize for small data
const auto guessUncompressedLen = 4 * std::max(blockSize, in.size());
growthSize = std::min(guessUncompressedLen, growthSize);
}
// Once LZ4_decompress() is called, the dctx_ cannot be reused until it
// returns 0
dirty_ = true;
// Decompress until the frame is over
size_t code = 0;
do {
// Allocate enough space to decompress at least a block
void* out;
size_t outSize;
std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
// Decompress
size_t inSize = in.size();
code = lz4FrameThrowOnError(
LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
if (in.empty() && outSize == 0 && code != 0) {
// We passed no input, no output was produced, and the frame isn't over
// No more forward progress is possible
throw std::runtime_error("LZ4Frame error: Incomplete frame");
}
in.uncheckedAdvance(inSize);
queue.postallocate(outSize);
} while (code != 0);
// At this point the decompression context can be reused
dirty_ = false;
if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
queue.chainLength() != uncompressedLength) {
throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
}
return queue.move();
}
#endif // FOLLY_HAVE_LIBLZ4
#if FOLLY_HAVE_LIBSNAPPY
......@@ -1238,6 +1392,12 @@ static constexpr CodecFactory
#else
nullptr,
#endif
#if FOLLY_HAVE_LIBLZ4
LZ4FrameCodec::create,
#else
nullptr,
#endif
};
bool hasCodec(CodecType type) {
......
......@@ -85,7 +85,13 @@ enum class CodecType {
*/
GZIP = 9,
NUM_CODEC_TYPES = 10,
/**
* Use LZ4 frame compression.
* Levels supported: 0 = fast, 16 = best; default = 0
*/
LZ4_FRAME = 10,
NUM_CODEC_TYPES = 11,
};
class Codec {
......
......@@ -159,6 +159,7 @@ TEST(CompressionTestNeedsUncompressedLength, Simple) {
{ CodecType::LZMA2_VARINT_SIZE, false },
{ CodecType::ZSTD, false },
{ CodecType::GZIP, false },
{ CodecType::LZ4_FRAME, false },
};
for (auto const& test : expectations) {
......@@ -391,8 +392,8 @@ INSTANTIATE_TEST_CASE_P(
supportedCodecs({
CodecType::SNAPPY,
CodecType::ZLIB,
})));
CodecType::LZ4_FRAME,
})));
}}} // namespaces
int main(int argc, char *argv[]) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment