Commit d216a9bd authored by Nick Terrell's avatar Nick Terrell Committed by Facebook Github Bot

Add bzip2 support

Summary:
Adds bzip2 support to `folly/io/Compression.h`.
Adds bzip2 to the default set of supported codecs for the `AutomaticCodec`.

Reviewed By: yfeldblum

Differential Revision: D4873771

fbshipit-source-id: d4f4861aef7e4b9efb67095e8892c265b5ae5557
parent 3a7cbbe4
......@@ -554,6 +554,7 @@ AC_CHECK_HEADER([snappy.h], AC_CHECK_LIB([snappy], [main]))
AC_CHECK_HEADER([zlib.h], AC_CHECK_LIB([z], [main]))
AC_CHECK_HEADER([lzma.h], AC_CHECK_LIB([lzma], [main]))
AC_CHECK_HEADER([zstd.h], AC_CHECK_LIB([zstd], [ZSTD_compressStream]))
AC_CHECK_HEADER([bzlib.h], AC_CHECK_LIB([bz2], [main]))
AC_CHECK_HEADER([linux/membarrier.h], AC_DEFINE([HAVE_LINUX_MEMBARRIER_H], [1], [Define to 1 if membarrier.h is available]))
AC_ARG_ENABLE([follytestmain],
......
......@@ -43,6 +43,10 @@
#include <zstd.h>
#endif
#if FOLLY_HAVE_LIBBZ2
#include <bzlib.h>
#endif
#include <folly/Bits.h>
#include <folly/Conv.h>
#include <folly/Memory.h>
......@@ -285,6 +289,14 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
memcpy(&result[0], &prefix, n);
return result;
}
static uint64_t computeBufferLength(
uint64_t const compressedLength,
uint64_t const blockSize) {
uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
return std::min(goodBufferSize, kMaxBufferLength);
}
} // namespace
#if FOLLY_HAVE_LIBLZ4
......@@ -969,13 +981,6 @@ std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
return out;
}
static uint64_t computeBufferLength(uint64_t const compressedLength) {
constexpr uint64_t kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
const uint64_t goodBufferSize = 4 * std::max(kBlockSize, compressedLength);
return std::min(goodBufferSize, kMaxBufferLength);
}
std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
uint64_t uncompressedLength) {
z_stream stream;
......@@ -1009,8 +1014,9 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
// Max 64MiB in one go
constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
const uint64_t defaultBufferLength =
computeBufferLength(data->computeChainDataLength());
computeBufferLength(data->computeChainDataLength(), kBlockSize);
auto out = addOutputBuffer(
&stream,
......@@ -1551,6 +1557,212 @@ std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
#endif // FOLLY_HAVE_LIBZSTD
#if FOLLY_HAVE_LIBBZ2
class Bzip2Codec final : public Codec {
public:
static std::unique_ptr<Codec> create(int level, CodecType type);
explicit Bzip2Codec(int level, CodecType type);
std::vector<std::string> validPrefixes() const override;
bool canUncompress(IOBuf const* data, uint64_t uncompressedLength)
const override;
private:
std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
std::unique_ptr<IOBuf> doUncompress(
IOBuf const* data,
uint64_t uncompressedLength) override;
int level_;
};
/* static */ std::unique_ptr<Codec> Bzip2Codec::create(
int level,
CodecType type) {
return make_unique<Bzip2Codec>(level, type);
}
Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::BZIP2);
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
level = 1;
break;
case COMPRESSION_LEVEL_DEFAULT:
level = 9;
break;
case COMPRESSION_LEVEL_BEST:
level = 9;
break;
}
if (level < 1 || level > 9) {
throw std::invalid_argument(
to<std::string>("Bzip2: invalid level: ", level));
}
level_ = level;
}
static uint32_t constexpr kBzip2MagicLE = 0x685a42;
static uint64_t constexpr kBzip2MagicBytes = 3;
std::vector<std::string> Bzip2Codec::validPrefixes() const {
return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
}
bool Bzip2Codec::canUncompress(IOBuf const* data, uint64_t) const {
return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
}
static bz_stream createBzStream() {
bz_stream stream;
stream.bzalloc = nullptr;
stream.bzfree = nullptr;
stream.opaque = nullptr;
stream.next_in = stream.next_out = nullptr;
stream.avail_in = stream.avail_out = 0;
return stream;
}
// Throws on error condition, otherwise returns the code.
static int bzCheck(int const rc) {
switch (rc) {
case BZ_OK:
case BZ_RUN_OK:
case BZ_FLUSH_OK:
case BZ_FINISH_OK:
case BZ_STREAM_END:
return rc;
default:
throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
}
}
static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
// http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
// To guarantee that the compressed data will fit in its buffer, allocate an
// output buffer of size 1% larger than the uncompressed data, plus six
// hundred extra bytes.
return uncompressedLength + uncompressedLength / 100 + 600;
}
static std::unique_ptr<IOBuf> addOutputBuffer(
bz_stream* stream,
uint64_t const bufferLength) {
DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
DCHECK_EQ(stream->avail_out, 0);
auto buf = IOBuf::create(bufferLength);
buf->append(buf->capacity());
stream->next_out = reinterpret_cast<char*>(buf->writableData());
stream->avail_out = buf->length();
return buf;
}
std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
bz_stream stream = createBzStream();
bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
SCOPE_EXIT {
bzCheck(BZ2_bzCompressEnd(&stream));
};
uint64_t const uncompressedLength = data->computeChainDataLength();
uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
auto out = addOutputBuffer(
&stream,
maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
: kDefaultBufferLength);
for (auto range : *data) {
while (!range.empty()) {
auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
stream.next_in =
const_cast<char*>(reinterpret_cast<char const*>(range.data()));
stream.avail_in = inSize;
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
}
bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
range.uncheckedAdvance(inSize - stream.avail_in);
}
}
do {
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
}
} while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
out->prev()->trimEnd(stream.avail_out);
return out;
}
std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
const IOBuf* data,
uint64_t uncompressedLength) {
bz_stream stream = createBzStream();
bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
SCOPE_EXIT {
bzCheck(BZ2_bzDecompressEnd(&stream));
};
uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
uint64_t const kDefaultBufferLength =
computeBufferLength(data->computeChainDataLength(), kBlockSize);
auto out = addOutputBuffer(
&stream,
((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
uncompressedLength <= kMaxSingleStepLength)
? uncompressedLength
: kDefaultBufferLength));
int rc = BZ_OK;
for (auto range : *data) {
while (!range.empty()) {
auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
stream.next_in =
const_cast<char*>(reinterpret_cast<char const*>(range.data()));
stream.avail_in = inSize;
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
}
rc = bzCheck(BZ2_bzDecompress(&stream));
range.uncheckedAdvance(inSize - stream.avail_in);
}
}
while (rc != BZ_STREAM_END) {
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
}
rc = bzCheck(BZ2_bzDecompress(&stream));
}
out->prev()->trimEnd(stream.avail_out);
uint64_t const totalOut =
(uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
uncompressedLength != totalOut) {
throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
}
return out;
}
#endif // FOLLY_HAVE_LIBBZ2
/**
* Automatic decompression
*/
......@@ -1630,6 +1842,7 @@ AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
addCodecIfSupported(CodecType::ZLIB);
addCodecIfSupported(CodecType::GZIP);
addCodecIfSupported(CodecType::LZMA2);
addCodecIfSupported(CodecType::BZIP2);
if (kIsDebug) {
checkCompatibleCodecs();
}
......@@ -1767,6 +1980,12 @@ static constexpr CodecFactory
#else
nullptr,
#endif
#if FOLLY_HAVE_LIBBZ2
Bzip2Codec::create,
#else
nullptr
#endif
};
bool hasCodec(CodecType type) {
......
......@@ -93,7 +93,13 @@ enum class CodecType {
*/
LZ4_FRAME = 10,
NUM_CODEC_TYPES = 11,
/**
* Use bzip2 compression.
* Levels supported: 1 = fast, 9 = best; default = 9
*/
BZIP2 = 11,
NUM_CODEC_TYPES = 12,
};
class Codec {
......@@ -230,7 +236,7 @@ std::unique_ptr<Codec> getCodec(CodecType type,
/**
* Returns a codec that can uncompress any of the given codec types as well as
* {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2}. Appends each default codec to
* {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2, BZIP2}. Appends each default codec to
* customCodecs in order, so long as a codec with the same type() isn't already
* present. When uncompress() is called, each codec's canUncompress() is called
* in the order that they are given. Appended default codecs are checked last.
......
......@@ -161,6 +161,7 @@ TEST(CompressionTestNeedsUncompressedLength, Simple) {
{ CodecType::ZSTD, false },
{ CodecType::GZIP, false },
{ CodecType::LZ4_FRAME, false },
{ CodecType::BZIP2, false },
};
for (auto const& test : expectations) {
......@@ -396,6 +397,7 @@ INSTANTIATE_TEST_CASE_P(
CodecType::LZMA2,
CodecType::ZSTD,
CodecType::LZ4_FRAME,
CodecType::BZIP2,
})));
class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
......@@ -584,7 +586,8 @@ INSTANTIATE_TEST_CASE_P(
CodecType::ZSTD,
CodecType::ZLIB,
CodecType::GZIP,
CodecType::LZMA2));
CodecType::LZMA2,
CodecType::BZIP2));
TEST(ValidPrefixesTest, CustomCodec) {
std::vector<std::unique_ptr<Codec>> codecs;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment