Commit 056e121f authored by Chad Austin's avatar Chad Austin Committed by Facebook Github Bot

clang-format folly/executors/

Summary:
Run clang-format across folly/executors/.

```
find . \( -iname '*.cpp' -o -iname '*.h' \) -exec clang-format -i {} \;
```

Reviewed By: yfeldblum

Differential Revision: D8843064

fbshipit-source-id: 0a3c82083eebf2c684a4ab2e12067f0f742bf1d4
parent 1d2790bc
...@@ -45,11 +45,12 @@ class FutureExecutor : public ExecutorImpl { ...@@ -45,11 +45,12 @@ class FutureExecutor : public ExecutorImpl {
using T = typename invoke_result_t<F>::value_type; using T = typename invoke_result_t<F>::value_type;
folly::Promise<T> promise; folly::Promise<T> promise;
auto future = promise.getFuture(); auto future = promise.getFuture();
ExecutorImpl::add( ExecutorImpl::add([promise = std::move(promise),
[ promise = std::move(promise), func = std::move(func) ]() mutable { func = std::move(func)]() mutable {
func().then([promise = std::move(promise)]( func().then([promise = std::move(promise)](folly::Try<T>&& t) mutable {
folly::Try<T> && t) mutable { promise.setTry(std::move(t)); }); promise.setTry(std::move(t));
}); });
});
return future; return future;
} }
...@@ -70,7 +71,7 @@ class FutureExecutor : public ExecutorImpl { ...@@ -70,7 +71,7 @@ class FutureExecutor : public ExecutorImpl {
folly::Promise<T> promise; folly::Promise<T> promise;
auto future = promise.getFuture(); auto future = promise.getFuture();
ExecutorImpl::add( ExecutorImpl::add(
[ promise = std::move(promise), func = std::move(func) ]() mutable { [promise = std::move(promise), func = std::move(func)]() mutable {
promise.setWith(std::move(func)); promise.setWith(std::move(func));
}); });
return future; return future;
......
...@@ -92,7 +92,7 @@ void IOThreadPoolExecutor::add( ...@@ -92,7 +92,7 @@ void IOThreadPoolExecutor::add(
auto ioThread = pickThread(); auto ioThread = pickThread();
auto task = Task(std::move(func), expiration, std::move(expireCallback)); auto task = Task(std::move(func), expiration, std::move(expireCallback));
auto wrappedFunc = [ ioThread, task = std::move(task) ]() mutable { auto wrappedFunc = [ioThread, task = std::move(task)]() mutable {
runTask(ioThread, std::move(task)); runTask(ioThread, std::move(task));
ioThread->pendingTasks--; ioThread->pendingTasks--;
}; };
......
...@@ -27,133 +27,133 @@ ...@@ -27,133 +27,133 @@
#include <folly/synchronization/LifoSem.h> #include <folly/synchronization/LifoSem.h>
namespace folly { namespace folly {
/// A ManualExecutor only does work when you turn the crank, by calling /// A ManualExecutor only does work when you turn the crank, by calling
/// run() or indirectly with makeProgress() or waitFor(). /// run() or indirectly with makeProgress() or waitFor().
/// ///
/// The clock for a manual executor starts at 0 and advances only when you /// The clock for a manual executor starts at 0 and advances only when you
/// ask it to. i.e. time is also under manual control. /// ask it to. i.e. time is also under manual control.
/// ///
/// NB No attempt has been made to make anything other than add and schedule /// NB No attempt has been made to make anything other than add and schedule
/// threadsafe. /// threadsafe.
class ManualExecutor : public DrivableExecutor, class ManualExecutor : public DrivableExecutor,
public ScheduledExecutor, public ScheduledExecutor,
public SequencedExecutor { public SequencedExecutor {
public: public:
void add(Func) override; void add(Func) override;
/// Do work. Returns the number of functions that were executed (maybe 0). /// Do work. Returns the number of functions that were executed (maybe 0).
/// Non-blocking, in the sense that we don't wait for work (we can't /// Non-blocking, in the sense that we don't wait for work (we can't
/// control whether one of the functions blocks). /// control whether one of the functions blocks).
/// This is stable, it will not chase an ever-increasing tail of work. /// This is stable, it will not chase an ever-increasing tail of work.
/// This also means, there may be more work available to perform at the /// This also means, there may be more work available to perform at the
/// moment that this returns. /// moment that this returns.
size_t run(); size_t run();
// Do work until there is no more work to do. // Do work until there is no more work to do.
// Returns the number of functions that were executed (maybe 0). // Returns the number of functions that were executed (maybe 0).
// Unlike run, this method is not stable. It will chase an infinite tail of // Unlike run, this method is not stable. It will chase an infinite tail of
// work so should be used with care. // work so should be used with care.
// There will be no work available to perform at the moment that this // There will be no work available to perform at the moment that this
// returns. // returns.
size_t drain(); size_t drain();
/// Wait for work to do. /// Wait for work to do.
void wait(); void wait();
/// Wait for work to do, and do it. /// Wait for work to do, and do it.
void makeProgress() { void makeProgress() {
wait(); wait();
run(); run();
} }
/// Implements DrivableExecutor /// Implements DrivableExecutor
void drive() override { void drive() override {
makeProgress(); makeProgress();
} }
/// makeProgress until this Future is ready. /// makeProgress until this Future is ready.
template <class F> void waitFor(F const& f) { template <class F>
// TODO(5427828) void waitFor(F const& f) {
// TODO(5427828)
#if 0 #if 0
while (!f.isReady()) while (!f.isReady())
makeProgress(); makeProgress();
#else #else
while (!f.isReady()) { while (!f.isReady()) {
run(); run();
}
#endif
} }
#endif
void scheduleAt(Func&& f, TimePoint const& t) override { }
void scheduleAt(Func&& f, TimePoint const& t) override {
std::lock_guard<std::mutex> lock(lock_);
scheduledFuncs_.emplace(t, std::move(f));
sem_.post();
}
/// Advance the clock. The clock never advances on its own.
/// Advancing the clock causes some work to be done, if work is available
/// to do (perhaps newly available because of the advanced clock).
/// If dur is <= 0 this is a noop.
void advance(Duration const& dur) {
advanceTo(now_ + dur);
}
/// Advance the clock to this absolute time. If t is <= now(),
/// this is a noop.
void advanceTo(TimePoint const& t);
TimePoint now() override {
return now_;
}
/// Flush the function queue. Destroys all stored functions without
/// executing them. Returns number of removed functions.
std::size_t clear() {
std::queue<Func> funcs;
std::priority_queue<ScheduledFunc> scheduled_funcs;
{
std::lock_guard<std::mutex> lock(lock_); std::lock_guard<std::mutex> lock(lock_);
scheduledFuncs_.emplace(t, std::move(f)); funcs_.swap(funcs);
sem_.post(); scheduledFuncs_.swap(scheduled_funcs);
}
/// Advance the clock. The clock never advances on its own.
/// Advancing the clock causes some work to be done, if work is available
/// to do (perhaps newly available because of the advanced clock).
/// If dur is <= 0 this is a noop.
void advance(Duration const& dur) {
advanceTo(now_ + dur);
} }
/// Advance the clock to this absolute time. If t is <= now(), return funcs.size() + scheduled_funcs.size();
/// this is a noop. }
void advanceTo(TimePoint const& t);
TimePoint now() override { return now_; } private:
std::mutex lock_;
std::queue<Func> funcs_;
LifoSem sem_;
/// Flush the function queue. Destroys all stored functions without // helper class to enable ordering of scheduled events in the priority
/// executing them. Returns number of removed functions. // queue
std::size_t clear() { struct ScheduledFunc {
std::queue<Func> funcs; TimePoint time;
std::priority_queue<ScheduledFunc> scheduled_funcs; size_t ordinal;
Func mutable func;
{
std::lock_guard<std::mutex> lock(lock_);
funcs_.swap(funcs);
scheduledFuncs_.swap(scheduled_funcs);
}
return funcs.size() + scheduled_funcs.size(); ScheduledFunc(TimePoint const& t, Func&& f) : time(t), func(std::move(f)) {
static size_t seq = 0;
ordinal = seq++;
} }
private: bool operator<(ScheduledFunc const& b) const {
std::mutex lock_; // Earlier-scheduled things must be *higher* priority
std::queue<Func> funcs_; // in the max-based std::priority_queue
LifoSem sem_; if (time == b.time) {
return ordinal > b.ordinal;
// helper class to enable ordering of scheduled events in the priority
// queue
struct ScheduledFunc {
TimePoint time;
size_t ordinal;
Func mutable func;
ScheduledFunc(TimePoint const& t, Func&& f)
: time(t), func(std::move(f))
{
static size_t seq = 0;
ordinal = seq++;
}
bool operator<(ScheduledFunc const& b) const {
// Earlier-scheduled things must be *higher* priority
// in the max-based std::priority_queue
if (time == b.time) {
return ordinal > b.ordinal;
}
return time > b.time;
} }
return time > b.time;
}
Func&& moveOutFunc() const { Func&& moveOutFunc() const {
return std::move(func); return std::move(func);
} }
};
std::priority_queue<ScheduledFunc> scheduledFuncs_;
TimePoint now_ = TimePoint::min();
}; };
std::priority_queue<ScheduledFunc> scheduledFuncs_;
TimePoint now_ = TimePoint::min();
};
} // namespace folly } // namespace folly
...@@ -24,36 +24,38 @@ ...@@ -24,36 +24,38 @@
#include <folly/lang/Exception.h> #include <folly/lang/Exception.h>
namespace folly { namespace folly {
// An executor that supports timed scheduling. Like RxScheduler. // An executor that supports timed scheduling. Like RxScheduler.
class ScheduledExecutor : public virtual Executor { class ScheduledExecutor : public virtual Executor {
public: public:
// Reality is that better than millisecond resolution is very hard to // Reality is that better than millisecond resolution is very hard to
// achieve. However, we reserve the right to be incredible. // achieve. However, we reserve the right to be incredible.
typedef std::chrono::microseconds Duration; typedef std::chrono::microseconds Duration;
typedef std::chrono::steady_clock::time_point TimePoint; typedef std::chrono::steady_clock::time_point TimePoint;
~ScheduledExecutor() override = default; ~ScheduledExecutor() override = default;
void add(Func) override = 0; void add(Func) override = 0;
/// Alias for add() (for Rx consistency) /// Alias for add() (for Rx consistency)
void schedule(Func&& a) { add(std::move(a)); } void schedule(Func&& a) {
add(std::move(a));
/// Schedule a Func to be executed after dur time has elapsed }
/// Expect millisecond resolution at best.
void schedule(Func&& a, Duration const& dur) { /// Schedule a Func to be executed after dur time has elapsed
scheduleAt(std::move(a), now() + dur); /// Expect millisecond resolution at best.
} void schedule(Func&& a, Duration const& dur) {
scheduleAt(std::move(a), now() + dur);
/// Schedule a Func to be executed at time t, or as soon afterward as }
/// possible. Expect millisecond resolution at best. Must be threadsafe.
virtual void scheduleAt(Func&& /* a */, TimePoint const& /* t */) { /// Schedule a Func to be executed at time t, or as soon afterward as
throw_exception<std::logic_error>("unimplemented"); /// possible. Expect millisecond resolution at best. Must be threadsafe.
} virtual void scheduleAt(Func&& /* a */, TimePoint const& /* t */) {
throw_exception<std::logic_error>("unimplemented");
/// Get this executor's notion of time. Must be threadsafe. }
virtual TimePoint now() {
return std::chrono::steady_clock::now(); /// Get this executor's notion of time. Must be threadsafe.
} virtual TimePoint now() {
}; return std::chrono::steady_clock::now();
} // namespace folly }
};
} // namespace folly
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include <folly/executors/SerialExecutor.h> #include <folly/executors/SerialExecutor.h>
#include <glog/logging.h> #include <glog/logging.h>
#include <folly/ExceptionString.h> #include <folly/ExceptionString.h>
......
...@@ -76,8 +76,7 @@ class SerialExecutor : public SequencedExecutor { ...@@ -76,8 +76,7 @@ class SerialExecutor : public SequencedExecutor {
}; };
using UniquePtr = std::unique_ptr<SerialExecutor, Deleter>; using UniquePtr = std::unique_ptr<SerialExecutor, Deleter>;
[[deprecated("Replaced by create")]] [[deprecated("Replaced by create")]] static UniquePtr createUnique(
static UniquePtr createUnique(
std::shared_ptr<Executor> parent = getCPUExecutor()); std::shared_ptr<Executor> parent = getCPUExecutor());
/** /**
......
...@@ -96,7 +96,7 @@ void ThreadedExecutor::controlLaunchEnqueuedTasks() { ...@@ -96,7 +96,7 @@ void ThreadedExecutor::controlLaunchEnqueuedTasks() {
with_unique_lock(enqueuedm_, [&] { std::swap(enqueuedt, enqueued_); }); with_unique_lock(enqueuedm_, [&] { std::swap(enqueuedt, enqueued_); });
for (auto& f : enqueuedt) { for (auto& f : enqueuedt) {
auto th = threadFactory_->newThread( auto th = threadFactory_->newThread(
[ this, f = std::move(f) ]() mutable { work(f); }); [this, f = std::move(f)]() mutable { work(f); });
auto id = th.get_id(); auto id = th.get_id();
running_[id] = std::move(th); running_[id] = std::move(th);
} }
......
...@@ -30,7 +30,10 @@ TEST(ManualExecutor, runIsStable) { ...@@ -30,7 +30,10 @@ TEST(ManualExecutor, runIsStable) {
ManualExecutor x; ManualExecutor x;
size_t count = 0; size_t count = 0;
auto f1 = [&]() { count++; }; auto f1 = [&]() { count++; };
auto f2 = [&]() { x.add(f1); x.add(f1); }; auto f2 = [&]() {
x.add(f1);
x.add(f1);
};
x.add(f2); x.add(f2);
x.run(); x.run();
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
...@@ -52,14 +55,14 @@ TEST(ManualExecutor, drainIsNotStable) { ...@@ -52,14 +55,14 @@ TEST(ManualExecutor, drainIsNotStable) {
TEST(ManualExecutor, scheduleDur) { TEST(ManualExecutor, scheduleDur) {
ManualExecutor x; ManualExecutor x;
size_t count = 0; size_t count = 0;
std::chrono::milliseconds dur {10}; std::chrono::milliseconds dur{10};
x.schedule([&]{ count++; }, dur); x.schedule([&] { count++; }, dur);
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.run(); x.run();
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.advance(dur/2); x.advance(dur / 2);
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.advance(dur/2); x.advance(dur / 2);
EXPECT_EQ(count, 1); EXPECT_EQ(count, 1);
} }
...@@ -114,7 +117,7 @@ TEST(ManualExecutor, clockStartsAt0) { ...@@ -114,7 +117,7 @@ TEST(ManualExecutor, clockStartsAt0) {
TEST(ManualExecutor, scheduleAbs) { TEST(ManualExecutor, scheduleAbs) {
ManualExecutor x; ManualExecutor x;
size_t count = 0; size_t count = 0;
x.scheduleAt([&]{ count++; }, x.now() + std::chrono::milliseconds(10)); x.scheduleAt([&] { count++; }, x.now() + std::chrono::milliseconds(10));
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.advance(std::chrono::milliseconds(10)); x.advance(std::chrono::milliseconds(10));
EXPECT_EQ(count, 1); EXPECT_EQ(count, 1);
...@@ -123,7 +126,7 @@ TEST(ManualExecutor, scheduleAbs) { ...@@ -123,7 +126,7 @@ TEST(ManualExecutor, scheduleAbs) {
TEST(ManualExecutor, advanceTo) { TEST(ManualExecutor, advanceTo) {
ManualExecutor x; ManualExecutor x;
size_t count = 0; size_t count = 0;
x.scheduleAt([&]{ count++; }, std::chrono::steady_clock::now()); x.scheduleAt([&] { count++; }, std::chrono::steady_clock::now());
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.advanceTo(std::chrono::steady_clock::now()); x.advanceTo(std::chrono::steady_clock::now());
EXPECT_EQ(count, 1); EXPECT_EQ(count, 1);
...@@ -133,7 +136,7 @@ TEST(ManualExecutor, advanceBack) { ...@@ -133,7 +136,7 @@ TEST(ManualExecutor, advanceBack) {
ManualExecutor x; ManualExecutor x;
size_t count = 0; size_t count = 0;
x.advance(std::chrono::microseconds(5)); x.advance(std::chrono::microseconds(5));
x.schedule([&]{ count++; }, std::chrono::microseconds(6)); x.schedule([&] { count++; }, std::chrono::microseconds(6));
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.advanceTo(x.now() - std::chrono::microseconds(1)); x.advanceTo(x.now() - std::chrono::microseconds(1));
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
...@@ -143,7 +146,7 @@ TEST(ManualExecutor, advanceNeg) { ...@@ -143,7 +146,7 @@ TEST(ManualExecutor, advanceNeg) {
ManualExecutor x; ManualExecutor x;
size_t count = 0; size_t count = 0;
x.advance(std::chrono::microseconds(5)); x.advance(std::chrono::microseconds(5));
x.schedule([&]{ count++; }, std::chrono::microseconds(6)); x.schedule([&] { count++; }, std::chrono::microseconds(6));
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
x.advance(std::chrono::microseconds(-1)); x.advance(std::chrono::microseconds(-1));
EXPECT_EQ(count, 0); EXPECT_EQ(count, 0);
...@@ -153,10 +156,10 @@ TEST(ManualExecutor, waitForDoesNotDeadlock) { ...@@ -153,10 +156,10 @@ TEST(ManualExecutor, waitForDoesNotDeadlock) {
ManualExecutor east, west; ManualExecutor east, west;
folly::Baton<> baton; folly::Baton<> baton;
auto f = makeFuture() auto f = makeFuture()
.via(&east) .via(&east)
.then([](Try<Unit>){ return makeFuture(); }) .then([](Try<Unit>) { return makeFuture(); })
.via(&west); .via(&west);
std::thread t([&]{ std::thread t([&] {
baton.post(); baton.post();
west.waitFor(f); west.waitFor(f);
}); });
...@@ -168,9 +171,10 @@ TEST(ManualExecutor, waitForDoesNotDeadlock) { ...@@ -168,9 +171,10 @@ TEST(ManualExecutor, waitForDoesNotDeadlock) {
TEST(ManualExecutor, getViaDoesNotDeadlock) { TEST(ManualExecutor, getViaDoesNotDeadlock) {
ManualExecutor east, west; ManualExecutor east, west;
folly::Baton<> baton; folly::Baton<> baton;
auto f = makeFuture().via(&east).then([](Try<Unit>) { auto f = makeFuture()
return makeFuture(); .via(&east)
}).via(&west); .then([](Try<Unit>) { return makeFuture(); })
.via(&west);
std::thread t([&] { std::thread t([&] {
baton.post(); baton.post();
f.getVia(&west); f.getVia(&west);
...@@ -196,8 +200,8 @@ TEST(ManualExecutor, clear) { ...@@ -196,8 +200,8 @@ TEST(ManualExecutor, clear) {
TEST(Executor, InlineExecutor) { TEST(Executor, InlineExecutor) {
InlineExecutor x; InlineExecutor x;
size_t counter = 0; size_t counter = 0;
x.add([&]{ x.add([&] {
x.add([&]{ x.add([&] {
EXPECT_EQ(counter, 0); EXPECT_EQ(counter, 0);
counter++; counter++;
}); });
...@@ -210,8 +214,8 @@ TEST(Executor, InlineExecutor) { ...@@ -210,8 +214,8 @@ TEST(Executor, InlineExecutor) {
TEST(Executor, QueuedImmediateExecutor) { TEST(Executor, QueuedImmediateExecutor) {
QueuedImmediateExecutor x; QueuedImmediateExecutor x;
size_t counter = 0; size_t counter = 0;
x.add([&]{ x.add([&] {
x.add([&]{ x.add([&] {
EXPECT_EQ(1, counter); EXPECT_EQ(1, counter);
counter++; counter++;
}); });
...@@ -226,10 +230,12 @@ TEST(Executor, Runnable) { ...@@ -226,10 +230,12 @@ TEST(Executor, Runnable) {
size_t counter = 0; size_t counter = 0;
struct Runnable { struct Runnable {
std::function<void()> fn; std::function<void()> fn;
void operator()() { fn(); } void operator()() {
fn();
}
}; };
Runnable f; Runnable f;
f.fn = [&]{ counter++; }; f.fn = [&] { counter++; };
x.add(f); x.add(f);
EXPECT_EQ(counter, 1); EXPECT_EQ(counter, 1);
} }
...@@ -247,7 +253,9 @@ TEST(Executor, ThrowableThen) { ...@@ -247,7 +253,9 @@ TEST(Executor, ThrowableThen) {
class CrappyExecutor : public Executor { class CrappyExecutor : public Executor {
public: public:
void add(Func /* f */) override { throw std::runtime_error("bad"); } void add(Func /* f */) override {
throw std::runtime_error("bad");
}
}; };
TEST(Executor, CrappyExecutor) { TEST(Executor, CrappyExecutor) {
......
...@@ -35,7 +35,7 @@ class NamedThreadFactory : public ThreadFactory { ...@@ -35,7 +35,7 @@ class NamedThreadFactory : public ThreadFactory {
std::thread newThread(Func&& func) override { std::thread newThread(Func&& func) override {
auto name = folly::to<std::string>(prefix_, suffix_++); auto name = folly::to<std::string>(prefix_, suffix_++);
return std::thread( return std::thread(
[ func = std::move(func), name = std::move(name) ]() mutable { [func = std::move(func), name = std::move(name)]() mutable {
folly::setThreadName(name); folly::setThreadName(name);
func(); func();
}); });
......
...@@ -43,7 +43,7 @@ class PriorityThreadFactory : public ThreadFactory { ...@@ -43,7 +43,7 @@ class PriorityThreadFactory : public ThreadFactory {
std::thread newThread(Func&& func) override { std::thread newThread(Func&& func) override {
int priority = priority_; int priority = priority_;
return factory_->newThread([ priority, func = std::move(func) ]() mutable { return factory_->newThread([priority, func = std::move(func)]() mutable {
if (setpriority(PRIO_PROCESS, 0, priority) != 0) { if (setpriority(PRIO_PROCESS, 0, priority) != 0) {
LOG(ERROR) << "setpriority failed (are you root?) with error " << errno, LOG(ERROR) << "setpriority failed (are you root?) with error " << errno,
strerror(errno); strerror(errno);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment