Commit 2f6126e6 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook GitHub Bot

Executor::invokeCatchingExns

Summary: Executors just carry on after a task they are running fails, although they try to log something marginally useful. Deduplicate the logic.

Reviewed By: aary, ot, philippv

Differential Revision: D26745058

fbshipit-source-id: 90fa558c89591b8a2d08e6820388970dee8811be
parent 688c8aaa
......@@ -20,11 +20,23 @@
#include <glog/logging.h>
#include <folly/ExceptionString.h>
#include <folly/Portability.h>
#include <folly/lang/Exception.h>
namespace folly {
void Executor::invokeCatchingExnsLog(
char const* const prefix, std::exception const* const ex) {
auto const message = " threw unhandled ";
if (ex) {
LOG(ERROR) << prefix << message << exceptionStr(*ex);
} else {
auto ep = std::current_exception();
LOG(ERROR) << prefix << message << exceptionStr(ep);
}
}
void Executor::addWithPriority(Func, int8_t /* priority */) {
throw std::runtime_error(
"addWithPriority() is not implemented for this Executor");
......
......@@ -24,6 +24,7 @@
#include <folly/Optional.h>
#include <folly/Range.h>
#include <folly/Utility.h>
#include <folly/lang/Exception.h>
namespace folly {
......@@ -229,6 +230,12 @@ class Executor {
return getKeepAliveToken(&executor);
}
template <typename F>
FOLLY_ERASE static void invokeCatchingExns(char const* prefix, F f) noexcept {
auto h = [&](auto const&... e) { invokeCatchingExnsLog(prefix, &e...); };
catch_exception([&] { catch_exception<std::exception const&>(f, h); }, h);
}
protected:
/**
* Returns true if the KeepAlive is constructed from an executor that does
......@@ -262,6 +269,9 @@ class Executor {
}
private:
static void invokeCatchingExnsLog(
char const* prefix, std::exception const* ex = nullptr);
template <typename ExecutorT>
static KeepAlive<ExecutorT> makeKeepAliveDummy(ExecutorT* executor) {
static_assert(
......
......@@ -369,34 +369,22 @@ void EDFThreadPoolExecutor::threadRun(ThreadPtr thread) {
}
stats.waitTime = startTime - stats.enqueueTime;
try {
task->run(iter);
} catch (const std::exception& e) {
LOG(ERROR) << "EDFThreadPoolExecutor: func threw unhandled "
<< typeid(e).name() << " exception: " << e.what();
} catch (...) {
LOG(ERROR)
<< "EDFThreadPoolExecutor: func threw unhandled non-exception object";
}
invokeCatchingExns("EDFThreadPoolExecutor: func", [&] {
std::exchange(task, {})->run(iter);
});
stats.runTime = std::chrono::steady_clock::now() - startTime;
thread->idle.store(true, std::memory_order_relaxed);
thread->lastActiveTime.store(
std::chrono::steady_clock::now(), std::memory_order_relaxed);
auto& inCallback = *thread->taskStatsCallbacks->inCallback;
thread->taskStatsCallbacks->callbackList.withRLock([&](auto& callbacks) {
*thread->taskStatsCallbacks->inCallback = true;
SCOPE_EXIT { *thread->taskStatsCallbacks->inCallback = false; };
try {
inCallback = true;
SCOPE_EXIT { inCallback = false; };
invokeCatchingExns("EDFThreadPoolExecutor: stats callback", [&] {
for (auto& callback : callbacks) {
callback(stats);
}
} catch (const std::exception& e) {
LOG(ERROR) << "EDFThreadPoolExecutor: task stats callback threw "
"unhandled "
<< typeid(e).name() << " exception: " << e.what();
} catch (...) {
LOG(ERROR) << "EDFThreadPoolExecutor: task stats callback threw "
"unhandled non-exception object";
}
});
});
}
}
......
......@@ -78,17 +78,8 @@ void SerialExecutor::run() {
Task task;
queue_.dequeue(task);
try {
folly::RequestContextScopeGuard ctxGuard(std::move(task.ctx));
auto func = std::move(task.func);
func();
} catch (std::exception const& ex) {
LOG(ERROR) << "SerialExecutor: func threw unhandled exception "
<< folly::exceptionStr(ex);
} catch (...) {
LOG(ERROR) << "SerialExecutor: func threw unhandled non-exception "
"object";
}
folly::RequestContextScopeGuard ctxGuard(std::move(task.ctx));
invokeCatchingExns("SerialExecutor: func", std::exchange(task.func, {}));
// We want scheduled_ to guard side-effects of completed tasks, so we can't
// use std::memory_order_relaxed here.
......
......@@ -98,15 +98,8 @@ void StrandContext::executeNext(
std::size_t pendingCount = 0;
for (std::size_t i = 0; i < maxItemsToProcessSynchronously; ++i) {
QueueItem item = thisPtr->queue_.dequeue();
try {
std::exchange(item.func, {})();
} catch (const std::exception& ex) {
LOG(DFATAL) << "StrandExecutor: func threw unhandled exception "
<< folly::exceptionStr(ex);
} catch (...) {
LOG(DFATAL) << "StrandExecutor: func threw unhandled non-exception "
"object";
}
Executor::invokeCatchingExns(
"StrandExecutor: func", std::exchange(item.func, {}));
++pendingCount;
......
......@@ -105,16 +105,18 @@ void ThreadPoolExecutor::runTask(const ThreadPtr& thread, Task&& task) {
folly::RequestContextScopeGuard rctx(task.context_);
if (task.expiration_ > std::chrono::milliseconds(0) &&
stats.waitTime >= task.expiration_) {
task.func_ = nullptr;
stats.expired = true;
if (task.expireCallback_ != nullptr) {
nothrow("expireCallback", [&] { task.expireCallback_(); });
invokeCatchingExns(
"ThreadPoolExecutor: expireCallback",
std::exchange(task.expireCallback_, {}));
}
} else {
nothrow("func", [&] { task.func_(); });
invokeCatchingExns(
"ThreadPoolExecutor: func", std::exchange(task.func_, {}));
task.expireCallback_ = nullptr;
}
// Callback destruction might involve user logic, protect it as well.
nothrow("~func", [&] { task.func_ = nullptr; });
nothrow("~expireCallback", [&] { task.expireCallback_ = nullptr; });
}
if (!stats.expired) {
stats.runTime = std::chrono::steady_clock::now() - startTime;
......
......@@ -49,19 +49,9 @@ std::shared_ptr<ThreadFactory> ThreadedExecutor::newDefaultThreadFactory() {
}
void ThreadedExecutor::work(Func& func) {
SCOPE_EXIT {
controlMessages_.enqueue(
{Message::Type::Join, {}, std::this_thread::get_id()});
};
try {
func();
} catch (const std::exception& e) {
LOG(ERROR) << "ThreadedExecutor: func threw unhandled " << typeid(e).name()
<< " exception: " << e.what();
} catch (...) {
LOG(ERROR) << "ThreadedExecutor: func threw unhandled non-exception object";
}
invokeCatchingExns("ThreadedExecutor: func", std::exchange(func, {}));
controlMessages_.enqueue(
{Message::Type::Join, {}, std::this_thread::get_id()});
}
void ThreadedExecutor::control() {
......
......@@ -30,13 +30,8 @@ TimekeeperScheduledExecutor::create(
}
void TimekeeperScheduledExecutor::run(Func func) {
try {
func();
} catch (std::exception const& ex) {
LOG(ERROR) << "func threw unhandled exception " << folly::exceptionStr(ex);
} catch (...) {
LOG(ERROR) << "func threw unhandled non-exception object";
}
invokeCatchingExns(
"TimekeeperScheduledExecutor: func", std::exchange(func, {}));
}
void TimekeeperScheduledExecutor::add(Func func) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment