Commit d3cc50e0 authored by Tom Jackson's avatar Tom Jackson Committed by Sara Golemon


Adding `... | parallel(my | pipe | line) | ...` for parallelizing a portion of a generator pipeline.

auto factored = from(values)
| parallel(filter(isEven) | map(square) | sub(count))
| sum;

Work is divided evenly among a fixed number of threads using a `MPMCQueue`.

Test Plan: Unit tests and benchmarks testing for a variety of workloads and performance characteristics, including sub-linear (blocking) workloads, linear (mostly math) workloads, and superlinear (sleeping) workloads to simulate real-world use.

Reviewed By:

FB internal diff: D638551
parent 7fbec719
......@@ -142,6 +142,45 @@ public:
* RangeSource - For producing values from a folly::Range. Useful for referring
* to a slice of some container.
* This type is primarily used through the 'from' function, like:
* auto rangeSource = from(folly::range(v.begin(), v.end()));
* auto sum = rangeSource | sum;
* Reminder: Be careful not to invalidate iterators when using ranges like this.
template<class Iterator>
class RangeSource : public GenImpl<typename Range<Iterator>::reference,
RangeSource<Iterator>> {
Range<Iterator> range_;
RangeSource() {}
explicit RangeSource(Range<Iterator> range)
: range_(std::move(range))
template<class Handler>
bool apply(Handler&& handler) const {
for (auto& value : range_) {
if (!handler(value)) {
return false;
return true;
template<class Body>
void foreach(Body&& body) const {
for (auto& value : range_) {
* Sequence - For generating values from beginning value, incremented along the
* way with the ++ and += operators. Iteration may continue indefinitely by
......@@ -256,8 +295,32 @@ class Yield : public GenImpl<Value, Yield<Value, Source>> {
template<class Value>
class Empty : public GenImpl<Value, Empty<Value>> {
template<class Handler>
bool apply(Handler&&) const { return true; }
template <class Handler>
bool apply(Handler&&) const {
return true;
template <class Body>
void foreach(Body&&) const {}
template<class Value>
class Just : public GenImpl<const Value&, Just<Value>> {
"Just requires non-ref types");
const Value value_;
Just(Value value) : value_(std::forward<Value>(value)) {}
template <class Handler>
bool apply(Handler&& handler) const {
return handler(value_);
template <class Body>
void foreach(Body&& body) const {
......@@ -879,6 +942,25 @@ class Distinct : public Operator<Distinct<Selector>> {
* Composer - Helper class for adapting pipelines into functors. Primarily used
* for 'mapOp'.
template<class Operators>
class Composer {
Operators op_;
explicit Composer(Operators op)
: op_(std::move(op)) {}
template<class Source,
class Ret = decltype(std::declval<Operators>()
Ret operator()(Source&& source) const {
return op_.compose(std::forward<Source>(source));
* Batch - For producing fixed-size batches of each value from a source.
* Copyright 2013 Facebook, Inc.
* Copyright 2014 Facebook, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
......@@ -264,6 +265,8 @@ class Yield;
template<class Value>
class Empty;
template<class Value>
class Just;
* Operators
......@@ -290,6 +293,9 @@ class Order;
template<class Selector>
class Distinct;
template<class Operators>
class Composer;
template<class Expected>
class TypeAssertion;
......@@ -431,6 +437,11 @@ detail::Empty<Value> empty() {
return {};
template<class Value>
detail::Just<Value> just(Value value) {
return detail::Just<Value>(std::move(value));
* Operator Factories
......@@ -446,6 +457,21 @@ Map map(Predicate pred = Predicate()) {
return Map(std::move(pred));
* mapOp - Given a generator of generators, maps the application of the given
* operator on to each inner gen. Especially useful in aggregating nested data
* structures:
* chunked(samples, 256)
* | mapOp(filter(sampleTest) | count)
* | sum;
template<class Operator,
class Map = detail::Map<detail::Composer<Operator>>>
Map mapOp(Operator op) {
return Map(detail::Composer<Operator>(std::move(op)));
* member(...) - For extracting a member from each value.
......@@ -90,8 +90,10 @@ class Operator : public FBounded<Self> {
Operator() = default;
Operator(const Operator&) = default;
Operator(Operator&&) = default;
Operator(const Operator&) = default;
Operator& operator=(Operator&&) = default;
Operator& operator=(const Operator&) = default;
......@@ -142,8 +144,10 @@ class GenImpl : public FBounded<Self> {
// To prevent slicing
GenImpl() = default;
GenImpl(const GenImpl&) = default;
GenImpl(GenImpl&&) = default;
GenImpl(const GenImpl&) = default;
GenImpl& operator=(GenImpl&&) = default;
GenImpl& operator=(const GenImpl&) = default;
typedef Value ValueType;
This diff is collapsed.
* Copyright 2014 Facebook, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <mutex>
#include "folly/gen/Base.h"
namespace folly { namespace gen {
namespace detail {
template <class Ops>
class Parallel;
template <class Sink>
class Sub;
template <class Iterator>
class ChunkedRangeSource;
* chunked() - For producing values from a container in slices.
* Especially for use with 'parallel()', chunked can be used to process values
* from a persistent container in chunks larger than one value at a time. The
* values produced are generators for slices of the input container. */
template <class Container,
class Iterator = typename Container::const_iterator,
class Chunked = detail::ChunkedRangeSource<Iterator>>
Chunked chunked(const Container& container, int chunkSize = 256) {
return Chunked(chunkSize, folly::range(container.begin(), container.end()));
template <class Container,
class Iterator = typename Container::iterator,
class Chunked = detail::ChunkedRangeSource<Iterator>>
Chunked chunked(Container& container, int chunkSize = 256) {
return Chunked(chunkSize, folly::range(container.begin(), container.end()));
* parallel - A parallelization operator.
* 'parallel(ops)' can be used with any generator to process a segment
* of the pipeline in parallel. Multiple threads are used to apply the
* operations ('ops') to the input sequence, with the resulting sequence
* interleaved to be processed on the client thread.
* auto scoredResults
* = from(ids)
* | parallel(map(fetchObj) | filter(isValid) | map(scoreObj))
* | as<vector>();
* Operators specified for parallel execution must yield sequences, not just
* individual values. If a sink function such as 'count' is desired, it must be
* wrapped in 'sub' to produce a subcount, since any such aggregation must be
* re-aggregated.
* auto matches
* = from(docs)
* | parallel(filter(expensiveTest) | sub(count))
* | sum;
* Here, each thread counts its portion of the result, then the sub-counts are
* summed up to produce the total count.
template <class Ops, class Parallel = detail::Parallel<Ops>>
Parallel parallel(Ops ops, size_t threads = 0) {
return Parallel(std::move(ops), threads);
* sub - For sub-summarization of a sequence.
* 'sub' can be used to apply a sink function to a generator, but wrap the
* single value in another generator. Note that the sink is eagerly evaluated on
* the input sequence.
* auto sum = from(list) | sub(count) | first;
* This is primarily used with 'parallel', as noted above.
template <class Sink, class Sub = detail::Sub<Sink>>
Sub sub(Sink sink) {
return Sub(std::move(sink));
}} // !namespace folly::gen
#include "folly/gen/Parallel-inl.h"
#endif /* FOLLY_GEN_PARALLEL_H_ */
* Copyright 2014 Facebook, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include "folly/Benchmark.h"
#define BENCH_GEN_IMPL(gen, prefix) \
static bool FB_ANONYMOUS_VARIABLE(benchGen) = ( \
::folly::addBenchmark(__FILE__, prefix FB_STRINGIZE(gen), \
[](unsigned iters){ \
while (iters--) { \
folly::doNotOptimizeAway(gen); \
} \
}), true)
#define BENCH_GEN(gen) BENCH_GEN_IMPL(gen, "")
#define BENCH_GEN_REL(gen) BENCH_GEN_IMPL(gen, "%")
* Copyright 2014 Facebook, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <glog/logging.h>
#include <iostream>
#include <array>
#include <vector>
#include <future>
#include "folly/gen/Base.h"
#include "folly/gen/Parallel.h"
#include "folly/gen/test/Bench.h"
std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2),
"Num threads.");
using namespace folly::gen;
using std::vector;
constexpr int kFib = 28; // unit of work
size_t fib(int n) { return n <= 1 ? 1 : fib(n - 1) + fib(n - 2); }
static auto add = [](int a, int b) { return a + b; };
static auto mod7 = [](int i) { return i % 7; };
static auto isPrimeSlow = [](int n) {
if (n < 2) {
return false;
} else if (n > 2) {
for (int d = 3; d * d <= n; d += 2) {
if (0 == n % d) {
return false;
return true;
static auto primes =
seq(1, 1 << 20) | filter(isPrimeSlow) | as<vector>();
static auto isPrime = [](int n) {
return !(from(primes)
| until([&](int d) { return d * d > n; })
| filter([&](int d) { return 0 == n % d; })
| any);
static auto factors = [](int n) {
return from(primes)
| until([&](int d) { return d * d > n; })
| filter([&](int d) { return 0 == n % d; })
| count;
static auto factorsSlow = [](int n) {
return from(primes)
| filter([&](int d) { return 0 == n % d; })
| count;
static auto sleepyWork = [](int i) {
const auto sleepyTime = std::chrono::microseconds(100);
return i;
static auto sleepAndWork = [](int i) {
return factorsSlow(i) + sleepyWork(i);
std::mutex block;
static auto workAndBlock = [](int i) {
int r = factorsSlow(i);
std::lock_guard<std::mutex> lock(block);
return sleepyWork(i) + r;
auto start = 1 << 20;
auto v = seq(start) | take(1 << 20) | as<vector>();
auto small = from(v) | take(1 << 12);
auto medium = from(v) | take(1 << 14);
auto large = from(v) | take(1 << 18);
auto huge = from(v);
auto chunks = chunked(v);
BENCH_GEN(small | map(factorsSlow) | sum);
BENCH_GEN_REL(small | parallel(map(factorsSlow)) | sum);
BENCH_GEN(small | map(factors) | sum);
BENCH_GEN_REL(small | parallel(map(factors)) | sum);
BENCH_GEN(large | map(factors) | sum);
BENCH_GEN_REL(large | parallel(map(factors)) | sum);
auto ch = chunks;
auto cat = concat;
BENCH_GEN(huge | filter(isPrime) | count);
BENCH_GEN_REL(ch | cat | filter(isPrime) | count);
BENCH_GEN_REL(ch | parallel(cat | filter(isPrime)) | count);
BENCH_GEN_REL(ch | parallel(cat | filter(isPrime) | sub(count)) | sum);
BENCH_GEN(small | map(sleepAndWork) | sum);
BENCH_GEN_REL(small | parallel(map(sleepAndWork)) | sum);
const int fibs = 1000;
BENCH_GEN(seq(1, fibs) | map([](int) { return fib(kFib); }) | sum);
BENCH_GEN_REL(seq(1, fibs) |
parallel(map([](int) { return fib(kFib); }) | sub(sum)) | sum);
auto threads = seq(1, int(FLAGS_threads))
| map([](int i) {
return std::thread([=] {
return range((i + 0) * fibs / FLAGS_threads,
(i + 1) * fibs / FLAGS_threads) |
map([](int) { return fib(kFib); }) | sum;
| as<vector>();
from(threads) | [](std::thread &thread) { thread.join(); };
return 1;
#if 0
folly/gen/test/ParallelBenchmark.cpp relative time/iter iters/s
small | map(factorsSlow) | sum 4.59s 217.87m
small | parallel(map(factorsSlow)) | sum 1588.86% 288.88ms 3.46
small | map(factors) | sum 9.62ms 103.94
small | parallel(map(factors)) | sum 89.15% 10.79ms 92.66
large | map(factors) | sum 650.52ms 1.54
large | parallel(map(factors)) | sum 53.82% 1.21s 827.41m
huge | filter(isPrime) | count 295.93ms 3.38
ch | cat | filter(isPrime) | count 99.76% 296.64ms 3.37
ch | parallel(cat | filter(isPrime)) | count 142.75% 207.31ms 4.82
ch | parallel(cat | filter(isPrime) | sub(count 1538.50% 19.24ms 51.99
small | map(sleepAndWork) | sum 5.37s 186.18m
small | parallel(map(sleepAndWork)) | sum 1840.38% 291.85ms 3.43
seq(1, fibs) | map([](int) { return fib(kFib); 1.49s 669.53m
seq(1, fibs) | parallel(map([](int) { return fi 1698.07% 87.96ms 11.37
[] { auto threads = seq(1, int(FLAGS_threads)) 1571.16% 95.06ms 10.52
int main(int argc, char *argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
return 0;
* Copyright 2014 Facebook, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <iostream>
#include <array>
#include <vector>
#include "folly/gen/Base.h"
#include "folly/gen/Parallel.h"
using namespace folly;
using namespace folly::gen;
using std::vector;
const auto square = [](int i) { return i * i; };
const auto even = [](int i) { return 0 == i % 2; };
static auto sleepyWork = [](int i) {
const auto sleepyTime = std::chrono::microseconds(100);
return i;
static auto isPrime = [](int n) {
if (n < 2) {
return false;
} else if (n > 2) {
for (int d = 3; d * d <= n; d += 2) {
if (0 == n % d) {
return false;
return true;
struct {
template<class T>
std::unique_ptr<T> operator()(T t) const {
return std::unique_ptr<T>(new T(std::move(t)));
} makeUnique;
static auto primes = seq(1, 1 << 14)
| filter(isPrime)
| as<vector<size_t>>();
static auto primeFactors = [](int n) {
return from(primes)
| filter([&](int d) { return 0 == n % d; })
| count;
TEST(ParallelTest, Serial) {
seq(1,10) | map(square) | filter(even) | sum,
seq(1,10) | parallel(map(square) | filter(even)) | sum);
auto heavyWork = map(primeFactors);
TEST(ParallelTest, ComputeBound64) {
int length = 1 << 10;
EXPECT_EQ(seq<size_t>(1, length) | heavyWork | sum,
seq<size_t>(1, length) | parallel(heavyWork) | sum);
TEST(ParallelTest, Take) {
int length = 1 << 18;
int limit = 1 << 14;
EXPECT_EQ(seq(1, length) | take(limit) | count,
seq(1, length) | parallel(heavyWork) | take(limit) | count);
TEST(ParallelTest, Unique) {
auto uniqued = from(primes) | map(makeUnique) | as<vector>();
from(primes) | parallel(map(makeUnique)) |
parallel(dereference | map(makeUnique)) | dereference | count);
from(primes) | parallel(map(makeUnique)) |
parallel(dereference | map(makeUnique)) | dereference |
take(2) | count);
TEST(ParallelTest, PSum) {
EXPECT_EQ(from(primes) | map(sleepyWork) | sum,
from(primes) | parallel(map(sleepyWork) | sub(sum)) | sum);
int main(int argc, char *argv[]) {
testing::InitGoogleTest(&argc, argv);
google::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment