Commit 85361b0c authored by Maged Michael's avatar Maged Michael Committed by Facebook Github Bot

Flat Combining

Summary:
Flat combining template that takes the following template parameters:
 T         Concurrent data structure using FC interface
 Mutex Mutex type (default std::mutex)
 Atom  Atomic template (default std::atomic)
 Req    Optional request structure to hold custom info (default dummy type bool)

Flat combining (FC) was introduced in the SPAA 2010 paper Flat Combining and the Synchronization-Parallelism Tradeoff, by Danny Hendler, Itai Incze, Nir Shavit, and Moran Tzafrir.
http://mcg.cs.tau.ac.il/projects/projects/flat-combining

Reviewed By: djwatson

Differential Revision: D4602402

fbshipit-source-id: 38327f752a3e92bb01e5496c321d8c87c818087a
parent 16a97089
This diff is collapsed.
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <memory>
#include <mutex>
#include <folly/Baton.h>
#include <folly/experimental/flat_combining/FlatCombining.h>
namespace folly {
struct Line {
FOLLY_ALIGN_TO_AVOID_FALSE_SHARING
uint64_t val_;
};
class Data { // Sequential data structure
public:
explicit Data(size_t size) : size_(size) {
x_ = std::make_unique<Line[]>(size_);
}
uint64_t getVal() {
uint64_t val = x_[0].val_;
for (size_t i = 1; i < size_; ++i) {
assert(x_[i].val_ == val);
}
return val;
}
// add
void add(uint64_t val) {
uint64_t oldval = x_[0].val_;
for (size_t i = 0; i < size_; ++i) {
assert(x_[i].val_ == oldval);
x_[i].val_ = oldval + val;
}
}
uint64_t fetchAdd(uint64_t val) {
uint64_t res = x_[0].val_;
for (size_t i = 0; i < size_; ++i) {
assert(x_[i].val_ == res);
x_[i].val_ += val;
}
return res;
}
private:
size_t size_;
std::unique_ptr<Line[]> x_;
};
// Example of FC concurrent data structure using simple interface
template <
typename Mutex = std::mutex,
template <typename> class Atom = std::atomic>
class FcSimpleExample
: public FlatCombining<FcSimpleExample<Mutex, Atom>, Mutex, Atom> {
using FC = FlatCombining<FcSimpleExample<Mutex, Atom>, Mutex, Atom>;
using Rec = typename FC::Rec;
public:
explicit FcSimpleExample(
size_t size,
bool dedicated = true,
uint32_t numRecs = 0,
uint32_t maxOps = 0)
: FC(dedicated, numRecs, maxOps), data_(size) {}
uint64_t getVal() {
return data_.getVal();
}
// add
void addNoFC(uint64_t val) {
this->requestNoFC([&] { data_.add(val); });
}
void add(uint64_t val, Rec* rec = nullptr) {
auto opFn = [&, val] { // asynchronous -- capture val by value
data_.add(val);
};
this->requestFC(opFn, rec, false);
}
// fetchAdd
uint64_t fetchAddNoFC(uint64_t val) {
uint64_t res;
auto opFn = [&] { res = data_.fetchAdd(val); };
this->requestNoFC(opFn);
return res;
}
uint64_t fetchAdd(uint64_t val, Rec* rec = nullptr) {
uint64_t res;
auto opFn = [&] { res = data_.fetchAdd(val); };
this->requestFC(opFn, rec);
return res;
}
private:
Data data_;
};
// Example of FC data structure using custom request processing
class Req {
public:
enum class Type { ADD, FETCHADD };
void setType(Type type) {
type_ = type;
}
Type getType() {
return type_;
}
void setVal(uint64_t val) {
val_ = val;
}
uint64_t getVal() {
return val_;
}
void setRes(uint64_t res) {
res_ = res;
}
uint64_t getRes() {
return res_;
}
private:
Type type_;
uint64_t val_;
uint64_t res_;
};
template <
typename Req,
typename Mutex = std::mutex,
template <typename> class Atom = std::atomic>
class FcCustomExample : public FlatCombining<
FcCustomExample<Req, Mutex, Atom>,
Mutex,
Atom,
Req> {
using FC = FlatCombining<FcCustomExample<Req, Mutex, Atom>, Mutex, Atom, Req>;
using Rec = typename FC::Rec;
public:
explicit FcCustomExample(
int size,
bool dedicated = true,
uint32_t numRecs = 0,
uint32_t maxOps = 0)
: FC(dedicated, numRecs, maxOps), data_(size) {}
uint64_t getVal() {
return data_.getVal();
}
// add
void addNoFC(uint64_t val) {
this->requestNoFC([&] { data_.add(val); });
}
void add(uint64_t val, Rec* rec = nullptr) {
auto opFn = [&, val] { data_.add(val); };
auto fillFn = [&](Req& req) {
req.setType(Req::Type::ADD);
req.setVal(val);
};
this->requestFC(opFn, fillFn, rec, false); // asynchronous
}
// fetchAdd
uint64_t fetchAddNoFC(uint64_t val) {
uint64_t res;
auto opFn = [&] { res = data_.fetchAdd(val); };
this->requestNoFC(opFn);
return res;
}
uint64_t fetchAdd(uint64_t val, Rec* rec = nullptr) {
uint64_t res;
auto opFn = [&] { res = data_.fetchAdd(val); };
auto fillFn = [&](Req& req) {
req.setType(Req::Type::FETCHADD);
req.setVal(val);
};
auto resFn = [&](Req& req) { res = req.getRes(); };
this->requestFC(opFn, fillFn, resFn, rec);
return res;
}
// custom combined op processing - overrides FlatCombining::combinedOp(Req&)
void combinedOp(Req& req) {
switch (req.getType()) {
case Req::Type::ADD: {
data_.add(req.getVal());
} break;
case Req::Type::FETCHADD: {
req.setRes(data_.fetchAdd(req.getVal()));
} break;
default: { assert(false); }
}
}
private:
Data data_;
};
} // namespace folly {
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/flat_combining/test/FlatCombiningTestHelpers.h>
#include <folly/portability/GTest.h>
#include <glog/logging.h>
using namespace folly::test;
constexpr int LINES = 5;
constexpr int NUM_RECS = 20;
constexpr int WORK = 0;
constexpr int ITERS = 100;
static std::vector<int> nthr = {1, 10, 20};
struct Params {
bool combining, simple, dedicated, tc, syncop;
};
class FlatCombiningTest : public ::testing::TestWithParam<Params> {};
TEST_P(FlatCombiningTest, combining) {
Params p = GetParam();
for (auto n : nthr) {
run_test(
n,
LINES,
NUM_RECS,
WORK,
ITERS,
p.combining,
p.simple,
p.dedicated,
p.tc,
p.syncop,
true,
true);
}
}
TEST_P(FlatCombiningTest, more_threads_than_records) {
int n = 20;
int num_recs = 1;
Params p = GetParam();
run_test(
n,
LINES,
num_recs,
WORK,
ITERS,
p.combining,
p.simple,
p.dedicated,
p.tc,
p.syncop,
true,
true);
}
constexpr Params params[] = {
{false, false, false, false, false}, // no combining
// simple combining
// dedicated
{true, true, true, false, true}, // no-tc sync
{true, true, true, false, false}, // no-tc async
{true, true, true, true, true}, // tc sync
{true, true, true, true, false}, // tc async
// no dedicated
{true, true, false, false, true}, // no-tc sync
{true, true, false, false, false}, // no-tc async
{true, true, false, true, true}, // tc sync
{true, true, false, true, false}, // tc async
// custom combining
// dedicated
{true, false, true, false, true}, // no-tc sync
{true, false, true, false, false}, // no-tc async
{true, false, true, true, true}, // tc sync
{true, false, true, true, false}, // tc async
// no dedicated
{true, false, false, false, true}, // no-tc sync
{true, false, false, false, false}, // no-tc async
{true, false, false, true, true}, // tc sync
{true, false, false, true, false}, // tc async
};
INSTANTIATE_TEST_CASE_P(Foo, FlatCombiningTest, ::testing::ValuesIn(params));
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/flat_combining/test/FlatCombiningExamples.h>
#include <folly/Benchmark.h>
#include <glog/logging.h>
#include <atomic>
#include <chrono>
#include <thread>
namespace folly {
namespace test {
void doWork(int work) {
uint64_t a = 0;
for (int i = work; i > 0; --i) {
a += i;
}
folly::doNotOptimizeAway(a);
}
template <
typename Example,
typename Req = bool,
typename Mutex = std::mutex,
template <typename> class Atom = std::atomic>
uint64_t fc_test(
int nthreads,
int lines,
int numRecs,
int work,
int ops,
bool combining,
bool dedicated,
bool tc,
bool syncops,
bool excl = false,
bool allocAll = false) {
using FC = FlatCombining<Example, Mutex, Atom, Req>;
using Rec = typename FC::Rec;
folly::BenchmarkSuspender susp;
std::atomic<bool> start{false};
std::atomic<int> started{0};
Example ex(lines, dedicated, numRecs);
std::atomic<uint64_t> total{0};
bool mutex = false;
if (allocAll) {
std::vector<Rec*> v(numRecs);
for (int i = 0; i < numRecs; ++i) {
v[i] = ex.allocRec();
}
for (int i = numRecs; i > 0; --i) {
ex.freeRec(v[i - 1]);
}
}
std::vector<std::thread> threads(nthreads);
for (int tid = 0; tid < nthreads; ++tid) {
threads[tid] = std::thread([&, tid] {
started.fetch_add(1);
Rec* myrec = (combining && tc) ? ex.allocRec() : nullptr;
uint64_t sum = 0;
while (!start.load())
;
if (!combining) {
// no combining
for (int i = tid; i < ops; i += nthreads) {
sum += ex.fetchAddNoFC(1);
doWork(work); // unrelated work
}
} else if (syncops) {
// sync combining
for (int i = tid; i < ops; i += nthreads) {
sum += ex.fetchAdd(1, myrec);
doWork(work); // unrelated work
}
} else {
// async combining
for (int i = tid; i < ops; i += nthreads) {
ex.add(1, myrec);
doWork(work); // unrelated work
}
}
if (excl) {
// test of unstructured exclusive access
ex.acquireExclusive();
{
CHECK(!mutex);
mutex = true;
VLOG(2) << tid << " " << ex.getVal() << " ...........";
using namespace std::chrono_literals;
/* sleep override */ // for coverage
std::this_thread::sleep_for(10ms);
VLOG(2) << tid << " " << ex.getVal() << " ===========";
CHECK(mutex);
mutex = false;
}
ex.releaseExclusive();
}
total.fetch_add(sum);
if (combining && tc) {
ex.freeRec(myrec);
}
});
}
while (started.load() < nthreads)
;
auto tbegin = std::chrono::steady_clock::now();
// begin time measurement
susp.dismiss();
start.store(true);
for (auto& t : threads) {
t.join();
}
if (!syncops) {
// complete any pending asynch ops
ex.drainAll();
}
// end time measurement
uint64_t duration = 0;
BENCHMARK_SUSPEND {
auto tend = std::chrono::steady_clock::now();
CHECK_EQ(ops, ex.getVal());
if (syncops) {
uint64_t n = (uint64_t)ops;
uint64_t expected = n * (n - 1) / 2;
CHECK_EQ(expected, total);
}
duration =
std::chrono::duration_cast<std::chrono::nanoseconds>(tend - tbegin)
.count();
}
return duration;
}
uint64_t run_test(
int nthreads,
int lines,
int numRecs,
int work,
int ops,
bool combining,
bool simple,
bool dedicated,
bool tc,
bool syncops,
bool excl = false,
bool allocAll = false) {
using M = std::mutex;
if (simple) {
using Example = FcSimpleExample<M>;
return fc_test<Example, bool, M>(
nthreads,
lines,
numRecs,
work,
ops,
combining,
dedicated,
tc,
syncops,
excl,
allocAll);
} else {
using Example = FcCustomExample<Req, M>;
return fc_test<Example, Req, M>(
nthreads,
lines,
numRecs,
work,
ops,
combining,
dedicated,
tc,
syncops,
excl,
allocAll);
}
}
} // namespace test {
} // namespace folly {
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment