Commit fc75fc56 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

VirtualExecutor

Summary:
Executor adaptor which allows joining only tasks scheduled through this VirtualExecutor instance without joining all other tasks in a wrapped Executor.
This is very similar to VirtualEventBase, but works for any Executor.

Reviewed By: yfeldblum, aary

Differential Revision: D8342064

fbshipit-source-id: 420b6390e2da88c4826e6d22bc8f3b3585525214
parent 12e65c2d
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/DefaultKeepAliveExecutor.h>
namespace folly {
/**
* VirtualExecutor implements a light-weight view onto existing Executor.
*
* Multiple VirtualExecutors can be backed by a single Executor.
*
* VirtualExecutor's destructor blocks until all tasks scheduled through it are
* complete. Executor's destructor also blocks until all VirtualExecutors
* backed by it are released.
*/
class VirtualExecutor : public DefaultKeepAliveExecutor {
public:
explicit VirtualExecutor(KeepAlive<> executor)
: executor_(std::move(executor)) {}
explicit VirtualExecutor(Executor* executor)
: VirtualExecutor(getKeepAliveToken(executor)) {}
explicit VirtualExecutor(Executor& executor)
: VirtualExecutor(getKeepAliveToken(executor)) {}
VirtualExecutor(const VirtualExecutor&) = delete;
VirtualExecutor& operator=(const VirtualExecutor&) = delete;
uint8_t getNumPriorities() const override {
return executor_->getNumPriorities();
}
void add(Func f) override {
executor_->add([func = std::move(f),
me = getKeepAliveToken(this)]() mutable { func(); });
}
void addWithPriority(Func f, int8_t priority) override {
executor_->addWithPriority(
[func = std::move(f), me = getKeepAliveToken(this)]() mutable {
func();
},
priority);
}
~VirtualExecutor() override {
joinKeepAlive();
}
private:
const KeepAlive<> executor_;
};
} // namespace folly
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <thread> #include <thread>
#include <folly/Exception.h> #include <folly/Exception.h>
#include <folly/VirtualExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/FutureExecutor.h> #include <folly/executors/FutureExecutor.h>
#include <folly/executors/IOThreadPoolExecutor.h> #include <folly/executors/IOThreadPoolExecutor.h>
...@@ -313,14 +314,16 @@ TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) { ...@@ -313,14 +314,16 @@ TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
completed++; completed++;
}; };
CPUThreadPoolExecutor pool(0, 2); CPUThreadPoolExecutor pool(0, 2);
for (int i = 0; i < 50; i++) { {
pool.addWithPriority(lopri, Executor::LO_PRI); VirtualExecutor ve(pool);
} for (int i = 0; i < 50; i++) {
for (int i = 0; i < 50; i++) { ve.addWithPriority(lopri, Executor::LO_PRI);
pool.addWithPriority(hipri, Executor::HI_PRI); }
for (int i = 0; i < 50; i++) {
ve.addWithPriority(hipri, Executor::HI_PRI);
}
pool.setNumThreads(1);
} }
pool.setNumThreads(1);
pool.join();
EXPECT_EQ(100, completed); EXPECT_EQ(100, completed);
} }
...@@ -751,6 +754,33 @@ static void WeakRefTest() { ...@@ -751,6 +754,33 @@ static void WeakRefTest() {
EXPECT_EQ(1, counter); EXPECT_EQ(1, counter);
} }
template <typename TPE>
static void virtualExecutorTest() {
using namespace std::literals;
folly::Optional<folly::SemiFuture<folly::Unit>> f;
int counter{0};
{
TPE fe(1);
{
VirtualExecutor ve(fe);
f = futures::sleep(100ms)
.via(&ve)
.then([&] {
++counter;
return futures::sleep(100ms);
})
.via(&fe)
.then([&] { ++counter; })
.semi();
}
EXPECT_EQ(1, counter);
}
EXPECT_TRUE(f->isReady());
EXPECT_NO_THROW(std::move(*f).get());
EXPECT_EQ(2, counter);
}
TEST(ThreadPoolExecutorTest, WeakRefTestIO) { TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
WeakRefTest<IOThreadPoolExecutor>(); WeakRefTest<IOThreadPoolExecutor>();
} }
...@@ -758,3 +788,11 @@ TEST(ThreadPoolExecutorTest, WeakRefTestIO) { ...@@ -758,3 +788,11 @@ TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
TEST(ThreadPoolExecutorTest, WeakRefTestCPU) { TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
WeakRefTest<CPUThreadPoolExecutor>(); WeakRefTest<CPUThreadPoolExecutor>();
} }
TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) {
virtualExecutorTest<IOThreadPoolExecutor>();
}
TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) {
virtualExecutorTest<CPUThreadPoolExecutor>();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment