Commit 1219e494 authored by Yedidya Feldblum's avatar Yedidya Feldblum Committed by Andrew Cox

EventBase::runInEventLoopThreadAndWait.

Summary:
[Folly] EventBase::runInEventLoopThreadAndWait.

Useful for when some code needs to be run in the event loop thread, but another thread needs to trigger the code and then wait for it to be done.

Test Plan:
Unit tests:
* `folly/io/async/test/EventBaseTest.cpp`

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, folly-diffs@, brettp, dougw

FB internal diff: D1810764

Signature: t1:1810764:1422900654:7ff0aa7feb2792266f620b344cf8a1110a09f7ef
parent 440b7da3
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <folly/io/async/EventBase.h> #include <folly/io/async/EventBase.h>
#include <folly/Baton.h>
#include <folly/ThreadName.h> #include <folly/ThreadName.h>
#include <folly/io/async/NotificationQueue.h> #include <folly/io/async/NotificationQueue.h>
...@@ -562,6 +563,40 @@ bool EventBase::runInEventBaseThread(const Cob& fn) { ...@@ -562,6 +563,40 @@ bool EventBase::runInEventBaseThread(const Cob& fn) {
return true; return true;
} }
bool EventBase::runInEventBaseThreadAndWait(void (*fn)(void*), void* arg) {
if (inRunningEventBaseThread()) {
LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed";
return false;
}
Baton<> ready;
runInEventBaseThread([&] {
fn(arg);
ready.post();
});
ready.wait();
return true;
}
bool EventBase::runInEventBaseThreadAndWait(const Cob& fn) {
if (inRunningEventBaseThread()) {
LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
<< "allowed";
return false;
}
Baton<> ready;
runInEventBaseThread([&] {
fn();
ready.post();
});
ready.wait();
return true;
}
bool EventBase::runAfterDelay(const Cob& cob, bool EventBase::runAfterDelay(const Cob& cob,
int milliseconds, int milliseconds,
TimeoutManager::InternalEnum in) { TimeoutManager::InternalEnum in) {
......
...@@ -346,6 +346,28 @@ class EventBase : private boost::noncopyable, ...@@ -346,6 +346,28 @@ class EventBase : private boost::noncopyable,
*/ */
bool runInEventBaseThread(const Cob& fn); bool runInEventBaseThread(const Cob& fn);
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
template<typename T>
bool runInEventBaseThreadAndWait(void (*fn)(T*), T* arg) {
return runInEventBaseThreadAndWait(reinterpret_cast<void (*)(void*)>(fn),
reinterpret_cast<void*>(arg));
}
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
bool runInEventBaseThreadAndWait(void (*fn)(void*), void* arg);
/*
* Like runInEventBaseThread, but the caller waits for the callback to be
* executed.
*/
bool runInEventBaseThreadAndWait(const Cob& fn);
/** /**
* Runs the given Cob at some time after the specified number of * Runs the given Cob at some time after the specified number of
* milliseconds. (No guarantees exactly when.) * milliseconds. (No guarantees exactly when.)
......
...@@ -22,14 +22,17 @@ ...@@ -22,14 +22,17 @@
#include <folly/io/async/test/SocketPair.h> #include <folly/io/async/test/SocketPair.h>
#include <folly/io/async/test/Util.h> #include <folly/io/async/test/Util.h>
#include <atomic>
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <memory> #include <memory>
#include <thread> #include <thread>
using std::atomic;
using std::deque; using std::deque;
using std::pair; using std::pair;
using std::vector; using std::vector;
using std::thread;
using std::make_pair; using std::make_pair;
using std::cerr; using std::cerr;
using std::endl; using std::endl;
...@@ -1171,6 +1174,45 @@ TEST(EventBaseTest, RunInThread) { ...@@ -1171,6 +1174,45 @@ TEST(EventBaseTest, RunInThread) {
} }
} }
// This test simulates some calls, and verifies that the waiting happens by
// triggering what otherwise would be race conditions, and trying to detect
// whether any of the race conditions happened.
TEST(EventBaseTest, RunInEventLoopThreadAndWait) {
const size_t c = 256;
vector<atomic<size_t>> atoms(c);
for (size_t i = 0; i < c; ++i) {
auto& atom = atoms.at(i);
atom = 0;
}
vector<thread> threads(c);
for (size_t i = 0; i < c; ++i) {
auto& atom = atoms.at(i);
auto& th = threads.at(i);
th = thread([&atom] {
EventBase eb;
auto ebth = thread([&]{ eb.loopForever(); });
eb.waitUntilRunning();
eb.runInEventBaseThreadAndWait([&] {
size_t x = 0;
atom.compare_exchange_weak(
x, 1, std::memory_order_release, std::memory_order_relaxed);
});
size_t x = 0;
atom.compare_exchange_weak(
x, 2, std::memory_order_release, std::memory_order_relaxed);
eb.terminateLoopSoon();
ebth.join();
});
}
for (size_t i = 0; i < c; ++i) {
auto& th = threads.at(i);
th.join();
}
size_t sum = 0;
for (auto& atom : atoms) sum += atom;
EXPECT_EQ(c, sum);
}
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
// Tests for runInLoop() // Tests for runInLoop()
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment