Commit 996729ec authored by Nick Terrell's avatar Nick Terrell Committed by Facebook Github Bot

Log (de)compression bytes

Summary: Log bytes before/after (de)compression, and timing to the logging backend with probability 1/50.

Reviewed By: yfeldblum

Differential Revision: D6817204

fbshipit-source-id: 7fde5284b8c97355ec445376a7c481e9ffd2cfce
parent 37b2d8bb
......@@ -51,11 +51,13 @@
#include <folly/Conv.h>
#include <folly/Memory.h>
#include <folly/Portability.h>
#include <folly/Random.h>
#include <folly/ScopeGuard.h>
#include <folly/Varint.h>
#include <folly/compression/Utils.h>
#include <folly/io/Cursor.h>
#include <folly/lang/Bits.h>
#include <folly/stop_watch.h>
#include <algorithm>
#include <unordered_set>
......@@ -65,19 +67,96 @@ using folly::io::compression::detail::prefixToStringLE;
namespace folly {
namespace io {
Codec::Codec(CodecType type) : type_(type) { }
Codec::Codec(
CodecType type,
Optional<int> level,
StringPiece name,
bool counters)
: type_(type) {
if (counters) {
bytesBeforeCompression_ = {type,
name,
level,
CompressionCounterKey::BYTES_BEFORE_COMPRESSION,
CompressionCounterType::SUM};
bytesAfterCompression_ = {type,
name,
level,
CompressionCounterKey::BYTES_AFTER_COMPRESSION,
CompressionCounterType::SUM};
bytesBeforeDecompression_ = {
type,
name,
level,
CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION,
CompressionCounterType::SUM};
bytesAfterDecompression_ = {
type,
name,
level,
CompressionCounterKey::BYTES_AFTER_DECOMPRESSION,
CompressionCounterType::SUM};
compressions_ = {type,
name,
level,
CompressionCounterKey::COMPRESSIONS,
CompressionCounterType::SUM};
decompressions_ = {type,
name,
level,
CompressionCounterKey::DECOMPRESSIONS,
CompressionCounterType::SUM};
compressionMilliseconds_ = {type,
name,
level,
CompressionCounterKey::COMPRESSION_MILLISECONDS,
CompressionCounterType::SUM};
decompressionMilliseconds_ = {
type,
name,
level,
CompressionCounterKey::DECOMPRESSION_MILLISECONDS,
CompressionCounterType::SUM};
}
}
namespace {
constexpr uint32_t kLoggingRate = 50;
class Timer {
public:
explicit Timer(folly::detail::CompressionCounter& counter)
: counter_(&counter) {}
~Timer() {
*counter_ += timer_.elapsed().count();
}
private:
folly::detail::CompressionCounter* counter_;
stop_watch<std::chrono::milliseconds> timer_;
};
} // namespace
// Ensure consistent behavior in the nullptr case
std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
if (data == nullptr) {
throw std::invalid_argument("Codec: data must not be nullptr");
}
uint64_t len = data->computeChainDataLength();
const uint64_t len = data->computeChainDataLength();
if (len > maxUncompressedLength()) {
throw std::runtime_error("Codec: uncompressed length too large");
}
return doCompress(data);
bool const logging = folly::Random::oneIn(kLoggingRate);
folly::Optional<Timer> const timer =
logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
auto result = doCompress(data);
if (logging) {
compressions_++;
bytesBeforeCompression_ += len;
bytesAfterCompression_ += result->computeChainDataLength();
}
return result;
}
std::string Codec::compress(const StringPiece data) {
......@@ -85,8 +164,16 @@ std::string Codec::compress(const StringPiece data) {
if (len > maxUncompressedLength()) {
throw std::runtime_error("Codec: uncompressed length too large");
}
return doCompressString(data);
bool const logging = folly::Random::oneIn(kLoggingRate);
folly::Optional<Timer> const timer =
logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
auto result = doCompressString(data);
if (logging) {
compressions_++;
bytesBeforeCompression_ += len;
bytesAfterCompression_ += result.size();
}
return result;
}
std::unique_ptr<IOBuf> Codec::uncompress(
......@@ -110,7 +197,16 @@ std::unique_ptr<IOBuf> Codec::uncompress(
return IOBuf::create(0);
}
return doUncompress(data, uncompressedLength);
bool const logging = folly::Random::oneIn(kLoggingRate);
folly::Optional<Timer> const timer =
logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
auto result = doUncompress(data, uncompressedLength);
if (logging) {
decompressions_++;
bytesBeforeDecompression_ += data->computeChainDataLength();
bytesAfterDecompression_ += result->computeChainDataLength();
}
return result;
}
std::string Codec::uncompress(
......@@ -131,7 +227,16 @@ std::string Codec::uncompress(
return "";
}
return doUncompressString(data, uncompressedLength);
bool const logging = folly::Random::oneIn(kLoggingRate);
folly::Optional<Timer> const timer =
logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
auto result = doUncompressString(data, uncompressedLength);
if (logging) {
decompressions_++;
bytesBeforeDecompression_ += data.size();
bytesAfterDecompression_ += result.size();
}
return result;
}
bool Codec::needsUncompressedLength() const {
......@@ -551,23 +656,24 @@ std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
return std::make_unique<LZ4Codec>(level, type);
}
LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
static int lz4ConvertLevel(int level) {
switch (level) {
case 1:
case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT:
level = 1;
break;
return 1;
case 2:
case COMPRESSION_LEVEL_BEST:
level = 2;
break;
}
if (level < 1 || level > 2) {
throw std::invalid_argument(to<std::string>(
"LZ4Codec: invalid level: ", level));
return 2;
}
highCompression_ = (level > 1);
throw std::invalid_argument(
to<std::string>("LZ4Codec: invalid level: ", level));
}
LZ4Codec::LZ4Codec(int level, CodecType type)
: Codec(type, lz4ConvertLevel(level)),
highCompression_(lz4ConvertLevel(level) > 1) {
DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
}
bool LZ4Codec::doNeedsUncompressedLength() const {
......@@ -739,20 +845,20 @@ void LZ4FrameCodec::resetDCtx() {
dirty_ = false;
}
LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
DCHECK(type == CodecType::LZ4_FRAME);
static int lz4fConvertLevel(int level) {
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
case COMPRESSION_LEVEL_DEFAULT:
level_ = 0;
break;
return 0;
case COMPRESSION_LEVEL_BEST:
level_ = 16;
break;
default:
level_ = level;
break;
return 16;
}
return level;
}
LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
: Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
DCHECK(type == CodecType::LZ4_FRAME);
}
LZ4FrameCodec::~LZ4FrameCodec() {
......@@ -1393,25 +1499,26 @@ std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
return make_unique<ZSTDStreamCodec>(level, type);
}
ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
: StreamCodec(type) {
DCHECK(type == CodecType::ZSTD);
static int zstdConvertLevel(int level) {
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
level = 1;
break;
return 1;
case COMPRESSION_LEVEL_DEFAULT:
level = 1;
break;
return 1;
case COMPRESSION_LEVEL_BEST:
level = 19;
break;
return 19;
}
if (level < 1 || level > ZSTD_maxCLevel()) {
throw std::invalid_argument(
to<std::string>("ZSTD: invalid level: ", level));
}
level_ = level;
return level;
}
ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
: StreamCodec(type, zstdConvertLevel(level)),
level_(zstdConvertLevel(level)) {
DCHECK(type == CodecType::ZSTD);
}
bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
......@@ -1910,7 +2017,7 @@ void AutomaticCodec::addCodecIfSupported(CodecType type) {
AutomaticCodec::AutomaticCodec(
std::vector<std::unique_ptr<Codec>> customCodecs,
std::unique_ptr<Codec> terminalCodec)
: Codec(CodecType::USER_DEFINED),
: Codec(CodecType::USER_DEFINED, folly::none, "auto"),
codecs_(std::move(customCodecs)),
terminalCodec_(std::move(terminalCodec)) {
// Fastest -> slowest
......
......@@ -24,6 +24,7 @@
#include <folly/Optional.h>
#include <folly/Range.h>
#include <folly/compression/Counters.h>
#include <folly/io/IOBuf.h>
/**
......@@ -185,7 +186,11 @@ class Codec {
folly::Optional<uint64_t> uncompressedLength = folly::none) const;
protected:
explicit Codec(CodecType type);
Codec(
CodecType type,
folly::Optional<int> level = folly::none,
folly::StringPiece name = {},
bool counters = true);
public:
/**
......@@ -231,6 +236,14 @@ class Codec {
folly::Optional<uint64_t> uncompressedLength) const;
CodecType type_;
folly::detail::CompressionCounter bytesBeforeCompression_;
folly::detail::CompressionCounter bytesAfterCompression_;
folly::detail::CompressionCounter bytesBeforeDecompression_;
folly::detail::CompressionCounter bytesAfterDecompression_;
folly::detail::CompressionCounter compressions_;
folly::detail::CompressionCounter decompressions_;
folly::detail::CompressionCounter compressionMilliseconds_;
folly::detail::CompressionCounter decompressionMilliseconds_;
};
class StreamCodec : public Codec {
......@@ -351,7 +364,12 @@ class StreamCodec : public Codec {
FlushOp flushOp = StreamCodec::FlushOp::NONE);
protected:
explicit StreamCodec(CodecType type) : Codec(type) {}
StreamCodec(
CodecType type,
folly::Optional<int> level = folly::none,
folly::StringPiece name = {},
bool counters = true)
: Codec(type, std::move(level), name, counters) {}
// Returns the uncompressed length last passed to resetStream() or none if it
// hasn't been called yet.
......
......@@ -32,6 +32,10 @@ enum class CompressionCounterKey {
BYTES_AFTER_COMPRESSION = 1,
BYTES_BEFORE_DECOMPRESSION = 2,
BYTES_AFTER_DECOMPRESSION = 3,
COMPRESSIONS = 4,
DECOMPRESSIONS = 5,
COMPRESSION_MILLISECONDS = 6,
DECOMPRESSION_MILLISECONDS = 7,
};
enum class CompressionCounterType {
......@@ -72,11 +76,15 @@ class CompressionCounter {
folly::Optional<int> level,
CompressionCounterKey key,
CompressionCounterType counterType) {
increment_ = makeCompressionCounterHandler(
codecType, codecName, std::move(level), key, counterType);
initialize_ = [=]() {
return makeCompressionCounterHandler(
codecType, codecName, level, key, counterType);
};
DCHECK(!initialize_.hasAllocatedMemory());
}
void operator+=(double sum) {
performLazyInit();
if (increment_) {
increment_(sum);
}
......@@ -90,11 +98,22 @@ class CompressionCounter {
*this += 1.0;
}
bool hasImplementation() const {
bool hasImplementation() {
performLazyInit();
return static_cast<bool>(increment_);
}
private:
void performLazyInit() {
if (!initialized_) {
initialized_ = true;
increment_ = initialize_();
initialize_ = {};
}
}
bool initialized_{false};
folly::Function<folly::Function<void(double)>()> initialize_;
folly::Function<void(double)> increment_;
};
......
......@@ -194,28 +194,32 @@ std::unique_ptr<StreamCodec> ZlibStreamCodec::createStream(
return std::make_unique<ZlibStreamCodec>(options, level);
}
ZlibStreamCodec::ZlibStreamCodec(Options options, int level)
: StreamCodec(getCodecType(options)) {
static bool inBounds(int value, int low, int high) {
return (value >= low) && (value <= high);
}
static int zlibConvertLevel(int level) {
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
level = 1;
break;
return 1;
case COMPRESSION_LEVEL_DEFAULT:
level = Z_DEFAULT_COMPRESSION;
break;
return 6;
case COMPRESSION_LEVEL_BEST:
level = 9;
break;
return 9;
}
auto inBounds = [](int value, int low, int high) {
return (value >= low) && (value <= high);
};
if (level != Z_DEFAULT_COMPRESSION && !inBounds(level, 0, 9)) {
if (!inBounds(level, 0, 9)) {
throw std::invalid_argument(
to<std::string>("ZlibStreamCodec: invalid level: ", level));
}
level_ = level;
return level;
}
ZlibStreamCodec::ZlibStreamCodec(Options options, int level)
: StreamCodec(
getCodecType(options),
zlibConvertLevel(level),
getCodecType(options) == CodecType::GZIP ? "gzip" : "zlib"),
level_(zlibConvertLevel(level)) {
options_ = options;
// Although zlib allows a windowSize of 8..15, a value of 8 is not
......
......@@ -26,8 +26,6 @@
#include <boost/noncopyable.hpp>
#include <glog/logging.h>
#include <folly/Benchmark.h>
#include <folly/Memory.h>
#include <folly/Random.h>
#include <folly/Varint.h>
#include <folly/hash/Hash.h>
......@@ -1489,14 +1487,3 @@ INSTANTIATE_TEST_CASE_P(
} // namespace test
} // namespace io
} // namespace folly
int main(int argc, char *argv[]) {
testing::InitGoogleTest(&argc, argv);
gflags::ParseCommandLineFlags(&argc, &argv, true);
auto ret = RUN_ALL_TESTS();
if (!ret) {
folly::runBenchmarksOnFlag();
}
return ret;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment