Commit 8dc220b9 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot

Make EventBase enqueue noexcept

Summary:
[Folly] Make `EventBase` enqueue `noexcept`.

It cannot really fail anyway in correct usage besides allocation failure, unless in the `EventBase` destructor and while draining and the `AlwaysEnqueue` variant is called.

Theoretically if a caller attempts to enqueue concurrently with `EventBase` dtor while in `consumeUntilDrained`, but either *not* in the `EventBase` thread or in the `EventBase` thread and using the `AlwaysEnqueue` variant, there is a race which can lead to termination.

Reviewed By: andriigrynenko

Differential Revision: D14114678

fbshipit-source-id: 9a0128d207f86ca34eb8a1d417766c095ed5e137
parent baeb381c
......@@ -108,10 +108,7 @@ void IOThreadPoolExecutor::add(
};
ioThread->pendingTasks++;
if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
ioThread->pendingTasks--;
throw std::runtime_error("Unable to run func in event base thread");
}
ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc));
}
std::shared_ptr<IOThreadPoolExecutor::IOThread>
......
......@@ -137,20 +137,8 @@ Future<Unit> ThreadWheelTimekeeper::after(Duration dur) {
// callback has either been executed, or will never be executed. So we are
// fine here.
//
if (!eventBase_.runInEventBaseThread(
[this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); })) {
// Release promise to break the circular reference. Because if
// scheduleTimeout fails, there is nothing to *promise*. Internally
// Core would automatically set an exception result when Promise is
// destructed before fulfilling.
// This is either called from EventBase thread, or here.
// They are somewhat racy but given the rare chance this could fail,
// I don't see it is introducing any problem yet.
auto promise = cob->stealPromise();
if (!promise.isFulfilled()) {
promise.setException(FutureNoTimekeeper{});
}
}
eventBase_.runInEventBaseThread(
[this, cob, dur] { wheelTimer_->scheduleTimeout(cob.get(), dur); });
return f;
}
......
......@@ -63,29 +63,19 @@ void AsyncServerSocket::RemoteAcceptor::start(
setMaxReadAtOnce(maxAtOnce);
queue_.setMaxQueueSize(maxInQueue);
if (!eventBase->runInEventBaseThread([=]() {
callback_->acceptStarted();
this->startConsuming(eventBase, &queue_);
})) {
throw std::invalid_argument(
"unable to start waiting on accept "
"notification queue in the specified "
"EventBase thread");
}
eventBase->runInEventBaseThread([=]() {
callback_->acceptStarted();
this->startConsuming(eventBase, &queue_);
});
}
void AsyncServerSocket::RemoteAcceptor::stop(
EventBase* eventBase,
AcceptCallback* callback) {
if (!eventBase->runInEventBaseThread([=]() {
callback->acceptStopped();
delete this;
})) {
throw std::invalid_argument(
"unable to start waiting on accept "
"notification queue in the specified "
"EventBase thread");
}
eventBase->runInEventBaseThread([=]() {
callback->acceptStopped();
delete this;
});
}
void AsyncServerSocket::RemoteAcceptor::messageAvailable(
......
......@@ -565,61 +565,45 @@ void EventBase::runBeforeLoop(LoopCallback* callback) {
runBeforeLoopCallbacks_.push_back(*callback);
}
bool EventBase::runInEventBaseThread(Func fn) {
void EventBase::runInEventBaseThread(Func fn) noexcept {
// Send the message.
// It will be received by the FunctionRunner in the EventBase's thread.
// We try not to schedule nullptr callbacks
if (!fn) {
LOG(ERROR) << "EventBase " << this
<< ": Scheduling nullptr callbacks is not allowed";
return false;
DLOG(FATAL) << "EventBase " << this
<< ": Scheduling nullptr callbacks is not allowed";
return;
}
// Short-circuit if we are already in our event base
if (inRunningEventBaseThread()) {
runInLoop(std::move(fn));
return true;
}
try {
queue_->putMessage(std::move(fn));
} catch (const std::exception& ex) {
LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
<< "for EventBase thread: " << ex.what();
return false;
return;
}
return true;
queue_->putMessage(std::move(fn));
}
bool EventBase::runInEventBaseThreadAlwaysEnqueue(Func fn) {
void EventBase::runInEventBaseThreadAlwaysEnqueue(Func fn) noexcept {
// Send the message.
// It will be received by the FunctionRunner in the EventBase's thread.
// We try not to schedule nullptr callbacks
if (!fn) {
LOG(ERROR) << "EventBase " << this
<< ": Scheduling nullptr callbacks is not allowed";
return false;
}
try {
queue_->putMessage(std::move(fn));
} catch (const std::exception& ex) {
LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
<< "for EventBase thread: " << ex.what();
return false;
LOG(DFATAL) << "EventBase " << this
<< ": Scheduling nullptr callbacks is not allowed";
return;
}
return true;
queue_->putMessage(std::move(fn));
}
bool EventBase::runInEventBaseThreadAndWait(Func fn) {
void EventBase::runInEventBaseThreadAndWait(Func fn) noexcept {
if (inRunningEventBaseThread()) {
LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed";
return false;
LOG(DFATAL) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed";
return;
}
Baton<> ready;
......@@ -632,16 +616,13 @@ bool EventBase::runInEventBaseThreadAndWait(Func fn) {
copy(std::move(fn))();
});
ready.wait();
return true;
}
bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept {
if (isInEventBaseThread()) {
fn();
return true;
} else {
return runInEventBaseThreadAndWait(std::move(fn));
runInEventBaseThreadAndWait(std::move(fn));
}
}
......
......@@ -469,10 +469,9 @@ class EventBase : public TimeoutManager,
* running, the function call will be delayed until the next time the loop is
* started.
*
* If runInEventBaseThread() returns true the function has successfully been
* scheduled to run in the loop thread. However, if the loop is terminated
* (and never later restarted) before it has a chance to run the requested
* function, the function will be run upon the EventBase's destruction.
* If the loop is terminated (and never later restarted) before it has a
* chance to run the requested function, the function will be run upon the
* EventBase's destruction.
*
* If two calls to runInEventBaseThread() are made from the same thread, the
* functions will always be run in the order that they were scheduled.
......@@ -482,12 +481,9 @@ class EventBase : public TimeoutManager,
* @param fn The function to run. The function must not throw any
* exceptions.
* @param arg An argument to pass to the function.
*
* @return Returns true if the function was successfully scheduled, or false
* if there was an error scheduling the function.
*/
template <typename T>
bool runInEventBaseThread(void (*fn)(T*), T* arg);
void runInEventBaseThread(void (*fn)(T*), T* arg) noexcept;
/**
* Run the specified function in the EventBase's thread
......@@ -504,7 +500,7 @@ class EventBase : public TimeoutManager,
*
* The function must not throw any exceptions.
*/
bool runInEventBaseThread(Func fn);
void runInEventBaseThread(Func fn) noexcept;
/**
* Run the specified function in the EventBase's thread.
......@@ -515,11 +511,9 @@ class EventBase : public TimeoutManager,
* not running, the function call will be delayed until the next time the loop
* is started.
*
* If runInEventBaseThreadAlwaysEnqueue() returns true the function has
* successfully been scheduled to run in the loop thread. However, if the
* loop is terminated (and never later restarted) before it has a chance to
* run the requested function, the function will be run upon the EventBase's
* destruction.
* If the loop is terminated (and never later restarted) before it has a
* chance to run the requested function, the function will be run upon the
* EventBase's destruction.
*
* If two calls to runInEventBaseThreadAlwaysEnqueue() are made from the same
* thread, the functions will always be run in the order that they were
......@@ -531,12 +525,9 @@ class EventBase : public TimeoutManager,
* @param fn The function to run. The function must not throw any
* exceptions.
* @param arg An argument to pass to the function.
*
* @return Returns true if the function was successfully scheduled, or false
* if there was an error scheduling the function.
*/
template <typename T>
bool runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg);
void runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg) noexcept;
/**
* Run the specified function in the EventBase's thread
......@@ -553,33 +544,35 @@ class EventBase : public TimeoutManager,
*
* The function must not throw any exceptions.
*/
bool runInEventBaseThreadAlwaysEnqueue(Func fn);
void runInEventBaseThreadAlwaysEnqueue(Func fn) noexcept;
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
template <typename T>
bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
void runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) noexcept;
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
bool runInEventBaseThreadAndWait(Func fn);
void runInEventBaseThreadAndWait(Func fn) noexcept;
/*
* Like runInEventBaseThreadAndWait, except if the caller is already in the
* event base thread, the functor is simply run inline.
*/
template <typename T>
bool runImmediatelyOrRunInEventBaseThreadAndWait(void (*fn)(T*), T* arg);
void runImmediatelyOrRunInEventBaseThreadAndWait(
void (*fn)(T*),
T* arg) noexcept;
/*
* Like runInEventBaseThreadAndWait, except if the caller is already in the
* event base thread, the functor is simply run inline.
*/
bool runImmediatelyOrRunInEventBaseThreadAndWait(Func fn);
void runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) noexcept;
/**
* Set the maximum desired latency in us and provide a callback which will be
......@@ -916,24 +909,26 @@ class EventBase : public TimeoutManager,
};
template <typename T>
bool EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) {
void EventBase::runInEventBaseThread(void (*fn)(T*), T* arg) noexcept {
return runInEventBaseThread([=] { fn(arg); });
}
template <typename T>
bool EventBase::runInEventBaseThreadAlwaysEnqueue(void (*fn)(T*), T* arg) {
void EventBase::runInEventBaseThreadAlwaysEnqueue(
void (*fn)(T*),
T* arg) noexcept {
return runInEventBaseThreadAlwaysEnqueue([=] { fn(arg); });
}
template <typename T>
bool EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
void EventBase::runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) noexcept {
return runInEventBaseThreadAndWait([=] { fn(arg); });
}
template <typename T>
bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
void EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(
void (*fn)(T*),
T* arg) {
T* arg) noexcept {
return runImmediatelyOrRunInEventBaseThreadAndWait([=] { fn(arg); });
}
......
......@@ -21,7 +21,7 @@ VirtualEventBase::VirtualEventBase(EventBase& evb)
: evb_(getKeepAliveToken(evb)) {}
std::future<void> VirtualEventBase::destroy() {
CHECK(evb_->runInEventBaseThread([this] { loopKeepAlive_.reset(); }));
evb_->runInEventBaseThread([this] { loopKeepAlive_.reset(); });
return std::move(destroyFuture_);
}
......
......@@ -71,13 +71,12 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
* @see EventBase::runInEventBaseThread
*/
template <typename F>
void runInEventBaseThread(F&& f) {
void runInEventBaseThread(F&& f) noexcept {
// KeepAlive token has to be released in the EventBase thread. If
// runInEventBaseThread() fails, we can't extract the KeepAlive token
// from the callback to properly release it.
CHECK(evb_->runInEventBaseThread(
[keepAliveToken = getKeepAliveToken(this),
f = std::forward<F>(f)]() mutable { f(); }));
evb_->runInEventBaseThread([keepAliveToken = getKeepAliveToken(this),
f = std::forward<F>(f)]() mutable { f(); });
}
HHWheelTimer& timer() {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment