Commit d7a43326 authored by Hans Fugal's avatar Hans Fugal Committed by Anton Likhtarov

Scheduler interface of Executor

Summary: and ManualExecutor implementation

Test Plan: unit tests, contbuild

Reviewed By: davejwatson@fb.com

Subscribers: bmatheny, folly@lists, net-systems@, fugalh, exa, marccelani, jsedgwick

FB internal diff: D1392999

Tasks: 4548494
parent 0ecc7473
...@@ -18,11 +18,44 @@ ...@@ -18,11 +18,44 @@
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <functional> #include <functional>
#include <chrono>
namespace folly { namespace wangle { namespace folly { namespace wangle {
// Like an Rx Scheduler. We should probably rename it to match now that it
// has scheduling semantics too, but that's a codemod for another lazy
// summer afternoon.
class Executor : boost::noncopyable { class Executor : boost::noncopyable {
public: public:
typedef std::function<void()> Action;
// Reality is that better than millisecond resolution is very hard to
// achieve. However, we reserve the right to be incredible.
typedef std::chrono::microseconds Duration;
typedef std::chrono::steady_clock::time_point TimePoint;
virtual ~Executor() = default; virtual ~Executor() = default;
virtual void add(std::function<void()>&&) = 0;
/// Enqueue an action to be performed by this executor. This and all
/// schedule variants must be threadsafe.
virtual void add(Action&&) = 0;
/// Alias for add() (for Rx consistency)
void schedule(Action&& a) { add(std::move(a)); }
/// Schedule an action to be executed after dur time has elapsed
/// Expect millisecond resolution at best.
void schedule(Action&& a, Duration const& dur) {
scheduleAt(std::move(a), now() + dur);
}
/// Schedule an action to be executed at time t, or as soon afterward as
/// possible. Expect millisecond resolution at best. Must be threadsafe.
virtual void scheduleAt(Action&& a, TimePoint const& t) {
throw std::logic_error("unimplemented");
}
/// Get this executor's notion of time. Must be threadsafe.
virtual TimePoint now() {
return std::chrono::steady_clock::now();
}
}; };
}} }}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <string.h> #include <string.h>
#include <string> #include <string>
#include <tuple>
#include <stdexcept> #include <stdexcept>
...@@ -31,24 +32,33 @@ ManualExecutor::ManualExecutor() { ...@@ -31,24 +32,33 @@ ManualExecutor::ManualExecutor() {
void ManualExecutor::add(std::function<void()>&& callback) { void ManualExecutor::add(std::function<void()>&& callback) {
std::lock_guard<std::mutex> lock(lock_); std::lock_guard<std::mutex> lock(lock_);
runnables_.push(callback); actions_.push(callback);
sem_post(&sem_); sem_post(&sem_);
} }
size_t ManualExecutor::run() { size_t ManualExecutor::run() {
size_t count; size_t count;
size_t n; size_t n;
std::function<void()> runnable; Action action;
{ {
std::lock_guard<std::mutex> lock(lock_); std::lock_guard<std::mutex> lock(lock_);
n = runnables_.size();
while (!scheduledActions_.empty()) {
auto& sa = scheduledActions_.top();
if (sa.time > now_)
break;
actions_.push(sa.action);
scheduledActions_.pop();
}
n = actions_.size();
} }
for (count = 0; count < n; count++) { for (count = 0; count < n; count++) {
{ {
std::lock_guard<std::mutex> lock(lock_); std::lock_guard<std::mutex> lock(lock_);
if (runnables_.empty()) { if (actions_.empty()) {
break; break;
} }
...@@ -57,10 +67,10 @@ size_t ManualExecutor::run() { ...@@ -57,10 +67,10 @@ size_t ManualExecutor::run() {
// This may fail (with EAGAIN), that's fine. // This may fail (with EAGAIN), that's fine.
sem_trywait(&sem_); sem_trywait(&sem_);
runnable = std::move(runnables_.front()); action = std::move(actions_.front());
runnables_.pop(); actions_.pop();
} }
runnable(); action();
} }
return count; return count;
...@@ -70,7 +80,7 @@ void ManualExecutor::wait() { ...@@ -70,7 +80,7 @@ void ManualExecutor::wait() {
while (true) { while (true) {
{ {
std::lock_guard<std::mutex> lock(lock_); std::lock_guard<std::mutex> lock(lock_);
if (!runnables_.empty()) if (!actions_.empty())
break; break;
} }
......
...@@ -20,17 +20,29 @@ ...@@ -20,17 +20,29 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <queue> #include <queue>
#include <cstdio>
namespace folly { namespace wangle { namespace folly { namespace wangle {
/// A ManualExecutor only does work when you turn the crank, by calling
/// run() or indirectly with makeProgress() or waitFor().
///
/// The clock for a manual executor starts at 0 and advances only when you
/// ask it to. i.e. time is also under manual control.
///
/// NB No attempt has been made to make anything other than add and schedule
/// threadsafe.
class ManualExecutor : public Executor { class ManualExecutor : public Executor {
public: public:
ManualExecutor(); ManualExecutor();
void add(std::function<void()>&&) override; void add(Action&&) override;
/// Do work. Returns the number of runnables that were executed (maybe 0). /// Do work. Returns the number of actions that were executed (maybe 0).
/// Non-blocking. /// Non-blocking, in the sense that we don't wait for work (we can't
/// control whether one of the actions blocks).
/// This is stable, it will not chase an ever-increasing tail of work.
/// This also means, there may be more work available to perform at the
/// moment that this returns.
size_t run(); size_t run();
/// Wait for work to do. /// Wait for work to do.
...@@ -42,15 +54,64 @@ namespace folly { namespace wangle { ...@@ -42,15 +54,64 @@ namespace folly { namespace wangle {
run(); run();
} }
/// makeProgress until this Future is ready.
template <class F> void waitFor(F const& f) { template <class F> void waitFor(F const& f) {
while (!f.isReady()) while (!f.isReady())
makeProgress(); makeProgress();
} }
virtual void scheduleAt(Action&& a, TimePoint const& t) override {
std::lock_guard<std::mutex> lock(lock_);
scheduledActions_.emplace(t, std::move(a));
sem_post(&sem_);
}
/// Advance the clock. The clock never advances on its own.
/// Advancing the clock causes some work to be done, if work is available
/// to do (perhaps newly available because of the advanced clock).
/// If dur is <= 0 this is a noop.
void advance(Duration const& dur) {
advanceTo(now_ + dur);
}
/// Advance the clock to this absolute time. If t is <= now(),
/// this is a noop.
void advanceTo(TimePoint const& t) {
if (t > now_) {
now_ = t;
}
run();
}
TimePoint now() override { return now_; }
private: private:
std::mutex lock_; std::mutex lock_;
std::queue<std::function<void()>> runnables_; std::queue<Action> actions_;
sem_t sem_; sem_t sem_;
// helper class to enable ordering of scheduled events in the priority
// queue
struct ScheduledAction {
TimePoint time;
size_t ordinal;
Action action;
ScheduledAction(TimePoint const& t, Action&& a)
: time(t), action(std::move(a))
{
static size_t seq = 0;
ordinal = seq++;
}
bool operator<(ScheduledAction const& b) const {
if (time == b.time)
return ordinal < b.ordinal;
return time < b.time;
}
};
std::priority_queue<ScheduledAction> scheduledActions_;
TimePoint now_ = now_.min();
}; };
}} }}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include "folly/wangle/ManualExecutor.h"
using namespace testing;
using namespace folly::wangle;
using namespace std::chrono;
TEST(ManualExecutor, runIsStable) {
ManualExecutor x;
size_t count = 0;
auto f1 = [&]() { count++; };
auto f2 = [&]() { x.add(f1); x.add(f1); };
x.add(f2);
x.run();
}
TEST(ManualExecutor, scheduleDur) {
ManualExecutor x;
size_t count = 0;
milliseconds dur {10};
x.schedule([&]{ count++; }, dur);
EXPECT_EQ(count, 0);
x.run();
EXPECT_EQ(count, 0);
x.advance(dur/2);
EXPECT_EQ(count, 0);
x.advance(dur/2);
EXPECT_EQ(count, 1);
}
TEST(ManualExecutor, clockStartsAt0) {
ManualExecutor x;
EXPECT_EQ(x.now(), x.now().min());
}
TEST(ManualExecutor, scheduleAbs) {
ManualExecutor x;
size_t count = 0;
x.scheduleAt([&]{ count++; }, x.now() + milliseconds(10));
EXPECT_EQ(count, 0);
x.advance(milliseconds(10));
EXPECT_EQ(count, 1);
}
TEST(ManualExecutor, advanceTo) {
ManualExecutor x;
size_t count = 0;
x.scheduleAt([&]{ count++; }, steady_clock::now());
EXPECT_EQ(count, 0);
x.advanceTo(steady_clock::now());
EXPECT_EQ(count, 1);
}
TEST(ManualExecutor, advanceBack) {
ManualExecutor x;
size_t count = 0;
x.advance(microseconds(5));
x.schedule([&]{ count++; }, microseconds(6));
EXPECT_EQ(count, 0);
x.advanceTo(x.now() - microseconds(1));
EXPECT_EQ(count, 0);
}
TEST(ManualExecutor, advanceNeg) {
ManualExecutor x;
size_t count = 0;
x.advance(microseconds(5));
x.schedule([&]{ count++; }, microseconds(6));
EXPECT_EQ(count, 0);
x.advance(microseconds(-1));
EXPECT_EQ(count, 0);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment