Commit d71636e2 authored by Tudor Bosman's avatar Tudor Bosman Committed by Sara Golemon

Add Varint-length-prefixed flavor of LZ4

Test Plan: test added

Reviewed By: alandau@fb.com

FB internal diff: D928836
parent 0558d398
...@@ -27,10 +27,13 @@ ...@@ -27,10 +27,13 @@
#include "folly/Memory.h" #include "folly/Memory.h"
#include "folly/Portability.h" #include "folly/Portability.h"
#include "folly/ScopeGuard.h" #include "folly/ScopeGuard.h"
#include "folly/Varint.h"
#include "folly/io/Cursor.h" #include "folly/io/Cursor.h"
namespace folly { namespace io { namespace folly { namespace io {
Codec::Codec(CodecType type) : type_(type) { }
// Ensure consistent behavior in the nullptr case // Ensure consistent behavior in the nullptr case
std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) { std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
return !data->empty() ? doCompress(data) : IOBuf::create(0); return !data->empty() ? doCompress(data) : IOBuf::create(0);
...@@ -65,10 +68,6 @@ uint64_t Codec::maxUncompressedLength() const { ...@@ -65,10 +68,6 @@ uint64_t Codec::maxUncompressedLength() const {
return doMaxUncompressedLength(); return doMaxUncompressedLength();
} }
CodecType Codec::type() const {
return doType();
}
bool Codec::doNeedsUncompressedLength() const { bool Codec::doNeedsUncompressedLength() const {
return false; return false;
} }
...@@ -84,22 +83,23 @@ namespace { ...@@ -84,22 +83,23 @@ namespace {
*/ */
class NoCompressionCodec FOLLY_FINAL : public Codec { class NoCompressionCodec FOLLY_FINAL : public Codec {
public: public:
static std::unique_ptr<Codec> create(int level); static std::unique_ptr<Codec> create(int level, CodecType type);
explicit NoCompressionCodec(int level); explicit NoCompressionCodec(int level, CodecType type);
private: private:
CodecType doType() const FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doUncompress( std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data, const IOBuf* data,
uint64_t uncompressedLength) FOLLY_OVERRIDE; uint64_t uncompressedLength) FOLLY_OVERRIDE;
}; };
std::unique_ptr<Codec> NoCompressionCodec::create(int level) { std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
return make_unique<NoCompressionCodec>(level); return make_unique<NoCompressionCodec>(level, type);
} }
NoCompressionCodec::NoCompressionCodec(int level) { NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
: Codec(type) {
DCHECK(type == CodecType::NO_COMPRESSION);
switch (level) { switch (level) {
case COMPRESSION_LEVEL_DEFAULT: case COMPRESSION_LEVEL_DEFAULT:
case COMPRESSION_LEVEL_FASTEST: case COMPRESSION_LEVEL_FASTEST:
...@@ -112,10 +112,6 @@ NoCompressionCodec::NoCompressionCodec(int level) { ...@@ -112,10 +112,6 @@ NoCompressionCodec::NoCompressionCodec(int level) {
} }
} }
CodecType NoCompressionCodec::doType() const {
return CodecType::NO_COMPRESSION;
}
std::unique_ptr<IOBuf> NoCompressionCodec::doCompress( std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
const IOBuf* data) { const IOBuf* data) {
return data->clone(); return data->clone();
...@@ -137,13 +133,15 @@ std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress( ...@@ -137,13 +133,15 @@ std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
*/ */
class LZ4Codec FOLLY_FINAL : public Codec { class LZ4Codec FOLLY_FINAL : public Codec {
public: public:
static std::unique_ptr<Codec> create(int level); static std::unique_ptr<Codec> create(int level, CodecType type);
explicit LZ4Codec(int level); explicit LZ4Codec(int level, CodecType type);
private: private:
bool doNeedsUncompressedLength() const FOLLY_OVERRIDE; bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
CodecType doType() const FOLLY_OVERRIDE;
bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doUncompress( std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data, const IOBuf* data,
...@@ -152,11 +150,13 @@ class LZ4Codec FOLLY_FINAL : public Codec { ...@@ -152,11 +150,13 @@ class LZ4Codec FOLLY_FINAL : public Codec {
bool highCompression_; bool highCompression_;
}; };
std::unique_ptr<Codec> LZ4Codec::create(int level) { std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
return make_unique<LZ4Codec>(level); return make_unique<LZ4Codec>(level, type);
} }
LZ4Codec::LZ4Codec(int level) { LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
switch (level) { switch (level) {
case COMPRESSION_LEVEL_FASTEST: case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT: case COMPRESSION_LEVEL_DEFAULT:
...@@ -174,7 +174,7 @@ LZ4Codec::LZ4Codec(int level) { ...@@ -174,7 +174,7 @@ LZ4Codec::LZ4Codec(int level) {
} }
bool LZ4Codec::doNeedsUncompressedLength() const { bool LZ4Codec::doNeedsUncompressedLength() const {
return true; return !encodeSize();
} }
uint64_t LZ4Codec::doMaxUncompressedLength() const { uint64_t LZ4Codec::doMaxUncompressedLength() const {
...@@ -183,10 +183,24 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const { ...@@ -183,10 +183,24 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const {
return 1.8 * (uint64_t(1) << 30); return 1.8 * (uint64_t(1) << 30);
} }
CodecType LZ4Codec::doType() const { namespace {
return CodecType::LZ4;
void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
DCHECK_GE(out->tailroom(), kMaxVarintLength64);
out->append(encodeVarint(val, out->writableTail()));
}
uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
// Must have enough room in *this* buffer.
auto p = cursor.peek();
folly::ByteRange range(p.first, p.second);
uint64_t val = decodeVarint(range);
cursor.skip(range.data() - p.first);
return val;
} }
} // namespace
std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) { std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
std::unique_ptr<IOBuf> clone; std::unique_ptr<IOBuf> clone;
if (data->isChained()) { if (data->isChained()) {
...@@ -196,14 +210,19 @@ std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) { ...@@ -196,14 +210,19 @@ std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
data = clone.get(); data = clone.get();
} }
auto out = IOBuf::create(LZ4_compressBound(data->length())); uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
if (encodeSize()) {
encodeVarintToIOBuf(data->length(), out.get());
}
int n; int n;
if (highCompression_) { if (highCompression_) {
n = LZ4_compress(reinterpret_cast<const char*>(data->data()), n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
reinterpret_cast<char*>(out->writableTail()), reinterpret_cast<char*>(out->writableTail()),
data->length()); data->length());
} else { } else {
n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()), n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
reinterpret_cast<char*>(out->writableTail()), reinterpret_cast<char*>(out->writableTail()),
data->length()); data->length());
} }
...@@ -226,15 +245,29 @@ std::unique_ptr<IOBuf> LZ4Codec::doUncompress( ...@@ -226,15 +245,29 @@ std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
data = clone.get(); data = clone.get();
} }
auto out = IOBuf::create(uncompressedLength); folly::io::Cursor cursor(data);
int n = LZ4_uncompress(reinterpret_cast<const char*>(data->data()), uint64_t actualUncompressedLength;
if (encodeSize()) {
actualUncompressedLength = decodeVarintFromCursor(cursor);
if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
uncompressedLength != actualUncompressedLength) {
throw std::runtime_error("LZ4Codec: invalid uncompressed length");
}
} else {
actualUncompressedLength = uncompressedLength;
DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
}
auto out = IOBuf::create(actualUncompressedLength);
auto p = cursor.peek();
int n = LZ4_uncompress(reinterpret_cast<const char*>(p.first),
reinterpret_cast<char*>(out->writableTail()), reinterpret_cast<char*>(out->writableTail()),
uncompressedLength); actualUncompressedLength);
if (n != data->length()) { if (n != p.second) {
throw std::runtime_error(to<std::string>( throw std::runtime_error(to<std::string>(
"LZ4 decompression returned invalid value ", n)); "LZ4 decompression returned invalid value ", n));
} }
out->append(uncompressedLength); out->append(actualUncompressedLength);
return out; return out;
} }
...@@ -279,23 +312,23 @@ void IOBufSnappySource::Skip(size_t n) { ...@@ -279,23 +312,23 @@ void IOBufSnappySource::Skip(size_t n) {
class SnappyCodec FOLLY_FINAL : public Codec { class SnappyCodec FOLLY_FINAL : public Codec {
public: public:
static std::unique_ptr<Codec> create(int level); static std::unique_ptr<Codec> create(int level, CodecType type);
explicit SnappyCodec(int level); explicit SnappyCodec(int level, CodecType type);
private: private:
uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
CodecType doType() const FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doUncompress( std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data, const IOBuf* data,
uint64_t uncompressedLength) FOLLY_OVERRIDE; uint64_t uncompressedLength) FOLLY_OVERRIDE;
}; };
std::unique_ptr<Codec> SnappyCodec::create(int level) { std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
return make_unique<SnappyCodec>(level); return make_unique<SnappyCodec>(level, type);
} }
SnappyCodec::SnappyCodec(int level) { SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::SNAPPY);
switch (level) { switch (level) {
case COMPRESSION_LEVEL_FASTEST: case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT: case COMPRESSION_LEVEL_DEFAULT:
...@@ -313,10 +346,6 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const { ...@@ -313,10 +346,6 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const {
return std::numeric_limits<uint32_t>::max(); return std::numeric_limits<uint32_t>::max();
} }
CodecType SnappyCodec::doType() const {
return CodecType::SNAPPY;
}
std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) { std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
IOBufSnappySource source(data); IOBufSnappySource source(data);
auto out = auto out =
...@@ -366,11 +395,10 @@ std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data, ...@@ -366,11 +395,10 @@ std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
*/ */
class ZlibCodec FOLLY_FINAL : public Codec { class ZlibCodec FOLLY_FINAL : public Codec {
public: public:
static std::unique_ptr<Codec> create(int level); static std::unique_ptr<Codec> create(int level, CodecType type);
explicit ZlibCodec(int level); explicit ZlibCodec(int level, CodecType type);
private: private:
CodecType doType() const FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
std::unique_ptr<IOBuf> doUncompress( std::unique_ptr<IOBuf> doUncompress(
const IOBuf* data, const IOBuf* data,
...@@ -382,11 +410,12 @@ class ZlibCodec FOLLY_FINAL : public Codec { ...@@ -382,11 +410,12 @@ class ZlibCodec FOLLY_FINAL : public Codec {
int level_; int level_;
}; };
std::unique_ptr<Codec> ZlibCodec::create(int level) { std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
return make_unique<ZlibCodec>(level); return make_unique<ZlibCodec>(level, type);
} }
ZlibCodec::ZlibCodec(int level) { ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::ZLIB);
switch (level) { switch (level) {
case COMPRESSION_LEVEL_FASTEST: case COMPRESSION_LEVEL_FASTEST:
level = 1; level = 1;
...@@ -405,10 +434,6 @@ ZlibCodec::ZlibCodec(int level) { ...@@ -405,10 +434,6 @@ ZlibCodec::ZlibCodec(int level) {
level_ = level; level_ = level;
} }
CodecType ZlibCodec::doType() const {
return CodecType::ZLIB;
}
std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream, std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
uint32_t length) { uint32_t length) {
CHECK_EQ(stream->avail_out, 0); CHECK_EQ(stream->avail_out, 0);
...@@ -599,14 +624,16 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data, ...@@ -599,14 +624,16 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
return out; return out;
} }
typedef std::unique_ptr<Codec> (*CodecFactory)(int); typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
CodecFactory gCodecFactories[ CodecFactory gCodecFactories[
static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = { static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
nullptr, // USER_DEFINED
NoCompressionCodec::create, NoCompressionCodec::create,
LZ4Codec::create, LZ4Codec::create,
SnappyCodec::create, SnappyCodec::create,
ZlibCodec::create ZlibCodec::create,
LZ4Codec::create
}; };
} // namespace } // namespace
...@@ -622,7 +649,7 @@ std::unique_ptr<Codec> getCodec(CodecType type, int level) { ...@@ -622,7 +649,7 @@ std::unique_ptr<Codec> getCodec(CodecType type, int level) {
throw std::invalid_argument(to<std::string>( throw std::invalid_argument(to<std::string>(
"Compression type ", idx, " not supported")); "Compression type ", idx, " not supported"));
} }
auto codec = (*factory)(level); auto codec = (*factory)(level, type);
DCHECK_EQ(static_cast<size_t>(codec->type()), idx); DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
return codec; return codec;
} }
......
...@@ -30,31 +30,43 @@ ...@@ -30,31 +30,43 @@
namespace folly { namespace io { namespace folly { namespace io {
enum class CodecType { enum class CodecType {
/**
* This codec type is not defined; getCodec() will throw an exception
* if used. Useful if deriving your own classes from Codec without
* going through the getCodec() interface.
*/
USER_DEFINED = 0,
/** /**
* Use no compression. * Use no compression.
* Levels supported: 0 * Levels supported: 0
*/ */
NO_COMPRESSION = 0, NO_COMPRESSION = 1,
/** /**
* Use LZ4 compression. * Use LZ4 compression.
* Levels supported: 1 = fast, 2 = best; default = 1 * Levels supported: 1 = fast, 2 = best; default = 1
*/ */
LZ4 = 1, LZ4 = 2,
/** /**
* Use Snappy compression. * Use Snappy compression.
* Levels supported: 1 * Levels supported: 1
*/ */
SNAPPY = 2, SNAPPY = 3,
/** /**
* Use zlib compression. * Use zlib compression.
* Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
*/ */
ZLIB = 3, ZLIB = 4,
NUM_CODEC_TYPES = 4, /**
* Use LZ4 compression, prefixed with size (as Varint).
*/
LZ4_VARINT_SIZE = 5,
NUM_CODEC_TYPES = 6,
}; };
class Codec { class Codec {
...@@ -71,7 +83,7 @@ class Codec { ...@@ -71,7 +83,7 @@ class Codec {
/** /**
* Return the codec's type. * Return the codec's type.
*/ */
CodecType type() const; CodecType type() const { return type_; }
/** /**
* Does this codec need the exact uncompressed length on decompression? * Does this codec need the exact uncompressed length on decompression?
...@@ -106,15 +118,19 @@ class Codec { ...@@ -106,15 +118,19 @@ class Codec {
const IOBuf* data, const IOBuf* data,
uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH); uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH);
protected:
explicit Codec(CodecType type);
private: private:
// default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH) // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH)
virtual uint64_t doMaxUncompressedLength() const; virtual uint64_t doMaxUncompressedLength() const;
// default: doesn't need uncompressed length // default: doesn't need uncompressed length
virtual bool doNeedsUncompressedLength() const; virtual bool doNeedsUncompressedLength() const;
virtual CodecType doType() const = 0;
virtual std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) = 0; virtual std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) = 0;
virtual std::unique_ptr<IOBuf> doUncompress(const folly::IOBuf* data, virtual std::unique_ptr<IOBuf> doUncompress(const folly::IOBuf* data,
uint64_t uncompressedLength) = 0; uint64_t uncompressedLength) = 0;
CodecType type_;
}; };
constexpr int COMPRESSION_LEVEL_FASTEST = -1; constexpr int COMPRESSION_LEVEL_FASTEST = -1;
...@@ -132,6 +148,10 @@ constexpr int COMPRESSION_LEVEL_BEST = -3; ...@@ -132,6 +148,10 @@ constexpr int COMPRESSION_LEVEL_BEST = -3;
* FASTEST and BEST) * FASTEST and BEST)
* COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
* best compression) * best compression)
*
* When decompressing, the compression level is ignored. All codecs will
* decompress all data compressed with the a codec of the same type, regardless
* of compression level.
*/ */
std::unique_ptr<Codec> getCodec(CodecType type, std::unique_ptr<Codec> getCodec(CodecType type,
int level = COMPRESSION_LEVEL_DEFAULT); int level = COMPRESSION_LEVEL_DEFAULT);
......
...@@ -84,6 +84,14 @@ void generateRandomData() { ...@@ -84,6 +84,14 @@ void generateRandomData() {
} }
} }
TEST(CompressionTestNeedsUncompressedLength, Simple) {
EXPECT_FALSE(getCodec(CodecType::NO_COMPRESSION)->needsUncompressedLength());
EXPECT_TRUE(getCodec(CodecType::LZ4)->needsUncompressedLength());
EXPECT_FALSE(getCodec(CodecType::SNAPPY)->needsUncompressedLength());
EXPECT_FALSE(getCodec(CodecType::ZLIB)->needsUncompressedLength());
EXPECT_FALSE(getCodec(CodecType::LZ4_VARINT_SIZE)->needsUncompressedLength());
}
class CompressionTest : public testing::TestWithParam< class CompressionTest : public testing::TestWithParam<
std::tr1::tuple<int, CodecType>> { std::tr1::tuple<int, CodecType>> {
protected: protected:
...@@ -123,7 +131,8 @@ INSTANTIATE_TEST_CASE_P( ...@@ -123,7 +131,8 @@ INSTANTIATE_TEST_CASE_P(
testing::Values(CodecType::NO_COMPRESSION, testing::Values(CodecType::NO_COMPRESSION,
CodecType::LZ4, CodecType::LZ4,
CodecType::SNAPPY, CodecType::SNAPPY,
CodecType::ZLIB))); CodecType::ZLIB,
CodecType::LZ4_VARINT_SIZE)));
class CompressionCorruptionTest : public testing::TestWithParam<CodecType> { class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
protected: protected:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment