Commit dbcb08a7 authored by Alexey Spiridonov's avatar Alexey Spiridonov Committed by Chip Turner

Factor out string stream re-splitting as StreamSplitter

Summary: This way I can reuse it in Subprocess. It also makes it easy to make a bunch of other convenient tokenization routines (e.g. delimiter-preserving folly::gen tokenizers, file tokenizers, etc, etc).

Test Plan: fbconfig folly/gen/test && fbmake runtests

Reviewed By: tjackson@fb.com

Subscribers: vkatich, tjackson

FB internal diff: D1317973
parent ce5e83f4
......@@ -20,60 +20,197 @@
#include <folly/Conv.h>
#include <folly/String.h>
#include <folly/io/IOBuf.h>
namespace folly {
namespace gen {
namespace detail {
inline bool splitPrefix(StringPiece& in,
/**
* Finds the first occurrence of delimiter in "in", advances "in" past the
* delimiter. Populates "prefix" with the consumed bytes, including the
* delimiter.
*
* Returns the number of trailing bytes of "prefix" that make up the
* delimiter, or 0 if the delimiter was not found.
*/
inline size_t splitPrefix(StringPiece& in,
StringPiece& prefix,
char delimiter) {
size_t found = in.find(delimiter);
if (found != StringPiece::npos) {
++found;
prefix.assign(in.data(), in.data() + found);
in.advance(found);
return 1;
}
prefix.clear();
return 0;
}
/**
* As above, but supports multibyte delimiters.
*/
inline size_t splitPrefix(StringPiece& in,
StringPiece& prefix,
StringPiece delimiter) {
auto p = in.find(delimiter);
if (p != std::string::npos) {
prefix.assign(in.data(), in.data() + p);
in.advance(p + delimiter.size());
return true;
auto found = in.find(delimiter);
if (found != StringPiece::npos) {
found += delimiter.size();
prefix.assign(in.data(), in.data() + found);
in.advance(found);
return delimiter.size();
}
prefix.clear();
return false;
return 0;
}
/**
* Split by any of the EOL terms: \r, \n, or \r\n.
* As above, but splits by any of the EOL terms: \r, \n, or \r\n.
*/
inline bool splitPrefix(StringPiece& in,
inline size_t splitPrefix(StringPiece& in,
StringPiece& prefix,
MixedNewlines) {
auto newline = "\r\n";
auto p = in.find_first_of(newline);
const auto kCRLF = "\r\n";
const size_t kLenCRLF = 2;
auto p = in.find_first_of(kCRLF);
if (p != std::string::npos) {
prefix.assign(in.data(), in.data() + p);
const auto in_start = in.data();
auto delim_len = 1;
in.advance(p);
if (!in.removePrefix(newline)) {
in.advance(1);
// Either remove an MS-DOS CR-LF 2-byte newline, or eat 1 byte at a time.
if (in.removePrefix(kCRLF)) {
delim_len = kLenCRLF;
} else {
in.advance(delim_len);
}
return true;
prefix.assign(in_start, in.data());
return delim_len;
}
prefix.clear();
return false;
return 0;
}
inline const char* ch(const unsigned char* p) {
return reinterpret_cast<const char*>(p);
}
inline bool splitPrefix(StringPiece& in, StringPiece& prefix, char delimiter) {
auto p = static_cast<const char*>(memchr(in.data(), delimiter, in.size()));
if (p) {
prefix.assign(in.data(), p);
in.assign(p + 1, in.end());
// Chop s into pieces of at most maxLength, feed them to cb
template <class Callback>
bool consumeFixedSizeChunks(Callback& cb, StringPiece& s, uint64_t maxLength) {
while (!s.empty()) {
auto num_to_add = s.size();
if (maxLength) {
num_to_add = std::min(num_to_add, maxLength);
}
if (!cb(StringPiece(s.begin(), num_to_add))) {
return false;
}
s.advance(num_to_add);
}
return true;
}
// Consumes all of buffer, plus n chars from s.
template <class Callback>
bool consumeBufferPlus(Callback& cb, IOBuf& buf, StringPiece& s, uint64_t n) {
buf.reserve(0, n);
memcpy(buf.writableTail(), s.data(), n);
buf.append(n);
s.advance(n);
if (!cb(StringPiece(detail::ch(buf.data()), buf.length()))) {
return false;
}
prefix.clear();
buf.clear();
return true;
}
} // namespace detail
template <class Callback>
bool StreamSplitter<Callback>::flush() {
CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
if (!pieceCb_(StringPiece(detail::ch(buffer_.data()), buffer_.length()))) {
return false;
}
// We are ready to handle another stream now.
buffer_.clear();
return true;
}
inline const char* ch(const unsigned char* p) {
return reinterpret_cast<const char*>(p);
template <class Callback>
bool StreamSplitter<Callback>::operator()(StringPiece in) {
StringPiece prefix;
// NB This code assumes a 1-byte delimiter. It's not too hard to support
// multibyte delimiters, just remember that maxLength_ chunks can end up
// falling in the middle of a delimiter.
bool found = detail::splitPrefix(in, prefix, delimiter_);
if (buffer_.length() != 0) {
if (found) {
uint64_t num_to_add = prefix.size();
if (maxLength_) {
CHECK(buffer_.length() < maxLength_);
// Consume as much of prefix as possible without exceeding maxLength_
num_to_add = std::min(maxLength_ - buffer_.length(), num_to_add);
}
// Append part of the prefix to the buffer, and send it to the callback
if (!detail::consumeBufferPlus(pieceCb_, buffer_, prefix, num_to_add)) {
return false;
}
if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
return false;
}
found = detail::splitPrefix(in, prefix, delimiter_);
// Post-conditions:
// - we consumed all of buffer_ and all of the first prefix.
// - found, in, and prefix reflect the second delimiter_ search
} else if (maxLength_ && buffer_.length() + in.size() >= maxLength_) {
// Send all of buffer_, plus a bit of in, to the callback
if (!detail::consumeBufferPlus(
pieceCb_, buffer_, in, maxLength_ - buffer_.length())) {
return false;
}
// Post-conditions:
// - we consumed all of buffer, and the minimal # of bytes from in
// - found is false
} // Otherwise: found is false & we cannot invoke the callback this turn
}
// Post-condition: buffer_ is nonempty only if found is false **and**
// len(buffer + in) < maxLength_.
// Send lines to callback directly from input (no buffer)
while (found) { // Buffer guaranteed to be empty
if (!detail::consumeFixedSizeChunks(pieceCb_, prefix, maxLength_)) {
return false;
}
found = detail::splitPrefix(in, prefix, delimiter_);
}
// No more delimiters left; consume 'in' until it is shorter than maxLength_
if (maxLength_) {
while (in.size() >= maxLength_) { // Buffer is guaranteed to be empty
if (!pieceCb_(StringPiece(in.begin(), maxLength_))) {
return false;
}
in.advance(maxLength_);
}
}
if (!in.empty()) { // Buffer may be nonempty
// Incomplete line left, append to buffer
buffer_.reserve(0, in.size());
memcpy(buffer_.writableTail(), in.data(), in.size());
buffer_.append(in.size());
}
CHECK(maxLength_ == 0 || buffer_.length() < maxLength_);
return true;
}
namespace detail {
class StringResplitter : public Operator<StringResplitter> {
char delimiter_;
public:
......@@ -89,58 +226,23 @@ class StringResplitter : public Operator<StringResplitter> {
template <class Body>
bool apply(Body&& body) const {
std::unique_ptr<IOBuf> buffer;
auto fn = [&](StringPiece in) -> bool {
StringPiece prefix;
bool found = splitPrefix(in, prefix, this->delimiter_);
if (found && buffer && buffer->length() != 0) {
// Append to end of buffer, return line
if (!prefix.empty()) {
buffer->reserve(0, prefix.size());
memcpy(buffer->writableTail(), prefix.data(), prefix.size());
buffer->append(prefix.size());
}
if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
auto splitter =
streamSplitter(this->delimiter_, [this, &body](StringPiece s) {
// The stream ended with a delimiter; our contract is to swallow
// the final empty piece.
if (s.empty()) {
return false;
}
buffer->clear();
found = splitPrefix(in, prefix, this->delimiter_);
}
// Buffer is empty, return lines directly from input (no buffer)
while (found) {
if (!body(prefix)) {
return false;
}
found = splitPrefix(in, prefix, this->delimiter_);
}
if (!in.empty()) {
// Incomplete line left, append to buffer
if (!buffer) {
// Arbitrarily assume that we have half a line and get enough
// room for twice that.
constexpr size_t kDefaultLineSize = 256;
buffer = IOBuf::create(std::max(kDefaultLineSize, 2 * in.size()));
}
buffer->reserve(0, in.size());
memcpy(buffer->writableTail(), in.data(), in.size());
buffer->append(in.size());
}
return true;
};
// Iterate
if (!source_.apply(std::move(fn))) {
return false;
if (s.back() != this->delimiter_) {
return body(s);
}
// Incomplete last line
if (buffer && buffer->length() != 0) {
if (!body(StringPiece(ch(buffer->data()), buffer->length()))) {
s.pop_back(); // Remove the 1-character delimiter
return body(s);
});
if (!source_.apply(splitter)) {
return false;
}
}
return true;
return splitter.flush();
}
static constexpr bool infinite = Source::infinite;
......@@ -176,7 +278,8 @@ class SplitStringSource
bool apply(Body&& body) const {
StringPiece rest(source_);
StringPiece prefix;
while (splitPrefix(rest, prefix, this->delimiter_)) {
while (size_t delim_len = splitPrefix(rest, prefix, this->delimiter_)) {
prefix.subtract(delim_len); // Remove the delimiter
if (!body(prefix)) {
return false;
}
......
......@@ -19,6 +19,7 @@
#include <folly/Range.h>
#include <folly/gen/Base.h>
#include <folly/io/IOBuf.h>
namespace folly {
namespace gen {
......@@ -48,6 +49,8 @@ class SplitTo;
*
* resplit() behaves as if the input strings were concatenated into one long
* string and then split.
*
* Equivalently, you can use StreamSplitter outside of a folly::gen setting.
*/
// make this a template so we don't require StringResplitter to be complete
// until use
......@@ -168,6 +171,77 @@ eachToPair(StringPiece delim) {
to<fbstring>(delim)));
}
/**
* Outputs exactly the same bytes as the input stream, in different chunks.
* A chunk boundary occurs after each delimiter, or, if maxLength is
* non-zero, after maxLength bytes, whichever comes first. Your callback
* can return false to stop consuming the stream at any time.
*
* The splitter buffers the last incomplete chunk, so you must call flush()
* to consume the piece of the stream after the final delimiter. This piece
* may be empty. After a flush(), the splitter can be re-used for a new
* stream.
*
* operator() and flush() return false iff your callback returns false. The
* internal buffer is not flushed, so reusing such a splitter will have
* indeterminate results. Same goes if your callback throws. Feel free to
* fix these corner cases if needed.
*
* Tips:
* - Create via streamSplitter() to take advantage of template deduction.
* - If your callback needs an end-of-stream signal, test for "no
* trailing delimiter **and** shorter than maxLength".
* - You can fine-tune the initial capacity of the internal IOBuf.
*/
template <class Callback>
class StreamSplitter {
public:
StreamSplitter(char delimiter,
Callback&& pieceCb,
uint64_t maxLength = 0,
uint64_t initialCapacity = 0)
: buffer_(IOBuf::CREATE, initialCapacity),
delimiter_(delimiter),
maxLength_(maxLength),
pieceCb_(std::move(pieceCb)) {}
/**
* Consume any incomplete last line (may be empty). Do this before
* destroying the StreamSplitter, or you will fail to consume part of the
* input.
*
* After flush() you may proceed to consume the next stream via ().
*
* Returns false if the callback wants no more data, true otherwise.
* A return value of false means that this splitter must no longer be used.
*/
bool flush();
/**
* Consume another piece of the input stream.
*
* Returns false only if your callback refuses to consume more data by
* returning false (true otherwise). A return value of false means that
* this splitter must no longer be used.
*/
bool operator()(StringPiece in);
private:
// Holds the current "incomplete" chunk so that chunks can span calls to ()
IOBuf buffer_;
char delimiter_;
uint64_t maxLength_; // The callback never gets more chars than this
Callback pieceCb_;
};
template <class Callback> // Helper to enable template deduction
StreamSplitter<Callback> streamSplitter(char delimiter,
Callback&& pieceCb,
uint64_t capacity = 0) {
return StreamSplitter<Callback>(delimiter, std::move(pieceCb), capacity);
}
} // namespace gen
} // namespace folly
......
......@@ -104,13 +104,15 @@ TEST(StringGen, Split) {
TEST(StringGen, SplitByNewLine) {
auto collect = eachTo<std::string>() | as<vector>();
{
auto pieces = lines("hello\n\n world\r\n goodbye\r meow") | collect;
EXPECT_EQ(5, pieces.size());
auto pieces = lines("hello\n\n world\r\n goodbye\r me\n\row") | collect;
EXPECT_EQ(7, pieces.size());
EXPECT_EQ("hello", pieces[0]);
EXPECT_EQ("", pieces[1]);
EXPECT_EQ(" world", pieces[2]);
EXPECT_EQ(" goodbye", pieces[3]);
EXPECT_EQ(" meow", pieces[4]);
EXPECT_EQ(" me", pieces[4]);
EXPECT_EQ("", pieces[5]);
EXPECT_EQ("ow", pieces[6]);
}
}
......@@ -258,6 +260,50 @@ TEST(StringGen, Resplit) {
}
}
void checkResplitMaxLength(vector<string> ins,
char delim,
uint64_t maxLength,
vector<string> outs) {
vector<std::string> pieces;
auto splitter = streamSplitter(delim, [&pieces](StringPiece s) {
pieces.push_back(string(s.begin(), s.end()));
return true;
}, maxLength);
for (const auto& in : ins) {
splitter(in);
}
splitter.flush();
EXPECT_EQ(outs.size(), pieces.size());
for (int i = 0; i < outs.size(); ++i) {
EXPECT_EQ(outs[i], pieces[i]);
}
// Also check the concatenated input against the same output
if (ins.size() > 1) {
checkResplitMaxLength({folly::join("", ins)}, delim, maxLength, outs);
}
}
TEST(StringGen, ResplitMaxLength) {
checkResplitMaxLength(
{"hel", "lo,", ", world", ", goodbye, m", "ew"}, ',', 5,
{"hello", ",", ",", " worl", "d,", " good", "bye,", " mew"}
);
// " meow" cannot be "end of stream", since it's maxLength long
checkResplitMaxLength(
{"hel", "lo,", ", world", ", goodbye, m", "eow"}, ',', 5,
{"hello", ",", ",", " worl", "d,", " good", "bye,", " meow", ""}
);
checkResplitMaxLength(
{"||", "", "", "", "|a|b", "cdefghijklmn", "|opqrst",
"uvwx|y|||", "z", "0123456789", "|", ""}, '|', 2,
{"|", "|", "|", "a|", "bc", "de", "fg", "hi", "jk", "lm", "n|", "op", "qr",
"st", "uv", "wx", "|", "y|", "|", "|", "z0", "12", "34", "56", "78", "9|",
""}
);
}
template<typename F>
void runUnsplitSuite(F fn) {
fn("hello, world");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment