Commit 8e16a2eb authored by Mainak Mandal's avatar Mainak Mandal Committed by Facebook Github Bot

fix memory leak in case of large number of retries

Summary: Infinite retries is something that is often needed for read-modify-write like workflows. The current implementation was creating a nested chain of implicit promises. This manifests as a memory leak after some time. Worse yet, even if it succeeds, it will take a long time to churn thru the chain of promises.

Reviewed By: yfeldblum

Differential Revision: D4770335

fbshipit-source-id: 44b8d6df1084de4514b66919a9838cf2322d6dce
parent f1a4f400
......@@ -176,6 +176,7 @@ nobase_follyinclude_HEADERS = \
futures/detail/Core.h \
futures/detail/FSM.h \
futures/detail/Types.h \
futures/test/TestExecutor.h \
gen/Base.h \
gen/Base-inl.h \
gen/Combine.h \
......@@ -434,6 +435,7 @@ libfolly_la_SOURCES = \
futures/ManualExecutor.cpp \
futures/QueuedImmediateExecutor.cpp \
futures/ThreadWheelTimekeeper.cpp \
futures/test/TestExecutor.cpp \
detail/Futex.cpp \
detail/StaticSingletonManager.cpp \
detail/ThreadLocalDetail.cpp \
......
......@@ -1229,23 +1229,49 @@ struct retrying_policy_traits {
is_fut::value, retrying_policy_fut_tag, void>::type>::type;
};
template <class Policy, class FF, class Prom>
void retryingImpl(size_t k, Policy&& p, FF&& ff, Prom prom) {
using F = typename std::result_of<FF(size_t)>::type;
using T = typename F::value_type;
auto f = ff(k++);
f.then([
k,
prom = std::move(prom),
pm = std::forward<Policy>(p),
ffm = std::forward<FF>(ff)
](Try<T> && t) mutable {
if (t.hasValue()) {
prom.setValue(std::move(t).value());
return;
}
auto& x = t.exception();
auto q = pm(k, x);
q.then([
k,
prom = std::move(prom),
xm = std::move(x),
pm = std::move(pm),
ffm = std::move(ffm)
](bool shouldRetry) mutable {
if (shouldRetry) {
retryingImpl(k, std::move(pm), std::move(ffm), std::move(prom));
} else {
prom.setException(std::move(xm));
};
});
});
}
template <class Policy, class FF>
typename std::result_of<FF(size_t)>::type
retrying(size_t k, Policy&& p, FF&& ff) {
using F = typename std::result_of<FF(size_t)>::type;
using T = typename F::value_type;
auto f = ff(k++);
return f.onError(
[ k, pm = std::forward<Policy>(p), ffm = std::forward<FF>(ff) ](
exception_wrapper x) mutable {
auto q = pm(k, x);
return q.then(
[ k, xm = std::move(x), pm = std::move(pm), ffm = std::move(ffm) ](
bool r) mutable {
return r ? retrying(k, std::move(pm), std::move(ffm))
: makeFuture<T>(std::move(xm));
});
});
auto prom = Promise<T>();
auto f = prom.getFuture();
retryingImpl(
k, std::forward<Policy>(p), std::forward<FF>(ff), std::move(prom));
return f;
}
template <class Policy, class FF>
......
......@@ -363,6 +363,9 @@ namespace futures {
* indicating that the failure was transitory.
*
* Cancellation is not supported.
*
* If both FF and Policy inline executes, then it is possible to hit a stack
* overflow due to the recursive nature of the retry implementation
*/
template <class Policy, class FF>
typename std::result_of<FF(size_t)>::type
......
......@@ -20,6 +20,8 @@
#include <folly/futures/Future.h>
#include <folly/portability/GTest.h>
#include <folly/portability/SysResource.h>
#include "TestExecutor.h"
using namespace std;
using namespace std::chrono;
......@@ -137,6 +139,45 @@ TEST(RetryingTest, policy_sleep_defaults) {
});
}
TEST(RetryingTest, large_retries) {
rlimit oldMemLimit;
PCHECK(getrlimit(RLIMIT_AS, &oldMemLimit) == 0);
rlimit newMemLimit;
newMemLimit.rlim_cur = std::min(1UL << 30, oldMemLimit.rlim_max);
newMemLimit.rlim_max = oldMemLimit.rlim_max;
PCHECK(setrlimit(RLIMIT_AS, &newMemLimit) == 0);
SCOPE_EXIT {
PCHECK(setrlimit(RLIMIT_AS, &oldMemLimit) == 0);
};
TestExecutor executor;
// size of implicit promise is at least the size of the return.
using LargeReturn = array<uint64_t, 16000>;
auto func = [&executor](size_t retryNum) -> Future<LargeReturn> {
return via(&executor).then([retryNum] {
return retryNum < 10000
? makeFuture<LargeReturn>(
make_exception_wrapper<std::runtime_error>("keep trying"))
: makeFuture<LargeReturn>(LargeReturn());
});
};
vector<Future<LargeReturn>> futures;
for (auto idx = 0; idx < 40; ++idx) {
futures.emplace_back(futures::retrying(
[&executor](size_t, const exception_wrapper&) {
return via(&executor).then([] { return true; });
},
func));
}
for (auto& f : futures) {
f.wait();
EXPECT_TRUE(f.hasValue());
}
}
/*
TEST(RetryingTest, policy_sleep_cancel) {
multiAttemptExpectDurationWithin(5, milliseconds(0), milliseconds(10), []{
......
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TestExecutor.h"
using namespace std;
namespace folly {
TestExecutor::TestExecutor() {
const auto kWorkers = std::max(1U, thread::hardware_concurrency());
for (auto idx = 0U; idx < kWorkers; ++idx) {
workers_.emplace_back([this] {
while (true) {
Func work;
{
unique_lock<mutex> lk(m_);
cv_.wait(lk, [this] { return !workItems_.empty(); });
work = std::move(workItems_.front());
workItems_.pop();
}
if (!work) {
break;
}
work();
}
});
}
}
TestExecutor::~TestExecutor() {
for (auto& worker : workers_) {
addImpl({});
}
for (auto& worker : workers_) {
worker.join();
}
}
void TestExecutor::add(Func f) {
if (f) {
addImpl(std::move(f));
}
}
uint32_t TestExecutor::numThreads() const {
return workers_.size();
}
void TestExecutor::addImpl(Func f) {
{
lock_guard<mutex> g(m_);
workItems_.push(std::move(f));
}
cv_.notify_one();
}
} // folly
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <condition_variable>
#include <queue>
#include <thread>
#include <folly/Executor.h>
namespace folly {
/**
* A simple multithreaded executor for use in tests etc
*/
class TestExecutor : public Executor {
public:
TestExecutor();
~TestExecutor() override;
void add(Func f) override;
uint32_t numThreads() const;
private:
void addImpl(Func f);
std::mutex m_;
std::queue<Func> workItems_;
std::condition_variable cv_;
std::vector<std::thread> workers_;
};
} // folly
/*
* Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/portability/GTest.h>
#include "TestExecutor.h"
using namespace std;
using namespace std::chrono;
using namespace folly;
TEST(TestExecutor, parallel_run) {
mutex m;
set<thread::id> ids;
auto executor = std::make_unique<TestExecutor>();
const auto numThreads = executor->numThreads();
for (auto idx = 0U; idx < numThreads * 10; ++idx) {
executor->add([&m, &ids]() mutable {
/* sleep override */ this_thread::sleep_for(milliseconds(100));
lock_guard<mutex> lg(m);
ids.insert(this_thread::get_id());
});
}
executor = nullptr;
EXPECT_EQ(ids.size(), numThreads);
}
......@@ -280,6 +280,7 @@ futures_test_SOURCES = \
../futures/test/RetryingTest.cpp \
../futures/test/SelfDestructTest.cpp \
../futures/test/SharedPromiseTest.cpp \
../futures/test/TestExecutorTest.cpp \
../futures/test/ThenCompileTest.cpp \
../futures/test/ThenTest.cpp \
../futures/test/TimekeeperTest.cpp \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment