Commit d12df6e9 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

A semaphore for testing

Summary:
[Folly] A semaphore type for testing scenarios which need a basic portable semaphore. Supports FIFO and LIFO wake policies.

May be used when testing other, more sophisticated, semaphore types.

Reviewed By: strager

Differential Revision: D15843438

fbshipit-source-id: 9136e8e19ea79fce4191e69795d5fe848a482f03
parent 50b82b1a
......@@ -743,6 +743,7 @@ if (BUILD_TESTS)
TEST call_once_test SOURCES CallOnceTest.cpp
TEST lifo_sem_test SOURCES LifoSemTests.cpp
TEST rw_spin_lock_test SOURCES RWSpinLockTest.cpp
TEST semaphore_test SOURCES SemaphoreTest.cpp
DIRECTORY system/test/
TEST memory_mapping_test SOURCES MemoryMappingTest.cpp
......
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <boost/intrusive/list.hpp>
#include <folly/ScopeGuard.h>
#include <folly/lang/Exception.h>
namespace folly {
namespace test {
enum class SemaphoreWakePolicy { Fifo, Lifo };
// Semaphore
//
// A basic portable semaphore, primarily intended for testing scenarios. Likely
// to be much less performant than better-optimized semaphore implementations.
//
// In the interest of portability, uses only synchronization mechanisms shipped
// with all implementations of C++: std::mutex and std::condition_variable.
template <SemaphoreWakePolicy WakePolicy>
class Semaphore {
public:
Semaphore() : value_(0) {}
explicit Semaphore(std::size_t value) : value_(value) {}
bool try_wait() {
std::unique_lock<std::mutex> lock{mutex_};
if (value_) {
--value_;
return true;
} else {
return false;
}
}
template <typename PreWait, typename PostWait>
void wait(PreWait pre_wait, PostWait post_wait) {
std::unique_lock<std::mutex> lock{mutex_};
pre_wait();
if (value_) {
--value_;
post_wait();
} else {
Waiter w;
waiters_.push_back(w);
w.wake_waiter.wait(lock);
auto wake_poster_guard = makeGuard([&] { w.wake_poster->post(); });
post_wait();
}
}
void wait() {
wait([] {}, [] {});
}
template <typename PrePost>
void post(PrePost pre_post) {
std::unique_lock<std::mutex> lock{mutex_};
if (value_ == -size_t(1)) {
throw_exception<std::logic_error>("overflow");
}
pre_post();
if (waiters_.empty()) {
++value_;
} else {
auto& w = pull();
waiters_.erase(waiters_.iterator_to(w));
Event wake_poster;
w.wake_poster = &wake_poster;
w.wake_waiter.post();
wake_poster.wait(lock);
}
}
void post() {
post([] {});
}
private:
class Event {
public:
void post() {
signaled = true;
cv.notify_one();
}
void wait(std::unique_lock<std::mutex>& lock) {
cv.wait(lock, [&] { return signaled; });
}
private:
bool signaled = false;
std::condition_variable cv;
};
struct Waiter : boost::intrusive::list_base_hook<> {
Event wake_waiter;
Event* wake_poster;
};
using WaiterList = boost::intrusive::list<Waiter>;
Waiter& pull() {
switch (WakePolicy) {
case SemaphoreWakePolicy::Fifo:
return waiters_.front();
case SemaphoreWakePolicy::Lifo:
return waiters_.back();
}
terminate_with<std::invalid_argument>("wake-policy");
}
std::size_t value_;
std::mutex mutex_;
WaiterList waiters_;
};
using FifoSemaphore = Semaphore<SemaphoreWakePolicy::Fifo>;
using LifoSemaphore = Semaphore<SemaphoreWakePolicy::Lifo>;
} // namespace test
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/synchronization/test/Semaphore.h>
#include <array>
#include <thread>
#include <vector>
#include <folly/portability/GTest.h>
#include <folly/synchronization/test/Barrier.h>
using namespace folly::test;
namespace {
class WaitForAll {
public:
explicit WaitForAll(size_t rem) : remaining(rem) {}
void post() {
std::unique_lock<std::mutex> l{m};
assert(remaining);
if (!--remaining) {
c.notify_one();
}
}
void wait() {
std::unique_lock<std::mutex> l{m};
c.wait(l, [&] { return !remaining; });
}
private:
size_t remaining;
std::mutex m;
std::condition_variable c;
};
template <SemaphoreWakePolicy WakePolicy>
auto wake_policy(Semaphore<WakePolicy> const&) {
return WakePolicy;
}
template <typename Sem>
void test_basic() {
Sem sem;
EXPECT_FALSE(sem.try_wait());
sem.post();
EXPECT_TRUE(sem.try_wait());
sem.post();
sem.wait();
}
template <typename Sem>
void test_wake_policy() {
constexpr auto const nthreads = 16ull;
constexpr auto const rounds = 1ull << 4;
Sem sem;
std::array<std::thread, nthreads> threads;
for (auto i = 0ull; i < rounds; ++i) {
std::vector<uint64_t> wait_seq;
std::vector<uint64_t> wake_seq;
WaitForAll ready(nthreads); // first nthreads waits, then nthreads posts
for (auto thi = 0ull; thi < nthreads; ++thi) {
threads[thi] = std::thread([&, thi] {
sem.wait(
[&, thi] { wait_seq.push_back(thi), ready.post(); },
[&, thi] { wake_seq.push_back(thi); });
});
}
ready.wait(); // first nthreads waits, then nthreads posts
for (auto thi = 0ull; thi < nthreads; ++thi) {
sem.post();
}
for (auto thi = 0ull; thi < nthreads; ++thi) {
threads[thi].join();
}
EXPECT_EQ(nthreads, wait_seq.size());
EXPECT_EQ(nthreads, wake_seq.size());
switch (wake_policy(sem)) {
case SemaphoreWakePolicy::Fifo:
break;
case SemaphoreWakePolicy::Lifo:
std::reverse(wake_seq.begin(), wake_seq.end());
break;
}
EXPECT_EQ(wait_seq, wake_seq);
}
}
template <typename Sem>
void test_multi_ping_pong() {
constexpr auto const nthreads = 4ull;
constexpr auto const iters = 1ull << 12;
Sem sem;
std::array<std::thread, nthreads> threads;
size_t waits_before = 0;
size_t waits_after = 0;
size_t posts = 0;
for (auto& th : threads) {
th = std::thread([&] {
for (auto i = 0ull; i < iters; ++i) {
sem.wait([&] { ++waits_before; }, [&] { ++waits_after; });
sem.post([&] { ++posts; });
}
});
}
sem.post(); // start the flood
for (auto& thr : threads) {
thr.join();
}
sem.wait();
EXPECT_FALSE(sem.try_wait());
EXPECT_EQ(iters * nthreads, waits_before);
EXPECT_EQ(iters * nthreads, waits_after);
EXPECT_EQ(iters * nthreads, posts);
}
template <typename Sem>
void test_concurrent_split_waiters_posters() {
constexpr auto const nthreads = 4ull;
constexpr auto const iters = 1ull << 12;
Sem sem;
Barrier barrier(nthreads * 2);
std::array<std::thread, nthreads> posters;
std::array<std::thread, nthreads> waiters;
for (auto& th : posters) {
th = std::thread([&] {
barrier.wait();
for (auto i = 0ull; i < iters; ++i) {
if (i % (iters >> 4) == 0) {
std::this_thread::yield();
}
sem.post();
}
});
}
for (auto& th : waiters) {
th = std::thread([&] {
barrier.wait();
for (auto i = 0ull; i < iters; ++i) {
sem.wait();
}
});
}
for (auto& th : posters) {
th.join();
}
for (auto& th : waiters) {
th.join();
}
EXPECT_FALSE(sem.try_wait());
}
} // namespace
class FifoSemaphoreTest : public testing::Test {};
TEST_F(FifoSemaphoreTest, basic) {
test_basic<FifoSemaphore>();
}
TEST_F(FifoSemaphoreTest, wake_policy) {
test_wake_policy<FifoSemaphore>();
}
TEST_F(FifoSemaphoreTest, multi_ping_pong) {
test_multi_ping_pong<FifoSemaphore>();
}
TEST_F(FifoSemaphoreTest, concurrent_split_waiters_posters) {
test_concurrent_split_waiters_posters<FifoSemaphore>();
}
class LifoSemaphoreTest : public testing::Test {};
TEST_F(LifoSemaphoreTest, basic) {
test_basic<LifoSemaphore>();
}
TEST_F(LifoSemaphoreTest, wake_policy) {
test_wake_policy<LifoSemaphore>();
}
TEST_F(LifoSemaphoreTest, multi_ping_pong) {
test_multi_ping_pong<LifoSemaphore>();
}
TEST_F(LifoSemaphoreTest, concurrent_split_waiters_posters) {
test_concurrent_split_waiters_posters<LifoSemaphore>();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment