Commit 0afc1272 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Facebook Github Bot 2

A thread-per-task executor

Summary:
[Wangle] A thread-per-task executor.

Moved from Folly into Wangle and fleshed out.
* Starts task threads from a control thread, rather than starting task threads from the submitter thread. Because starting task threads is likely to be more expensive than moving a functor.
* Joins task threads as they finish, rather than joining all task threads in the executor's destructor.

Suitable for running tasks which spend most of their time sleeping. Such as blocking IO, blocking fork-exec-wait, etc., when it is inconvenient to use the nonblocking variants with an IO executor.

Reviewed By: simpkins

Differential Revision: D3286988

fbshipit-source-id: 4b91133a7d55332ebbae020c1515c60e816906b3
parent bc02b7a4
......@@ -156,7 +156,6 @@ nobase_follyinclude_HEADERS = \
Format.h \
Format-inl.h \
futures/Barrier.h \
futures/ThreadedExecutor.h \
futures/DrivableExecutor.h \
futures/Future-pre.h \
futures/helpers.h \
......@@ -387,7 +386,6 @@ libfolly_la_SOURCES = \
FileUtil.cpp \
FingerprintTables.cpp \
futures/Barrier.cpp \
futures/ThreadedExecutor.cpp \
futures/Future.cpp \
futures/InlineExecutor.cpp \
futures/ManualExecutor.cpp \
......
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/futures/ThreadedExecutor.h>
#include <glog/logging.h>
using namespace std;
namespace folly { namespace futures {
ThreadedExecutor::~ThreadedExecutor() {
lock_guard<mutex> lock(mutex_);
destructing_ = true;
for (auto& th : threads_) {
th.join();
}
}
void ThreadedExecutor::add(Func f) {
lock_guard<mutex> lock(mutex_);
CHECK(!destructing_);
threads_.emplace_back(std::move(f));
}
}}
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <list>
#include <mutex>
#include <thread>
#include <folly/Executor.h>
namespace folly { namespace futures {
/**
* Runs functions each in its own thread.
*
* Kind of simple. Suitable for a few types of strange cases.
*/
class ThreadedExecutor : public Executor {
public:
~ThreadedExecutor();
void add(Func f) override;
private:
std::mutex mutex_;
std::list<std::thread> threads_;
bool destructing_ = false;
};
}}
/*
* Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/futures/ThreadedExecutor.h>
#include <folly/futures/Future.h>
#include <folly/Conv.h>
#include <gtest/gtest.h>
using namespace std;
using namespace folly;
using namespace folly::futures;
class ThreadedExecutorTest : public testing::Test {};
TEST_F(ThreadedExecutorTest, example) {
ThreadedExecutor x;
auto ret = via(&x)
.then([&] { return 17; })
.then([&](int x) { return to<string>(x); })
.wait()
.getTry();
EXPECT_EQ("17", ret.value());
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment