Commit 78cfa5e5 authored by Maged Michael's avatar Maged Michael Committed by Facebook Github Bot

hazptr: Use executor for default domain asynchronous reclamation

Summary:
Add support for using an executor for asynchronous reclamation.

This enables doing asynchronous reclamation on a dedicated executor, which serves two purposes:
(1) Avoid unpredictable latencies for regular thread operations (that happen to trigger synchronous reclamation).
(2) Avoid deadlock that may arise from regular threads running deleters that may require resources held by such threads.

By default the QueuedImmediateExecutor is used.
A member function set_executor allows the use of different executors.
The intent is to use a thread pool executor.
In order to avoid cyclical dependency, subsequent diffs (1) add in a separate library a function make the default hazptr domain use a thread pool executor and (2) add a call to such function to folly/init.

Reviewed By: davidtgoldblatt

Differential Revision: D19107049

fbshipit-source-id: 540e9d3aea786a8ece7958d6c09b9429e2a2f4a5
parent 66a8324a
......@@ -16,6 +16,8 @@
#pragma once
#include <folly/portability/GFlags.h>
#include <atomic>
#include <memory>
......@@ -31,6 +33,8 @@
#define FOLLY_HAZPTR_THR_LOCAL true
#endif
DECLARE_bool(folly_hazptr_use_executor);
namespace folly {
///
......
......@@ -16,8 +16,15 @@
#include <folly/synchronization/Hazptr.h>
#include <folly/portability/GFlags.h>
#include <atomic>
DEFINE_bool(
folly_hazptr_use_executor,
true,
"Use an executor for hazptr asynchronous reclamation");
namespace folly {
FOLLY_STATIC_CTOR_PRIORITY_MAX hazptr_domain<std::atomic> default_domain;
......
......@@ -22,6 +22,7 @@
#include <folly/synchronization/HazptrThrLocal.h>
#include <folly/Portability.h>
#include <folly/executors/QueuedImmediateExecutor.h>
#include <folly/synchronization/AsymmetricMemoryBarrier.h>
#include <atomic>
......@@ -107,12 +108,17 @@ class hazptr_domain {
using ObjList = hazptr_obj_list<Atom>;
using RetiredList = hazptr_obj_retired_list<Atom>;
using Set = std::unordered_set<const void*>;
using ExecFn = folly::Executor* (*)();
static constexpr int kThreshold = detail::hazptr_domain_rcount_threshold();
static constexpr int kMultiplier = 2;
static constexpr uint64_t kSyncTimePeriod{2000000000}; // nanoseconds
static constexpr uintptr_t kTagBit = hazptr_obj<Atom>::kTagBit;
static folly::Executor* get_default_executor() {
return &folly::QueuedImmediateExecutor::instance();
}
Atom<hazptr_rec<Atom>*> hazptrs_{nullptr};
Atom<hazptr_obj<Atom>*> retired_{nullptr};
Atom<uint64_t> sync_time_{0};
......@@ -123,13 +129,14 @@ class hazptr_domain {
Atom<int> rcount_{0};
Atom<uint16_t> num_bulk_reclaims_{0};
bool shutdown_{false};
RetiredList untagged_;
RetiredList tagged_;
Obj* unprotected_; // List of unprotected objects being reclaimed
ObjList children_; // Children of unprotected objects being reclaimed
Atom<uint64_t> tagged_sync_time_{0};
Atom<uint64_t> untagged_sync_time_{0};
Atom<ExecFn> exec_fn_{nullptr};
Atom<int> exec_backlog_{0};
public:
/** Constructor */
......@@ -148,6 +155,14 @@ class hazptr_domain {
hazptr_domain& operator=(const hazptr_domain&) = delete;
hazptr_domain& operator=(hazptr_domain&&) = delete;
void set_executor(ExecFn exfn) {
exec_fn_.store(exfn, std::memory_order_release);
}
void clear_executor() {
exec_fn_.store(nullptr, std::memory_order_release);
}
/** retire - nonintrusive - allocates memory */
template <typename T, typename D = std::default_delete<T>>
void retire(T* obj, D reclaim = {}) {
......@@ -287,9 +302,15 @@ class hazptr_domain {
if (!(lock && rlist.check_lock()) &&
(rlist.check_threshold_try_zero_count(threshold()) ||
check_sync_time(sync_time))) {
if (std::is_same<Atom<int>, std::atomic<int>>{} &&
this == &default_hazptr_domain<Atom>() &&
FLAGS_folly_hazptr_use_executor) {
invoke_reclamation_in_executor();
} else {
do_reclamation(rlist, lock);
}
}
}
/** check_sync_time_and_reclaim **/
void check_sync_time_and_reclaim() {
......@@ -302,6 +323,13 @@ class hazptr_domain {
/** do_reclamation */
void do_reclamation(RetiredList& rlist, bool lock) {
auto obj = rlist.pop_all(lock == RetiredList::kAlsoLock);
if (!obj) {
if (lock) {
ObjList l;
rlist.push_unlock(l);
}
return;
}
/*** Full fence ***/ asymmetricHeavyBarrier(AMBFlags::EXPEDITED);
auto hprec = hazptrs_.load(std::memory_order_acquire);
/* Read hazard pointer values into private search structure */
......@@ -583,6 +611,36 @@ class hazptr_domain {
hcount_.fetch_add(1);
return rec;
}
void invoke_reclamation_in_executor() {
auto fn = exec_fn_.load(std::memory_order_acquire);
auto ex = fn ? fn() : get_default_executor();
auto backlog = exec_backlog_.fetch_add(1, std::memory_order_relaxed);
if (ex) {
ex->add([this] {
exec_backlog_.store(0, std::memory_order_relaxed);
reclamation_by_executor();
});
} else {
LOG(INFO) << "Skip asynchronous reclamation by hazptr executor";
}
if (backlog >= 10) {
LOG(WARNING) << backlog
<< " request backlog for hazptr reclamation executora";
}
}
/** reclamation_by_executor */
void reclamation_by_executor() {
uint64_t time = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count() +
kSyncTimePeriod;
tagged_sync_time_.store(time, std::memory_order_relaxed);
untagged_sync_time_.store(time, std::memory_order_relaxed);
do_reclamation(tagged_, RetiredList::kAlsoLock);
do_reclamation(untagged_, RetiredList::kDontLock);
}
}; // hazptr_domain
/**
......
......@@ -20,6 +20,8 @@
#include <folly/synchronization/example/HazptrWideCAS.h>
#include <folly/synchronization/test/Barrier.h>
#include <folly/Singleton.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/portability/GFlags.h>
#include <folly/portability/GTest.h>
#include <folly/test/DeterministicSchedule.h>
......@@ -897,6 +899,49 @@ void recursive_destruction_test() {
ASSERT_EQ(c_.dtors(), total);
}
struct TPETag {};
folly::Singleton<folly::CPUThreadPoolExecutor, TPETag> cputpe_([] {
return new folly::CPUThreadPoolExecutor(1);
});
folly::Executor* get_cputpe() {
auto ex = cputpe_.try_get();
return ex ? ex.get() : nullptr;
}
void fork_test() {
folly::default_hazptr_domain().set_executor(&get_cputpe);
auto trigger_reclamation = [] {
hazptr_obj_batch b;
for (int i = 0; i < 2001; ++i) {
auto p = new Node;
p->set_batch_no_tag(&b);
p->retire();
}
};
std::thread t1(trigger_reclamation);
t1.join();
folly::SingletonVault::singleton()->destroyInstances();
auto pid = fork();
folly::SingletonVault::singleton()->reenableInstances();
if (pid > 0) {
// parent
int status = -1;
auto pid2 = waitpid(pid, &status, 0);
EXPECT_EQ(status, 0);
EXPECT_EQ(pid, pid2);
trigger_reclamation();
} else if (pid == 0) {
// child
c_.clear();
std::thread t2(trigger_reclamation);
t2.join();
exit(0); // Do not print gtest results
} else {
PLOG(FATAL) << "Failed to fork()";
}
}
template <template <typename> class Atom = std::atomic>
void lifo_test() {
for (int i = 0; i < FLAGS_num_reps; ++i) {
......@@ -1169,6 +1214,10 @@ TEST(HazptrTest, dsched_recursive_destruction) {
recursive_destruction_test<DeterministicAtomic>();
}
TEST(HazptrTest, fork) {
fork_test();
}
TEST(HazptrTest, lifo) {
lifo_test();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment