Commit 7aa24338 authored by Ameya Limaye's avatar Ameya Limaye Committed by Facebook Github Bot

Implement AtomicBatchDispatcher in folly::fibers

Summary:
Implement AtomicBatchDispatcher in folly::fibers
- Details about how to use the added functionality can be found
  in the doc comment for class AtomicBatchDispatcher.

Reviewed By: andriigrynenko

Differential Revision: D4054148

fbshipit-source-id: 090272eeab8c8abb15d5e400e52725853fcfc364
parent e0277a6c
......@@ -515,6 +515,8 @@ if HAVE_BOOST_CONTEXT
nobase_follyinclude_HEADERS += \
fibers/AddTasks.h \
fibers/AddTasks-inl.h \
fibers/AtomicBatchDispatcher.h \
fibers/AtomicBatchDispatcher-inl.h \
fibers/Baton.h \
fibers/Baton-inl.h \
fibers/BatchDispatcher.h \
......
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace folly {
namespace fibers {
template <typename InputT, typename ResultT>
struct AtomicBatchDispatcher<InputT, ResultT>::DispatchBaton {
DispatchBaton(DispatchFunctionT&& dispatchFunction)
: expectedCount_(0), dispatchFunction_(std::move(dispatchFunction)) {}
~DispatchBaton() {
fulfillPromises();
}
void reserve(size_t numEntries) {
optEntries_.reserve(numEntries);
}
void setError(std::string message) {
optErrorMessage_ = std::move(message);
}
void setExpectedCount(size_t expectedCount) {
expectedCount_ = expectedCount;
}
Future<ResultT> getFutureResult(InputT&& input, size_t sequenceNumber) {
if (sequenceNumber >= optEntries_.size()) {
optEntries_.resize(sequenceNumber + 1);
}
folly::Optional<Entry>& optEntry = optEntries_[sequenceNumber];
if (optEntry) {
throw std::logic_error(
"Cannot have multiple inputs with same token sequence number");
}
optEntry = Entry(std::move(input));
return optEntry->promise.getFuture();
}
private:
void setExceptionResults(std::exception_ptr eptr) {
auto exceptionWrapper = exception_wrapper(eptr);
for (auto& optEntry : optEntries_) {
if (optEntry) {
optEntry->promise.setException(exceptionWrapper);
}
}
}
template <typename TException>
void setExceptionResults(
const TException& ex,
std::exception_ptr eptr = std::exception_ptr()) {
auto exceptionWrapper =
eptr ? exception_wrapper(eptr, ex) : exception_wrapper(ex);
for (auto& optEntry : optEntries_) {
if (optEntry) {
optEntry->promise.setException(exceptionWrapper);
}
}
}
void fulfillPromises() {
try {
// If an error message is set, set all promises to exception with message
if (optErrorMessage_) {
auto ex = std::logic_error(*optErrorMessage_);
return setExceptionResults(std::move(ex));
}
// Create inputs vector and validate entries count same as expectedCount_
std::vector<InputT> inputs;
inputs.reserve(expectedCount_);
bool allEntriesFound = (optEntries_.size() == expectedCount_);
if (allEntriesFound) {
for (auto& optEntry : optEntries_) {
if (!optEntry) {
allEntriesFound = false;
break;
}
inputs.emplace_back(std::move(optEntry->input));
}
}
if (!allEntriesFound) {
auto ex = std::logic_error(
"One or more input tokens destroyed before calling dispatch");
return setExceptionResults(std::move(ex));
}
// Call the user provided batch dispatch function to get all results
// and make sure that we have the expected number of results returned
auto results = dispatchFunction_(std::move(inputs));
if (results.size() != expectedCount_) {
auto ex = std::logic_error(
"Unexpected number of results returned from dispatch function");
return setExceptionResults(std::move(ex));
}
// Fulfill the promises with the results from the batch dispatch
for (size_t i = 0; i < expectedCount_; ++i) {
optEntries_[i]->promise.setValue(std::move(results[i]));
}
} catch (const std::exception& ex) {
return setExceptionResults(ex, std::current_exception());
} catch (...) {
return setExceptionResults(std::current_exception());
}
}
struct Entry {
InputT input;
folly::Promise<ResultT> promise;
Entry(Entry&& other) noexcept
: input(std::move(other.input)), promise(std::move(other.promise)) {}
Entry& operator=(Entry&& other) noexcept {
input = std::move(other.input);
promise = std::move(other.promise);
return *this;
}
explicit Entry(InputT&& input) : input(std::move(input)) {}
};
size_t expectedCount_;
DispatchFunctionT dispatchFunction_;
std::vector<folly::Optional<Entry>> optEntries_;
folly::Optional<std::string> optErrorMessage_;
};
template <typename InputT, typename ResultT>
AtomicBatchDispatcher<InputT, ResultT>::Token::Token(
std::shared_ptr<DispatchBaton> baton,
size_t sequenceNumber)
: baton_(std::move(baton)), SEQUENCE_NUMBER(sequenceNumber) {}
template <typename InputT, typename ResultT>
Future<ResultT> AtomicBatchDispatcher<InputT, ResultT>::Token::dispatch(
InputT input) {
auto baton = std::move(baton_);
if (!baton) {
throw std::logic_error(
"Dispatch called more than once on the same Token object");
}
return baton->getFutureResult(std::move(input), SEQUENCE_NUMBER);
}
template <typename InputT, typename ResultT>
AtomicBatchDispatcher<InputT, ResultT>::AtomicBatchDispatcher(
DispatchFunctionT&& dispatchFunc)
: numTokensIssued_(0),
baton_(std::make_shared<DispatchBaton>(std::move(dispatchFunc))) {}
template <typename InputT, typename ResultT>
AtomicBatchDispatcher<InputT, ResultT>::~AtomicBatchDispatcher() {
if (baton_) {
baton_->setError(
"AtomicBatchDispatcher destroyed before commit() was called on it");
commit();
}
}
template <typename InputT, typename ResultT>
void AtomicBatchDispatcher<InputT, ResultT>::reserve(size_t numEntries) {
if (!baton_) {
throw std::logic_error("Cannot call reserve(....) after calling commit()");
}
baton_->reserve(numEntries);
}
template <typename InputT, typename ResultT>
auto AtomicBatchDispatcher<InputT, ResultT>::getToken() -> Token {
if (!baton_) {
throw std::logic_error("Cannot issue more tokens after calling commit()");
}
return Token(baton_, numTokensIssued_++);
}
template <typename InputT, typename ResultT>
void AtomicBatchDispatcher<InputT, ResultT>::commit() {
auto baton = std::move(baton_);
if (!baton) {
throw std::logic_error(
"Cannot call commit() more than once on the same dispatcher");
}
baton->setExpectedCount(numTokensIssued_);
}
template <typename InputT, typename ResultT>
AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
size_t initialCapacity) {
auto abd = AtomicBatchDispatcher<InputT, ResultT>(std::move(dispatchFunc));
if (initialCapacity) {
abd.reserve(initialCapacity);
}
return abd;
}
} // namespace fibers
} // manespace folly
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Function.h>
#include <folly/Optional.h>
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <memory>
#include <utility>
#include <vector>
namespace folly {
namespace fibers {
/**
* AtomicBatchDispatcher should be used if you want to process fiber tasks in
* parallel, but require to synchronize them at some point. The canonical
* example is to create a database transaction dispatch round. This API notably
* enforces that all tasks in the batch have reached the synchronization point
* before the user provided dispatch function is called with all the inputs
* provided in one function call. It also provides a guarantee that the inputs
* in the vector of inputs passed to the user provided dispatch function will be
* in the same order as the order in which the token for the job was issued.
*
* Use this when you want all the inputs in the batch to be processed by a
* single function call to the user provided dispatch function.
* The user provided dispatch function takes a vector of InputT as input and
* returns a vector of ResultT.
* To use an AtomicBatchDispatcher, create it by providing a dispatch function:
* TO EITHER the constructor of the AtomicBatchDispatcher class
* (can call reserve method on the dispatcher to reserve space (for number of
* inputs expected)),
* OR the createAtomicBatchDispatcher function in folly::fibers namespace
* (optionally specify an initial capacity (for number of inputs expected)).
* The AtomicBatchDispatcher object created using this call (dispatcher),
* is the only object that can issue tokens (Token objects) that are used to
* add an input to the batch. A single Token is issued when the user calls
* the getToken function on the dispatcher.
* Token objects cannot be copied (can only be moved). User can call the public
* dispatch function on the Token providing a single input value. The dispatch
* function returns a folly::Future<ResultT> value that the user can then wait
* on to obtain a ResultT value. The ResultT value will only be available once
* the dispatch function has been called on all the Tokens in the batch and the
* user has called dispatcher.commit() to indicate no more batched transactions
* are to be added.
* User code pertaining to a task can be run between the point where a token for
* the task has been issued and before calling the dispatch function on the
* token. Since this code can potentially throw, the token issued for a task
* should be moved into this processing code in such a way that if an exception
* is thrown and then handled, the token object for the task is destroyed.
* The batch query dispatcher will wait until all tokens have either been
* destroyed or have had the dispatch function called on them. Leaking an
* issued token will cause the batch dispatch to wait forever to happen.
*
* The AtomicBatchDispatcher object is referred to as the dispatcher below.
*
* POSSIBLE ERRORS:
* 1) The dispatcher is destroyed before calling commit on it, for example
* because the user forgot to call commit OR an exception was thrown
* in user code before the call to commit:
* - The future ResultT has an exception of type std::logic_error set for all
* tokens that were issued by the dispatcher (once all tokens are either
* destroyed or have called dispatch)
* 2) Calling the dispatch function more than once on the same Token object
* (or a moved version of the same Token):
* - Subsequent calls to dispatch (after the first one) will throw an
* std::logic_error exception (the batch itself will not have any errors
* and will get processed)
* 3) One/more of the Tokens issued are destroyed before calling dispatch on
* it/them:
* - The future ResultT has an exception of type std::logic_error set for all
* tokens that were issued by the dispatcher (once all tokens are either
* destroyed or have called dispatch)
* 4) dispatcher.getToken() is called after calling dispatcher.commit()
* - the call to getToken() will throw an std::logic_error exception
* (the batch itself will not have any errors and will get processed).
* 5) All tokens were issued and called dispatch, the user provided batch
* dispatch function is called, but that function throws any exception.
* - The future ResultT has exception for all tokens that were issued by
* the dispatcher. The result will contain the wrapped user exception.
*
* EXAMPLE (There are other ways to achieve this, but this is one example):
* - User creates an AtomicBatchDispatcher on stack
* auto dispatcher =
* folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
* - User creates "count" number of token objects by calling "getToken" count
* number of times
* std::vector<Job> jobs;
* for (size_t i = 0; i < count; ++i) {
* auto token = dispatcher.getToken();
* jobs.push_back(Job(std::move(token), singleInputValueToProcess);
* }
* - User calls commit() on the dispatcher to indicate that no new tokens will
* be issued for this batch
* dispatcher.commit();
* - Use any single threaded executor that will process the jobs
* - On each execution (fiber) preprocess a single "Job" that has been moved in
* from the original vector "jobs". This way if the preprocessing throws
* the Job object being processed is destroyed and so is the token.
* - On each execution (fiber) call the dispatch on the token
* auto future = job.token.dispatch(job.input);
* - Save the future returned so that eventually you can wait on the results
* ResultT result;
* try {
* result = future.value();
* // future.hasValue() is true
* } catch (...) {
* // future.hasException() is true
* <DO WHATEVER YOU WANT IN CASE OF ERROR> }
* }
*
* NOTES:
* - AtomicBatchDispatcher is not thread safe.
* - Works for executors that run tasks on a single thread.
*/
template <typename InputT, typename ResultT>
class AtomicBatchDispatcher {
private:
struct DispatchBaton;
friend struct DispatchBaton;
public:
using DispatchFunctionT =
folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
class Token {
public:
explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
Future<ResultT> dispatch(InputT input);
// Allow moving a Token object
Token(Token&&) = default;
Token& operator=(Token&&) = default;
private:
// Disallow copying a Token object
Token(const Token&) = delete;
Token& operator=(const Token&) = delete;
std::shared_ptr<DispatchBaton> baton_;
const size_t SEQUENCE_NUMBER;
};
explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
~AtomicBatchDispatcher();
// numEntries is a *hint* about the number of inputs to expect:
// - It is used purely to reserve space for storing vector of inputs etc.,
// so that reeallocation and move copy are reduced / not needed.
// - It is provided purely for performance reasons
void reserve(size_t numEntries);
Token getToken();
void commit();
// Allow moving an AtomicBatchDispatcher object
AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
private:
// Disallow copying an AtomicBatchDispatcher object
AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
size_t numTokensIssued_;
std::shared_ptr<DispatchBaton> baton_;
};
// initialCapacity is a *hint* about the number of inputs to expect:
// - It is used purely to reserve space for storing vector of inputs etc.,
// so that reeallocation and move copy are reduced / not needed.
// - It is provided purely for performance reasons
template <typename InputT, typename ResultT>
AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
size_t initialCapacity = 0);
} // namespace fibers
} // namespace folly
#include <folly/fibers/AtomicBatchDispatcher-inl.h>
......@@ -18,10 +18,12 @@
#include <vector>
#include <folly/Memory.h>
#include <folly/Random.h>
#include <folly/futures/Future.h>
#include <folly/Conv.h>
#include <folly/fibers/AddTasks.h>
#include <folly/fibers/AtomicBatchDispatcher.h>
#include <folly/fibers/BatchDispatcher.h>
#include <folly/fibers/EventBaseLoopController.h>
#include <folly/fibers/FiberManager.h>
......@@ -1753,6 +1755,292 @@ TEST(FiberManager, batchDispatchExceptionHandlingTest) {
evb.loop();
}
namespace AtomicBatchDispatcherTesting {
using ValueT = size_t;
using ResultT = std::string;
using DispatchFunctionT =
folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
#define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
#if ENABLE_TRACE_IN_TEST
#define OUTPUT_TRACE std::cerr
#else // ENABLE_TRACE_IN_TEST
struct DevNullPiper {
template <typename T>
DevNullPiper& operator<<(const T&) {
return *this;
}
DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
return *this;
}
} devNullPiper;
#define OUTPUT_TRACE devNullPiper
#endif // ENABLE_TRACE_IN_TEST
struct Job {
AtomicBatchDispatcher<ValueT, ResultT>::Token token;
ValueT input;
void preprocess(FiberManager& executor, bool die) {
// Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
clock_t msecToDoIO = folly::Random::rand32() % 10;
double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
double endAfter = start + msecToDoIO;
while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
executor.yield();
}
if (die) {
throw std::logic_error("Simulating preprocessing failure");
}
}
Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
: token(std::move(t)), input(i) {}
Job(Job&&) = default;
Job& operator=(Job&&) = default;
};
ResultT processSingleInput(ValueT&& input) {
return folly::to<ResultT>(std::move(input));
}
std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
size_t expectedCount = inputs.size();
std::vector<ResultT> results;
results.reserve(expectedCount);
for (size_t i = 0; i < expectedCount; ++i) {
results.emplace_back(processSingleInput(std::move(inputs[i])));
}
return results;
}
void createJobs(
AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
std::vector<Job>& jobs,
size_t count) {
jobs.clear();
for (size_t i = 0; i < count; ++i) {
jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
}
}
enum class DispatchProblem {
None,
PreprocessThrows,
DuplicateDispatch,
};
void dispatchJobs(
FiberManager& executor,
std::vector<Job>& jobs,
std::vector<folly::Optional<folly::Future<ResultT>>>& results,
DispatchProblem dispatchProblem = DispatchProblem::None,
size_t problemIndex = size_t(-1)) {
EXPECT_TRUE(
dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
results.clear();
results.resize(jobs.size());
for (size_t i = 0; i < jobs.size(); ++i) {
executor.add(
[i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
try {
Job job(std::move(jobs[i]));
if (dispatchProblem == DispatchProblem::PreprocessThrows) {
if (i == problemIndex) {
EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
return;
}
}
job.preprocess(executor, false);
OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
results[i] = job.token.dispatch(job.input);
OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
if (i == problemIndex) {
EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
}
}
} catch (...) {
OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
}
});
}
}
void validateResult(
std::vector<folly::Optional<folly::Future<ResultT>>>& results,
size_t i) {
try {
OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
<< std::endl;
} catch (std::exception& e) {
OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
throw;
}
}
template <typename TException>
void validateResults(
std::vector<folly::Optional<folly::Future<ResultT>>>& results,
size_t expectedNumResults) {
size_t numResultsFilled = 0;
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {
continue;
}
++numResultsFilled;
EXPECT_THROW(validateResult(results, i), TException);
}
EXPECT_EQ(numResultsFilled, expectedNumResults);
}
void validateResults(
std::vector<folly::Optional<folly::Future<ResultT>>>& results,
size_t expectedNumResults) {
size_t numResultsFilled = 0;
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {
continue;
}
++numResultsFilled;
EXPECT_NO_THROW(validateResult(results, i));
ValueT expectedInput = i;
EXPECT_EQ(
results[i]->value(), processSingleInput(std::move(expectedInput)));
}
EXPECT_EQ(numResultsFilled, expectedNumResults);
}
} // AtomicBatchDispatcherTesting
#define SET_UP_TEST_FUNC \
using namespace AtomicBatchDispatcherTesting; \
folly::EventBase evb; \
auto& executor = getFiberManager(evb); \
const size_t COUNT = 11; \
std::vector<Job> jobs; \
jobs.reserve(COUNT); \
std::vector<folly::Optional<folly::Future<ResultT>>> results; \
results.reserve(COUNT); \
DispatchFunctionT dispatchFunc
TEST(FiberManager, ABD_Test) {
SET_UP_TEST_FUNC;
//
// Testing AtomicBatchDispatcher with explicit call to commit()
//
dispatchFunc = userDispatchFunc;
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
dispatchJobs(executor, jobs, results);
atomicBatchDispatcher.commit();
evb.loop();
validateResults(results, COUNT);
}
TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
SET_UP_TEST_FUNC;
//
// Testing AtomicBatchDispatcher destroyed before calling commit.
// Handles error cases for:
// - User might have forgotten to add the call to commit() in the code
// - An unexpected exception got thrown in user code before commit() is called
//
try {
dispatchFunc = userDispatchFunc;
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
dispatchJobs(executor, jobs, results);
throw std::runtime_error(
"Unexpected exception in user code before commit called");
atomicBatchDispatcher.commit();
} catch (...) {
/* User code handles the exception and does not exit process */
}
evb.loop();
validateResults<std::logic_error>(results, COUNT);
}
TEST(FiberManager, ABD_PreprocessingFailureTest) {
SET_UP_TEST_FUNC;
//
// Testing preprocessing failure on a job throws
//
dispatchFunc = userDispatchFunc;
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
atomicBatchDispatcher.commit();
evb.loop();
validateResults<std::logic_error>(results, COUNT - 1);
}
TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
SET_UP_TEST_FUNC;
//
// Testing that calling dispatch more than once on the same token throws
//
dispatchFunc = userDispatchFunc;
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
atomicBatchDispatcher.commit();
evb.loop();
}
TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
SET_UP_TEST_FUNC;
//
// Testing that exception set on attempt to call getToken after commit called
//
dispatchFunc = userDispatchFunc;
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
atomicBatchDispatcher.commit();
EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
dispatchJobs(executor, jobs, results);
EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
evb.loop();
validateResults(results, COUNT);
EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
}
TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
SET_UP_TEST_FUNC;
//
// Testing that exception is set if user provided batch dispatch throws
//
dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
auto results = userDispatchFunc(std::move(inputs));
throw std::runtime_error("Unexpected exception in user dispatch function");
return results;
};
auto atomicBatchDispatcher =
createAtomicBatchDispatcher(std::move(dispatchFunc));
createJobs(atomicBatchDispatcher, jobs, COUNT);
dispatchJobs(executor, jobs, results);
atomicBatchDispatcher.commit();
evb.loop();
validateResults<std::runtime_error>(results, COUNT);
}
/**
* Test that we can properly track fiber stack usage.
*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment