Commit 61017384 authored by Tom Jackson's avatar Tom Jackson Committed by Facebook Github Bot

Window, mainly for futures

Summary: Just a circular buffer in the middle of a pipeline.

Reviewed By: yfeldblum

Differential Revision: D5791551

fbshipit-source-id: 2808a53df9b8cd2a402da0678a6b4ed0e6ca6e00
parent 0caa2c14
...@@ -1366,6 +1366,103 @@ class Batch : public Operator<Batch> { ...@@ -1366,6 +1366,103 @@ class Batch : public Operator<Batch> {
} }
}; };
/**
* Window - For overlapping the lifetimes of pipeline values, especially with
* Futures.
*
* This type is usually used through the 'window' helper function:
*
* auto responses
* = byLine(STDIN)
* | map(makeRequestFuture)
* | window(1000)
* | map(waitFuture)
* | as<vector>();
*/
class Window : public Operator<Window> {
size_t windowSize_;
public:
explicit Window(size_t windowSize) : windowSize_(windowSize) {
if (windowSize_ == 0) {
throw std::invalid_argument("Window size must be non-zero!");
}
}
template <
class Value,
class Source,
class StorageType = typename std::decay<Value>::type>
class Generator
: public GenImpl<StorageType&&, Generator<Value, Source, StorageType>> {
Source source_;
size_t windowSize_;
public:
explicit Generator(Source source, size_t windowSize)
: source_(std::move(source)), windowSize_(windowSize) {}
template <class Handler>
bool apply(Handler&& handler) const {
std::vector<StorageType> buffer;
buffer.reserve(windowSize_);
size_t readIndex = 0;
bool shouldContinue = source_.apply([&](Value value) -> bool {
if (buffer.size() < windowSize_) {
buffer.push_back(std::forward<Value>(value));
} else {
StorageType& entry = buffer[readIndex++];
if (readIndex == windowSize_) {
readIndex = 0;
}
if (!handler(std::move(entry))) {
return false;
}
entry = std::forward<Value>(value);
}
return true;
});
if (!shouldContinue) {
return false;
}
if (buffer.size() < windowSize_) {
for (StorageType& entry : buffer) {
if (!handler(std::move(entry))) {
return false;
}
}
} else {
for (size_t i = readIndex;;) {
StorageType& entry = buffer[i++];
if (!handler(std::move(entry))) {
return false;
}
if (i == windowSize_) {
i = 0;
}
if (i == readIndex) {
break;
}
}
}
return true;
}
// Taking n-tuples of an infinite source is still infinite
static constexpr bool infinite = Source::infinite;
};
template <class Source, class Value, class Gen = Generator<Value, Source>>
Gen compose(GenImpl<Value, Source>&& source) const {
return Gen(std::move(source.self()), windowSize_);
}
template <class Source, class Value, class Gen = Generator<Value, Source>>
Gen compose(const GenImpl<Value, Source>& source) const {
return Gen(source.self(), windowSize_);
}
};
/** /**
* Concat - For flattening generators of generators. * Concat - For flattening generators of generators.
* *
...@@ -2357,6 +2454,11 @@ inline detail::Skip skip(size_t count) { return detail::Skip(count); } ...@@ -2357,6 +2454,11 @@ inline detail::Skip skip(size_t count) { return detail::Skip(count); }
inline detail::Batch batch(size_t batchSize) { inline detail::Batch batch(size_t batchSize) {
return detail::Batch(batchSize); return detail::Batch(batchSize);
} }
inline detail::Window window(size_t windowSize) {
return detail::Window(windowSize);
}
} // namespace gen } // namespace gen
} // namespace folly } // namespace folly
......
...@@ -357,6 +357,8 @@ class Cycle; ...@@ -357,6 +357,8 @@ class Cycle;
class Batch; class Batch;
class Window;
class Dereference; class Dereference;
class Indirect; class Indirect;
......
...@@ -1249,6 +1249,31 @@ TEST(Gen, BatchMove) { ...@@ -1249,6 +1249,31 @@ TEST(Gen, BatchMove) {
EXPECT_EQ(expected, actual); EXPECT_EQ(expected, actual);
} }
TEST(Gen, Window) {
auto expected = seq(0, 10) | as<std::vector>();
for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
// no early stop
auto actual = seq(0, 10) |
mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
window(4) | dereference | as<std::vector>();
EXPECT_EQ(expected, actual) << windowSize;
}
for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
// pre-window take
auto actual = seq(0) |
mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
take(11) | window(4) | dereference | as<std::vector>();
EXPECT_EQ(expected, actual) << windowSize;
}
for (size_t windowSize = 1; windowSize <= 20; ++windowSize) {
// post-window take
auto actual = seq(0) |
mapped([](int i) { return std::unique_ptr<int>(new int(i)); }) |
window(4) | take(11) | dereference | as<std::vector>();
EXPECT_EQ(expected, actual) << windowSize;
}
}
TEST(Gen, Just) { TEST(Gen, Just) {
{ {
int x = 3; int x = 3;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment