Commit de821c22 authored by Andrii Grynenko's avatar Andrii Grynenko Committed by Facebook Github Bot

Default VirtualEventBase

Summary:
Make each EventBase have a defuault VirtualEventBase which is attached to it and is joined on destruction. Default VirtualEventBase is lazily created on first request.
This makes it trivial to use code switched to VirtualEventBase both with VirtualEventBase and EventBase.

Reviewed By: yfeldblum

Differential Revision: D4644639

fbshipit-source-id: cf28a3632463a1c61404c225ce1186f5a4a062a3
parent d433d90a
......@@ -2042,34 +2042,36 @@ TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
}
TEST(FiberManager, VirtualEventBase) {
folly::ScopedEventBaseThread thread;
auto evb1 =
folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
auto evb2 =
folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
bool done1{false};
bool done2{false};
{
folly::ScopedEventBaseThread thread;
getFiberManager(*evb1).addTaskRemote([&] {
Baton baton;
baton.timed_wait(std::chrono::milliseconds{100});
auto evb1 =
folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
auto& evb2 = thread.getEventBase()->getVirtualEventBase();
done1 = true;
});
getFiberManager(*evb1).addTaskRemote([&] {
Baton baton;
baton.timed_wait(std::chrono::milliseconds{100});
getFiberManager(*evb2).addTaskRemote([&] {
Baton baton;
baton.timed_wait(std::chrono::milliseconds{200});
done1 = true;
});
done2 = true;
});
getFiberManager(evb2).addTaskRemote([&] {
Baton baton;
baton.timed_wait(std::chrono::milliseconds{200});
evb1.reset();
EXPECT_TRUE(done1);
done2 = true;
});
EXPECT_FALSE(done1);
EXPECT_FALSE(done2);
evb2.reset();
evb1.reset();
EXPECT_TRUE(done1);
EXPECT_FALSE(done2);
}
EXPECT_TRUE(done2);
}
......
......@@ -19,6 +19,7 @@
#endif
#include <folly/io/async/EventBase.h>
#include <folly/io/async/VirtualEventBase.h>
#include <folly/ThreadName.h>
#include <folly/io/async/NotificationQueue.h>
......@@ -154,6 +155,11 @@ EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
}
EventBase::~EventBase() {
std::future<void> virtualEventBaseDestroyFuture;
if (virtualEventBase_) {
virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
}
// Keep looping until all keep-alive handles are released. Each keep-alive
// handle signals that some external code will still schedule some work on
// this EventBase (so it's not safe to destroy it).
......@@ -162,6 +168,10 @@ EventBase::~EventBase() {
loopOnce();
}
if (virtualEventBaseDestroyFuture.valid()) {
virtualEventBaseDestroyFuture.get();
}
// Call all destruction callbacks, before we start cleaning up our state.
while (!onDestructionCallbacks_.empty()) {
LoopCallback* callback = &onDestructionCallbacks_.front();
......@@ -736,4 +746,12 @@ const std::string& EventBase::getName() {
const char* EventBase::getLibeventVersion() { return event_get_version(); }
const char* EventBase::getLibeventMethod() { return event_get_method(); }
VirtualEventBase& EventBase::getVirtualEventBase() {
folly::call_once(virtualEventBaseInitFlag_, [&] {
virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
});
return *virtualEventBase_;
}
} // folly
......@@ -34,6 +34,7 @@
#include <boost/intrusive/list.hpp>
#include <boost/utility.hpp>
#include <folly/CallOnce.h>
#include <folly/Executor.h>
#include <folly/Function.h>
#include <folly/Portability.h>
......@@ -629,6 +630,15 @@ class EventBase : private boost::noncopyable,
return isInEventBaseThread();
}
// Returns a VirtualEventBase attached to this EventBase. Can be used to
// pass to APIs which expect VirtualEventBase. This VirtualEventBase will be
// destroyed together with the EventBase.
//
// Any number of VirtualEventBases instances may be independently constructed,
// which are backed by this EventBase. This method should be only used if you
// don't need to manage the life time of the VirtualEventBase used.
folly::VirtualEventBase& getVirtualEventBase();
protected:
void keepAliveRelease() override {
DCHECK(isInEventBaseThread());
......@@ -736,6 +746,9 @@ class EventBase : private boost::noncopyable,
std::mutex localStorageMutex_;
std::unordered_map<uint64_t, std::shared_ptr<void>> localStorage_;
std::unordered_set<detail::EventBaseLocalBaseBase*> localStorageToDtor_;
folly::once_flag virtualEventBaseInitFlag_;
std::unique_ptr<VirtualEventBase> virtualEventBase_;
};
template <typename T>
......
......@@ -22,13 +22,16 @@ VirtualEventBase::VirtualEventBase(EventBase& evb) : evb_(evb) {
loopKeepAlive_ = getKeepAliveToken();
}
VirtualEventBase::~VirtualEventBase() {
CHECK(!evb_.inRunningEventBaseThread());
std::future<void> VirtualEventBase::destroy() {
CHECK(evb_.runInEventBaseThread([this] { loopKeepAlive_.reset(); }));
CHECK(evb_.runInEventBaseThread([&] { loopKeepAlive_.reset(); }));
loopKeepAliveBaton_.wait();
return std::move(destroyFuture_);
}
CHECK(evb_.runInEventBaseThreadAndWait([&] {
void VirtualEventBase::destroyImpl() {
// Make sure we release EventBase KeepAlive token even if exception occurs
auto evbLoopKeepAlive = std::move(evbLoopKeepAlive_);
try {
clearCobTimeouts();
onDestructionCallbacks_.withWLock([&](LoopCallbackList& callbacks) {
......@@ -39,8 +42,18 @@ VirtualEventBase::~VirtualEventBase() {
}
});
evbLoopKeepAlive_.reset();
}));
destroyPromise_.set_value();
} catch (...) {
destroyPromise_.set_exception(std::current_exception());
}
}
VirtualEventBase::~VirtualEventBase() {
if (!destroyFuture_.valid()) {
return;
}
CHECK(!evb_.inRunningEventBaseThread());
destroy().get();
}
void VirtualEventBase::runOnDestruction(EventBase::LoopCallback* callback) {
......
......@@ -16,6 +16,8 @@
#pragma once
#include <future>
#include <folly/Baton.h>
#include <folly/Executor.h>
#include <folly/io/async/EventBase.h>
......@@ -147,18 +149,24 @@ class VirtualEventBase : public folly::Executor, public folly::TimeoutManager {
}
DCHECK(loopKeepAliveCount_ > 0);
if (--loopKeepAliveCount_ == 0) {
loopKeepAliveBaton_.post();
destroyImpl();
}
}
private:
friend class EventBase;
std::future<void> destroy();
void destroyImpl();
using LoopCallbackList = EventBase::LoopCallback::List;
EventBase& evb_;
ssize_t loopKeepAliveCount_{0};
std::atomic<ssize_t> loopKeepAliveCountAtomic_{0};
folly::Baton<> loopKeepAliveBaton_;
std::promise<void> destroyPromise_;
std::future<void> destroyFuture_{destroyPromise_.get_future()};
KeepAlive loopKeepAlive_;
KeepAlive evbLoopKeepAlive_;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment