Commit 2f002209 authored by Nick Terrell's avatar Nick Terrell Committed by Facebook Github Bot

Add BZIP2 stream codec

Summary:
Add the BZIP2 stream codec.

The `FlushOp::FLUSH` does not guarantee that the decompressor can read all the input processed so far, due to a bug in the bzip2 library. This is likely not important, since `FLUSH` is not a common operation, especially with bzip2.

Reviewed By: yfeldblum

Differential Revision: D9484325

fbshipit-source-id: 40770b6f301a16d86c4de8c2b0875f931f00cba2
parent 8cbbffc3
......@@ -1549,10 +1549,13 @@ std::unique_ptr<StreamCodec> getZstdFastStreamCodec(int level, CodecType type) {
#if FOLLY_HAVE_LIBBZ2
class Bzip2Codec final : public Codec {
class Bzip2StreamCodec final : public StreamCodec {
public:
static std::unique_ptr<Codec> create(int level, CodecType type);
explicit Bzip2Codec(int level, CodecType type);
static std::unique_ptr<Codec> createCodec(int level, CodecType type);
static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
explicit Bzip2StreamCodec(int level, CodecType type);
~Bzip2StreamCodec() override;
std::vector<std::string> validPrefixes() const override;
bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
......@@ -1560,21 +1563,41 @@ class Bzip2Codec final : public Codec {
private:
uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
std::unique_ptr<IOBuf> doUncompress(
IOBuf const* data,
Optional<uint64_t> uncompressedLength) override;
void doResetStream() override;
bool doCompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) override;
bool doUncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) override;
void resetCStream();
void resetDStream();
Optional<bz_stream> cstream_{};
Optional<bz_stream> dstream_{};
int level_;
bool needReset_{true};
};
/* static */ std::unique_ptr<Codec> Bzip2Codec::create(
/* static */ std::unique_ptr<Codec> Bzip2StreamCodec::createCodec(
int level,
CodecType type) {
return std::make_unique<Bzip2Codec>(level, type);
return createStream(level, type);
}
Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
/* static */ std::unique_ptr<StreamCodec> Bzip2StreamCodec::createStream(
int level,
CodecType type) {
return std::make_unique<Bzip2StreamCodec>(level, type);
}
Bzip2StreamCodec::Bzip2StreamCodec(int level, CodecType type)
: StreamCodec(type) {
DCHECK(type == CodecType::BZIP2);
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
......@@ -1597,15 +1620,17 @@ Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
static uint32_t constexpr kBzip2MagicLE = 0x685a42;
static uint64_t constexpr kBzip2MagicBytes = 3;
std::vector<std::string> Bzip2Codec::validPrefixes() const {
std::vector<std::string> Bzip2StreamCodec::validPrefixes() const {
return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
}
bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
bool Bzip2StreamCodec::canUncompress(IOBuf const* data, Optional<uint64_t>)
const {
return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
}
uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
uint64_t Bzip2StreamCodec::doMaxCompressedLength(
uint64_t uncompressedLength) const {
// http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
// To guarantee that the compressed data will fit in its buffer, allocate an
// output buffer of size 1% larger than the uncompressed data, plus six
......@@ -1631,126 +1656,125 @@ static int bzCheck(int const rc) {
case BZ_FLUSH_OK:
case BZ_FINISH_OK:
case BZ_STREAM_END:
// Allow BZ_PARAM_ERROR.
// It can get returned if no progress is made, but we handle that.
case BZ_PARAM_ERROR:
return rc;
default:
throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
}
}
static std::unique_ptr<IOBuf> addOutputBuffer(
bz_stream* stream,
uint64_t const bufferLength) {
DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
DCHECK_EQ(stream->avail_out, 0);
auto buf = IOBuf::create(bufferLength);
buf->append(buf->capacity());
stream->next_out = reinterpret_cast<char*>(buf->writableData());
stream->avail_out = buf->length();
return buf;
Bzip2StreamCodec::~Bzip2StreamCodec() {
if (cstream_) {
BZ2_bzCompressEnd(cstream_.get_pointer());
cstream_.clear();
}
if (dstream_) {
BZ2_bzDecompressEnd(dstream_.get_pointer());
dstream_.clear();
}
}
std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
bz_stream stream = createBzStream();
bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
SCOPE_EXIT {
bzCheck(BZ2_bzCompressEnd(&stream));
};
uint64_t const uncompressedLength = data->computeChainDataLength();
uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
auto out = addOutputBuffer(
&stream,
maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
: kDefaultBufferLength);
for (auto range : *data) {
while (!range.empty()) {
auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
stream.next_in =
const_cast<char*>(reinterpret_cast<char const*>(range.data()));
stream.avail_in = inSize;
void Bzip2StreamCodec::doResetStream() {
needReset_ = true;
}
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
void Bzip2StreamCodec::resetCStream() {
if (cstream_) {
BZ2_bzCompressEnd(cstream_.get_pointer());
}
cstream_ = createBzStream();
bzCheck(BZ2_bzCompressInit(cstream_.get_pointer(), level_, 0, 0));
}
bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
range.uncheckedAdvance(inSize - stream.avail_in);
int bzip2TranslateFlush(StreamCodec::FlushOp flushOp) {
switch (flushOp) {
case StreamCodec::FlushOp::NONE:
return BZ_RUN;
case StreamCodec::FlushOp::END:
return BZ_FINISH;
case StreamCodec::FlushOp::FLUSH:
throw std::invalid_argument(
"Bzip2StreamCodec: FlushOp::FLUSH not supported");
default:
throw std::invalid_argument("Bzip2StreamCodec: Invalid flush");
}
}
bool Bzip2StreamCodec::doCompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (needReset_) {
resetCStream();
needReset_ = false;
}
do {
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
if (input.empty() && output.empty()) {
return false;
}
} while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
out->prev()->trimEnd(stream.avail_out);
return out;
}
std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
const IOBuf* data,
Optional<uint64_t> uncompressedLength) {
bz_stream stream = createBzStream();
bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
cstream_->next_in =
const_cast<char*>(reinterpret_cast<const char*>(input.data()));
cstream_->avail_in = input.size();
cstream_->next_out = reinterpret_cast<char*>(output.data());
cstream_->avail_out = output.size();
SCOPE_EXIT {
bzCheck(BZ2_bzDecompressEnd(&stream));
input.uncheckedAdvance(input.size() - cstream_->avail_in);
output.uncheckedAdvance(output.size() - cstream_->avail_out);
};
uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
uint64_t const kDefaultBufferLength =
computeBufferLength(data->computeChainDataLength(), kBlockSize);
auto out = addOutputBuffer(
&stream,
((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
? *uncompressedLength
: kDefaultBufferLength));
int rc = BZ_OK;
for (auto range : *data) {
while (!range.empty()) {
auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
stream.next_in =
const_cast<char*>(reinterpret_cast<char const*>(range.data()));
stream.avail_in = inSize;
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
}
rc = bzCheck(BZ2_bzDecompress(&stream));
range.uncheckedAdvance(inSize - stream.avail_in);
}
}
while (rc != BZ_STREAM_END) {
if (stream.avail_out == 0) {
out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
}
size_t const outputSize = stream.avail_out;
rc = bzCheck(BZ2_bzDecompress(&stream));
if (outputSize == stream.avail_out) {
throw std::runtime_error("Bzip2Codec: Truncated input");
int const rc = bzCheck(
BZ2_bzCompress(cstream_.get_pointer(), bzip2TranslateFlush(flushOp)));
switch (flushOp) {
case StreamCodec::FlushOp::NONE:
return false;
case StreamCodec::FlushOp::FLUSH:
if (rc == BZ_RUN_OK) {
DCHECK_EQ(cstream_->avail_in, 0);
DCHECK(input.size() == 0 || cstream_->avail_out != output.size());
return true;
}
return false;
case StreamCodec::FlushOp::END:
return rc == BZ_STREAM_END;
default:
throw std::invalid_argument("Bzip2StreamCodec: invalid FlushOp");
}
return false;
}
out->prev()->trimEnd(stream.avail_out);
void Bzip2StreamCodec::resetDStream() {
if (dstream_) {
BZ2_bzDecompressEnd(dstream_.get_pointer());
}
dstream_ = createBzStream();
bzCheck(BZ2_bzDecompressInit(dstream_.get_pointer(), 0, 0));
}
uint64_t const totalOut =
(uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
if (uncompressedLength && uncompressedLength != totalOut) {
throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
bool Bzip2StreamCodec::doUncompressStream(
ByteRange& input,
MutableByteRange& output,
StreamCodec::FlushOp flushOp) {
if (flushOp == StreamCodec::FlushOp::FLUSH) {
throw std::invalid_argument(
"Bzip2StreamCodec: FlushOp::FLUSH not supported");
}
if (needReset_) {
resetDStream();
needReset_ = false;
}
return out;
dstream_->next_in =
const_cast<char*>(reinterpret_cast<const char*>(input.data()));
dstream_->avail_in = input.size();
dstream_->next_out = reinterpret_cast<char*>(output.data());
dstream_->avail_out = output.size();
SCOPE_EXIT {
input.uncheckedAdvance(input.size() - dstream_->avail_in);
output.uncheckedAdvance(output.size() - dstream_->avail_out);
};
int const rc = bzCheck(BZ2_bzDecompress(dstream_.get_pointer()));
return rc == BZ_STREAM_END;
}
#endif // FOLLY_HAVE_LIBBZ2
......@@ -2051,7 +2075,7 @@ constexpr Factory
#endif
#if FOLLY_HAVE_LIBBZ2
{Bzip2Codec::create, nullptr},
{Bzip2StreamCodec::createCodec, Bzip2StreamCodec::createStream},
#else
{},
#endif
......
......@@ -63,6 +63,7 @@ enum class CodecType {
/**
* Use zlib compression.
* Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
* Streaming compression is supported.
*/
ZLIB = 4,
......@@ -74,6 +75,7 @@ enum class CodecType {
/**
* Use LZMA2 compression.
* Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
* Streaming compression is supported.
*/
LZMA2 = 6,
LZMA2_VARINT_SIZE = 7,
......@@ -82,6 +84,7 @@ enum class CodecType {
* Use ZSTD compression.
* Levels supported: 1 = fast, ..., 19 = best; default = 3
* Use ZSTD_FAST for the fastest zstd compression (negative levels).
* Streaming compression is supported.
*/
ZSTD = 8,
......@@ -89,6 +92,7 @@ enum class CodecType {
* Use gzip compression. This is the same compression algorithm as ZLIB but
* gzip-compressed files tend to be easier to work with from the command line.
* Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
* Streaming compression is supported.
*/
GZIP = 9,
......@@ -101,6 +105,9 @@ enum class CodecType {
/**
* Use bzip2 compression.
* Levels supported: 1 = fast, 9 = best; default = 9
* Streaming compression is supported BUT FlushOp::FLUSH does NOT ensure that
* the decompressor can read all the data up to that point, due to a bug in
* the bzip2 library.
*/
BZIP2 = 11,
......@@ -114,6 +121,7 @@ enum class CodecType {
* speed is around 25% faster than ZSTD.
* This codec is fully compatible with ZSTD.
* Levels supported: 1 = best, ..., 5 = fast; default = 1
* Streaming compression is supported.
*/
ZSTD_FAST = 12,
......
......@@ -462,12 +462,20 @@ INSTANTIATE_TEST_CASE_P(
CodecType::BZIP2,
})));
static bool codecHasFlush(CodecType type) {
return type != CodecType::BZIP2;
}
class StreamingUnitTest : public testing::TestWithParam<CodecType> {
protected:
void SetUp() override {
codec_ = getStreamCodec(GetParam());
}
bool hasFlush() const {
return codecHasFlush(GetParam());
}
std::unique_ptr<StreamCodec> codec_;
};
......@@ -554,8 +562,10 @@ TEST_P(StreamingUnitTest, emptyData) {
codec_->resetStream(0);
output = {largeBuffer->writableData(), largeBuffer->length()};
EXPECT_FALSE(codec_->compressStream(input, output));
if (hasFlush()) {
EXPECT_TRUE(
codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
}
EXPECT_TRUE(
codec_->compressStream(input, output, StreamCodec::FlushOp::END));
}
......@@ -564,17 +574,21 @@ TEST_P(StreamingUnitTest, emptyData) {
output = {};
codec_->resetStream();
EXPECT_TRUE(codec_->uncompressStream(input, output));
if (hasFlush()) {
codec_->resetStream();
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
}
codec_->resetStream();
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
codec_->resetStream(0);
EXPECT_TRUE(codec_->uncompressStream(input, output));
if (hasFlush()) {
codec_->resetStream(0);
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
}
codec_->resetStream(0);
EXPECT_TRUE(
codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
......@@ -600,6 +614,9 @@ TEST_P(StreamingUnitTest, noForwardProgress) {
// No progress is not okay twice in a row for all flush operations when
// compressing
for (const auto flushOp : flushOps) {
if (flushOp == StreamCodec::FlushOp::FLUSH && !hasFlush()) {
continue;
}
if (codec_->needsDataLength()) {
codec_->resetStream(inBuffer->computeChainDataLength());
} else {
......@@ -621,6 +638,9 @@ TEST_P(StreamingUnitTest, noForwardProgress) {
// No progress is not okay twice in a row for all flush operations when
// uncompressing
for (const auto flushOp : flushOps) {
if (flushOp == StreamCodec::FlushOp::FLUSH && !hasFlush()) {
continue;
}
codec_->resetStream();
auto input = compressed->coalesce();
// Remove the last byte so the operation is incomplete
......@@ -681,35 +701,43 @@ TEST_P(StreamingUnitTest, stateTransitions) {
codec_->resetStream();
EXPECT_FALSE(compress());
EXPECT_FALSE(compress());
if (hasFlush()) {
EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
}
EXPECT_FALSE(compress());
EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
}
codec_->resetStream(in.size() * 5);
compress_all(false);
compress_all(false);
if (hasFlush()) {
compress_all(true, StreamCodec::FlushOp::FLUSH);
}
compress_all(false);
compress_all(true, StreamCodec::FlushOp::END);
// uncompression flow
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
if (hasFlush()) {
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
}
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
codec_->resetStream();
EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
if (hasFlush()) {
codec_->resetStream();
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
}
// compress -> uncompress
codec_->resetStream(in.size());
EXPECT_FALSE(compress());
EXPECT_THROW(uncompress(), std::logic_error);
// uncompress -> compress
codec_->resetStream(inBuffer->computeChainDataLength());
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::NONE));
EXPECT_THROW(compress(), std::logic_error);
// end -> compress
if (!codec_->needsDataLength()) {
......@@ -724,16 +752,20 @@ TEST_P(StreamingUnitTest, stateTransitions) {
EXPECT_THROW(compress(), std::logic_error);
// end -> uncompress
codec_->resetStream();
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
EXPECT_TRUE(uncompress(StreamCodec::FlushOp::END));
EXPECT_THROW(uncompress(), std::logic_error);
// flush -> compress
if (hasFlush()) {
codec_->resetStream(in.size());
EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
EXPECT_THROW(compress(), std::logic_error);
}
// flush -> end
if (hasFlush()) {
codec_->resetStream(in.size());
EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
}
// undefined -> compress
codec_->compress(inBuffer.get());
EXPECT_THROW(compress(), std::logic_error);
......@@ -752,6 +784,10 @@ INSTANTIATE_TEST_CASE_P(
class StreamingCompressionTest
: public testing::TestWithParam<std::tuple<int, int, CodecType>> {
protected:
bool hasFlush() const {
return codecHasFlush(std::get<2>(GetParam()));
}
void SetUp() override {
auto const tup = GetParam();
uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
......@@ -874,6 +910,8 @@ TEST_P(StreamingCompressionTest, compressStream) {
void StreamingCompressionTest::runUncompressStreamTest(
const folly::io::test::DataHolder& dh) {
const auto flush =
hasFlush() ? StreamCodec::FlushOp::FLUSH : StreamCodec::FlushOp::NONE;
auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
// Concatenate 3 compressed frames in a row
auto compressed = codec_->compress(data.get());
......@@ -884,8 +922,7 @@ void StreamingCompressionTest::runUncompressStreamTest(
// Uncompress the first frame
codec_->resetStream(data->computeChainDataLength());
{
auto const result = uncompressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
auto const result = uncompressSome(codec_.get(), input, chunkSize_, flush);
ASSERT_TRUE(result.first);
ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
}
......@@ -900,8 +937,7 @@ void StreamingCompressionTest::runUncompressStreamTest(
// Uncompress the third frame
codec_->resetStream();
{
auto const result = uncompressSome(
codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
auto const result = uncompressSome(codec_.get(), input, chunkSize_, flush);
ASSERT_TRUE(result.first);
ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
}
......@@ -945,6 +981,9 @@ void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
}
TEST_P(StreamingCompressionTest, testFlush) {
if (!hasFlush()) {
return;
}
runFlushTest(constantDataHolder);
runFlushTest(randomDataHolder);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment