Commit 83312f92 authored by Tudor Bosman's avatar Tudor Bosman Committed by Sara Golemon

folly::Future-istic barrier

Summary: What it says on the tin.

Reviewed By: @fugalh

Differential Revision: D2230390
parent 84f796c9
......@@ -128,6 +128,7 @@ nobase_follyinclude_HEADERS = \
FormatTraits.h \
Format.h \
Format-inl.h \
futures/Barrier.h \
futures/Deprecated.h \
futures/ThreadedExecutor.h \
futures/DrivableExecutor.h \
......@@ -311,6 +312,7 @@ libfolly_la_SOURCES = \
FileUtil.cpp \
FingerprintTables.cpp \
futures/detail/ThreadWheelTimekeeper.cpp \
futures/Barrier.cpp \
futures/ThreadedExecutor.cpp \
futures/Future.cpp \
futures/InlineExecutor.cpp \
......
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/futures/Barrier.h>
namespace folly { namespace futures {
Barrier::Barrier(uint32_t n)
: size_(n),
controlBlock_(allocateControlBlock()) { }
Barrier::~Barrier() {
auto block = controlBlock_.load(std::memory_order_relaxed);
auto prev = block->valueAndReaderCount.load(std::memory_order_relaxed);
DCHECK_EQ(prev >> kReaderShift, 0);
auto val = prev & kValueMask;
auto p = promises(block);
for (uint32_t i = 0; i < val; ++i) {
p[i].setException(
folly::make_exception_wrapper<std::runtime_error>("Barrier destroyed"));
}
freeControlBlock(controlBlock_);
}
auto Barrier::allocateControlBlock() -> ControlBlock* {
auto block = static_cast<ControlBlock*>(malloc(controlBlockSize(size_)));
if (!block) {
throw std::bad_alloc();
}
block->valueAndReaderCount = 0;
auto p = promises(block);
uint32_t i = 0;
try {
for (i = 0; i < size_; ++i) {
new (p + i) BoolPromise();
}
} catch (...) {
for (; i != 0; --i) {
p[i - 1].~BoolPromise();
}
throw;
}
return block;
}
void Barrier::freeControlBlock(ControlBlock* block) {
auto p = promises(block);
for (uint32_t i = size_; i != 0; --i) {
p[i - 1].~BoolPromise();
}
free(block);
}
folly::Future<bool> Barrier::wait() {
// Load the current control block first. As we know there is at least
// one thread in the current epoch (us), this means that the value is
// < size_, so controlBlock_ can't change until we bump the value below.
auto block = controlBlock_.load(std::memory_order_acquire);
auto p = promises(block);
// Bump the value and record ourselves as reader.
// This ensures that block stays allocated, as the reader count is > 0.
auto prev = block->valueAndReaderCount.fetch_add(kReader + 1,
std::memory_order_acquire);
auto prevValue = static_cast<uint32_t>(prev & kValueMask);
DCHECK_LT(prevValue, size_);
auto future = p[prevValue].getFuture();
if (prevValue + 1 == size_) {
// Need to reset the barrier before fulfilling any futures. This is
// when the epoch is flipped to the next.
controlBlock_.store(allocateControlBlock(), std::memory_order_release);
p[0].setValue(true);
for (uint32_t i = 1; i < size_; ++i) {
p[i].setValue(false);
}
}
// Free the control block if we're the last reader at max value.
prev = block->valueAndReaderCount.fetch_sub(kReader,
std::memory_order_acq_rel);
if (prev == (kReader | uint64_t(size_))) {
freeControlBlock(block);
}
return future;
}
}} // namespaces
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <cstdint>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
namespace folly { namespace futures {
// A folly::Future-istic Barrier synchronization primitive
//
// The barrier is initialized with a count N.
//
// The first N-1 calls to wait() return uncompleted futures.
//
// The Nth call to wait() completes the previous N-1 futures successfully,
// returns a future that is already completed successfully, and resets the
// barrier; the barrier may be reused immediately, as soon as at least one
// of the future completions has been observed.
//
// Of these N futures, exactly one is completed with true, while the others are
// completed with false; it is unspecified which future completes with true.
// (This may be used to elect a "leader" among a group of threads.)
//
// If the barrier is destroyed, any futures already returned by wait() will
// complete with an error.
class Barrier {
public:
explicit Barrier(uint32_t n);
~Barrier();
folly::Future<bool> wait();
private:
typedef folly::Promise<bool> BoolPromise;
static constexpr uint64_t kReaderShift = 32;
static constexpr uint64_t kReader = uint64_t(1) << kReaderShift;
static constexpr uint64_t kValueMask = kReader - 1;
// For each "epoch" that the barrier is active, we have a different
// ControlBlock. The ControlBlock contains the current barrier value
// and the number of readers (currently inside wait()) packed into a
// 64-bit value.
//
// The ControlBlock is allocated as long as either:
// - there are threads currently inside wait() (reader count > 0), or
// - the value has not yet reached size_ (value < size_)
//
// The array of size_ Promise objects is allocated immediately following
// valueAndReaderCount.
struct ControlBlock {
// Reader count in most significant 32 bits
// Value in least significant 32 bits
std::atomic<uint64_t> valueAndReaderCount;
};
struct ControlBlockAndPromise {
ControlBlock cb;
BoolPromise promises[1];
};
static BoolPromise* promises(ControlBlock* cb) {
return reinterpret_cast<ControlBlockAndPromise*>(cb)->promises;
}
static size_t controlBlockSize(size_t n) {
return offsetof(ControlBlockAndPromise, promises) + n * sizeof(BoolPromise);
}
ControlBlock* allocateControlBlock();
void freeControlBlock(ControlBlock* b);
uint32_t size_;
std::atomic<ControlBlock*> controlBlock_;
};
}} // namespaces
/*
* Copyright 2015 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/futures/Barrier.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <folly/Random.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
DEFINE_int32(seed, 0, "Random seed");
namespace folly { namespace futures { namespace test {
TEST(BarrierTest, Simple) {
constexpr uint32_t numThreads = 10;
std::mutex mutex;
std::condition_variable b1DoneCond;
std::condition_variable b2DoneCond;
std::atomic<uint32_t> b1TrueSeen(0);
std::atomic<uint32_t> b1Passed(0);
std::atomic<uint32_t> b2TrueSeen(0);
std::atomic<uint32_t> b2Passed(0);
Barrier barrier(numThreads + 1);
std::vector<std::thread> threads;
threads.reserve(numThreads);
for (uint32_t i = 0; i < numThreads; ++i) {
threads.emplace_back([&] () {
barrier.wait()
.then(
[&] (bool v) {
std::unique_lock<std::mutex> lock(mutex);
b1TrueSeen += uint32_t(v);
if (++b1Passed == numThreads) {
b1DoneCond.notify_one();
}
return barrier.wait();
})
.then(
[&] (bool v) {
std::unique_lock<std::mutex> lock(mutex);
b2TrueSeen += uint32_t(v);
if (++b2Passed == numThreads) {
b2DoneCond.notify_one();
}
})
.get();
});
}
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_EQ(0, b1Passed);
EXPECT_EQ(0, b1TrueSeen);
b1TrueSeen += barrier.wait().get();
{
std::unique_lock<std::mutex> lock(mutex);
while (b1Passed != numThreads) {
b1DoneCond.wait(lock);
}
EXPECT_EQ(1, b1TrueSeen);
}
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_EQ(0, b2Passed);
EXPECT_EQ(0, b2TrueSeen);
b2TrueSeen += barrier.wait().get();
{
std::unique_lock<std::mutex> lock(mutex);
while (b2Passed != numThreads) {
b2DoneCond.wait(lock);
}
EXPECT_EQ(1, b2TrueSeen);
}
for (auto& t : threads) {
t.join();
}
}
TEST(BarrierTest, Random) {
// Create numThreads threads.
//
// Each thread repeats the following numIterations times:
// - grab a randomly chosen number of futures from the barrier, waiting
// for a short random time between each
// - wait for all futures to complete
// - record whether the one future returning true was seen among them
//
// At the end, we verify that exactly one future returning true was seen
// for each iteration.
constexpr uint32_t numIterations = 1;
auto numThreads = folly::Random::rand32(30, 91);
struct ThreadInfo {
ThreadInfo() { }
std::thread thread;
uint32_t iteration = 0;
uint32_t numFutures;
std::vector<uint32_t> trueSeen;
};
std::vector<ThreadInfo> threads;
threads.resize(numThreads);
uint32_t totalFutures = 0;
for (auto& tinfo : threads) {
tinfo.numFutures = folly::Random::rand32(100);
tinfo.trueSeen.resize(numIterations);
totalFutures += tinfo.numFutures;
}
Barrier barrier(totalFutures);
for (auto& tinfo : threads) {
auto pinfo = &tinfo;
tinfo.thread = std::thread(
[numIterations, pinfo, &barrier] () {
std::vector<folly::Future<bool>> futures;
futures.reserve(pinfo->numFutures);
for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) {
futures.clear();
for (uint32_t j = 0; j < pinfo->numFutures; ++j) {
futures.push_back(barrier.wait());
auto nanos = folly::Random::rand32(10 * 1000 * 1000);
/* sleep override */
std::this_thread::sleep_for(std::chrono::nanoseconds(nanos));
}
auto results = folly::collect(futures).get();
pinfo->trueSeen[i] =
std::count(results.begin(), results.end(), true);
}
});
}
for (auto& tinfo : threads) {
tinfo.thread.join();
EXPECT_EQ(numIterations, tinfo.iteration);
}
for (uint32_t i = 0; i < numIterations; ++i) {
uint32_t trueCount = 0;
for (auto& tinfo : threads) {
trueCount += tinfo.trueSeen[i];
}
EXPECT_EQ(1, trueCount);
}
}
}}} // namespaces
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment