Commit c3637eb8 authored by Sarang Masti's avatar Sarang Masti Committed by Jordan DeLong

Add futexTimedWait

Summary:
Add futexTimedWait to Futex which allows callers to wait on the futex
for a specified max duration.

Test Plan: -- Ran all unitests

Reviewed By: ngbronson@fb.com

FB internal diff: D1090115
parent 79062841
/*
* Copyright 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/detail/Futex.h>
namespace folly { namespace detail {
/* see Futex.h */
FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno) {
if (returnVal == 0) {
return FutexResult::AWOKEN;
}
switch(futexErrno) {
case ETIMEDOUT:
return FutexResult::TIMEDOUT;
case EINTR:
return FutexResult::INTERRUPTED;
case EWOULDBLOCK:
return FutexResult::VALUE_CHANGED;
default:
assert(false);
/* Shouldn't reach here. Just return one of the FutexResults */
return FutexResult::VALUE_CHANGED;
}
}
}}
......@@ -17,6 +17,7 @@
#pragma once
#include <atomic>
#include <chrono>
#include <limits>
#include <assert.h>
#include <errno.h>
......@@ -25,8 +26,22 @@
#include <unistd.h>
#include <boost/noncopyable.hpp>
using std::chrono::steady_clock;
using std::chrono::system_clock;
using std::chrono::time_point;
namespace folly { namespace detail {
enum class FutexResult {
VALUE_CHANGED, /* Futex value didn't match expected */
AWOKEN, /* futex wait matched with a futex wake */
INTERRUPTED, /* Spurious wake-up or signal caused futex wait failure */
TIMEDOUT
};
/* Converts return value and errno from a futex syscall to a FutexResult */
FutexResult futexErrnoToFutexResult(int returnVal, int futexErrno);
/**
* Futex is an atomic 32 bit unsigned integer that provides access to the
* futex() syscall on that value. It is templated in such a way that it
......@@ -46,25 +61,69 @@ struct Futex : Atom<uint32_t>, boost::noncopyable {
* other return (signal, this->load() != expected, or spurious wakeup). */
bool futexWait(uint32_t expected, uint32_t waitMask = -1);
/** Similar to futexWait but also accepts a timeout that gives the time until
* when the call can block (time is the absolute time i.e time since epoch).
* Allowed clock types: std::chrono::system_clock, std::chrono::steady_clock.
* Returns one of FutexResult values.
*
* NOTE: On some systems steady_clock is just an alias for system_clock,
* and is not actually steady.*/
template <class Clock, class Duration = typename Clock::duration>
FutexResult futexWaitUntil(uint32_t expected,
const time_point<Clock, Duration>& absTime,
uint32_t waitMask = -1);
/** Wakens up to count waiters where (waitMask & wakeMask) != 0,
* returning the number of awoken threads. */
int futexWake(int count = std::numeric_limits<int>::max(),
uint32_t wakeMask = -1);
private:
/** Futex wait implemented via syscall SYS_futex. absTimeout gives
* time till when the wait can block. If it is nullptr the call will
* block until a matching futex wake is received. extraOpFlags can be
* used to specify addtional flags to add to the futex operation (by
* default only FUTEX_WAIT_BITSET and FUTEX_PRIVATE_FLAG are included).
* Returns 0 on success or -1 on error, with errno set to one of the
* values listed in futex(2). */
int futexWaitImpl(uint32_t expected,
const struct timespec* absTimeout,
int extraOpFlags,
uint32_t waitMask);
};
template <>
inline bool Futex<std::atomic>::futexWait(uint32_t expected,
inline int
Futex<std::atomic>::futexWaitImpl(uint32_t expected,
const struct timespec* absTimeout,
int extraOpFlags,
uint32_t waitMask) {
assert(sizeof(*this) == sizeof(int));
int rv = syscall(SYS_futex,
/* Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
* value - http://locklessinc.com/articles/futex_cheat_sheet/ */
int rv = syscall(
SYS_futex,
this, /* addr1 */
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG, /* op */
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | extraOpFlags, /* op */
expected, /* val */
nullptr, /* timeout */
absTimeout, /* timeout */
nullptr, /* addr2 */
waitMask); /* val3 */
assert(rv == 0 || (errno == EWOULDBLOCK || errno == EINTR));
return rv == 0;
assert(rv == 0 ||
errno == EWOULDBLOCK ||
errno == EINTR ||
(absTimeout != nullptr && errno == ETIMEDOUT));
return rv;
}
template <>
inline bool Futex<std::atomic>::futexWait(uint32_t expected,
uint32_t waitMask) {
return futexWaitImpl(expected, nullptr, 0 /* extraOpFlags */, waitMask) == 0;
}
template <>
......@@ -81,4 +140,47 @@ inline int Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
return rv;
}
/* Convert std::chrono::time_point to struct timespec */
template <class Clock, class Duration = typename Clock::Duration>
struct timespec timePointToTimeSpec(const time_point<Clock, Duration>& tp) {
using std::chrono::nanoseconds;
using std::chrono::seconds;
using std::chrono::duration_cast;
struct timespec ts;
auto duration = tp.time_since_epoch();
auto secs = duration_cast<seconds>(duration);
auto nanos = duration_cast<nanoseconds>(duration - secs);
ts.tv_sec = secs.count();
ts.tv_nsec = nanos.count();
return ts;
}
template <template<typename> class Atom> template<class Clock, class Duration>
inline FutexResult
Futex<Atom>::futexWaitUntil(
uint32_t expected,
const time_point<Clock, Duration>& absTime,
uint32_t waitMask) {
static_assert(std::is_same<Clock,system_clock>::value ||
std::is_same<Clock,steady_clock>::value,
"Only std::system_clock or std::steady_clock supported");
struct timespec absTimeSpec = timePointToTimeSpec(absTime);
int extraOpFlags = 0;
/* We must use FUTEX_CLOCK_REALTIME flag if we are getting the time_point
* from the system clock (CLOCK_REALTIME). This check also works correctly for
* broken glibc in which steady_clock is a typedef to system_clock.*/
if (std::is_same<Clock,system_clock>::value) {
extraOpFlags = FUTEX_CLOCK_REALTIME;
} else {
assert(Clock::is_steady);
}
const int rv = futexWaitImpl(expected, &absTimeSpec, extraOpFlags, waitMask);
return futexErrnoToFutexResult(rv, errno);
}
}}
......@@ -134,6 +134,14 @@ DeterministicSchedule::afterSharedAccess() {
sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
}
int
DeterministicSchedule::getRandNumber(int n) {
if (tls_sched) {
return tls_sched->scheduler_(n);
}
return std::rand() % n;
}
sem_t*
DeterministicSchedule::beforeThreadCreate() {
sem_t* s = new sem_t;
......@@ -247,6 +255,49 @@ bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
return rv;
}
FutexResult futexWaitUntilImpl(Futex<DeterministicAtomic>* futex,
uint32_t expected, uint32_t waitMask) {
if (futex == nullptr) {
return FutexResult::VALUE_CHANGED;
}
bool rv = false;
int futexErrno = 0;
DeterministicSchedule::beforeSharedAccess();
futexLock.lock();
if (futex->data == expected) {
auto& queue = futexQueues[futex];
queue.push_back(std::make_pair(waitMask, &rv));
auto ours = queue.end();
ours--;
while (!rv) {
futexLock.unlock();
DeterministicSchedule::afterSharedAccess();
DeterministicSchedule::beforeSharedAccess();
futexLock.lock();
// Simulate spurious wake-ups, timeouts each time with
// a 10% probability
if (DeterministicSchedule::getRandNumber(100) < 10) {
queue.erase(ours);
if (queue.empty()) {
futexQueues.erase(futex);
}
rv = false;
// Simulate ETIMEDOUT 90% of the time and other failures
// remaining time
futexErrno =
DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR;
break;
}
}
}
futexLock.unlock();
DeterministicSchedule::afterSharedAccess();
return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno);
}
template<>
int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
int rv = 0;
......
......@@ -124,6 +124,10 @@ class DeterministicSchedule : boost::noncopyable {
/** Calls sem_wait(sem) as part of a deterministic schedule. */
static void wait(sem_t* sem);
/** Used scheduler_ to get a random number b/w [0, n). If tls_sched is
* not set-up it falls back to std::rand() */
static int getRandNumber(int n);
private:
static __thread sem_t* tls_sem;
static __thread DeterministicSchedule* tls_sched;
......@@ -274,6 +278,20 @@ template<>
bool Futex<test::DeterministicAtomic>::futexWait(uint32_t expected,
uint32_t waitMask);
/// This function ignores the time bound, and instead pseudo-randomly chooses
/// whether the timeout was reached. To do otherwise would not be deterministic.
FutexResult futexWaitUntilImpl(Futex<test::DeterministicAtomic> *futex,
uint32_t expected, uint32_t waitMask);
template<> template<class Clock, class Duration>
FutexResult
Futex<test::DeterministicAtomic>::futexWaitUntil(
uint32_t expected,
const time_point<Clock, Duration>& absTimeUnused,
uint32_t waitMask) {
return futexWaitUntilImpl(this, expected, waitMask);
}
template<>
int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
......
......@@ -17,13 +17,17 @@
#include "folly/detail/Futex.h"
#include "folly/test/DeterministicSchedule.h"
#include <chrono>
#include <thread>
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include <common/logging/logging.h>
#include <time.h>
using namespace folly::detail;
using namespace folly::test;
using namespace std::chrono;
typedef DeterministicSchedule DSched;
......@@ -45,14 +49,133 @@ void run_basic_tests() {
DSched::join(thr);
}
template<template<typename> class Atom>
void run_wait_until_tests();
template <typename Clock>
void stdAtomicWaitUntilTests() {
Futex<std::atomic> f(0);
auto thrA = DSched::thread([&]{
while (true) {
typename Clock::time_point nowPlus2s = Clock::now() + seconds(2);
auto res = f.futexWaitUntil(0, nowPlus2s);
EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
if (res == FutexResult::AWOKEN) {
break;
}
}
});
while (f.futexWake() != 1) {
std::this_thread::yield();
}
DSched::join(thrA);
auto start = Clock::now();
EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)),
FutexResult::TIMEDOUT);
LOG(INFO) << "Futex wait timed out after waiting for "
<< duration_cast<milliseconds>(Clock::now() - start).count()
<< "ms";
}
template <typename Clock>
void deterministicAtomicWaitUntilTests() {
Futex<DeterministicAtomic> f(0);
// Futex wait must eventually fail with either FutexResult::TIMEDOUT or
// FutexResult::INTERRUPTED
auto res = f.futexWaitUntil(0, Clock::now() + milliseconds(100));
EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED);
}
template <>
void run_wait_until_tests<std::atomic>() {
stdAtomicWaitUntilTests<system_clock>();
stdAtomicWaitUntilTests<steady_clock>();
}
template <>
void run_wait_until_tests<DeterministicAtomic>() {
deterministicAtomicWaitUntilTests<system_clock>();
deterministicAtomicWaitUntilTests<steady_clock>();
}
uint64_t diff(uint64_t a, uint64_t b) {
return a > b ? a - b : b - a;
}
void run_system_clock_test() {
/* Test to verify that system_clock uses clock_gettime(CLOCK_REALTIME, ...)
* for the time_points */
struct timespec ts;
const int maxIters = 1000;
int iter = 0;
uint64_t delta = 10000000 /* 10 ms */;
/** The following loop is only to make the test more robust in the presence of
* clock adjustments that can occur. We just run the loop maxIter times and
* expect with very high probability that there will be atleast one iteration
* of the test during which clock adjustments > delta have not occurred. */
while (iter < maxIters) {
uint64_t a = duration_cast<nanoseconds>(system_clock::now()
.time_since_epoch()).count();
clock_gettime(CLOCK_REALTIME, &ts);
uint64_t b = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
uint64_t c = duration_cast<nanoseconds>(system_clock::now()
.time_since_epoch()).count();
if (diff(a, b) <= delta &&
diff(b, c) <= delta &&
diff(a, c) <= 2 * delta) {
/* Success! system_clock uses CLOCK_REALTIME for time_points */
break;
}
iter++;
}
EXPECT_TRUE(iter < maxIters);
}
void run_steady_clock_test() {
/* Test to verify that steady_clock uses clock_gettime(CLOCK_MONOTONIC, ...)
* for the time_points */
EXPECT_TRUE(steady_clock::is_steady);
uint64_t A = duration_cast<nanoseconds>(steady_clock::now()
.time_since_epoch()).count();
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
uint64_t B = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
uint64_t C = duration_cast<nanoseconds>(steady_clock::now()
.time_since_epoch()).count();
EXPECT_TRUE(A <= B && B <= C);
}
TEST(Futex, clock_source) {
run_system_clock_test();
/* On some systems steady_clock is just an alias for system_clock. So,
* we must skip run_steady_clock_test if the two clocks are the same. */
if (!std::is_same<system_clock,steady_clock>::value) {
run_steady_clock_test();
}
}
TEST(Futex, basic_live) {
run_basic_tests<std::atomic>();
run_wait_until_tests<std::atomic>();
}
TEST(Futex, basic_deterministic) {
DSched sched(DSched::uniform(0));
run_basic_tests<DeterministicAtomic>();
run_wait_until_tests<DeterministicAtomic>();
}
int main(int argc, char ** argv) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment