Commit dc7ba0b5 authored by Emanuele Altieri's avatar Emanuele Altieri Committed by Facebook GitHub Bot

The Latch synchronization class

Summary:
Similar to std::latch (C++20) but with timed waits:
https://en.cppreference.com/w/cpp/thread/latch

The latch class is a downward counter which can be used to synchronize
threads. The value of the counter is initialized on creation. Threads may
block on the latch until the counter is decremented to zero. There is no
possibility to increase or reset the counter, which makes the latch a
single-use barrier.

Example:

  const int N = 32;
  folly::Latch latch(N);
  std::vector<std::thread> threads;
  for (int i = 0; i < N; i++) {
    threads.emplace_back([&] {
      do_some_work();
      latch.count_down();
    });
  }
  latch.wait();

A latch can be used to easily wait for mocked async methods in tests:

  ACTION_P(DecrementLatchImpl, latch) {
    latch.count_down();
  }
  constexpr auto DecrementLatch = DecrementLatchImpl<folly::Latch&>;

  class MockableObject {
   public:
    MOCK_METHOD(void, someAsyncEvent, ());
  };

  TEST(TestSuite, TestFeature) {
    MockableObject mockObjA;
    MockableObject mockObjB;

    folly::Latch latch(5);

    EXPECT_CALL(mockObjA, someAsyncEvent())
        .Times(2)
        .WillRepeatedly(DecrementLatch(latch)); // called 2 times

    EXPECT_CALL(mockObjB, someAsyncEvent())
        .Times(3)
        .WillRepeatedly(DecrementLatch(latch)); // called 3 times

    // trigger async events
    // ...

    EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
  }

Reviewed By: yfeldblum

Differential Revision: D28951720

fbshipit-source-id: 6a9e20ad925a38d1cdb0134eedad826771bef3e0
parent ddcb93e0
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <folly/CPortability.h>
#include <folly/Likely.h>
#include <folly/lang/Exception.h>
#include <folly/synchronization/SaturatingSemaphore.h>
namespace folly {
/// Similar to std::latch (C++20) but with timed waits.
///
/// The latch class is a downward counter which can be used to synchronize
/// threads. The value of the counter is initialized on creation. Threads may
/// block on the latch until the counter is decremented to zero. There is no
/// possibility to increase or reset the counter, which makes the latch a
/// single-use barrier.
//
/// Example:
///
/// const int N = 32;
/// folly::Latch latch(N);
/// std::vector<std::thread> threads;
/// for (int i = 0; i < N; i++) {
/// threads.emplace_back([&] {
/// do_some_work();
/// latch.count_down();
/// });
/// }
/// latch.wait();
///
/// A latch can be used to easily wait for mocked async methods in tests:
///
/// ACTION_P(DecrementLatchImpl, latch) {
/// latch.count_down();
/// }
/// constexpr auto DecrementLatch = DecrementLatchImpl<folly::Latch&>;
///
/// class MockableObject {
/// public:
/// MOCK_METHOD(void, someAsyncEvent, ());
/// };
///
/// TEST(TestSuite, TestFeature) {
/// MockableObject mockObjA;
/// MockableObject mockObjB;
///
/// folly::Latch latch(5);
///
/// EXPECT_CALL(mockObjA, someAsyncEvent())
/// .Times(2)
/// .WillRepeatedly(DecrementLatch(latch)); // called 2 times
///
/// EXPECT_CALL(mockObjB, someAsyncEvent())
/// .Times(3)
/// .WillRepeatedly(DecrementLatch(latch)); // called 3 times
///
/// // trigger async events
/// // ...
///
/// EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
/// }
class Latch {
public:
/// The maximum value of counter supported by this implementation
static constexpr ptrdiff_t max() noexcept {
return std::numeric_limits<int32_t>::max();
}
/// Constructs a latch and initializes its internal counter.
constexpr explicit Latch(ptrdiff_t expected) : count_(expected) {
terminate_if(expected < 0 || expected > max());
if (expected == 0) {
semaphore_.post();
}
}
/// Atomically decrements the internal counter by `n` without blocking the
/// caller.
FOLLY_ALWAYS_INLINE void count_down(ptrdiff_t n = 1) noexcept {
terminate_if(n < 0 || n > max());
if (FOLLY_LIKELY(n)) {
const auto count = count_.fetch_sub(n, std::memory_order_relaxed);
terminate_if(count < n);
if (FOLLY_UNLIKELY(count == n)) {
semaphore_.post();
}
}
}
/// Returns true only if the internal counter has reached zero. The function
/// does not block.
FOLLY_ALWAYS_INLINE bool try_wait() noexcept { return semaphore_.try_wait(); }
/// Wait until the internal counter reaches zero, or until the given `timeout`
/// expires. Returns true if the internal counter reached zero before the
/// period expired, otherwise false.
template <typename Rep, typename Period>
FOLLY_ALWAYS_INLINE bool try_wait_for(
const std::chrono::duration<Rep, Period>& timeout) noexcept {
return semaphore_.try_wait_for(timeout);
}
/// Wait until the internal counter reaches zero, or until the given
/// `deadline` expires. Returns true if the internal counter reached zero
/// before the deadline expired, otherwise false.
template <typename Clock, typename Duration>
FOLLY_ALWAYS_INLINE bool try_wait_until(
const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
return semaphore_.try_wait_until(deadline);
}
/// Equivalent to try_wait(), but available on const receivers.
FOLLY_ALWAYS_INLINE bool ready() const noexcept { return semaphore_.ready(); }
/// Wait until the internal counter reaches zero, indefinitely.
FOLLY_ALWAYS_INLINE void wait() noexcept { semaphore_.wait(); }
/// Atomically decrement the internal counter by `n` and wait until the
/// internal counter reaches zero, indefinitely. Equivalent to `count_down()`
/// followed by a `wait()`.
FOLLY_ALWAYS_INLINE void arrive_and_wait(ptrdiff_t n = 1) noexcept {
count_down(n);
wait();
}
private:
FOLLY_ALWAYS_INLINE void terminate_if(bool cond) noexcept {
if (cond) {
folly::terminate_with<std::invalid_argument>(
"argument outside expected range");
}
}
std::atomic<int32_t> count_;
SaturatingSemaphore</* MayBlock = */ true> semaphore_;
};
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <folly/portability/GTest.h>
#include <folly/synchronization/Latch.h>
TEST(LatchTest, ExpectedZero) {
folly::Latch latch(0);
EXPECT_TRUE(latch.try_wait());
}
TEST(LatchTest, ExpectedOne) {
folly::Latch latch(1);
EXPECT_FALSE(latch.try_wait());
latch.count_down();
EXPECT_TRUE(latch.try_wait());
}
TEST(LatchTest, ExpectedMax) {
const auto max = folly::Latch::max();
folly::Latch latch(max);
EXPECT_FALSE(latch.try_wait());
latch.count_down(max);
EXPECT_TRUE(latch.try_wait());
}
TEST(LatchTest, WaitFor) {
folly::Latch latch(1);
EXPECT_FALSE(latch.try_wait_for(std::chrono::seconds(1)));
latch.count_down();
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(1)));
}
TEST(LatchTest, WaitUntil) {
folly::Latch latch(1);
EXPECT_FALSE(latch.try_wait_until(
std::chrono::steady_clock::now() + std::chrono::seconds(1)));
latch.count_down();
EXPECT_TRUE(latch.try_wait_until(
std::chrono::steady_clock::now() + std::chrono::seconds(1)));
}
TEST(LatchTest, Ready) {
folly::Latch latch(1);
auto ready = [](const folly::Latch& l) { return l.ready(); };
EXPECT_FALSE(ready(latch));
latch.count_down();
EXPECT_TRUE(ready(latch));
}
TEST(LatchTest, CountDown) {
folly::Latch latch(3);
EXPECT_FALSE(latch.try_wait());
latch.count_down(0); // (noop)
EXPECT_FALSE(latch.try_wait());
latch.count_down();
EXPECT_FALSE(latch.try_wait());
latch.count_down();
EXPECT_FALSE(latch.try_wait());
latch.count_down();
EXPECT_TRUE(latch.try_wait());
}
TEST(LatchTest, CountDownZero) {
folly::Latch latch(0);
EXPECT_TRUE(latch.try_wait());
latch.count_down(0);
EXPECT_TRUE(latch.try_wait());
}
TEST(LatchTest, CountDownN) {
folly::Latch latch(5);
EXPECT_FALSE(latch.try_wait());
latch.count_down(5);
EXPECT_TRUE(latch.try_wait());
}
TEST(LatchTest, CountDownThreads) {
std::atomic_int completed{0};
const int N = 32;
folly::Latch latch(N);
std::vector<std::thread> threads;
for (int i = 0; i < N; i++) {
threads.emplace_back([&] {
completed++;
latch.count_down();
});
}
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
EXPECT_EQ(completed.load(), N);
for (auto& t : threads) {
t.join();
}
}
TEST(LatchTest, CountDownThreadsTwice1) {
std::atomic_int completed{0};
const int N = 32;
folly::Latch latch(N * 2);
std::vector<std::thread> threads;
for (int i = 0; i < N; i++) {
threads.emplace_back([&] {
completed++;
// count_down() multiple times within same thread
latch.count_down();
latch.count_down();
});
}
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
EXPECT_EQ(completed.load(), N);
for (auto& t : threads) {
t.join();
}
}
TEST(LatchTest, CountDownThreadsTwice2) {
std::atomic_int completed{0};
const int N = 32;
folly::Latch latch(N * 2);
std::vector<std::thread> threads;
for (int i = 0; i < N; i++) {
threads.emplace_back([&] {
completed++;
// count_down() multiple times within same thread
latch.count_down(2);
});
}
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
EXPECT_EQ(completed.load(), N);
for (auto& t : threads) {
t.join();
}
}
TEST(LatchTest, CountDownThreadsWait) {
std::atomic_int completed{0};
const int N = 32;
folly::Latch latch(N);
std::vector<std::thread> threads;
for (int i = 0; i < N; i++) {
threads.emplace_back([&] {
completed++;
// count_down() and wait() within thread
latch.count_down();
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
EXPECT_EQ(completed.load(), N);
});
}
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
EXPECT_EQ(completed.load(), N);
for (auto& t : threads) {
t.join();
}
}
TEST(LatchTest, CountDownThreadsArriveAndWait) {
std::atomic_int completed{0};
const int N = 32;
folly::Latch latch(N);
std::vector<std::thread> threads;
for (int i = 0; i < N; i++) {
threads.emplace_back([&] {
completed++;
// count_down() and wait() within thread
latch.arrive_and_wait();
EXPECT_EQ(completed.load(), N);
});
}
EXPECT_TRUE(latch.try_wait_for(std::chrono::seconds(60)));
EXPECT_EQ(completed.load(), N);
for (auto& t : threads) {
t.join();
}
}
TEST(LatchTest, OutOfScopeStillArmed) {
folly::Latch latch(3);
latch.count_down();
// mainly checking for blocking behavior which will result in a test timeout
// (latch should not block in this case)
}
TEST(LatchTest, InvalidInit) {
// latch initialized with a negative value
EXPECT_DEATH(folly::Latch latch(-1), ".*");
// latch initialized with a value bigger than max
const int64_t init = folly::Latch::max() + 1;
EXPECT_DEATH(folly::Latch latch(init), ".*");
}
TEST(LatchTest, InvalidCountDown) {
folly::Latch latch(1);
latch.count_down();
// count_down() called more times than expected
EXPECT_DEATH(latch.count_down(), ".*");
}
TEST(LatchTest, InvalidCountDownN) {
folly::Latch latch(5);
// count_down() called with a bigger value than expected
EXPECT_DEATH(latch.count_down(6), ".*");
}
TEST(LatchTest, InvalidCountDownNegative) {
folly::Latch latch(1);
// count_down() called with a negative value
EXPECT_DEATH(latch.count_down(-1), ".*");
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment