Commit 7660d85b authored by Dave Watson's avatar Dave Watson Committed by woo

codecs

Summary:
Start of codec framework.  Copied the frame based codecs almost exactly from netty, but made to fit our pipeline management.

BytesToMessageCodec is slightly different: Netty preprocesses all the available data to a List<Message>, while this codec a) Only does one message at a time to avoid queueing issues, and b) doesn't template the message type, and just passes an IOBuf

I'm fighting the type system to get the pipelines to play nice to each other:  I'd rather template the message type, but it prevents stacking outgoing handlers on top of it, since you ahve to specify both the input/output type for each handler, even if you only care about the output type.  Suggestions to fix?   Netty gets around this by lots of dynamic casting to Object type, but we can't do that in C++ since we don't have a base object type

Test Plan: Includes lots of tests

Reviewed By: jsedgwick@fb.com

Subscribers: jsedgwick, doug, fugalh, folly-diffs@

FB internal diff: D1758189

Tasks: 5002361, 5002316

Signature: t1:1758189:1421170225:6bc8cc6a0bb461a965665bc88f7009033b215da9
parent ebc33419
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/codec/ByteToMessageCodec.h>
namespace folly { namespace wangle {
void ByteToMessageCodec::read(Context* ctx, IOBufQueue& q) {
size_t needed = 0;
std::unique_ptr<IOBuf> result;
while (true) {
result = decode(ctx, q, needed);
if (result) {
q_.append(std::move(result));
ctx->fireRead(q_);
} else {
break;
}
}
}
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/channel/ChannelHandler.h>
namespace folly { namespace wangle {
/**
* A ChannelHandler which decodes bytes in a stream-like fashion from
* IOBufQueue to a Message type.
*
* Frame detection
*
* Generally frame detection should be handled earlier in the pipeline
* by adding a DelimiterBasedFrameDecoder, FixedLengthFrameDecoder,
* LengthFieldBasedFrameDecoder, LineBasedFrameDecoder.
*
* If a custom frame decoder is required, then one needs to be careful
* when implementing one with {@link ByteToMessageDecoder}. Ensure
* there are enough bytes in the buffer for a complete frame by
* checking {@link ByteBuf#readableBytes()}. If there are not enough
* bytes for a complete frame, return without modify the reader index
* to allow more bytes to arrive.
*
* To check for complete frames without modify the reader index, use
* IOBufQueue.front(), without split() or pop_front().
*/
class ByteToMessageCodec
: public BytesToBytesHandler {
public:
virtual std::unique_ptr<IOBuf> decode(
Context* ctx, IOBufQueue& buf, size_t&) = 0;
void read(Context* ctx, IOBufQueue& q);
private:
IOBufQueue q_;
};
}}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <folly/wangle/codec/FixedLengthFrameDecoder.h>
#include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
#include <folly/wangle/codec/LengthFieldPrepender.h>
using namespace folly;
using namespace folly::wangle;
using namespace folly::io;
class FrameTester
: public BytesToBytesHandler {
public:
FrameTester(std::function<void(std::unique_ptr<IOBuf>)> test)
: test_(test) {}
void read(Context* ctx, IOBufQueue& q) {
test_(q.move());
}
private:
std::function<void(std::unique_ptr<IOBuf>)> test_;
};
class BytesReflector
: public BytesToBytesHandler {
public:
Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf) {
IOBufQueue q_(IOBufQueue::cacheChainLength());
q_.append(std::move(buf));
ctx->fireRead(q_);
return makeFuture();
}
};
TEST(CodecTest, FixedLengthFrameDecoder) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(FixedLengthFrameDecoder(10))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 10);
}))
.finalize();
auto buf3 = IOBuf::create(3);
buf3->append(3);
auto buf11 = IOBuf::create(11);
buf11->append(11);
auto buf16 = IOBuf::create(16);
buf16->append(16);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(buf3));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(buf11));
pipeline.read(q);
EXPECT_EQ(called, 1);
q.append(std::move(buf16));
pipeline.read(q);
EXPECT_EQ(called, 3);
}
TEST(CodecTest, LengthFieldFramePipeline) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(BytesReflector())
.addBack(LengthFieldBasedFrameDecoder())
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 2);
}))
.addBack(LengthFieldPrepender())
.finalize();
auto buf = IOBuf::create(2);
buf->append(2);
pipeline.write(std::move(buf));
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFramePipelineLittleEndian) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(BytesReflector())
.addBack(LengthFieldBasedFrameDecoder(4, 100, 0, 0, 4, false))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 1);
}))
.addBack(LengthFieldPrepender(4, 0, false, false))
.finalize();
auto buf = IOBuf::create(1);
buf->append(1);
pipeline.write(std::move(buf));
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderSimple) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder())
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 1);
}))
.finalize();
auto bufFrame = IOBuf::create(4);
bufFrame->append(4);
RWPrivateCursor c(bufFrame.get());
c.writeBE((uint32_t)1);
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderNoStrip) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder(2, 10, 0, 0, 0))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 3);
}))
.finalize();
auto bufFrame = IOBuf::create(2);
bufFrame->append(2);
RWPrivateCursor c(bufFrame.get());
c.writeBE((uint16_t)1);
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderAdjustment) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder(2, 10, 0, -2, 0))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 3);
}))
.finalize();
auto bufFrame = IOBuf::create(2);
bufFrame->append(2);
RWPrivateCursor c(bufFrame.get());
c.writeBE((uint16_t)3); // includes frame size
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderPreHeader) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder(2, 10, 2, 0, 0))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 5);
}))
.finalize();
auto bufFrame = IOBuf::create(4);
bufFrame->append(4);
RWPrivateCursor c(bufFrame.get());
c.write((uint16_t)100); // header
c.writeBE((uint16_t)1); // frame size
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderPostHeader) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder(2, 10, 0, 2, 0))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 5);
}))
.finalize();
auto bufFrame = IOBuf::create(4);
bufFrame->append(4);
RWPrivateCursor c(bufFrame.get());
c.writeBE((uint16_t)1); // frame size
c.write((uint16_t)100); // header
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeader) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder(2, 10, 2, 2, 4))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 3);
}))
.finalize();
auto bufFrame = IOBuf::create(6);
bufFrame->append(6);
RWPrivateCursor c(bufFrame.get());
c.write((uint16_t)100); // pre header
c.writeBE((uint16_t)1); // frame size
c.write((uint16_t)100); // post header
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeaderFrameInclHeader) {
ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
int called = 0;
pipeline
.addBack(LengthFieldBasedFrameDecoder(2, 10, 2, -2, 4))
.addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
auto sz = buf->computeChainDataLength();
called++;
EXPECT_EQ(sz, 3);
}))
.finalize();
auto bufFrame = IOBuf::create(6);
bufFrame->append(6);
RWPrivateCursor c(bufFrame.get());
c.write((uint16_t)100); // pre header
c.writeBE((uint16_t)5); // frame size
c.write((uint16_t)100); // post header
auto bufData = IOBuf::create(1);
bufData->append(1);
IOBufQueue q(IOBufQueue::cacheChainLength());
q.append(std::move(bufFrame));
pipeline.read(q);
EXPECT_EQ(called, 0);
q.append(std::move(bufData));
pipeline.read(q);
EXPECT_EQ(called, 1);
}
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/codec/ByteToMessageCodec.h>
namespace folly {namespace wangle {
/**
* A decoder that splits the received IOBufs by the fixed number
* of bytes. For example, if you received the following four
* fragmented packets:
*
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
*
* A FixedLengthFrameDecoder will decode them into the following three
* packets with the fixed length:
*
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
*
*/
class FixedLengthFrameDecoder
: public ByteToMessageCodec {
public:
FixedLengthFrameDecoder(size_t length)
: length_(length) {}
std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& q, size_t& needed) {
if (q.chainLength() < length_) {
needed = length_ - q.chainLength();
return nullptr;
}
return q.split(length_);
}
private:
size_t length_;
};
}} // Namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
namespace folly { namespace wangle {
LengthFieldBasedFrameDecoder::LengthFieldBasedFrameDecoder(
uint32_t lengthFieldLength,
uint32_t maxFrameLength,
uint32_t lengthFieldOffset,
uint32_t lengthAdjustment,
uint32_t initialBytesToStrip,
bool networkByteOrder)
: lengthFieldLength_(lengthFieldLength)
, maxFrameLength_(maxFrameLength)
, lengthFieldOffset_(lengthFieldOffset)
, lengthAdjustment_(lengthAdjustment)
, initialBytesToStrip_(initialBytesToStrip)
, networkByteOrder_(networkByteOrder)
, lengthFieldEndOffset_(lengthFieldOffset + lengthFieldLength) {
CHECK(maxFrameLength > 0);
CHECK(lengthFieldOffset <= maxFrameLength - lengthFieldLength);
}
std::unique_ptr<IOBuf> LengthFieldBasedFrameDecoder::decode(
Context* ctx, IOBufQueue& buf, size_t&) {
// discarding too long frame
if (buf.chainLength() <= lengthFieldEndOffset_) {
return nullptr;
}
uint64_t frameLength = getUnadjustedFrameLength(
buf, lengthFieldOffset_, lengthFieldLength_, networkByteOrder_);
frameLength += lengthAdjustment_ + lengthFieldEndOffset_;
if (frameLength < lengthFieldEndOffset_) {
throw std::runtime_error("Frame too small");
}
if (frameLength > maxFrameLength_) {
throw std::runtime_error("Frame larger than " +
folly::to<std::string>(maxFrameLength_));
}
if (buf.chainLength() < frameLength) {
return nullptr;
}
if (initialBytesToStrip_ > frameLength) {
throw std::runtime_error("InitialBytesToSkip larger than frame");
}
buf.trimStart(initialBytesToStrip_);
int actualFrameLength = frameLength - initialBytesToStrip_;
return buf.split(actualFrameLength);
}
uint64_t LengthFieldBasedFrameDecoder::getUnadjustedFrameLength(
IOBufQueue& buf, int offset, int length, bool networkByteOrder) {
folly::io::Cursor c(buf.front());
uint64_t frameLength;
c.skip(offset);
switch(length) {
case 1:{
if (networkByteOrder) {
frameLength = c.readBE<uint8_t>();
} else {
frameLength = c.readLE<uint8_t>();
}
break;
}
case 2:{
if (networkByteOrder) {
frameLength = c.readBE<uint16_t>();
} else {
frameLength = c.readLE<uint16_t>();
}
break;
}
case 4:{
if (networkByteOrder) {
frameLength = c.readBE<uint32_t>();
} else {
frameLength = c.readLE<uint32_t>();
}
break;
}
case 8:{
if (networkByteOrder) {
frameLength = c.readBE<uint64_t>();
} else {
frameLength = c.readLE<uint64_t>();
}
break;
}
}
return frameLength;
}
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/codec/ByteToMessageCodec.h>
#include <folly/io/Cursor.h>
namespace folly { namespace wangle {
/**
* A decoder that splits the received IOBufs dynamically by the
* value of the length field in the message. It is particularly useful when you
* decode a binary message which has an integer header field that represents the
* length of the message body or the whole message.
*
* LengthFieldBasedFrameDecoder has many configuration parameters so
* that it can decode any message with a length field, which is often seen in
* proprietary client-server protocols. Here are some example that will give
* you the basic idea on which option does what.
*
* 2 bytes length field at offset 0, do not strip header
*
* The value of the length field in this example is 12 (0x0C) which
* represents the length of "HELLO, WORLD". By default, the decoder assumes
* that the length field represents the number of the bytes that follows the
* length field. Therefore, it can be decoded with the simplistic parameter
* combination.
*
* lengthFieldOffset = 0
* lengthFieldLength = 2
* lengthAdjustment = 0
* initialBytesToStrip = 0 (= do not strip header)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
*
*
* 2 bytes length field at offset 0, strip header
*
* Because we can get the length of the content by calling
* ioBuf->computeChainDataLength(), you might want to strip the length
* field by specifying initialBytesToStrip. In this example, we
* specified 2, that is same with the length of the length field, to
* strip the first two bytes.
*
* lengthFieldOffset = 0
* lengthFieldLength = 2
* lengthAdjustment = 0
* initialBytesToStrip = 2 (= the length of the Length field)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
*
*
* 2 bytes length field at offset 0, do not strip header, the length field
* represents the length of the whole message
*
* In most cases, the length field represents the length of the message body
* only, as shown in the previous examples. However, in some protocols, the
* length field represents the length of the whole message, including the
* message header. In such a case, we specify a non-zero
* lengthAdjustment. Because the length value in this example message
* is always greater than the body length by 2, we specify -2
* as lengthAdjustment for compensation.
*
* lengthFieldOffset = 0
* lengthFieldLength = 2
* lengthAdjustment = -2 (= the length of the Length field)
* initialBytesToStrip = 0
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
*
*
* 3 bytes length field at the end of 5 bytes header, do not strip header
*
* The following message is a simple variation of the first example. An extra
* header value is prepended to the message. lengthAdjustment is zero
* again because the decoder always takes the length of the prepended data into
* account during frame length calculation.
*
* lengthFieldOffset = 2 (= the length of Header 1)
* lengthFieldLength = 3
* lengthAdjustment = 0
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
*
*
* 3 bytes length field at the beginning of 5 bytes header, do not strip header
*
* This is an advanced example that shows the case where there is an extra
* header between the length field and the message body. You have to specify a
* positive lengthAdjustment so that the decoder counts the extra
* header into the frame length calculation.
*
* lengthFieldOffset = 0
* lengthFieldLength = 3
* lengthAdjustment = 2 (= the length of Header 1)
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
* | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
*
*
* 2 bytes length field at offset 1 in the middle of 4 bytes header,
* strip the first header field and the length field
*
* This is a combination of all the examples above. There are the prepended
* header before the length field and the extra header after the length field.
* The prepended header affects the lengthFieldOffset and the extra
* header affects the lengthAdjustment. We also specified a non-zero
* initialBytesToStrip to strip the length field and the prepended
* header from the frame. If you don't want to strip the prepended header, you
* could specify 0 for initialBytesToSkip.
*
* lengthFieldOffset = 1 (= the length of HDR1)
* lengthFieldLength = 2
* lengthAdjustment = 1 (= the length of HDR2)
* initialBytesToStrip = 3 (= the length of HDR1 + LEN)
*
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
*
*
* 2 bytes length field at offset 1 in the middle of 4 bytes header,
* strip the first header field and the length field, the length field
* represents the length of the whole message
*
* Let's give another twist to the previous example. The only difference from
* the previous example is that the length field represents the length of the
* whole message instead of the message body, just like the third example.
* We have to count the length of HDR1 and Length into lengthAdjustment.
* Please note that we don't need to take the length of HDR2 into account
* because the length field already includes the whole header length.
*
* lengthFieldOffset = 1
* lengthFieldLength = 2
* lengthAdjustment = -3 (= the length of HDR1 + LEN, negative)
* initialBytesToStrip = 3
*
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
*
* @see LengthFieldPrepender
*/
class LengthFieldBasedFrameDecoder : public ByteToMessageCodec {
public:
LengthFieldBasedFrameDecoder(
uint32_t lengthFieldLength = 4,
uint32_t maxFrameLength = UINT_MAX,
uint32_t lengthFieldOffset = 0,
uint32_t lengthAdjustment = 0,
uint32_t initialBytesToStrip = 4,
bool networkByteOrder = true);
std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& buf, size_t&);
private:
uint64_t getUnadjustedFrameLength(
IOBufQueue& buf, int offset, int length, bool networkByteOrder);
uint32_t lengthFieldLength_;
uint32_t maxFrameLength_;
uint32_t lengthFieldOffset_;
uint32_t lengthAdjustment_;
uint32_t initialBytesToStrip_;
bool networkByteOrder_;
uint32_t lengthFieldEndOffset_;
};
}} // namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/wangle/codec/LengthFieldPrepender.h>
namespace folly { namespace wangle {
LengthFieldPrepender::LengthFieldPrepender(
int lengthFieldLength,
int lengthAdjustment,
bool lengthIncludesLengthField,
bool networkByteOrder)
: lengthFieldLength_(lengthFieldLength)
, lengthAdjustment_(lengthAdjustment)
, lengthIncludesLengthField_(lengthIncludesLengthField)
, networkByteOrder_(networkByteOrder) {
CHECK(lengthFieldLength == 1 ||
lengthFieldLength == 2 ||
lengthFieldLength == 4 ||
lengthFieldLength == 8 );
}
Future<void> LengthFieldPrepender::write(
Context* ctx, std::unique_ptr<IOBuf> buf) {
int length = lengthAdjustment_ + buf->computeChainDataLength();
if (lengthIncludesLengthField_) {
length += lengthFieldLength_;
}
if (length < 0) {
throw std::runtime_error("Length field < 0");
}
auto len = IOBuf::create(lengthFieldLength_);
len->append(lengthFieldLength_);
folly::io::RWPrivateCursor c(len.get());
switch (lengthFieldLength_) {
case 1: {
if (length >= 256) {
throw std::runtime_error("length does not fit byte");
}
if (networkByteOrder_) {
c.writeBE((uint8_t)length);
} else {
c.writeLE((uint8_t)length);
}
break;
}
case 2: {
if (length >= 65536) {
throw std::runtime_error("length does not fit byte");
}
if (networkByteOrder_) {
c.writeBE((uint16_t)length);
} else {
c.writeLE((uint16_t)length);
}
break;
}
case 4: {
if (networkByteOrder_) {
c.writeBE((uint32_t)length);
} else {
c.writeLE((uint32_t)length);
}
break;
}
case 8: {
if (networkByteOrder_) {
c.writeBE((uint64_t)length);
} else {
c.writeLE((uint64_t)length);
}
break;
}
default: {
throw std::runtime_error("Invalid lengthFieldLength");
}
}
len->prependChain(std::move(buf));
return ctx->fireWrite(std::move(len));
}
}} // Namespace
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/wangle/codec/ByteToMessageCodec.h>
#include <folly/io/Cursor.h>
namespace folly { namespace wangle {
/**
* An encoder that prepends the length of the message. The length value is
* prepended as a binary form.
*
* For example, LengthFieldPrepender(2)will encode the
* following 12-bytes string:
*
* +----------------+
* | "HELLO, WORLD" |
* +----------------+
*
* into the following:
*
* +--------+----------------+
* + 0x000C | "HELLO, WORLD" |
* +--------+----------------+
*
* If you turned on the lengthIncludesLengthFieldLength flag in the
* constructor, the encoded data would look like the following
* (12 (original data) + 2 (prepended data) = 14 (0xE)):
*
* +--------+----------------+
* + 0x000E | "HELLO, WORLD" |
* +--------+----------------+
*
*/
class LengthFieldPrepender
: public BytesToBytesHandler {
public:
LengthFieldPrepender(
int lengthFieldLength = 4,
int lengthAdjustment = 0,
bool lengthIncludesLengthField = false,
bool networkByteOrder = true);
Future<void> write(Context* ctx, std::unique_ptr<IOBuf> buf);
private:
int lengthFieldLength_;
int lengthAdjustment_;
bool lengthIncludesLengthField_;
bool networkByteOrder_;
};
}} // namespace
Codecs are modeled after netty's codecs:
https://github.com/netty/netty/tree/master/codec/src/main/java/io/netty/handler/codec
Most of the changes are due to differing memory allocation strategies.
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment