Commit 6958370b authored by Eric Sun's avatar Eric Sun Committed by Facebook GitHub Bot

Implement BatchSemaphore which is able to accept batch tokens

Summary: In addition to `folly::fibers::Semaphore` which only accepts single token increment/decrement. `folly::fibers::BatchSemaphore` is able to accept batch tokens

Differential Revision: D21874795

fbshipit-source-id: 1d4c62bdb079c058529cd917078d81d09d585084
parent 93cbddb8
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/fibers/BatchSemaphore.h>
namespace folly {
namespace fibers {
void BatchSemaphore::signal(int64_t tokens) {
auto oldVal = tokens_.load(std::memory_order_acquire);
do {
if (signalSlow(tokens, oldVal)) {
return;
}
oldVal = tokens_.load(std::memory_order_acquire);
} while (!tokens_.compare_exchange_weak(
oldVal,
oldVal + tokens,
std::memory_order_release,
std::memory_order_acquire));
}
void BatchSemaphore::wait(int64_t tokens) {
wait_common(tokens);
}
bool BatchSemaphore::try_wait(Waiter& waiter, int64_t tokens) {
return try_wait_common(waiter, tokens);
}
#if FOLLY_HAS_COROUTINES
coro::Task<void> BatchSemaphore::co_wait(int64_t tokens) {
co_await co_wait_common(tokens);
}
#endif
#if FOLLY_FUTURE_USING_FIBER
SemiFuture<Unit> BatchSemaphore::future_wait(int64_t tokens) {
return future_wait_common(tokens);
}
#endif
} // namespace fibers
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/fibers/SemaphoreBase.h>
namespace folly {
namespace fibers {
/*
* Fiber-compatible batch semaphore with ability to perform batch token
* increment/decrement. Will safely block fibers that wait when no tokens are
* available and wake fibers when signalled.
*/
class BatchSemaphore : public SemaphoreBase {
public:
explicit BatchSemaphore(size_t tokenCount) : SemaphoreBase{tokenCount} {}
BatchSemaphore(const BatchSemaphore&) = delete;
BatchSemaphore(BatchSemaphore&&) = delete;
BatchSemaphore& operator=(const BatchSemaphore&) = delete;
BatchSemaphore& operator=(BatchSemaphore&&) = delete;
/*
* Release requested tokens in the semaphore. Signal the waiter if necessary.
*/
void signal(int64_t tokens);
/*
* Wait for requested tokens in the semaphore.
*/
void wait(int64_t tokens);
/**
* Try to wait on the semaphore.
* Return true on success.
* On failure, the passed waiter is enqueued, its baton will be posted once
* semaphore has requested tokens. Caller is responsible to wait then signal.
*/
bool try_wait(Waiter& waiter, int64_t tokens);
#if FOLLY_HAS_COROUTINES
/*
* Wait for requested tokens in the semaphore.
*
* Note that this wait-operation can be cancelled by requesting cancellation
* on the awaiting coroutine's associated CancellationToken.
* If the operation is successfully cancelled then it will complete with
* an error of type folly::OperationCancelled.
*
* Note that requesting cancellation of the operation will only have an
* effect if the operation does not complete synchronously (ie. was not
* already in a signalled state).
*
* If the semaphore was already in a signalled state prior to awaiting the
* returned Task then the operation will complete successfully regardless
* of whether cancellation was requested.
*/
coro::Task<void> co_wait(int64_t tokens);
#endif
#if FOLLY_FUTURE_USING_FIBER
/*
* Wait for requested tokens in the semaphore.
*/
SemiFuture<Unit> future_wait(int64_t tokens);
#endif
};
} // namespace fibers
} // namespace folly
......@@ -28,6 +28,7 @@
#include <folly/fibers/AddTasks.h>
#include <folly/fibers/AtomicBatchDispatcher.h>
#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/BatchSemaphore.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/ExecutorLoopController.h>
#include <folly/fibers/FiberManager.h>
......@@ -1780,6 +1781,84 @@ TEST(FiberManager, semaphore) {
}
}
TEST(FiberManager, batchSemaphore) {
static constexpr size_t kTasks = 10;
static constexpr size_t kIterations = 10000;
static constexpr size_t kNumTokens = 60;
static constexpr size_t kNumThreads = 16;
BatchSemaphore sem(kNumTokens);
struct Worker {
explicit Worker(BatchSemaphore& s) : sem(s), t([&] { run(); }) {}
void run() {
FiberManager manager(std::make_unique<EventBaseLoopController>());
folly::EventBase evb;
dynamic_cast<EventBaseLoopController&>(manager.loopController())
.attachEventBase(evb);
{
std::shared_ptr<folly::EventBase> completionCounter(
&evb, [](folly::EventBase* evb_) { evb_->terminateLoopSoon(); });
for (size_t i = 0; i < kTasks; ++i) {
manager.addTask([&, completionCounter]() {
for (size_t j = 0; j < kIterations; ++j) {
int tokens = j % 3 + 1;
switch (j % 3) {
case 0:
sem.wait(tokens);
break;
case 1:
#if FOLLY_FUTURE_USING_FIBER
sem.future_wait(tokens).get();
#else
sem.wait(tokens);
#endif
break;
case 2: {
BatchSemaphore::Waiter waiter{tokens};
bool acquired = sem.try_wait(waiter, tokens);
if (!acquired) {
waiter.baton.wait();
}
break;
}
}
counter += tokens;
sem.signal(tokens);
counter -= tokens;
EXPECT_LT(counter, kNumTokens);
EXPECT_GE(counter, 0);
}
});
}
}
evb.loopForever();
}
BatchSemaphore& sem;
int counter{0};
std::thread t;
};
std::vector<Worker> workers;
workers.reserve(kNumThreads);
for (size_t i = 0; i < kNumThreads; ++i) {
workers.emplace_back(sem);
}
for (auto& worker : workers) {
worker.t.join();
}
for (auto& worker : workers) {
EXPECT_EQ(0, worker.counter);
}
}
template <typename ExecutorT>
void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment