Commit 89c3562e authored by Lucian Grijincu's avatar Lucian Grijincu Committed by Sara Golemon

folly: gen: pmap: parallel version of map

Summary:
same as map, but runs it's argument in parallel over a number of threads.

@override-unit-failures

Test Plan: added test

Reviewed By: tjackson@fb.com

FB internal diff: D1258364
parent 188e13f3
......@@ -112,15 +112,17 @@ class Optional {
}
}
/* implicit */ Optional(const None&)
/* implicit */ Optional(const None&) noexcept
: hasValue_(false) {
}
/* implicit */ Optional(Value&& newValue) {
/* implicit */ Optional(Value&& newValue)
noexcept(std::is_nothrow_move_constructible<Value>::value) {
construct(std::move(newValue));
}
/* implicit */ Optional(const Value& newValue) {
/* implicit */ Optional(const Value& newValue)
noexcept(std::is_nothrow_copy_constructible<Value>::value) {
construct(newValue);
}
......
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_GEN_PARALLELMAP_H
#error This file may only be included from folly/gen/ParallelMap.h
#endif
#include <atomic>
#include <cassert>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
#include "folly/MPMCPipeline.h"
#include "folly/experimental/EventCount.h"
namespace folly { namespace gen { namespace detail {
/**
* PMap - Map in parallel (using threads). For producing a sequence of
* values by passing each value from a source collection through a
* predicate while running the predicate in parallel in different
* threads.
*
* This type is usually used through the 'pmap' helper function:
*
* auto squares = seq(1, 10) | pmap(4, fibonacci) | sum;
*/
template<class Predicate>
class PMap : public Operator<PMap<Predicate>> {
Predicate pred_;
size_t nThreads_;
public:
PMap() {}
PMap(Predicate pred, size_t nThreads)
: pred_(std::move(pred)),
nThreads_(nThreads) { }
template<class Value,
class Source,
class Input = typename std::decay<Value>::type,
class Output = typename std::decay<
typename std::result_of<Predicate(Value)>::type
>::type>
class Generator :
public GenImpl<Output, Generator<Value, Source, Input, Output>> {
Source source_;
Predicate pred_;
const size_t nThreads_;
class ExecutionPipeline {
std::vector<std::thread> workers_;
std::atomic<bool> done_{false};
const Predicate& pred_;
MPMCPipeline<Input, Output> pipeline_;
EventCount wake_;
public:
ExecutionPipeline(const Predicate& pred, size_t nThreads)
: pred_(pred),
pipeline_(nThreads, nThreads) {
workers_.reserve(nThreads);
for (int i = 0; i < nThreads; i++) {
workers_.push_back(std::thread([this] { this->predApplier(); }));
}
}
~ExecutionPipeline() {
assert(pipeline_.sizeGuess() == 0);
assert(done_.load());
for (auto& w : workers_) { w.join(); }
}
void stop() {
// prevent workers from consuming more than we produce.
done_.store(true, std::memory_order_release);
wake_.notifyAll();
}
bool write(Value&& value) {
bool wrote = pipeline_.write(std::forward<Value>(value));
if (wrote) {
wake_.notify();
}
return wrote;
}
void blockingWrite(Value&& value) {
pipeline_.blockingWrite(std::forward<Value>(value));
wake_.notify();
}
bool read(Output& out) {
return pipeline_.read(out);
}
void blockingRead(Output& out) {
pipeline_.blockingRead(out);
}
private:
void predApplier() {
// Each thread takes a value from the pipeline_, runs the
// predicate and enqueues the result. The pipeline preserves
// ordering. NOTE: don't use blockingReadStage<0> to read from
// the pipeline_ as there may not be any: end-of-data is signaled
// separately using done_/wake_.
Input in;
for (;;) {
auto key = wake_.prepareWait();
typename MPMCPipeline<Input, Output>::template Ticket<0> ticket;
if (pipeline_.template readStage<0>(ticket, in)) {
wake_.cancelWait();
Output out = pred_(std::move(in));
pipeline_.template blockingWriteStage<0>(ticket,
std::move(out));
continue;
}
if (done_.load(std::memory_order_acquire)) {
wake_.cancelWait();
break;
}
// Not done_, but no items in the queue.
wake_.wait(key);
}
}
};
public:
Generator(Source source, const Predicate& pred, size_t nThreads)
: source_(std::move(source)),
pred_(pred),
nThreads_(nThreads ?: sysconf(_SC_NPROCESSORS_ONLN)) {
}
template<class Body>
void foreach(Body&& body) const {
ExecutionPipeline pipeline(pred_, nThreads_);
size_t wrote = 0;
size_t read = 0;
source_.foreach([&](Value value) {
if (pipeline.write(std::forward<Value>(value))) {
// input queue not yet full, saturate it before we process
// anything downstream
++wrote;
return;
}
// input queue full; drain ready items from the queue
Output out;
while (pipeline.read(out)) {
++read;
body(std::move(out));
}
// write the value we were going to write before we made room.
pipeline.blockingWrite(std::forward<Value>(value));
++wrote;
});
pipeline.stop();
// flush the output queue
while (read < wrote) {
Output out;
pipeline.blockingRead(out);
++read;
body(std::move(out));
}
}
template<class Handler>
bool apply(Handler&& handler) const {
ExecutionPipeline pipeline(pred_, nThreads_);
size_t wrote = 0;
size_t read = 0;
bool more = true;
source_.apply([&](Value value) {
if (pipeline.write(std::forward<Value>(value))) {
// input queue not yet full, saturate it before we process
// anything downstream
++wrote;
return true;
}
// input queue full; drain ready items from the queue
Output out;
while (pipeline.read(out)) {
++read;
if (!handler(std::move(out))) {
more = false;
return false;
}
}
// write the value we were going to write before we made room.
pipeline.blockingWrite(std::forward<Value>(value));
++wrote;
return true;
});
pipeline.stop();
// flush the output queue
while (read < wrote) {
Output out;
pipeline.blockingRead(out);
++read;
if (more) {
more = more && handler(std::move(out));
}
}
return more;
}
static constexpr bool infinite = Source::infinite;
};
template<class Source,
class Value,
class Gen = Generator<Value, Source>>
Gen compose(GenImpl<Value, Source>&& source) const {
return Gen(std::move(source.self()), pred_, nThreads_);
}
template<class Source,
class Value,
class Gen = Generator<Value, Source>>
Gen compose(const GenImpl<Value, Source>& source) const {
return Gen(source.self(), pred_, nThreads_);
}
};
}}} // namespaces
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FOLLY_GEN_PARALLELMAP_H
#define FOLLY_GEN_PARALLELMAP_H
#include "folly/gen/Core.h"
namespace folly { namespace gen {
namespace detail {
template<class Predicate>
class PMap;
} // namespace detail
/**
* Run `pred` in parallel in nThreads. Results are returned in the
* same order in which they were retrieved from the source generator
* (similar to map).
*
* NOTE: Only `pred` is run from separate threads; the source
* generator and the rest of the pipeline is executed in the
* caller thread.
*/
template<class Predicate,
class PMap = detail::PMap<Predicate>>
PMap pmap(Predicate pred = Predicate(), size_t nThreads = 0) {
return PMap(std::move(pred), nThreads);
}
}} // namespaces
#include "folly/gen/ParallelMap-inl.h"
#endif // FOLLY_GEN_PARALLELMAP_H
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unistd.h>
#include <atomic>
#include <algorithm>
#include <thread>
#include <vector>
#include "folly/Benchmark.h"
#include "folly/gen/Base.h"
#include "folly/gen/ParallelMap.h"
using namespace folly::gen;
DEFINE_int32(threads,
std::max(1, (int32_t) sysconf(_SC_NPROCESSORS_CONF) / 2),
"Num threads.");
constexpr int kFib = 35; // unit of work
size_t fib(int n) { return n <= 1 ? 1 : fib(n-1) * fib(n-2); }
BENCHMARK(FibSumMap, n) {
auto result =
seq(1, (int) n)
| map([](int) { return fib(kFib); })
| sum;
folly::doNotOptimizeAway(result);
}
BENCHMARK_RELATIVE(FibSumPmap, n) {
// Schedule more work: enough so that each worker thread does the
// same amount as one FibSumMap.
const size_t kNumThreads = FLAGS_threads;
auto result =
seq(1, (int) (n * kNumThreads))
| pmap([](int) { return fib(kFib); }, kNumThreads)
| sum;
folly::doNotOptimizeAway(result);
}
BENCHMARK_RELATIVE(FibSumThreads, n) {
// Schedule kNumThreads to execute the same code as FibSumMap.
const size_t kNumThreads = FLAGS_threads;
std::vector<std::thread> workers;
workers.reserve(kNumThreads);
auto fn = [n] {
auto result =
seq(1, (int) n)
| map([](int) { return fib(kFib); })
| sum;
folly::doNotOptimizeAway(result);
};
for (int i = 0; i < kNumThreads; i++) {
workers.push_back(std::thread(fn));
}
for (auto& w : workers) { w.join(); }
}
/*
============================================================================
folly/gen/test/ParallelMapBenchmark.cpp relative time/iter iters/s
============================================================================
FibSumMap 41.64ms 24.02
FibSumPmap 98.38% 42.32ms 23.63
FibSumThreads 94.48% 44.07ms 22.69
============================================================================
real0m15.595s
user2m47.100s
sys0m0.016s
*/
int main(int argc, char *argv[]) {
google::ParseCommandLineFlags(&argc, &argv, true);
folly::runBenchmarks();
return 0;
}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "folly/Memory.h"
#include "folly/gen/Base.h"
#include "folly/gen/ParallelMap.h"
using namespace folly;
using namespace folly::gen;
TEST(Pmap, InfiniteEquivalent) {
// apply
{
auto mapResult
= seq(1)
| map([](int x) { return x * x; })
| until([](int x) { return x > 1000 * 1000; })
| as<std::vector<int>>();
auto pmapResult
= seq(1)
| pmap([](int x) { return x * x; }, 4)
| until([](int x) { return x > 1000 * 1000; })
| as<std::vector<int>>();
EXPECT_EQ(pmapResult, mapResult);
}
// foreach
{
auto mapResult
= seq(1, 10)
| map([](int x) { return x * x; })
| as<std::vector<int>>();
auto pmapResult
= seq(1, 10)
| pmap([](int x) { return x * x; }, 4)
| as<std::vector<int>>();
EXPECT_EQ(pmapResult, mapResult);
}
}
TEST(Pmap, Empty) {
// apply
{
auto mapResult
= seq(1)
| map([](int x) { return x * x; })
| until([](int) { return true; })
| as<std::vector<int>>();
auto pmapResult
= seq(1)
| pmap([](int x) { return x * x; }, 4)
| until([](int) { return true; })
| as<std::vector<int>>();
EXPECT_EQ(mapResult.size(), 0);
EXPECT_EQ(pmapResult, mapResult);
}
// foreach
{
auto mapResult
= empty<int>()
| map([](int x) { return x * x; })
| as<std::vector<int>>();
auto pmapResult
= empty<int>()
| pmap([](int x) { return x * x; }, 4)
| as<std::vector<int>>();
EXPECT_EQ(mapResult.size(), 0);
EXPECT_EQ(pmapResult, mapResult);
}
}
TEST(Pmap, Rvalues) {
// apply
{
auto mapResult
= seq(1)
| map([](int x) { return make_unique<int>(x); })
| map([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
| map([](std::unique_ptr<int> x) { return *x; })
| take(1000)
| sum;
auto pmapResult
= seq(1)
| pmap([](int x) { return make_unique<int>(x); })
| pmap([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
| pmap([](std::unique_ptr<int> x) { return *x; })
| take(1000)
| sum;
EXPECT_EQ(pmapResult, mapResult);
}
// foreach
{
auto mapResult
= seq(1, 1000)
| map([](int x) { return make_unique<int>(x); })
| map([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
| map([](std::unique_ptr<int> x) { return *x; })
| sum;
auto pmapResult
= seq(1, 1000)
| pmap([](int x) { return make_unique<int>(x); })
| pmap([](std::unique_ptr<int> x) { return make_unique<int>(*x * *x); })
| pmap([](std::unique_ptr<int> x) { return *x; })
| sum;
EXPECT_EQ(pmapResult, mapResult);
}
}
int main(int argc, char *argv[]) {
testing::InitGoogleTest(&argc, argv);
google::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment