Commit 660c0417 authored by Nick Terrell's avatar Nick Terrell Committed by Facebook Github Bot

Add support for advanced zstd options

Summary:
* Use the new advanced API.
* Support all advanced compression options and decompression window size.
* Dictionary support is easy to add later when needed.
* Remove the one-pass optimizations since zstd supports them by default now.

Reviewed By: yfeldblum

Differential Revision: D7898697

fbshipit-source-id: 42d84808a9bf2200ebfb8d2071b853960a19eb79
parent d0142327
......@@ -58,6 +58,7 @@ nobase_follyinclude_HEADERS = \
compression/Counters.h \
compression/Utils.h \
compression/Zlib.h \
compression/Zstd.h \
concurrency/CacheLocality.h \
concurrency/ConcurrentHashMap.h \
concurrency/CoreCachedSharedPtr.h \
......@@ -542,6 +543,7 @@ libfolly_la_SOURCES = \
compression/Compression.cpp \
compression/Counters.cpp \
compression/Zlib.cpp \
compression/Zstd.cpp \
concurrency/CacheLocality.cpp \
container/detail/F14Table.cpp \
detail/AtFork.cpp \
......
......@@ -40,8 +40,7 @@
#endif
#if FOLLY_HAVE_LIBZSTD
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#include <folly/compression/Zstd.h>
#endif
#if FOLLY_HAVE_LIBBZ2
......@@ -1420,85 +1419,6 @@ bool LZMA2StreamCodec::doUncompressStream(
#ifdef FOLLY_HAVE_LIBZSTD
namespace {
void zstdFreeCStream(ZSTD_CStream* zcs) {
ZSTD_freeCStream(zcs);
}
void zstdFreeDStream(ZSTD_DStream* zds) {
ZSTD_freeDStream(zds);
}
} // namespace
/**
* ZSTD compression
*/
class ZSTDStreamCodec final : public StreamCodec {
public:
static std::unique_ptr<Codec> createCodec(int level, CodecType);
static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
explicit ZSTDStreamCodec(int level, CodecType type);
std::vector<std::string> validPrefixes() const override;
bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
const override;
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
Optional<uint64_t> doGetUncompressedLength(
IOBuf const* data,
Optional<uint64_t> uncompressedLength) const override;
void doResetStream() override;
bool doCompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) override;
bool doUncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) override;
void resetCStream();
void resetDStream();
bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
int level_;
bool needReset_{true};
std::unique_ptr<
ZSTD_CStream,
folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
cstream_{nullptr};
std::unique_ptr<
ZSTD_DStream,
folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
dstream_{nullptr};
};
static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
return {prefixToStringLE(kZSTDMagicLE)};
}
bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
const {
return dataStartsWithLE(data, kZSTDMagicLE);
}
std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
return make_unique<ZSTDStreamCodec>(level, type);
}
std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
int level,
CodecType type) {
return make_unique<ZSTDStreamCodec>(level, type);
}
static int zstdConvertLevel(int level) {
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
......@@ -1515,193 +1435,14 @@ static int zstdConvertLevel(int level) {
return level;
}
ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
: StreamCodec(type, zstdConvertLevel(level)),
level_(zstdConvertLevel(level)) {
std::unique_ptr<Codec> getZstdCodec(int level, CodecType type) {
DCHECK(type == CodecType::ZSTD);
return zstd::getCodec(zstd::Options(zstdConvertLevel(level)));
}
bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
return false;
}
uint64_t ZSTDStreamCodec::doMaxCompressedLength(
uint64_t uncompressedLength) const {
return ZSTD_compressBound(uncompressedLength);
}
void zstdThrowIfError(size_t rc) {
if (!ZSTD_isError(rc)) {
return;
}
throw std::runtime_error(
to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
}
Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
IOBuf const* data,
Optional<uint64_t> uncompressedLength) const {
// Read decompressed size from frame if available in first IOBuf.
auto const decompressedSize =
ZSTD_getDecompressedSize(data->data(), data->length());
if (decompressedSize != 0) {
if (uncompressedLength && *uncompressedLength != decompressedSize) {
throw std::runtime_error("ZSTD: invalid uncompressed length");
}
uncompressedLength = decompressedSize;
}
return uncompressedLength;
}
void ZSTDStreamCodec::doResetStream() {
needReset_ = true;
}
bool ZSTDStreamCodec::tryBlockCompress(
ByteRange& input,
MutableByteRange& output) const {
DCHECK(needReset_);
// We need to know that we have enough output space to use block compression
if (output.size() < ZSTD_compressBound(input.size())) {
return false;
}
size_t const length = ZSTD_compress(
output.data(), output.size(), input.data(), input.size(), level_);
zstdThrowIfError(length);
input.uncheckedAdvance(input.size());
output.uncheckedAdvance(length);
return true;
}
void ZSTDStreamCodec::resetCStream() {
if (!cstream_) {
cstream_.reset(ZSTD_createCStream());
if (!cstream_) {
throw std::bad_alloc{};
}
}
// As of 1.3.2 ZSTD_initCStream_advanced() interprets content size 0 as
// unknown if contentSizeFlag == 0, but this behavior is deprecated, and will
// be removed in the future. Starting with version 1.3.2 start passing the
// correct value, ZSTD_CONTENTSIZE_UNKNOWN.
#if ZSTD_VERSION_NUMBER >= 10302
constexpr uint64_t kZstdUnknownContentSize = ZSTD_CONTENTSIZE_UNKNOWN;
#else
constexpr uint64_t kZstdUnknownContentSize = 0;
#endif
// Advanced API usage works for all supported versions of zstd.
// Required to set contentSizeFlag.
auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
params.fParams.contentSizeFlag = uncompressedLength().hasValue();
zstdThrowIfError(ZSTD_initCStream_advanced(
cstream_.get(),
nullptr,
0,
params,
uncompressedLength().value_or(kZstdUnknownContentSize)));
}
bool ZSTDStreamCodec::doCompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (needReset_) {
// If we are given all the input in one chunk try to use block compression
if (flushOp == StreamCodec::FlushOp::END &&
tryBlockCompress(input, output)) {
return true;
}
resetCStream();
needReset_ = false;
}
ZSTD_inBuffer in = {input.data(), input.size(), 0};
ZSTD_outBuffer out = {output.data(), output.size(), 0};
SCOPE_EXIT {
input.uncheckedAdvance(in.pos);
output.uncheckedAdvance(out.pos);
};
if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
}
if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
size_t rc;
switch (flushOp) {
case StreamCodec::FlushOp::FLUSH:
rc = ZSTD_flushStream(cstream_.get(), &out);
break;
case StreamCodec::FlushOp::END:
rc = ZSTD_endStream(cstream_.get(), &out);
break;
default:
throw std::invalid_argument("ZSTD: invalid FlushOp");
}
zstdThrowIfError(rc);
if (rc == 0) {
return true;
}
}
return false;
}
bool ZSTDStreamCodec::tryBlockUncompress(
ByteRange& input,
MutableByteRange& output) const {
DCHECK(needReset_);
#if ZSTD_VERSION_NUMBER < 10104
// We require ZSTD_findFrameCompressedSize() to perform this optimization.
return false;
#else
// We need to know the uncompressed length and have enough output space.
if (!uncompressedLength() || output.size() < *uncompressedLength()) {
return false;
}
size_t const compressedLength =
ZSTD_findFrameCompressedSize(input.data(), input.size());
zstdThrowIfError(compressedLength);
size_t const length = ZSTD_decompress(
output.data(), *uncompressedLength(), input.data(), compressedLength);
zstdThrowIfError(length);
if (length != *uncompressedLength()) {
throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
}
input.uncheckedAdvance(compressedLength);
output.uncheckedAdvance(length);
return true;
#endif
}
void ZSTDStreamCodec::resetDStream() {
if (!dstream_) {
dstream_.reset(ZSTD_createDStream());
if (!dstream_) {
throw std::bad_alloc{};
}
}
zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
}
bool ZSTDStreamCodec::doUncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (needReset_) {
// If we are given all the input in one chunk try to use block uncompression
if (flushOp == StreamCodec::FlushOp::END &&
tryBlockUncompress(input, output)) {
return true;
}
resetDStream();
needReset_ = false;
}
ZSTD_inBuffer in = {input.data(), input.size(), 0};
ZSTD_outBuffer out = {output.data(), output.size(), 0};
SCOPE_EXIT {
input.uncheckedAdvance(in.pos);
output.uncheckedAdvance(out.pos);
};
size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
zstdThrowIfError(rc);
return rc == 0;
std::unique_ptr<StreamCodec> getZstdStreamCodec(int level, CodecType type) {
DCHECK(type == CodecType::ZSTD);
return zstd::getStreamCodec(zstd::Options(zstdConvertLevel(level)));
}
#endif // FOLLY_HAVE_LIBZSTD
......@@ -2192,7 +1933,7 @@ constexpr Factory
#endif
#if FOLLY_HAVE_LIBZSTD
{ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
{getZstdCodec, getZstdStreamCodec},
#else
{},
#endif
......
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/compression/Zstd.h>
#if FOLLY_HAVE_LIBZSTD
#include <stdexcept>
#include <string>
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#include <folly/Conv.h>
#include <folly/Range.h>
#include <folly/ScopeGuard.h>
#include <folly/compression/Utils.h>
static_assert(
ZSTD_VERSION_NUMBER >= 10302,
"zstd-1.3.2 is the minimum supported zstd version.");
using folly::io::compression::detail::dataStartsWithLE;
using folly::io::compression::detail::prefixToStringLE;
namespace folly {
namespace io {
namespace zstd {
namespace {
void zstdFreeCCtx(ZSTD_CCtx* zc) {
ZSTD_freeCCtx(zc);
}
void zstdFreeDCtx(ZSTD_DCtx* zd) {
ZSTD_freeDCtx(zd);
}
size_t zstdThrowIfError(size_t rc) {
if (!ZSTD_isError(rc)) {
return rc;
}
throw std::runtime_error(
to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
}
ZSTD_EndDirective zstdTranslateFlush(StreamCodec::FlushOp flush) {
switch (flush) {
case StreamCodec::FlushOp::NONE:
return ZSTD_e_continue;
case StreamCodec::FlushOp::FLUSH:
return ZSTD_e_flush;
case StreamCodec::FlushOp::END:
return ZSTD_e_end;
default:
throw std::invalid_argument("ZSTDStreamCodec: Invalid flush");
}
}
class ZSTDStreamCodec final : public StreamCodec {
public:
explicit ZSTDStreamCodec(Options options);
std::vector<std::string> validPrefixes() const override;
bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
const override;
private:
bool doNeedsUncompressedLength() const override;
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
Optional<uint64_t> doGetUncompressedLength(
IOBuf const* data,
Optional<uint64_t> uncompressedLength) const override;
void doResetStream() override;
bool doCompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) override;
bool doUncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) override;
void resetCCtx();
void resetDCtx();
Options options_;
bool needReset_{true};
std::unique_ptr<
ZSTD_CCtx,
folly::static_function_deleter<ZSTD_CCtx, &zstdFreeCCtx>>
cctx_{nullptr};
std::unique_ptr<
ZSTD_DCtx,
folly::static_function_deleter<ZSTD_DCtx, &zstdFreeDCtx>>
dctx_{nullptr};
};
constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
return {prefixToStringLE(kZSTDMagicLE)};
}
bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
const {
return dataStartsWithLE(data, kZSTDMagicLE);
}
ZSTDStreamCodec::ZSTDStreamCodec(Options options)
: StreamCodec(CodecType::ZSTD), options_(std::move(options)) {}
bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
return false;
}
uint64_t ZSTDStreamCodec::doMaxCompressedLength(
uint64_t uncompressedLength) const {
return ZSTD_compressBound(uncompressedLength);
}
Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
IOBuf const* data,
Optional<uint64_t> uncompressedLength) const {
// Read decompressed size from frame if available in first IOBuf.
auto const decompressedSize =
ZSTD_getFrameContentSize(data->data(), data->length());
if (decompressedSize == ZSTD_CONTENTSIZE_UNKNOWN ||
decompressedSize == ZSTD_CONTENTSIZE_ERROR) {
return uncompressedLength;
}
if (uncompressedLength && *uncompressedLength != decompressedSize) {
throw std::runtime_error("ZSTD: invalid uncompressed length");
}
return decompressedSize;
}
void ZSTDStreamCodec::doResetStream() {
needReset_ = true;
}
void ZSTDStreamCodec::resetCCtx() {
if (!cctx_) {
cctx_.reset(ZSTD_createCCtx());
if (!cctx_) {
throw std::bad_alloc{};
}
}
ZSTD_CCtx_reset(cctx_.get());
zstdThrowIfError(
ZSTD_CCtx_setParametersUsingCCtxParams(cctx_.get(), options_.params()));
zstdThrowIfError(ZSTD_CCtx_setPledgedSrcSize(
cctx_.get(), uncompressedLength().value_or(ZSTD_CONTENTSIZE_UNKNOWN)));
}
bool ZSTDStreamCodec::doCompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (needReset_) {
resetCCtx();
needReset_ = false;
}
ZSTD_inBuffer in = {input.data(), input.size(), 0};
ZSTD_outBuffer out = {output.data(), output.size(), 0};
SCOPE_EXIT {
input.uncheckedAdvance(in.pos);
output.uncheckedAdvance(out.pos);
};
size_t const rc = zstdThrowIfError(ZSTD_compress_generic(
cctx_.get(), &out, &in, zstdTranslateFlush(flushOp)));
switch (flushOp) {
case StreamCodec::FlushOp::NONE:
return false;
case StreamCodec::FlushOp::FLUSH:
case StreamCodec::FlushOp::END:
return rc == 0;
default:
throw std::invalid_argument("ZSTD: invalid FlushOp");
}
}
void ZSTDStreamCodec::resetDCtx() {
if (!dctx_) {
dctx_.reset(ZSTD_createDCtx());
if (!dctx_) {
throw std::bad_alloc{};
}
}
ZSTD_DCtx_reset(dctx_.get());
if (options_.maxWindowSize() != 0) {
zstdThrowIfError(
ZSTD_DCtx_setMaxWindowSize(dctx_.get(), options_.maxWindowSize()));
}
}
bool ZSTDStreamCodec::doUncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp) {
if (needReset_) {
resetDCtx();
needReset_ = false;
}
ZSTD_inBuffer in = {input.data(), input.size(), 0};
ZSTD_outBuffer out = {output.data(), output.size(), 0};
SCOPE_EXIT {
input.uncheckedAdvance(in.pos);
output.uncheckedAdvance(out.pos);
};
size_t const rc =
zstdThrowIfError(ZSTD_decompress_generic(dctx_.get(), &out, &in));
return rc == 0;
}
} // namespace
Options::Options(int level) : params_(ZSTD_createCCtxParams()) {
if (params_ == nullptr) {
throw std::bad_alloc{};
}
zstdThrowIfError(ZSTD_CCtxParams_init(params_.get(), level));
}
void Options::set(ZSTD_cParameter param, unsigned value) {
zstdThrowIfError(ZSTD_CCtxParam_setParameter(params_.get(), param, value));
}
/* static */ void Options::freeCCtxParams(ZSTD_CCtx_params* params) {
ZSTD_freeCCtxParams(params);
}
std::unique_ptr<Codec> getCodec(Options options) {
return std::make_unique<ZSTDStreamCodec>(std::move(options));
}
std::unique_ptr<StreamCodec> getStreamCodec(Options options) {
return std::make_unique<ZSTDStreamCodec>(std::move(options));
}
} // namespace zstd
} // namespace io
} // namespace folly
#endif
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory.h>
#include <folly/Memory.h>
#include <folly/Portability.h>
#include <folly/compression/Compression.h>
#if FOLLY_HAVE_LIBZSTD
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
namespace folly {
namespace io {
namespace zstd {
/**
* Interface for zstd-specific codec initialization.
*/
class Options {
public:
/* Create an Options struct with the default options for the given `level`.
* NOTE: This is the zstd level, COMPRESSION_LEVEL_DEFAULT and such aren't
* supported, since zstd supports negative compression levels.
*/
explicit Options(int level);
/**
* Set the compression `param` to `value`.
* See the zstd documentation for ZSTD_CCtx_setParameter() for details, this
* is just a thin wrapper.
*/
void set(ZSTD_cParameter param, unsigned value);
/**
* Set the maximum allowed window size during decompression.
* `maxWindowSize == 0` means don't set the maximum window size.
* zstd's current default limit is 2^27.
* See the zstd documentation for ZSTD_DCtx_setMaxWindowSize() for details.
*/
void setMaxWindowSize(size_t maxWindowSize) {
maxWindowSize_ = maxWindowSize;
}
/// Get a reference to the ZSTD_CCtx_params.
ZSTD_CCtx_params const* params() const {
return params_.get();
}
/// Get the maximum window size.
size_t maxWindowSize() const {
return maxWindowSize_;
}
private:
static void freeCCtxParams(ZSTD_CCtx_params* params);
std::unique_ptr<
ZSTD_CCtx_params,
folly::static_function_deleter<ZSTD_CCtx_params, &freeCCtxParams>>
params_;
size_t maxWindowSize_{0};
};
/// Get a zstd Codec with the given options.
std::unique_ptr<Codec> getCodec(Options options);
/// Get a zstd StreamCodec with the given options.
std::unique_ptr<StreamCodec> getStreamCodec(Options options);
} // namespace zstd
} // namespace io
} // namespace folly
#endif
......@@ -34,6 +34,8 @@
#if FOLLY_HAVE_LIBZSTD
#include <zstd.h>
#include <folly/compression/Zstd.h>
#endif
#if FOLLY_HAVE_LIBZ
......@@ -1370,6 +1372,43 @@ TEST(ZstdTest, BackwardCompatible) {
}
}
TEST(ZstdTest, CustomOptions) {
auto test = [](const DataHolder& dh, unsigned contentSizeFlag) {
unsigned const wlog = 23;
zstd::Options options(1);
options.set(ZSTD_p_nbWorkers, 3);
options.set(ZSTD_p_contentSizeFlag, contentSizeFlag);
options.set(ZSTD_p_checksumFlag, 1);
options.set(ZSTD_p_windowLog, wlog);
auto codec = zstd::getCodec(std::move(options));
size_t const uncompressedLength = (size_t)1 << 27;
auto const original = std::string(
reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
uncompressedLength);
auto const compressed = codec->compress(original);
auto const uncompressed = codec->uncompress(compressed);
EXPECT_EQ(uncompressed, original);
EXPECT_EQ(
codec->getUncompressedLength(
folly::IOBuf::wrapBuffer(compressed.data(), compressed.size())
.get()),
contentSizeFlag ? uncompressedLength : Optional<uint64_t>());
{
ZSTD_frameHeader zfh;
ZSTD_getFrameHeader(&zfh, compressed.data(), compressed.size());
EXPECT_EQ(zfh.checksumFlag, 1);
EXPECT_EQ(zfh.windowSize, 1ULL << wlog);
EXPECT_EQ(
zfh.frameContentSize,
contentSizeFlag ? uncompressedLength : ZSTD_CONTENTSIZE_UNKNOWN);
}
};
for (unsigned contentSizeFlag = 0; contentSizeFlag <= 1; ++contentSizeFlag) {
test(constantDataHolder, contentSizeFlag);
test(randomDataHolder, contentSizeFlag);
}
}
#endif
#if FOLLY_HAVE_LIBZ
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment