Commit d887f51c authored by Dave Watson's avatar Dave Watson Committed by Viswanath Sivakumar

LineBasedFrameDecoder

Summary: Copy of netty's line based decoder.

Test Plan:
unittests
fbconfig folly/wangle/codec; fbmake runtests

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D1959155

Signature: t1:1959155:1427935150:e11280c5567df9ad9964dbb656aa090267856f57
parent 8e2ec4c3
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <folly/wangle/codec/FixedLengthFrameDecoder.h> #include <folly/wangle/codec/FixedLengthFrameDecoder.h>
#include <folly/wangle/codec/LineBasedFrameDecoder.h>
#include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h> #include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
#include <folly/wangle/codec/LengthFieldPrepender.h> #include <folly/wangle/codec/LengthFieldPrepender.h>
...@@ -32,6 +33,10 @@ class FrameTester ...@@ -32,6 +33,10 @@ class FrameTester
void read(Context* ctx, IOBufQueue& q) { void read(Context* ctx, IOBufQueue& q) {
test_(q.move()); test_(q.move());
} }
void readException(Context* ctx, exception_wrapper w) {
test_(nullptr);
}
private: private:
std::function<void(std::unique_ptr<IOBuf>)> test_; std::function<void(std::unique_ptr<IOBuf>)> test_;
}; };
...@@ -348,3 +353,205 @@ TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeaderFrameInclHeader) { ...@@ -348,3 +353,205 @@ TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeaderFrameInclHeader) {
pipeline.read(q); pipeline.read(q);
EXPECT_EQ(called, 1); EXPECT_EQ(called, 1);
} }
TEST(CodecTest, LineBasedFrameDecoder) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LineBasedFrameDecoder(10))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 3);
}))
.finalize();
auto buf = IOBuf::create(3);
buf->append(3);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 0);
buf = IOBuf::create(1);
buf->append(1);
RWPrivateCursor c(buf.get());
c.write<char>('\n');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(4);
buf->append(4);
RWPrivateCursor c1(buf.get());
c1.write(' ');
c1.write(' ');
c1.write(' ');
c1.write('\r');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(1);
buf->append(1);
RWPrivateCursor c2(buf.get());
c2.write('\n');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 2);
}
TEST(CodecTest, LineBasedFrameDecoderSaveDelimiter) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LineBasedFrameDecoder(10, false))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 4);
}))
.finalize();
auto buf = IOBuf::create(3);
buf->append(3);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 0);
buf = IOBuf::create(1);
buf->append(1);
RWPrivateCursor c(buf.get());
c.write<char>('\n');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(3);
buf->append(3);
RWPrivateCursor c1(buf.get());
c1.write(' ');
c1.write(' ');
c1.write('\r');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(1);
buf->append(1);
RWPrivateCursor c2(buf.get());
c2.write('\n');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 2);
}
TEST(CodecTest, LineBasedFrameDecoderFail) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LineBasedFrameDecoder(10))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
called++;
}))
.finalize();
auto buf = IOBuf::create(11);
buf->append(11);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(1);
buf->append(1);
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(2);
buf->append(2);
RWPrivateCursor c(buf.get());
c.write(' ');
c.write<char>('\n');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
buf = IOBuf::create(12);
buf->append(12);
RWPrivateCursor c2(buf.get());
for (int i = 0; i < 11; i++) {
c2.write(' ');
}
c2.write<char>('\n');
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 2);
}
TEST(CodecTest, LineBasedFrameDecoderNewLineOnly) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LineBasedFrameDecoder(
10, true, LineBasedFrameDecoder::TerminatorType::NEWLINE))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 1);
}))
.finalize();
auto buf = IOBuf::create(2);
buf->append(2);
RWPrivateCursor c(buf.get());
c.write<char>('\r');
c.write<char>('\n');
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LineBasedFrameDecoderCarriageNewLineOnly) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LineBasedFrameDecoder(
10, true, LineBasedFrameDecoder::TerminatorType::CARRIAGENEWLINE))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 1);
}))
.finalize();
auto buf = IOBuf::create(3);
buf->append(3);
RWPrivateCursor c(buf.get());
c.write<char>('\n');
c.write<char>('\r');
c.write<char>('\n');
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(buf));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/codec/LineBasedFrameDecoder.h>
namespace folly { namespace wangle {
using folly::io::Cursor;
LineBasedFrameDecoder::LineBasedFrameDecoder(uint32_t maxLength,
bool stripDelimiter,
TerminatorType terminatorType)
: maxLength_(maxLength)
, stripDelimiter_(stripDelimiter)
, terminatorType_(terminatorType) {}
std::unique_ptr<IOBuf> LineBasedFrameDecoder::decode(
Context* ctx, IOBufQueue& buf, size_t&) {
int64_t eol = findEndOfLine(buf);
if (!discarding_) {
if (eol >= 0) {
Cursor c(buf.front());
c += eol;
auto delimLength = c.read<char>() == '\r' ? 2 : 1;
if (eol > maxLength_) {
buf.split(eol + delimLength);
fail(ctx, folly::to<std::string>(eol));
return nullptr;
}
std::unique_ptr<folly::IOBuf> frame;
if (stripDelimiter_) {
frame = buf.split(eol);
buf.trimStart(delimLength);
} else {
frame = buf.split(eol + delimLength);
}
return std::move(frame);
} else {
auto len = buf.chainLength();
if (len > maxLength_) {
discardedBytes_ = len;
buf.trimStart(len);
discarding_ = true;
fail(ctx, "over " + folly::to<std::string>(len));
}
return nullptr;
}
} else {
if (eol >= 0) {
Cursor c(buf.front());
c += eol;
auto delimLength = c.read<char>() == '\r' ? 2 : 1;
buf.trimStart(eol + delimLength);
discardedBytes_ = 0;
discarding_ = false;
} else {
discardedBytes_ = buf.chainLength();
buf.move();
}
return nullptr;
}
}
void LineBasedFrameDecoder::fail(Context* ctx, std::string len) {
ctx->fireReadException(
folly::make_exception_wrapper<std::runtime_error>(
"frame length" + len +
" exeeds max " + folly::to<std::string>(maxLength_)));
}
int64_t LineBasedFrameDecoder::findEndOfLine(IOBufQueue& buf) {
Cursor c(buf.front());
for (uint32_t i = 0; i < maxLength_ && i < buf.chainLength(); i++) {
auto b = c.read<char>();
if (b == '\n' && terminatorType_ != TerminatorType::CARRIAGENEWLINE) {
return i;
} else if (terminatorType_ != TerminatorType::NEWLINE &&
b == '\r' && !c.isAtEnd() && c.read<char>() == '\n') {
return i;
}
}
return -1;
}
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/codec/ByteToMessageCodec.h>
#include <folly/io/Cursor.h>
namespace folly { namespace wangle {
/**
* A decoder that splits the received IOBufQueue on line endings.
*
* Both "\n" and "\r\n" are handled, or optionally reqire only
* one or the other.
*/
class LineBasedFrameDecoder : public ByteToMessageCodec {
public:
enum class TerminatorType {
BOTH,
NEWLINE,
CARRIAGENEWLINE
};
LineBasedFrameDecoder(uint32_t maxLength = UINT_MAX,
bool stripDelimiter = true,
TerminatorType terminatorType =
TerminatorType::BOTH);
std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& buf, size_t&);
private:
int64_t findEndOfLine(IOBufQueue& buf);
void fail(Context* ctx, std::string len);
uint32_t maxLength_;
bool stripDelimiter_;
bool discarding_{false};
uint32_t discardedBytes_{0};
TerminatorType terminatorType_;
};
}} // namespace
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment