Commit eed46f42 authored by Tudor Bosman's avatar Tudor Bosman Committed by Jordan DeLong

Stream operations: file access, iteration, splitting.

Summary: Intended to complement and replace strings::byLine.

Test Plan: stream_test

Reviewed By: delong.j@fb.com

FB internal diff: D463341
parent 39786ac6
......@@ -187,7 +187,8 @@ public:
// Allow implicit conversion from Range<const char*> (aka StringPiece) to
// Range<const unsigned char*> (aka ByteRange), as they're both frequently
// used to represent ranges of bytes.
// used to represent ranges of bytes. Allow explicit conversion in the other
// direction.
template <class OtherIter, typename std::enable_if<
(std::is_same<Iter, const unsigned char*>::value &&
std::is_same<OtherIter, const char*>::value), int>::type = 0>
......@@ -196,6 +197,14 @@ public:
e_(reinterpret_cast<const unsigned char*>(other.end())) {
}
template <class OtherIter, typename std::enable_if<
(std::is_same<Iter, const char*>::value &&
std::is_same<OtherIter, const unsigned char*>::value), int>::type = 0>
explicit Range(const Range<OtherIter>& other)
: b_(reinterpret_cast<const char*>(other.begin())),
e_(reinterpret_cast<const char*>(other.end())) {
}
void clear() {
b_ = Iter();
e_ = Iter();
......
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_IO_STREAM_H_
#error This file may only be included from Stream.h
#endif
#include <string.h>
#include <glog/logging.h>
namespace folly {
template <class Stream>
InputByteStreamSplitter<Stream>::InputByteStreamSplitter(
char delimiter, Stream stream)
: done_(false),
delimiter_(delimiter),
stream_(std::move(stream)) {
}
template <class Stream>
bool InputByteStreamSplitter<Stream>::operator()(ByteRange& chunk) {
DCHECK_EQ(buffer_->length(), 0);
chunk.clear();
if (rest_.empty()) {
if (done_) {
return false;
} else if (!stream_(rest_)) {
done_ = true;
return false;
}
}
auto p = static_cast<const unsigned char*>(memchr(rest_.data(), delimiter_,
rest_.size()));
if (p) {
chunk.assign(rest_.data(), p);
rest_.assign(p + 1, rest_.end());
return true;
}
// Incomplete line read, copy to buffer
if (!buffer_) {
static const size_t kDefaultLineSize = 256;
// Arbitrarily assume that we have half of a line in rest_, and
// get enough room for twice that.
buffer_ = IOBuf::create(std::max(kDefaultLineSize, 2 * rest_.size()));
} else {
buffer_->reserve(0, rest_.size());
}
memcpy(buffer_->writableTail(), rest_.data(), rest_.size());
buffer_->append(rest_.size());
while (stream_(rest_)) {
auto p = static_cast<const unsigned char*>(
memchr(rest_.data(), delimiter_, rest_.size()));
if (p) {
// Copy everything up to the delimiter and return it
size_t n = p - rest_.data();
buffer_->reserve(0, n);
memcpy(buffer_->writableTail(), rest_.data(), n);
buffer_->append(n);
chunk.reset(buffer_->data(), buffer_->length());
buffer_->trimStart(buffer_->length());
rest_.assign(p + 1, rest_.end());
return true;
}
// Nope, copy the entire chunk that we read
buffer_->reserve(0, rest_.size());
memcpy(buffer_->writableTail(), rest_.data(), rest_.size());
buffer_->append(rest_.size());
}
// Incomplete last line
done_ = true;
rest_.clear();
chunk.reset(buffer_->data(), buffer_->length());
buffer_->trimStart(buffer_->length());
return true;
}
} // namespace folly
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/experimental/io/Stream.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdexcept>
#include <system_error>
#include "folly/String.h"
namespace folly {
FileInputByteStream::FileInputByteStream(int fd, bool ownsFd, size_t bufferSize)
: fd_(fd),
ownsFd_(ownsFd),
buffer_(IOBuf::create(bufferSize)) {
}
FileInputByteStream::FileInputByteStream(int fd, bool ownsFd,
std::unique_ptr<IOBuf>&& buffer)
: fd_(fd),
ownsFd_(ownsFd),
buffer_(std::move(buffer)) {
buffer_->clear();
}
bool FileInputByteStream::operator()(ByteRange& chunk) {
ssize_t n = ::read(fd_, buffer_->writableTail(), buffer_->capacity());
if (n == -1) {
throw std::system_error(errno, std::system_category(), "read failed");
}
chunk.reset(buffer_->tail(), n);
return (n != 0);
}
FileInputByteStream::FileInputByteStream(FileInputByteStream&& other)
: fd_(other.fd_),
ownsFd_(other.ownsFd_),
buffer_(std::move(other.buffer_)) {
other.fd_ = -1;
other.ownsFd_ = false;
}
FileInputByteStream& FileInputByteStream::operator=(
FileInputByteStream&& other) {
if (&other != this) {
closeNoThrow();
fd_ = other.fd_;
ownsFd_ = other.ownsFd_;
buffer_ = std::move(other.buffer_);
other.fd_ = -1;
other.ownsFd_ = false;
}
return *this;
}
FileInputByteStream::~FileInputByteStream() {
closeNoThrow();
}
void FileInputByteStream::closeNoThrow() {
if (!ownsFd_) {
return;
}
ownsFd_ = false;
if (::close(fd_) == -1) {
PLOG(ERROR) << "close failed";
}
}
InputByteStreamSplitter<FileInputByteStream> byLine(
const char* fileName, char delim) {
int fd = ::open(fileName, O_RDONLY);
if (fd == -1) {
throw std::system_error(errno, std::system_category(), "open failed");
}
return makeInputByteStreamSplitter(delim, FileInputByteStream(fd, true));
}
} // namespace folly
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_IO_STREAM_H_
#define FOLLY_IO_STREAM_H_
#include <boost/iterator/iterator_facade.hpp>
#include <glog/logging.h>
#include "folly/Range.h"
#include "folly/FBString.h"
#include "folly/experimental/io/IOBuf.h"
namespace folly {
/**
* An InputByteStream is a functional object with the following signature:
*
* bool operator()(ByteRange& data);
*
* Input byte streams must be movable.
*
* The stream returns false at EOF; otherwise, it returns true and sets data to
* the next chunk of data from the stream. The memory that data points to must
* remain valid until the next call to the stream. In case of error, the
* stream throws an exception.
*
* The meaning of a "chunk" is left up to the stream implementation. Some
* streams return chunks limited to the size of an internal buffer. Other
* streams return the entire input as one (potentially huge) ByteRange.
* Others assign meaning to chunks: StreamSplitter returns "lines" -- sequences
* of bytes between delimiters. This ambiguity is intentional; resolving it
* would significantly increase the complexity of the code.
*
* An OutputByteStream is an object with the following signature:
*
* void operator()(ByteRange data);
* void close();
*
* Output byte streams must be movable.
*
* The stream appends a chunk of data to the stream when calling operator().
* close() closes the stream, allowing us to detect any errors before
* destroying the stream object (to avoid throwing exceptions from the
* destructor). The destructor must close the stream if close() was not
* explicitly called, and abort the program if closing the stream caused
* an error.
*
* Just like with input byte streams, the meaning of a "chunk" is left up
* to the stream implementation. Some streams will just append all chunks
* as given; others might assign meaning to chunks and (for example) append
* delimiters between chunks.
*/
template <class Stream> class InputByteStreamIterator;
/**
* Convenient base class template to derive all streams from; provides begin()
* and end() for iterator access. This class makes use of the curriously
* recurring template pattern; your stream class S may derive from
* InputByteStreamBase<S>.
*
* Deriving from InputByteStreamBase<S> is not required, but is convenient.
*/
template <class Derived>
class InputByteStreamBase {
public:
InputByteStreamIterator<Derived> begin() {
return InputByteStreamIterator<Derived>(static_cast<Derived&>(*this));
}
InputByteStreamIterator<Derived> end() {
return InputByteStreamIterator<Derived>();
}
InputByteStreamBase() { }
InputByteStreamBase(InputByteStreamBase&&) = default;
InputByteStreamBase& operator=(InputByteStreamBase&&) = default;
private:
InputByteStreamBase(const InputByteStreamBase&) = delete;
InputByteStreamBase& operator=(const InputByteStreamBase&) = delete;
};
/**
* Stream iterator
*/
template <class Stream>
class InputByteStreamIterator
: public boost::iterator_facade<
InputByteStreamIterator<Stream>,
const ByteRange,
boost::single_pass_traversal_tag> {
public:
InputByteStreamIterator() : stream_(nullptr) { }
explicit InputByteStreamIterator(Stream& stream) : stream_(&stream) {
increment();
}
private:
friend class boost::iterator_core_access;
void increment() {
DCHECK(stream_);
if (stream_ && !(*stream_)(chunk_)) {
stream_ = nullptr;
}
}
// This is a single pass iterator, so all we care about is that
// equal forms an equivalence class on the subset of iterators that it's
// defined on. In our case, only identical (same object) iterators and
// past-the-end iterators compare equal. (so that it != end() works)
bool equal(const InputByteStreamIterator& other) const {
return (this == &other) || (!stream_ && !other.stream_);
}
const ByteRange& dereference() const {
DCHECK(stream_);
return chunk_;
}
Stream* stream_;
ByteRange chunk_;
};
/**
* Stream that read()s from a file.
*/
class FileInputByteStream : public InputByteStreamBase<FileInputByteStream> {
public:
static const size_t kDefaultBufferSize = 4096;
explicit FileInputByteStream(int fd,
bool ownsFd = false,
size_t bufferSize = kDefaultBufferSize);
FileInputByteStream(int fd, bool ownsFd, std::unique_ptr<IOBuf>&& buffer);
FileInputByteStream(FileInputByteStream&& other);
FileInputByteStream& operator=(FileInputByteStream&& other);
~FileInputByteStream();
bool operator()(ByteRange& chunk);
private:
void closeNoThrow();
int fd_;
bool ownsFd_;
std::unique_ptr<IOBuf> buffer_;
};
/**
* Split a stream on a delimiter. Returns "lines" between delimiters;
* the delimiters are not included in the returned string.
*
* Note that the InputByteStreamSplitter acts as a stream itself, and you can
* iterate over it.
*/
template <class Stream>
class InputByteStreamSplitter
: public InputByteStreamBase<InputByteStreamSplitter<Stream>> {
public:
InputByteStreamSplitter(char delimiter, Stream stream);
bool operator()(ByteRange& chunk);
InputByteStreamSplitter(InputByteStreamSplitter&&) = default;
InputByteStreamSplitter& operator=(InputByteStreamSplitter&&) = default;
private:
InputByteStreamSplitter(const InputByteStreamSplitter&) = delete;
InputByteStreamSplitter& operator=(const InputByteStreamSplitter&) = delete;
bool done_;
char delimiter_;
Stream stream_;
std::unique_ptr<IOBuf> buffer_;
ByteRange rest_;
};
/**
* Shortcut to create a stream splitter around a stream and deduce
* the type of the template argument.
*/
template <class Stream>
InputByteStreamSplitter<Stream> makeInputByteStreamSplitter(
char delimiter, Stream stream) {
return InputByteStreamSplitter<Stream>(delimiter, std::move(stream));
}
/**
* Create a stream that splits a file into chunks (default: lines, with
* '\n' as the delimiter)
*/
InputByteStreamSplitter<FileInputByteStream> byLine(
const char* fileName, char delim='\n');
// overload for std::string
inline InputByteStreamSplitter<FileInputByteStream> byLine(
const std::string& fileName, char delim='\n') {
return byLine(fileName.c_str(), delim);
}
// overload for fbstring
inline InputByteStreamSplitter<FileInputByteStream> byLine(
const fbstring& fileName, char delim='\n') {
return byLine(fileName.c_str(), delim);
}
} // namespace folly
#include "folly/experimental/io/Stream-inl.h"
#endif /* FOLLY_IO_STREAM_H_ */
/*
* Copyright 2012 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "folly/experimental/io/Stream.h"
#include <vector>
#include <string>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "folly/Benchmark.h"
#include "folly/experimental/TestUtil.h"
using namespace folly;
namespace {
std::vector<std::string> streamSplit(const std::string& str, char delimiter,
size_t maxChunkSize = (size_t)-1) {
size_t pos = 0;
auto cb = [&] (ByteRange& sp) mutable -> bool {
if (pos == str.size()) return false;
size_t n = std::min(str.size() - pos, maxChunkSize);
sp.reset(reinterpret_cast<const unsigned char*>(&(str[pos])), n);
pos += n;
return true;
};
std::vector<std::string> result;
for (auto line : makeInputByteStreamSplitter(delimiter, cb)) {
result.push_back(StringPiece(line).str());
}
return result;
}
} // namespace
TEST(InputByteStreamSplitter, Empty) {
{
auto pieces = streamSplit("", ',');
EXPECT_EQ(0, pieces.size());
}
// The last delimiter is eaten, just like std::getline
{
auto pieces = streamSplit(",", ',');
EXPECT_EQ(1, pieces.size());
EXPECT_EQ("", pieces[0]);
}
{
auto pieces = streamSplit(",,", ',');
EXPECT_EQ(2, pieces.size());
EXPECT_EQ("", pieces[0]);
EXPECT_EQ("", pieces[1]);
}
}
TEST(InputByteStreamSplitter, Simple) {
std::string str = "hello,, world, goodbye, meow";
for (size_t chunkSize = 1; chunkSize <= str.size(); ++chunkSize) {
auto pieces = streamSplit(str, ',', chunkSize);
EXPECT_EQ(5, pieces.size());
EXPECT_EQ("hello", pieces[0]);
EXPECT_EQ("", pieces[1]);
EXPECT_EQ(" world", pieces[2]);
EXPECT_EQ(" goodbye", pieces[3]);
EXPECT_EQ(" meow", pieces[4]);
}
}
TEST(ByLine, Simple) {
test::TemporaryFile file("ByLine");
static const std::string lines(
"Hello world\n"
"This is the second line\n"
"\n"
"\n"
"a few empty lines above\n"
"incomplete last line");
EXPECT_EQ(lines.size(), write(file.fd(), lines.data(), lines.size()));
auto expected = streamSplit(lines, '\n');
std::vector<std::string> found;
for (auto& line : byLine(file.path())) {
found.push_back(StringPiece(line).str());
}
EXPECT_TRUE(expected == found);
}
int main(int argc, char *argv[]) {
testing::InitGoogleTest(&argc, argv);
google::ParseCommandLineFlags(&argc, &argv, true);
auto ret = RUN_ALL_TESTS();
if (!ret) {
folly::runBenchmarksOnFlag();
}
return ret;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment