Commit 7c93aeb3 authored by Amlan Nayak's avatar Amlan Nayak Committed by Facebook GitHub Bot

Use WorkersProvider in CPUThreadPoolWorker to collect thread IDs

Summary:
The WorkersProvider can be used to collect OS thread IDs of all active
threads consuming tasks from the CPUThreadPoolExecutor queue. The
WorkersProvider::KeepAlive holds a read lock for the `osThreadIds_` list here to
block the active threads from exiting (they will need to acquire a wlock on the same list on exit)
while the observer captures the backtraces.
The actual logic to captures the traces in coming in a subsequent diff.

Reviewed By: mshneer

Differential Revision: D30133823

fbshipit-source-id: 5bcdc52c55077b53fe8ec3be6d2413cac4436125
parent d365f9e4
...@@ -41,6 +41,37 @@ namespace { ...@@ -41,6 +41,37 @@ namespace {
using default_queue = UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>; using default_queue = UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>;
using default_queue_alloc = using default_queue_alloc =
AlignedSysAllocator<default_queue, FixedAlign<alignof(default_queue)>>; AlignedSysAllocator<default_queue, FixedAlign<alignof(default_queue)>>;
class ThreadIdCollector : public WorkerProvider {
public:
ThreadIdCollector() {}
IdsWithKeepAlive collectThreadIds() override final {
auto keepAlive = std::make_unique<WorkerKeepAlive>(
SharedMutex::ReadHolder{&threadsExitMutex_});
auto locked = osThreadIds_.rlock();
return {std::move(keepAlive), {locked->begin(), locked->end()}};
}
Synchronized<std::unordered_set<pid_t>> osThreadIds_;
SharedMutex threadsExitMutex_;
private:
class WorkerKeepAlive : public WorkerProvider::KeepAlive {
public:
explicit WorkerKeepAlive(SharedMutex::ReadHolder idsLock)
: threadsExitLock_(std::move(idsLock)) {}
~WorkerKeepAlive() override {}
private:
SharedMutex::ReadHolder threadsExitLock_;
};
};
inline ThreadIdCollector* upcast(std::unique_ptr<WorkerProvider>& wpPtr) {
return static_cast<ThreadIdCollector*>(wpPtr.get());
}
} // namespace } // namespace
const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14; const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
...@@ -272,10 +303,15 @@ void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) { ...@@ -272,10 +303,15 @@ void CPUThreadPoolExecutor::threadRun(ThreadPtr thread) {
} }
thread->startupBaton.post(); thread->startupBaton.post();
osThreadIds_.wlock()->insert(folly::getOSThreadID()); auto collectorPtr = upcast(threadIdCollector_);
collectorPtr->osThreadIds_.wlock()->insert(folly::getOSThreadID());
// On thread exit, we should remove the thread ID from the tracking list. // On thread exit, we should remove the thread ID from the tracking list.
auto threadIDsGuard = folly::makeGuard( auto threadIDsGuard = folly::makeGuard([collectorPtr]() {
[this]() { osThreadIds_.wlock()->erase(folly::getOSThreadID()); }); // The observer could be capturing a stack trace from this thread
// so it should block until the collection finishes to exit.
collectorPtr->osThreadIds_.wlock()->erase(folly::getOSThreadID());
SharedMutex::WriteHolder w{collectorPtr->threadsExitMutex_};
});
while (true) { while (true) {
auto task = taskQueue_->try_take_for(threadTimeout_); auto task = taskQueue_->try_take_for(threadTimeout_);
...@@ -330,7 +366,13 @@ CPUThreadPoolExecutor::createQueueObserverFactory() { ...@@ -330,7 +366,13 @@ CPUThreadPoolExecutor::createQueueObserverFactory() {
observer.store(nullptr, std::memory_order_release); observer.store(nullptr, std::memory_order_release);
} }
return QueueObserverFactory::make( return QueueObserverFactory::make(
"cpu." + getName(), taskQueue_->getNumPriorities()); "cpu." + getName(),
taskQueue_->getNumPriorities(),
threadIdCollector_.get());
}
std::unique_ptr<WorkerProvider> CPUThreadPoolExecutor::createWorkerProvider() {
return std::make_unique<ThreadIdCollector>();
} }
} // namespace folly } // namespace folly
...@@ -175,6 +175,8 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor { ...@@ -175,6 +175,8 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
protected: protected:
BlockingQueue<CPUTask>* getTaskQueue(); BlockingQueue<CPUTask>* getTaskQueue();
std::unique_ptr<WorkerProvider> createWorkerProvider();
std::unique_ptr<WorkerProvider> threadIdCollector_{createWorkerProvider()};
private: private:
void threadRun(ThreadPtr thread) override; void threadRun(ThreadPtr thread) override;
......
...@@ -28,6 +28,8 @@ make_queue_observer_factory_fallback( ...@@ -28,6 +28,8 @@ make_queue_observer_factory_fallback(
namespace folly { namespace folly {
WorkerProvider::KeepAlive::~KeepAlive() {}
/* static */ std::unique_ptr<QueueObserverFactory> QueueObserverFactory::make( /* static */ std::unique_ptr<QueueObserverFactory> QueueObserverFactory::make(
const std::string& context, const std::string& context,
size_t numPriorities, size_t numPriorities,
......
...@@ -289,7 +289,6 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor { ...@@ -289,7 +289,6 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
std::string namePrefix_; std::string namePrefix_;
const bool isWaitForAll_; // whether to wait till event base loop exits const bool isWaitForAll_; // whether to wait till event base loop exits
folly::Synchronized<std::unordered_set<pid_t>> osThreadIds_;
ThreadList threadList_; ThreadList threadList_;
SharedMutex threadListLock_; SharedMutex threadListLock_;
StoppedThreadQueue stoppedThreads_; StoppedThreadQueue stoppedThreads_;
......
...@@ -14,9 +14,11 @@ ...@@ -14,9 +14,11 @@
* limitations under the License. * limitations under the License.
*/ */
#include <folly/CPortability.h>
#include <folly/DefaultKeepAliveExecutor.h> #include <folly/DefaultKeepAliveExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h> #include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ThreadPoolExecutor.h> #include <folly/executors/ThreadPoolExecutor.h>
#include <folly/lang/Keep.h>
#include <atomic> #include <atomic>
#include <memory> #include <memory>
...@@ -44,6 +46,20 @@ static Func burnMs(uint64_t ms) { ...@@ -44,6 +46,20 @@ static Func burnMs(uint64_t ms) {
return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); }; return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
} }
static WorkerProvider* kWorkerProviderGlobal = nullptr;
namespace folly {
#if FOLLY_HAVE_WEAK_SYMBOLS
FOLLY_KEEP std::unique_ptr<QueueObserverFactory> make_queue_observer_factory(
const std::string&, size_t, WorkerProvider* workerProvider) {
kWorkerProviderGlobal = workerProvider;
return {};
}
#endif
} // namespace folly
template <class TPE> template <class TPE>
static void basic() { static void basic() {
// Create and destroy // Create and destroy
...@@ -924,27 +940,114 @@ TEST(ThreadPoolExecutorTest, AddPerf) { ...@@ -924,27 +940,114 @@ TEST(ThreadPoolExecutorTest, AddPerf) {
e.stop(); e.stop();
} }
class OSThreadIDTestExecutor : public CPUThreadPoolExecutor { class ExecutorWorkerProviderTest : public ::testing::Test {
public: protected:
explicit OSThreadIDTestExecutor(size_t n) : CPUThreadPoolExecutor(n) {} void SetUp() override { kWorkerProviderGlobal = nullptr; }
const std::unordered_set<pid_t>& getThreadIds() const { void TearDown() override { kWorkerProviderGlobal = nullptr; }
return *osThreadIds_.rlock();
}
}; };
TEST(ThreadPoolExecutorTest, OSThreadIDsCollectionTest) { TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBasicTest) {
OSThreadIDTestExecutor e(1); // Start 4 threads and have all of them work on a task.
const auto& idsList = e.getThreadIds(); // Then invoke the ThreadIdCollector::collectThreadIds()
std::atomic<uint64_t> flag{0}; // method to capture the set of active thread ids.
Baton<> b; boost::barrier barrier{5};
e.add([&]() { Synchronized<std::vector<pid_t>> expectedTids;
flag.exchange(getOSThreadID()); auto task = [&]() {
b.post(); expectedTids.wlock()->push_back(folly::getOSThreadID());
barrier.wait();
};
CPUThreadPoolExecutor e(4);
for (int i = 0; i < 4; ++i) {
e.add(task);
}
barrier.wait();
{
const auto threadIdsWithKA = kWorkerProviderGlobal->collectThreadIds();
const auto& ids = threadIdsWithKA.threadIds;
auto locked = expectedTids.rlock();
EXPECT_EQ(ids.size(), locked->size());
EXPECT_TRUE(std::is_permutation(ids.begin(), ids.end(), locked->begin()));
}
e.join();
}
TEST_F(ExecutorWorkerProviderTest, ThreadCollectorMultipleInvocationTest) {
// Run some tasks via the executor and invoke
// WorkerProvider::collectThreadIds() at least twice to make sure that there
// is no deadlock in repeated invocations.
CPUThreadPoolExecutor e(1);
e.add([&]() {});
{
auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
auto& ids1 = idsWithKA1.threadIds;
auto& ids2 = idsWithKA2.threadIds;
EXPECT_EQ(ids1.size(), 1);
EXPECT_EQ(ids1.size(), ids2.size());
EXPECT_EQ(ids1, ids2);
}
// Add some more threads and schedule tasks while the collector
// is capturing thread Ids.
std::array<folly::Baton<>, 4> bats;
{
auto idsWithKA1 = kWorkerProviderGlobal->collectThreadIds();
e.setNumThreads(4);
for (size_t i = 0; i < 4; ++i) {
e.add([i, &bats]() { bats[i].wait(); });
}
for (auto& bat : bats) {
bat.post();
}
auto idsWithKA2 = kWorkerProviderGlobal->collectThreadIds();
auto& ids1 = idsWithKA1.threadIds;
auto& ids2 = idsWithKA2.threadIds;
EXPECT_EQ(ids1.size(), 1);
EXPECT_EQ(ids2.size(), 4);
}
e.join();
}
TEST_F(ExecutorWorkerProviderTest, ThreadCollectorBlocksThreadExitTest) {
// We need to ensure that the collector's keep alive effectively
// blocks the executor's threads from exiting. This is done by verifying
// that a call to reduce the worker count via setNumThreads() does not
// actually reduce the workers (kills threads) while the keep alive is
// in scope.
constexpr size_t kNumThreads = 4;
std::array<folly::Baton<>, kNumThreads> bats;
CPUThreadPoolExecutor e(kNumThreads);
for (size_t i = 0; i < kNumThreads; ++i) {
e.add([i, &bats]() { bats[i].wait(); });
}
Baton<> baton;
Baton<> threadCountBaton;
auto bgCollector = std::thread([&]() {
{
auto idsWithKA = kWorkerProviderGlobal->collectThreadIds();
baton.post();
// Since this thread is holding the KeepAlive, it should block
// the main thread's `setNumThreads()` call which is trying to
// reduce the thread count of the executor. We verify that by
// checking that the baton isn't posted after a 100ms wait.
auto posted =
threadCountBaton.try_wait_for(std::chrono::milliseconds(100));
EXPECT_FALSE(posted);
auto& ids = idsWithKA.threadIds;
// The thread count should still be 4 since the collector's
// keep alive is active. To further verify that the threads are
EXPECT_EQ(ids.size(), kNumThreads);
}
}); });
b.wait(); baton.wait();
EXPECT_EQ(idsList.count(flag.load()), 1); for (auto& bat : bats) {
bat.post();
}
e.setNumThreads(2);
threadCountBaton.post();
bgCollector.join();
// The thread count should now be reduced to 2.
EXPECT_EQ(e.numThreads(), 2);
e.join(); e.join();
EXPECT_EQ(idsList.size(), 0);
} }
template <typename TPE> template <typename TPE>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment