Commit 8045a2a0 authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook GitHub Bot

Add Support for nested MasterPtr and Cleanup tasks

Summary:
Allow a T that derives from EnableMasterFromThis<T> to use masterLockFromThis() to get a non-owning shared_ptr to this and to use masterRefFromThis() to get a MasterPtrRef<> from this.

Adds MasterPtr::cleanup() that returns SemiFuture<Unit>. join() just does a blocking wait on the SemiFuture returned from cleanup().

Allows a T to provide a T::cleanup() method that will be composed into the MasterPtr::cleanup() work.

MasterPtr now uses SemiFuture<Unit> instead of Baton. This allows users of MasterPtr::cleanup() to compose cleanup work with other tasks.

Andrii suggested that the cleanup feature be extracted out of MasterPtr

Adds cleanup traits (that MasterPtr satisfies) and a Cleanup type that satisfies the cleanup traits and allows objects that are not heap-allocated to participate in structured concurrency by deriving from Cleanup.

Reviewed By: andriigrynenko

Differential Revision: D19584561

fbshipit-source-id: aa2d608effe613ec84b08f902a1c61561f3458bb
parent 4bce7f6d
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <mutex>
#include <folly/futures/Future.h>
#include <glog/logging.h>
namespace folly {
// Structured Async Cleanup
//
// Structured Async Cleanup - traits
//
namespace detail {
struct cleanup_fn {
template <
class T,
class R = decltype(std::declval<T>().cleanup()),
std::enable_if_t<std::is_same_v<R, folly::SemiFuture<folly::Unit>>, int> =
0>
R operator()(T&& t) const {
return ((T &&) t).cleanup();
}
};
} // namespace detail
template <class T>
constexpr bool is_cleanup_v = folly::is_invocable_v<detail::cleanup_fn, T>;
template <typename T>
using is_cleanup = std::bool_constant<is_cleanup_v<T>>;
// Structured Async Cleanup
//
// This implementation is a base class that collects a set of cleanup tasks
// and runs them in reverse order.
//
// A class derived from Cleanup
// - only allows cleanup to be run once
// - is required to complete cleanup before running the destructor
// - *should not* run cleanup tasks. Running the cleanup task should be
// delegated to the owner of the derived class
// - *should not* be owned by a shared_ptr. Cleanup is intended to remove
// shared ownership.
//
class Cleanup {
public:
Cleanup() : safe_to_destruct_(false), cleanup_(folly::makeSemiFuture()) {}
~Cleanup() {
if (!safe_to_destruct_) {
LOG(FATAL) << "Cleanup must complete before it is destructed.";
}
}
// Returns: a SemiFuture that, just like destructors, sequences the cleanup
// tasks added in reverse of the order they were added.
//
// calls to cleanup() do not mutate state. The returned SemiFuture, once
// it has been given an executor, does mutate state and must not overlap with
// any calls to addCleanup().
//
folly::SemiFuture<folly::Unit> cleanup() {
return folly::makeSemiFuture()
.deferValue([this](folly::Unit) {
if (!cleanup_.valid()) {
LOG(FATAL) << "cleanup already run - cleanup task invalid.";
}
return std::move(cleanup_);
})
.defer([this](folly::Try<folly::Unit> t) {
if (t.hasException()) {
LOG(FATAL) << "Cleanup actions must be noexcept.";
}
this->safe_to_destruct_ = true;
});
}
protected:
// includes the provided SemiFuture under the scope of this.
//
// when the cleanup() for this started it will get this SemiFuture first.
//
// order matters, just like destructors, cleanup tasks will be run in reverse
// of the order they were added.
//
// all gets will use the Executor provided to the SemiFuture returned by
// cleanup()
//
// calls to addCleanup() must not overlap with each other and must not overlap
// with a running SemiFuture returned from addCleanup().
//
void addCleanup(folly::SemiFuture<folly::Unit> c) {
if (!cleanup_.valid()) {
LOG(FATAL)
<< "Cleanup::addCleanup must not be called after Cleanup::cleanup.";
}
cleanup_ = std::move(c).deferValue(
[nested = std::move(cleanup_)](folly::Unit) mutable {
return std::move(nested);
});
}
// includes the provided model of Cleanup under the scope of this
//
// when the cleanup() for this started it will cleanup this first.
//
// order matters, just like destructors, cleanup tasks will be run in reverse
// of the order they were added.
//
// all gets will use the Executor provided to the SemiFuture returned by
// cleanup()
//
// calls to addCleanup() must not overlap with each other and must not overlap
// with a running SemiFuture returned from addCleanup().
//
template <
class OtherCleanup,
std::enable_if_t<is_cleanup_v<OtherCleanup>, int> = 0>
void addCleanup(OtherCleanup&& c) {
addCleanup(((OtherCleanup &&) c).cleanup());
}
private:
bool safe_to_destruct_;
folly::SemiFuture<folly::Unit> cleanup_;
};
} // namespace folly
This diff is collapsed.
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/Cleanup.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/portability/GTest.h>
using namespace std::literals::chrono_literals;
class Cleaned : public folly::Cleanup {
folly::CPUThreadPoolExecutor pool_;
public:
Cleaned() : pool_(4) {
addCleanup(
folly::makeSemiFuture().defer([this](auto&&) { this->pool_.join(); }));
}
using folly::Cleanup::addCleanup;
};
TEST(CleanupTest, Basic) {
EXPECT_TRUE(folly::is_cleanup_v<Cleaned>);
Cleaned cleaned;
int phase = 0;
int index = 0;
cleaned.addCleanup(
folly::makeSemiFuture().deferValue([&, expected = index++](folly::Unit) {
EXPECT_EQ(phase, 1);
EXPECT_EQ(--index, expected);
}));
cleaned.addCleanup(
folly::makeSemiFuture().deferValue([&, expected = index++](folly::Unit) {
EXPECT_EQ(phase, 1);
EXPECT_EQ(--index, expected);
}));
EXPECT_EQ(index, 2);
folly::ManualExecutor exec;
phase = 1;
cleaned.cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec);
phase = 2;
EXPECT_EQ(index, 0);
}
TEST(CleanupTest, Errors) {
auto cleaned = std::make_unique<Cleaned>();
cleaned->addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); }));
cleaned->addCleanup(
folly::makeSemiFuture<folly::Unit>(std::runtime_error("failed cleanup")));
folly::ManualExecutor exec;
EXPECT_EXIT(
cleaned->cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec),
testing::KilledBySignal(SIGABRT),
".*noexcept.*");
EXPECT_EXIT(
cleaned.reset(), testing::KilledBySignal(SIGABRT), ".*destructed.*");
// must leak the Cleaned as its destructor will abort.
(void)cleaned.release();
}
TEST(CleanupTest, Invariants) {
Cleaned cleaned;
auto ranCleanup = false;
cleaned.addCleanup(folly::makeSemiFuture().deferValue(
[&](folly::Unit) { ranCleanup = true; }));
EXPECT_FALSE(ranCleanup);
{
folly::ManualExecutor exec;
cleaned.cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec);
}
EXPECT_TRUE(ranCleanup);
EXPECT_EXIT(
cleaned.addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); })),
testing::KilledBySignal(SIGABRT),
".*addCleanup.*");
{
folly::ManualExecutor exec;
EXPECT_EXIT(
cleaned.cleanup().via(folly::getKeepAliveToken(exec)).getVia(&exec),
testing::KilledBySignal(SIGABRT),
".*already.*");
}
}
...@@ -16,16 +16,23 @@ ...@@ -16,16 +16,23 @@
#include <future> #include <future>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/executors/ManualExecutor.h>
#include <folly/experimental/MasterPtr.h> #include <folly/experimental/MasterPtr.h>
#include <folly/portability/GTest.h> #include <folly/portability/GTest.h>
#include <folly/synchronization/Baton.h> #include <folly/synchronization/Baton.h>
using namespace std::literals::chrono_literals;
TEST(MasterPtrTest, Basic) { TEST(MasterPtrTest, Basic) {
EXPECT_TRUE(folly::is_cleanup_v<folly::MasterPtr<int>>);
auto ptr = std::make_unique<int>(42); auto ptr = std::make_unique<int>(42);
auto rawPtr = ptr.get(); auto rawPtr = ptr.get();
folly::MasterPtr<int> masterPtr(std::move(ptr)); folly::MasterPtr<int> masterPtr(std::move(ptr));
auto masterPtrRef = masterPtr.ref(); auto masterPtrRef = masterPtr.ref();
EXPECT_TRUE(!!masterPtr);
auto lockedPtr1 = masterPtr.lock(); auto lockedPtr1 = masterPtr.lock();
auto lockedPtr2 = masterPtrRef.lock(); auto lockedPtr2 = masterPtrRef.lock();
...@@ -35,7 +42,12 @@ TEST(MasterPtrTest, Basic) { ...@@ -35,7 +42,12 @@ TEST(MasterPtrTest, Basic) {
EXPECT_EQ(lockedPtr1.use_count(), 3); EXPECT_EQ(lockedPtr1.use_count(), 3);
EXPECT_EQ(lockedPtr2.use_count(), 3); EXPECT_EQ(lockedPtr2.use_count(), 3);
auto joinFuture = std::async(std::launch::async, [&] { masterPtr.join(); }); EXPECT_TRUE(!!masterPtr);
auto joinFuture = std::async(std::launch::async, [&] {
masterPtr.join();
EXPECT_TRUE(!masterPtr);
});
auto lockFailFuture = std::async(std::launch::async, [&] { auto lockFailFuture = std::async(std::launch::async, [&] {
while (masterPtr.lock()) { while (masterPtr.lock()) {
...@@ -63,14 +75,145 @@ TEST(MasterPtrTest, Basic) { ...@@ -63,14 +75,145 @@ TEST(MasterPtrTest, Basic) {
EXPECT_EQ( EXPECT_EQ(
joinFuture.wait_for(std::chrono::milliseconds{100}), joinFuture.wait_for(std::chrono::milliseconds{100}),
std::future_status::ready); std::future_status::ready);
EXPECT_TRUE(!masterPtr);
ptr = std::make_unique<int>(42);
rawPtr = ptr.get();
masterPtr.set(std::move(ptr));
EXPECT_TRUE(!!masterPtr);
lockedPtr1 = masterPtr.lock();
EXPECT_EQ(lockedPtr1.get(), rawPtr);
lockedPtr1.reset();
masterPtr.join();
EXPECT_EQ(masterPtr.lock().get(), nullptr);
EXPECT_TRUE(!masterPtr);
} }
struct Mastered : folly::EnableMasterFromThis<Mastered> { struct Mastered : folly::Cleanup, folly::EnableMasterFromThis<Mastered> {
folly::MasterPtr<int> nested_;
folly::CPUThreadPoolExecutor pool_;
Mastered() : nested_(std::make_unique<int>(42)), pool_(4) {
addCleanup(nested_);
addCleanup(
folly::makeSemiFuture().defer([this](auto&&) { this->pool_.join(); }));
}
using folly::Cleanup::addCleanup;
std::shared_ptr<Mastered> get_shared() { std::shared_ptr<Mastered> get_shared() {
return masterLockFromThis(); return masterLockFromThis();
} }
}; };
TEST(MasterPtrTest, BasicCleanup) {
auto ptr = std::make_unique<Mastered>();
folly::MasterPtr<Mastered> masterPtr(std::move(ptr));
int phase = 0;
int index = 0;
masterPtr.lock()->addCleanup(
folly::makeSemiFuture().deferValue([&, expected = index++](folly::Unit) {
EXPECT_EQ(phase, 1);
EXPECT_EQ(--index, expected);
}));
masterPtr.lock()->addCleanup(
folly::makeSemiFuture().deferValue([&, expected = index++](folly::Unit) {
EXPECT_EQ(phase, 1);
EXPECT_EQ(--index, expected);
}));
EXPECT_EQ(index, 2);
folly::ManualExecutor exec;
phase = 1;
masterPtr.cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec);
phase = 2;
EXPECT_EQ(index, 0);
}
#if defined(__has_feature)
#if !__has_feature(address_sanitizer)
TEST(MasterPtrTest, Errors) {
auto ptr = std::make_unique<Mastered>();
auto masterPtr = std::make_unique<folly::MasterPtr<Mastered>>(std::move(ptr));
masterPtr->lock()->addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); }));
masterPtr->lock()->addCleanup(
folly::makeSemiFuture<folly::Unit>(std::runtime_error("failed cleanup")));
EXPECT_EXIT(
masterPtr->set(std::unique_ptr<Mastered>{}),
testing::KilledBySignal(SIGABRT),
".*joined before.*");
folly::ManualExecutor exec;
EXPECT_EXIT(
masterPtr->cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec),
testing::KilledBySignal(SIGABRT),
".*noexcept.*");
EXPECT_EXIT(
masterPtr.reset(), testing::KilledBySignal(SIGABRT), ".*MasterPtr.*");
// must leak the MasterPtr as its destructor will abort.
(void)masterPtr.release();
}
#endif
#endif
TEST(MasterPtrTest, Invariants) {
struct BadDerived : Mastered {
~BadDerived() {
EXPECT_EXIT(
addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); })),
testing::KilledBySignal(SIGABRT),
".*addCleanup.*");
EXPECT_EXIT(
addCleanup(folly::makeSemiFuture().deferValue(
[](folly::Unit) { EXPECT_TRUE(false); })),
testing::KilledBySignal(SIGABRT),
".*addCleanup.*");
}
};
auto ptr = std::make_unique<BadDerived>();
folly::MasterPtr<Mastered> masterPtr(std::move(ptr));
auto ranCleanup = false;
masterPtr.lock()->addCleanup(folly::makeSemiFuture().deferValue(
[&](folly::Unit) { ranCleanup = true; }));
EXPECT_FALSE(ranCleanup);
{
folly::ManualExecutor exec;
masterPtr.cleanup()
.within(1s)
.via(folly::getKeepAliveToken(exec))
.getVia(&exec);
}
EXPECT_TRUE(ranCleanup);
{
folly::ManualExecutor exec;
EXPECT_EXIT(
masterPtr.cleanup().via(folly::getKeepAliveToken(exec)).getVia(&exec),
testing::KilledBySignal(SIGABRT),
".*already.*");
}
}
struct Derived : Mastered {}; struct Derived : Mastered {};
TEST(MasterPtrTest, EnableMasterFromThis) { TEST(MasterPtrTest, EnableMasterFromThis) {
...@@ -92,17 +235,18 @@ TEST(MasterPtrTest, EnableMasterFromThis) { ...@@ -92,17 +235,18 @@ TEST(MasterPtrTest, EnableMasterFromThis) {
EXPECT_EQ(lockedPtr3.use_count(), 4); EXPECT_EQ(lockedPtr3.use_count(), 4);
EXPECT_EQ(lockedPtr3.get(), rawPtr); EXPECT_EQ(lockedPtr3.get(), rawPtr);
auto joinFuture = std::async(std::launch::async, [&] { masterPtr.join(); }); auto cleanupFuture = std::async(std::launch::async, [&] {
folly::ManualExecutor exec;
auto lockFailFuture = std::async(std::launch::async, [&] { masterPtr.cleanup()
while (masterPtr.lock()) { .within(1s)
std::this_thread::yield(); .via(folly::getKeepAliveToken(exec))
} .getVia(&exec);
EXPECT_TRUE(!masterPtr);
}); });
EXPECT_EQ( EXPECT_EQ(
lockFailFuture.wait_for(std::chrono::milliseconds{100}), cleanupFuture.wait_for(std::chrono::milliseconds{100}),
std::future_status::ready); std::future_status::timeout);
EXPECT_EQ(lockedPtr1.use_count(), 3); EXPECT_EQ(lockedPtr1.use_count(), 3);
EXPECT_EQ(lockedPtr2.use_count(), 3); EXPECT_EQ(lockedPtr2.use_count(), 3);
...@@ -112,7 +256,7 @@ TEST(MasterPtrTest, EnableMasterFromThis) { ...@@ -112,7 +256,7 @@ TEST(MasterPtrTest, EnableMasterFromThis) {
EXPECT_EQ(masterPtrRef.lock().get(), nullptr); EXPECT_EQ(masterPtrRef.lock().get(), nullptr);
EXPECT_EQ( EXPECT_EQ(
joinFuture.wait_for(std::chrono::milliseconds{100}), cleanupFuture.wait_for(std::chrono::milliseconds{100}),
std::future_status::timeout); std::future_status::timeout);
lockedPtr1.reset(); lockedPtr1.reset();
...@@ -120,6 +264,8 @@ TEST(MasterPtrTest, EnableMasterFromThis) { ...@@ -120,6 +264,8 @@ TEST(MasterPtrTest, EnableMasterFromThis) {
lockedPtr3.reset(); lockedPtr3.reset();
EXPECT_EQ( EXPECT_EQ(
joinFuture.wait_for(std::chrono::milliseconds{100}), cleanupFuture.wait_for(std::chrono::milliseconds{100}),
std::future_status::ready); std::future_status::ready);
EXPECT_TRUE(!masterPtr);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment