Commit 69baad3b authored by Dan Melnic's avatar Dan Melnic Committed by Facebook Github Bot

EventBase test lib rework

Summary: EventBase test lib rework

Reviewed By: yfeldblum

Differential Revision: D19001877

fbshipit-source-id: d99ad41007012c7bd21ee259a0a0b5ee2b709512
parent 9cce3b90
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Memory.h>
#include <folly/ScopeGuard.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/test/SocketPair.h>
#include <folly/io/async/test/Util.h>
#include <folly/portability/Stdlib.h>
#include <folly/portability/Unistd.h>
#include <folly/futures/Promise.h>
#include <folly/io/async/test/EventBaseTestLib.h>
#include <atomic>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
using std::atomic;
using std::cerr;
using std::deque;
using std::endl;
using std::make_pair;
using std::pair;
using std::thread;
using std::unique_ptr;
using std::vector;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::milliseconds;
namespace folly {
namespace test {
EventBaseBackendProvider::GetBackendFunc
EventBaseBackendProvider::getBackendFunc_;
}
} // namespace folly
using namespace std::chrono_literals;
using namespace folly;
///////////////////////////////////////////////////////////////////////////
// Tests for read and write events
///////////////////////////////////////////////////////////////////////////
namespace {
class BackendEventBase : public EventBase {
public:
BackendEventBase()
: EventBase(folly::test::EventBaseBackendProvider::getBackend()) {}
};
class EventBaseTest : public ::testing::Test {
public:
EventBaseTest() {
// libevent 2.x uses a coarse monotonic timer by default on Linux.
// This timer is imprecise enough to cause several of our tests to fail.
//
// Set an environment variable that causes libevent to use a non-coarse
// timer. This can be controlled programmatically by using the
// EVENT_BASE_FLAG_PRECISE_TIMER flag with event_base_new_with_config().
// However, this would require more compile-time #ifdefs to tell if we are
// using libevent 2.1+ or not. Simply using the environment variable is
// the easiest option for now.
setenv("EVENT_PRECISE_TIMER", "1", 1);
}
};
enum { BUF_SIZE = 4096 };
ssize_t writeToFD(int fd, size_t length) {
// write an arbitrary amount of data to the fd
auto bufv = vector<char>(length);
auto buf = bufv.data();
memset(buf, 'a', length);
ssize_t rc = write(fd, buf, length);
CHECK_EQ(rc, length);
return rc;
}
size_t writeUntilFull(int fd) {
// Write to the fd until EAGAIN is returned
size_t bytesWritten = 0;
char buf[BUF_SIZE];
memset(buf, 'a', sizeof(buf));
while (true) {
ssize_t rc = write(fd, buf, sizeof(buf));
if (rc < 0) {
CHECK_EQ(errno, EAGAIN);
break;
} else {
bytesWritten += rc;
}
}
return bytesWritten;
}
ssize_t readFromFD(int fd, size_t length) {
// write an arbitrary amount of data to the fd
auto buf = vector<char>(length);
return read(fd, buf.data(), length);
}
size_t readUntilEmpty(int fd) {
// Read from the fd until EAGAIN is returned
char buf[BUF_SIZE];
size_t bytesRead = 0;
while (true) {
int rc = read(fd, buf, sizeof(buf));
if (rc == 0) {
CHECK(false) << "unexpected EOF";
} else if (rc < 0) {
CHECK_EQ(errno, EAGAIN);
break;
} else {
bytesRead += rc;
}
}
return bytesRead;
}
void checkReadUntilEmpty(int fd, size_t expectedLength) {
ASSERT_EQ(readUntilEmpty(fd), expectedLength);
}
struct ScheduledEvent {
int milliseconds;
uint16_t events;
size_t length;
ssize_t result;
void perform(int fd) {
if (events & EventHandler::READ) {
if (length == 0) {
result = readUntilEmpty(fd);
} else {
result = readFromFD(fd, length);
}
}
if (events & EventHandler::WRITE) {
if (length == 0) {
result = writeUntilFull(fd);
} else {
result = writeToFD(fd, length);
}
}
}
};
void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
eventBase->tryRunAfterDelay(
std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
}
}
class TestHandler : public EventHandler {
public:
TestHandler(EventBase* eventBase, int fd)
: EventHandler(eventBase, NetworkSocket::fromFd(fd)), fd_(fd) {}
void handlerReady(uint16_t events) noexcept override {
ssize_t bytesRead = 0;
ssize_t bytesWritten = 0;
if (events & READ) {
// Read all available data, so EventBase will stop calling us
// until new data becomes available
bytesRead = readUntilEmpty(fd_);
}
if (events & WRITE) {
// Write until the pipe buffer is full, so EventBase will stop calling
// us until the other end has read some data
bytesWritten = writeUntilFull(fd_);
}
log.emplace_back(events, bytesRead, bytesWritten);
}
struct EventRecord {
EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
: events(events_),
timestamp(),
bytesRead(bytesRead_),
bytesWritten(bytesWritten_) {}
uint16_t events;
TimePoint timestamp;
ssize_t bytesRead;
ssize_t bytesWritten;
};
deque<EventRecord> log;
private:
int fd_;
};
} // namespace
/**
* Test a READ event
*/
TEST_F(EventBaseTest, ReadEvent) {
BackendEventBase eb;
SocketPair sp;
// Register for read events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ);
// Register timeouts to perform two write events
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{160, EventHandler::WRITE, 99, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have received the first read, then unregistered itself. Check that only
// the first chunk of data was received.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start,
handler.log[0].timestamp,
milliseconds(events[0].milliseconds),
milliseconds(90));
ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
T_CHECK_TIMEOUT(
start, end, milliseconds(events[1].milliseconds), milliseconds(30));
// Make sure the second chunk of data is still waiting to be read.
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[1].length);
}
/**
* Test (READ | PERSIST)
*/
TEST_F(EventBaseTest, ReadPersist) {
BackendEventBase eb;
SocketPair sp;
// Register for read events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
// Register several timeouts to perform writes
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 1024, 0},
{20, EventHandler::WRITE, 2211, 0},
{30, EventHandler::WRITE, 4096, 0},
{100, EventHandler::WRITE, 100, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler after the third write
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
// Loop
eb.loop();
TimePoint end;
// The handler should have received the first 3 events,
// then been unregistered after that.
ASSERT_EQ(handler.log.size(), 3);
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
// Make sure the data from the last write is still waiting to be read
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[3].length);
}
/**
* Test registering for READ when the socket is immediately readable
*/
TEST_F(EventBaseTest, ReadImmediate) {
BackendEventBase eb;
SocketPair sp;
// Write some data to the socket so the other end will
// be immediately readable
size_t dataLength = 1234;
writeToFD(sp[1], dataLength);
// Register for read events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
// Register a timeout to perform another write
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 2);
// There should have been 1 event for immediate readability
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, dataLength);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
// There should be another event after the timeout wrote more data
ASSERT_EQ(handler.log[1].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
ASSERT_EQ(handler.log[1].bytesWritten, 0);
T_CHECK_TIMEOUT(start, end, milliseconds(20));
}
/**
* Test a WRITE event
*/
TEST_F(EventBaseTest, WriteEvent) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::WRITE);
// Register timeouts to perform two reads
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{60, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have only been able to write once, then unregistered itself.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
ASSERT_EQ(events[0].result, initialBytesWritten);
ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
}
/**
* Test (WRITE | PERSIST)
*/
TEST_F(EventBaseTest, WritePersist) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// Register several timeouts to read from the socket at several intervals
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{40, EventHandler::READ, 0, 0},
{70, EventHandler::READ, 0, 0},
{100, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler after the third read
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
// Loop
eb.loop();
TimePoint end;
// The handler should have received the first 3 events,
// then been unregistered after that.
ASSERT_EQ(handler.log.size(), 3);
ASSERT_EQ(events[0].result, initialBytesWritten);
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_GT(handler.log[n].bytesWritten, 0);
ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
}
T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
}
/**
* Test registering for WRITE when the socket is immediately writable
*/
TEST_F(EventBaseTest, WriteImmediate) {
BackendEventBase eb;
SocketPair sp;
// Register for write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// Register a timeout to perform a read
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
int64_t unregisterTimeout = 40;
eb.tryRunAfterDelay(
std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 2);
// Since the socket buffer was initially empty,
// there should have been 1 event for immediate writability
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
// There should be another event after the timeout wrote more data
ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[1].bytesRead, 0);
ASSERT_GT(handler.log[1].bytesWritten, 0);
T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
}
/**
* Test (READ | WRITE) when the socket becomes readable first
*/
TEST_F(EventBaseTest, ReadWrite) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t sock0WriteLength = writeUntilFull(sp[0]);
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ_WRITE);
// Register timeouts to perform a write then a read.
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{40, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have only noticed readability, then unregistered itself. Check that only
// one event was logged.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
ASSERT_EQ(events[1].result, sock0WriteLength);
T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
}
/**
* Test (READ | WRITE) when the socket becomes writable first
*/
TEST_F(EventBaseTest, WriteRead) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t sock0WriteLength = writeUntilFull(sp[0]);
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ_WRITE);
// Register timeouts to perform a read then a write.
size_t sock1WriteLength = 2345;
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{40, EventHandler::WRITE, sock1WriteLength, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have only noticed writability, then unregistered itself. Check that only
// one event was logged.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
ASSERT_EQ(events[0].result, sock0WriteLength);
ASSERT_EQ(events[1].result, sock1WriteLength);
T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
// Make sure the written data is still waiting to be read.
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[1].length);
}
/**
* Test (READ | WRITE) when the socket becomes readable and writable
* at the same time.
*/
TEST_F(EventBaseTest, ReadWriteSimultaneous) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t sock0WriteLength = writeUntilFull(sp[0]);
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ_WRITE);
// Register a timeout to perform a read and write together
ScheduledEvent events[] = {
{10, EventHandler::READ | EventHandler::WRITE, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// It's not strictly required that the EventBase register us about both
// events in the same call or thw read/write notifications are delievered at
// the same. So, it's possible that if the EventBase implementation changes
// this test could start failing, and it wouldn't be considered breaking the
// API. However for now it's nice to exercise this code path.
ASSERT_EQ(handler.log.size(), 1);
if (handler.log[0].events & EventHandler::READ) {
ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
ASSERT_GT(handler.log[0].bytesWritten, 0);
}
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
}
/**
* Test (READ | WRITE | PERSIST)
*/
TEST_F(EventBaseTest, ReadWritePersist) {
BackendEventBase eb;
SocketPair sp;
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(
EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
// Register timeouts to perform several reads and writes
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{20, EventHandler::READ, 0, 0},
{35, EventHandler::WRITE, 200, 0},
{45, EventHandler::WRITE, 15, 0},
{55, EventHandler::READ, 0, 0},
{120, EventHandler::WRITE, 2345, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 6);
// Since we didn't fill up the write buffer immediately, there should
// be an immediate event for writability.
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
// Events 1 through 5 should correspond to the scheduled events
for (int n = 1; n < 6; ++n) {
ScheduledEvent* event = &events[n - 1];
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(event->milliseconds));
if (event->events == EventHandler::READ) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_GT(handler.log[n].bytesWritten, 0);
} else {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
ASSERT_EQ(handler.log[n].bytesRead, event->length);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
}
// The timeout should have unregistered the handler before the last write.
// Make sure that data is still waiting to be read
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[5].length);
}
namespace {
class PartialReadHandler : public TestHandler {
public:
PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
: TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
void handlerReady(uint16_t events) noexcept override {
assert(events == EventHandler::READ);
ssize_t bytesRead = readFromFD(fd_, readLength_);
log.emplace_back(events, bytesRead, 0);
}
private:
int fd_;
size_t readLength_;
};
} // namespace
/**
* Test reading only part of the available data when a read event is fired.
* When PERSIST is used, make sure the handler gets notified again the next
* time around the loop.
*/
TEST_F(EventBaseTest, ReadPartial) {
BackendEventBase eb;
SocketPair sp;
// Register for read events
size_t readLength = 100;
PartialReadHandler handler(&eb, sp[0], readLength);
handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
// Register a timeout to perform a single write,
// with more data than PartialReadHandler will read at once
ScheduledEvent events[] = {
{10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 4);
// The first 3 invocations should read readLength bytes each
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, readLength);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
// The last read only has readLength/2 bytes
ASSERT_EQ(handler.log[3].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[3].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
ASSERT_EQ(handler.log[3].bytesWritten, 0);
}
namespace {
class PartialWriteHandler : public TestHandler {
public:
PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
: TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
void handlerReady(uint16_t events) noexcept override {
assert(events == EventHandler::WRITE);
ssize_t bytesWritten = writeToFD(fd_, writeLength_);
log.emplace_back(events, 0, bytesWritten);
}
private:
int fd_;
size_t writeLength_;
};
} // namespace
/**
* Test writing without completely filling up the write buffer when the fd
* becomes writable. When PERSIST is used, make sure the handler gets
* notified again the next time around the loop.
*/
TEST_F(EventBaseTest, WritePartial) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
size_t writeLength = 100;
PartialWriteHandler handler(&eb, sp[0], writeLength);
handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// Register a timeout to read, so that more data can be written
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
// Loop
eb.loop();
TimePoint end;
// Depending on how big the socket buffer is, there will be multiple writes
// Only check the first 5
int numChecked = 5;
ASSERT_GE(handler.log.size(), numChecked);
ASSERT_EQ(events[0].result, initialBytesWritten);
// The first 3 invocations should read writeLength bytes each
for (int n = 0; n < numChecked; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
}
}
/**
* Test destroying a registered EventHandler
*/
TEST_F(EventBaseTest, DestroyHandler) {
class DestroyHandler : public AsyncTimeout {
public:
DestroyHandler(EventBase* eb, EventHandler* h)
: AsyncTimeout(eb), handler_(h) {}
void timeoutExpired() noexcept override {
delete handler_;
}
private:
EventHandler* handler_;
};
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
TestHandler* handler = new TestHandler(&eb, sp[0]);
handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// After 10ms, read some data, so that the handler
// will be notified that it can write.
eb.tryRunAfterDelay(
std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
// Start a timer to destroy the handler after 25ms
// This mainly just makes sure the code doesn't break or assert
DestroyHandler dh(&eb, handler);
dh.scheduleTimeout(25);
TimePoint start;
eb.loop();
TimePoint end;
// Make sure the EventHandler was uninstalled properly when it was
// destroyed, and the EventBase loop exited
T_CHECK_TIMEOUT(start, end, milliseconds(25));
// Make sure that the handler wrote data to the socket
// before it was destroyed
size_t bytesRemaining = readUntilEmpty(sp[1]);
ASSERT_GT(bytesRemaining, 0);
}
///////////////////////////////////////////////////////////////////////////
// Tests for timeout events
///////////////////////////////////////////////////////////////////////////
TEST_F(EventBaseTest, RunAfterDelay) {
BackendEventBase eb;
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
TimePoint start;
eb.tryRunAfterDelay(std::move(fn1), 10);
eb.tryRunAfterDelay(std::move(fn2), 20);
eb.tryRunAfterDelay(std::move(fn3), 40);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
T_CHECK_TIMEOUT(start, end, milliseconds(40));
}
/**
* Test the behavior of tryRunAfterDelay() when some timeouts are
* still scheduled when the EventBase is destroyed.
*/
TEST_F(EventBaseTest, RunAfterDelayDestruction) {
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
TimePoint timestamp4(false);
TimePoint start(false);
TimePoint end(false);
{
BackendEventBase eb;
start.reset();
// Run two normal timeouts
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
// Schedule a timeout to stop the event loop after 40ms
eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
// Schedule 2 timeouts that would fire after the event loop stops
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
eb.loop();
end.reset();
}
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
T_CHECK_TIMEOUT(start, end, milliseconds(40));
ASSERT_TRUE(timestamp3.isUnset());
ASSERT_TRUE(timestamp4.isUnset());
// Ideally this test should be run under valgrind to ensure that no
// memory is leaked.
}
namespace {
class TestTimeout : public AsyncTimeout {
public:
explicit TestTimeout(EventBase* eventBase)
: AsyncTimeout(eventBase), timestamp(false) {}
void timeoutExpired() noexcept override {
timestamp.reset();
}
TimePoint timestamp;
};
} // namespace
TEST_F(EventBaseTest, BasicTimeouts) {
BackendEventBase eb;
TestTimeout t1(&eb);
TestTimeout t2(&eb);
TestTimeout t3(&eb);
TimePoint start;
t1.scheduleTimeout(10);
t2.scheduleTimeout(20);
t3.scheduleTimeout(40);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
T_CHECK_TIMEOUT(start, end, milliseconds(40));
}
namespace {
class ReschedulingTimeout : public AsyncTimeout {
public:
ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
: AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
void start() {
reschedule();
}
void timeoutExpired() noexcept override {
timestamps.emplace_back();
reschedule();
}
void reschedule() {
if (iterator_ != timeouts_.end()) {
uint32_t timeout = *iterator_;
++iterator_;
scheduleTimeout(timeout);
}
}
vector<TimePoint> timestamps;
private:
vector<uint32_t> timeouts_;
vector<uint32_t>::const_iterator iterator_;
};
} // namespace
/**
* Test rescheduling the same timeout multiple times
*/
TEST_F(EventBaseTest, ReuseTimeout) {
BackendEventBase eb;
vector<uint32_t> timeouts;
timeouts.push_back(10);
timeouts.push_back(30);
timeouts.push_back(15);
ReschedulingTimeout t(&eb, timeouts);
TimePoint start;
t.start();
eb.loop();
TimePoint end;
// Use a higher tolerance than usual. We're waiting on 3 timeouts
// consecutively. In general, each timeout may go over by a few
// milliseconds, and we're tripling this error by witing on 3 timeouts.
milliseconds tolerance{6};
ASSERT_EQ(timeouts.size(), t.timestamps.size());
uint32_t total = 0;
for (size_t n = 0; n < timeouts.size(); ++n) {
total += timeouts[n];
T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
}
T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
}
/**
* Test rescheduling a timeout before it has fired
*/
TEST_F(EventBaseTest, RescheduleTimeout) {
BackendEventBase eb;
TestTimeout t1(&eb);
TestTimeout t2(&eb);
TestTimeout t3(&eb);
TimePoint start;
t1.scheduleTimeout(15);
t2.scheduleTimeout(30);
t3.scheduleTimeout(30);
auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>(
&AsyncTimeout::scheduleTimeout);
// after 10ms, reschedule t2 to run sooner than originally scheduled
eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
// after 10ms, reschedule t3 to run later than originally scheduled
eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
T_CHECK_TIMEOUT(start, end, milliseconds(50));
}
/**
* Test cancelling a timeout
*/
TEST_F(EventBaseTest, CancelTimeout) {
BackendEventBase eb;
vector<uint32_t> timeouts;
timeouts.push_back(10);
timeouts.push_back(30);
timeouts.push_back(25);
ReschedulingTimeout t(&eb, timeouts);
TimePoint start;
t.start();
eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
eb.loop();
TimePoint end;
ASSERT_EQ(t.timestamps.size(), 2);
T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
T_CHECK_TIMEOUT(start, end, milliseconds(50));
}
/**
* Test destroying a scheduled timeout object
*/
TEST_F(EventBaseTest, DestroyTimeout) {
class DestroyTimeout : public AsyncTimeout {
public:
DestroyTimeout(EventBase* eb, AsyncTimeout* t)
: AsyncTimeout(eb), timeout_(t) {}
void timeoutExpired() noexcept override {
delete timeout_;
}
private:
AsyncTimeout* timeout_;
};
BackendEventBase eb;
TestTimeout* t1 = new TestTimeout(&eb);
TimePoint start;
t1->scheduleTimeout(30);
DestroyTimeout dt(&eb, t1);
dt.scheduleTimeout(10);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, end, milliseconds(10));
}
/**
* Test the scheduled executor impl
*/
TEST_F(EventBaseTest, ScheduledFn) {
BackendEventBase eb;
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
TimePoint start;
eb.schedule(std::move(fn1), milliseconds(9));
eb.schedule(std::move(fn2), milliseconds(19));
eb.schedule(std::move(fn3), milliseconds(39));
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
T_CHECK_TIMEOUT(start, end, milliseconds(39));
}
TEST_F(EventBaseTest, ScheduledFnAt) {
BackendEventBase eb;
TimePoint timestamp0(false);
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
auto fn0 = std::bind(&TimePoint::reset, &timestamp0);
auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
TimePoint start;
eb.scheduleAt(fn0, eb.now() - milliseconds(5));
eb.scheduleAt(fn1, eb.now() + milliseconds(9));
eb.scheduleAt(fn2, eb.now() + milliseconds(19));
eb.scheduleAt(fn3, eb.now() + milliseconds(39));
TimePoint loopStart;
eb.loop();
TimePoint end;
// Even though we asked to schedule the first function in the past,
// in practice it doesn't run until after 1 iteration of the HHWheelTimer tick
// interval.
T_CHECK_TIMEOUT(start, timestamp0, eb.timer().getTickInterval());
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
T_CHECK_TIMEOUT(start, end, milliseconds(39));
}
///////////////////////////////////////////////////////////////////////////
// Test for runInThreadTestFunc()
///////////////////////////////////////////////////////////////////////////
namespace {
struct RunInThreadData {
RunInThreadData(int numThreads, int opsPerThread_)
: opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
BackendEventBase evb;
deque<pair<int, int>> values;
int opsPerThread;
int opsToGo;
};
struct RunInThreadArg {
RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
: data(data_), thread(threadId), value(value_) {}
RunInThreadData* data;
int thread;
int value;
};
void runInThreadTestFunc(RunInThreadArg* arg) {
arg->data->values.emplace_back(arg->thread, arg->value);
RunInThreadData* data = arg->data;
delete arg;
if (--data->opsToGo == 0) {
// Break out of the event base loop if we are the last thread running
data->evb.terminateLoopSoon();
}
}
} // namespace
TEST_F(EventBaseTest, RunInThread) {
constexpr uint32_t numThreads = 50;
constexpr uint32_t opsPerThread = 100;
RunInThreadData data(numThreads, opsPerThread);
deque<std::thread> threads;
SCOPE_EXIT {
// Wait on all of the threads.
for (auto& thread : threads) {
thread.join();
}
};
for (uint32_t i = 0; i < numThreads; ++i) {
threads.emplace_back([i, &data] {
for (int n = 0; n < data.opsPerThread; ++n) {
RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
usleep(10);
}
});
}
// Add a timeout event to run after 3 seconds.
// Otherwise loop() will return immediately since there are no events to run.
// Once the last thread exits, it will stop the loop(). However, this
// timeout also stops the loop in case there is a bug performing the normal
// stop.
data.evb.tryRunAfterDelay(
std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
TimePoint start;
data.evb.loop();
TimePoint end;
// Verify that the loop exited because all threads finished and requested it
// to stop. This should happen much sooner than the 3 second timeout.
// Assert that it happens in under a second. (This is still tons of extra
// padding.)
auto timeTaken =
std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime());
ASSERT_LT(timeTaken.count(), 1000);
VLOG(11) << "Time taken: " << timeTaken.count();
// Verify that we have all of the events from every thread
int expectedValues[numThreads];
for (uint32_t n = 0; n < numThreads; ++n) {
expectedValues[n] = 0;
}
for (deque<pair<int, int>>::const_iterator it = data.values.begin();
it != data.values.end();
++it) {
int threadID = it->first;
int value = it->second;
ASSERT_EQ(expectedValues[threadID], value);
++expectedValues[threadID];
}
for (uint32_t n = 0; n < numThreads; ++n) {
ASSERT_EQ(expectedValues[n], opsPerThread);
}
}
// This test simulates some calls, and verifies that the waiting happens by
// triggering what otherwise would be race conditions, and trying to detect
// whether any of the race conditions happened.
TEST_F(EventBaseTest, RunInEventBaseThreadAndWait) {
const size_t c = 256;
vector<unique_ptr<atomic<size_t>>> atoms(c);
for (size_t i = 0; i < c; ++i) {
auto& atom = atoms.at(i);
atom = std::make_unique<atomic<size_t>>(0);
}
vector<thread> threads;
for (size_t i = 0; i < c; ++i) {
threads.emplace_back([&atoms, i] {
BackendEventBase eb;
auto& atom = *atoms.at(i);
auto ebth = thread([&] { eb.loopForever(); });
eb.waitUntilRunning();
eb.runInEventBaseThreadAndWait([&] {
size_t x = 0;
atom.compare_exchange_weak(
x, 1, std::memory_order_release, std::memory_order_relaxed);
});
size_t x = 0;
atom.compare_exchange_weak(
x, 2, std::memory_order_release, std::memory_order_relaxed);
eb.terminateLoopSoon();
ebth.join();
});
}
for (size_t i = 0; i < c; ++i) {
auto& th = threads.at(i);
th.join();
}
size_t sum = 0;
for (auto& atom : atoms) {
sum += *atom;
}
EXPECT_EQ(c, sum);
}
TEST_F(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
BackendEventBase eb;
thread th(&EventBase::loopForever, &eb);
SCOPE_EXIT {
eb.terminateLoopSoon();
th.join();
};
auto mutated = false;
eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
EXPECT_TRUE(mutated);
}
TEST_F(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
BackendEventBase eb;
thread th(&EventBase::loopForever, &eb);
SCOPE_EXIT {
eb.terminateLoopSoon();
th.join();
};
eb.runInEventBaseThreadAndWait([&] {
auto mutated = false;
eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
EXPECT_TRUE(mutated);
});
}
TEST_F(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
BackendEventBase eb;
auto mutated = false;
eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
EXPECT_TRUE(mutated);
}
///////////////////////////////////////////////////////////////////////////
// Tests for runInLoop()
///////////////////////////////////////////////////////////////////////////
namespace {
class CountedLoopCallback : public EventBase::LoopCallback {
public:
CountedLoopCallback(
EventBase* eventBase,
unsigned int count,
std::function<void()> action = std::function<void()>())
: eventBase_(eventBase), count_(count), action_(action) {}
void runLoopCallback() noexcept override {
--count_;
if (count_ > 0) {
eventBase_->runInLoop(this);
} else if (action_) {
action_();
}
}
unsigned int getCount() const {
return count_;
}
private:
EventBase* eventBase_;
unsigned int count_;
std::function<void()> action_;
};
} // namespace
// Test that EventBase::loop() doesn't exit while there are
// still LoopCallbacks remaining to be invoked.
TEST_F(EventBaseTest, RepeatedRunInLoop) {
EventBase eventBase;
CountedLoopCallback c(&eventBase, 10);
eventBase.runInLoop(&c);
// The callback shouldn't have run immediately
ASSERT_EQ(c.getCount(), 10);
eventBase.loop();
// loop() should loop until the CountedLoopCallback stops
// re-installing itself.
ASSERT_EQ(c.getCount(), 0);
}
// Test that EventBase::loop() works as expected without time measurements.
TEST_F(EventBaseTest, RunInLoopNoTimeMeasurement) {
EventBase eventBase(false);
CountedLoopCallback c(&eventBase, 10);
eventBase.runInLoop(&c);
// The callback shouldn't have run immediately
ASSERT_EQ(c.getCount(), 10);
eventBase.loop();
// loop() should loop until the CountedLoopCallback stops
// re-installing itself.
ASSERT_EQ(c.getCount(), 0);
}
// Test runInLoop() calls with terminateLoopSoon()
TEST_F(EventBaseTest, RunInLoopStopLoop) {
EventBase eventBase;
CountedLoopCallback c1(&eventBase, 20);
CountedLoopCallback c2(
&eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
eventBase.runInLoop(&c1);
eventBase.runInLoop(&c2);
ASSERT_EQ(c1.getCount(), 20);
ASSERT_EQ(c2.getCount(), 10);
eventBase.loopForever();
// c2 should have stopped the loop after 10 iterations
ASSERT_EQ(c2.getCount(), 0);
// We allow the EventBase to run the loop callbacks in whatever order it
// chooses. We'll accept c1's count being either 10 (if the loop terminated
// after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
// before c1 ran).
//
// (With the current code, c1 will always run 10 times, but we don't consider
// this a hard API requirement.)
ASSERT_GE(c1.getCount(), 10);
ASSERT_LE(c1.getCount(), 11);
}
TEST_F(EventBaseTest, messageAvailableException) {
auto deadManWalking = [] {
EventBase eventBase;
std::thread t([&] {
// Call this from another thread to force use of NotificationQueue in
// runInEventBaseThread
eventBase.runInEventBaseThread(
[]() { throw std::runtime_error("boom"); });
});
t.join();
eventBase.loopForever();
};
EXPECT_DEATH(deadManWalking(), ".*");
}
TEST_F(EventBaseTest, TryRunningAfterTerminate) {
bool ran = false;
{
EventBase eventBase;
CountedLoopCallback c1(
&eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
eventBase.runInLoop(&c1);
eventBase.loopForever();
eventBase.runInEventBaseThread([&]() { ran = true; });
ASSERT_FALSE(ran);
}
// Loop callbacks are triggered on EventBase destruction
ASSERT_TRUE(ran);
}
// Test cancelling runInLoop() callbacks
TEST_F(EventBaseTest, CancelRunInLoop) {
EventBase eventBase;
CountedLoopCallback c1(&eventBase, 20);
CountedLoopCallback c2(&eventBase, 20);
CountedLoopCallback c3(&eventBase, 20);
std::function<void()> cancelC1Action =
std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
std::function<void()> cancelC2Action =
std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
// Install cancelC1 after c1
eventBase.runInLoop(&c1);
eventBase.runInLoop(&cancelC1);
// Install cancelC2 before c2
eventBase.runInLoop(&cancelC2);
eventBase.runInLoop(&c2);
// Install c3
eventBase.runInLoop(&c3);
ASSERT_EQ(c1.getCount(), 20);
ASSERT_EQ(c2.getCount(), 20);
ASSERT_EQ(c3.getCount(), 20);
ASSERT_EQ(cancelC1.getCount(), 10);
ASSERT_EQ(cancelC2.getCount(), 10);
// Run the loop
eventBase.loop();
// cancelC1 and cancelC2 should have both fired after 10 iterations and
// stopped re-installing themselves
ASSERT_EQ(cancelC1.getCount(), 0);
ASSERT_EQ(cancelC2.getCount(), 0);
// c3 should have continued on for the full 20 iterations
ASSERT_EQ(c3.getCount(), 0);
// c1 and c2 should have both been cancelled on the 10th iteration.
//
// Callbacks are always run in the order they are installed,
// so c1 should have fired 10 times, and been canceled after it ran on the
// 10th iteration. c2 should have only fired 9 times, because cancelC2 will
// have run before it on the 10th iteration, and cancelled it before it
// fired.
ASSERT_EQ(c1.getCount(), 10);
ASSERT_EQ(c2.getCount(), 11);
}
namespace {
class TerminateTestCallback : public EventBase::LoopCallback,
public EventHandler {
public:
TerminateTestCallback(EventBase* eventBase, int fd)
: EventHandler(eventBase, NetworkSocket::fromFd(fd)),
eventBase_(eventBase),
loopInvocations_(0),
maxLoopInvocations_(0),
eventInvocations_(0),
maxEventInvocations_(0) {}
void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
loopInvocations_ = 0;
maxLoopInvocations_ = maxLoopInvocations;
eventInvocations_ = 0;
maxEventInvocations_ = maxEventInvocations;
cancelLoopCallback();
unregisterHandler();
}
void handlerReady(uint16_t /* events */) noexcept override {
// We didn't register with PERSIST, so we will have been automatically
// unregistered already.
ASSERT_FALSE(isHandlerRegistered());
++eventInvocations_;
if (eventInvocations_ >= maxEventInvocations_) {
return;
}
eventBase_->runInLoop(this);
}
void runLoopCallback() noexcept override {
++loopInvocations_;
if (loopInvocations_ >= maxLoopInvocations_) {
return;
}
registerHandler(READ);
}
uint32_t getLoopInvocations() const {
return loopInvocations_;
}
uint32_t getEventInvocations() const {
return eventInvocations_;
}
private:
EventBase* eventBase_;
uint32_t loopInvocations_;
uint32_t maxLoopInvocations_;
uint32_t eventInvocations_;
uint32_t maxEventInvocations_;
};
} // namespace
/**
* Test that EventBase::loop() correctly detects when there are no more events
* left to run.
*
* This uses a single callback, which alternates registering itself as a loop
* callback versus a EventHandler callback. This exercises a regression where
* EventBase::loop() incorrectly exited if there were no more fd handlers
* registered, but a loop callback installed a new fd handler.
*/
TEST_F(EventBaseTest, LoopTermination) {
EventBase eventBase;
// Open a pipe and close the write end,
// so the read endpoint will be readable
int pipeFds[2];
int rc = pipe(pipeFds);
ASSERT_EQ(rc, 0);
close(pipeFds[1]);
TerminateTestCallback callback(&eventBase, pipeFds[0]);
// Test once where the callback will exit after a loop callback
callback.reset(10, 100);
eventBase.runInLoop(&callback);
eventBase.loop();
ASSERT_EQ(callback.getLoopInvocations(), 10);
ASSERT_EQ(callback.getEventInvocations(), 9);
// Test once where the callback will exit after an fd event callback
callback.reset(100, 7);
eventBase.runInLoop(&callback);
eventBase.loop();
ASSERT_EQ(callback.getLoopInvocations(), 7);
ASSERT_EQ(callback.getEventInvocations(), 7);
close(pipeFds[0]);
}
TEST_F(EventBaseTest, CallbackOrderTest) {
size_t num = 0;
BackendEventBase evb;
evb.runInEventBaseThread([&]() {
std::thread t([&]() {
evb.runInEventBaseThread([&]() {
num++;
EXPECT_EQ(num, 2);
});
});
t.join();
// this callback will run first
// even if it is scheduled after the first one
evb.runInEventBaseThread([&]() {
num++;
EXPECT_EQ(num, 1);
});
});
evb.loop();
EXPECT_EQ(num, 2);
}
TEST_F(EventBaseTest, AlwaysEnqueueCallbackOrderTest) {
size_t num = 0;
BackendEventBase evb;
evb.runInEventBaseThread([&]() {
std::thread t([&]() {
evb.runInEventBaseThreadAlwaysEnqueue([&]() {
num++;
EXPECT_EQ(num, 1);
});
});
t.join();
// this callback will run second
// since it was enqueued after the first one
evb.runInEventBaseThreadAlwaysEnqueue([&]() {
num++;
EXPECT_EQ(num, 2);
});
});
evb.loop();
EXPECT_EQ(num, 2);
}
///////////////////////////////////////////////////////////////////////////
// Tests for latency calculations
///////////////////////////////////////////////////////////////////////////
namespace {
class IdleTimeTimeoutSeries : public AsyncTimeout {
public:
explicit IdleTimeTimeoutSeries(
EventBase* base,
std::deque<std::size_t>& timeout)
: AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
scheduleTimeout(1);
}
~IdleTimeTimeoutSeries() override {}
void timeoutExpired() noexcept override {
++timeouts_;
if (timeout_.empty()) {
cancelTimeout();
} else {
std::size_t sleepTime = timeout_.front();
timeout_.pop_front();
if (sleepTime) {
usleep(sleepTime);
}
scheduleTimeout(1);
}
}
int getTimeouts() const {
return timeouts_;
}
private:
int timeouts_;
std::deque<std::size_t>& timeout_;
};
} // namespace
/**
* Verify that idle time is correctly accounted for when decaying our loop
* time.
*
* This works by creating a high loop time (via usleep), expecting a latency
* callback with known value, and then scheduling a timeout for later. This
* later timeout is far enough in the future that the idle time should have
* caused the loop time to decay.
*/
TEST_F(EventBaseTest, IdleTime) {
EventBase eventBase;
std::deque<std::size_t> timeouts0(4, 8080);
timeouts0.push_front(8000);
timeouts0.push_back(14000);
IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
std::deque<std::size_t> timeouts(20, 20);
std::unique_ptr<IdleTimeTimeoutSeries> tos;
bool hostOverloaded = false;
// Loop once before starting the main test. This will run NotificationQueue
// callbacks that get automatically installed when the EventBase is first
// created. We want to make sure they don't interfere with the timing
// operations below.
eventBase.loopOnce(EVLOOP_NONBLOCK);
eventBase.setLoadAvgMsec(1000ms);
eventBase.resetLoadAvg(5900.0);
auto testStart = std::chrono::steady_clock::now();
int latencyCallbacks = 0;
eventBase.setMaxLatency(6000us, [&]() {
++latencyCallbacks;
if (latencyCallbacks != 1) {
FAIL() << "Unexpected latency callback";
}
if (tos0.getTimeouts() < 6) {
// This could only happen if the host this test is running
// on is heavily loaded.
int64_t usElapsed = duration_cast<microseconds>(
std::chrono::steady_clock::now() - testStart)
.count();
EXPECT_LE(43800, usElapsed);
hostOverloaded = true;
return;
}
EXPECT_EQ(6, tos0.getTimeouts());
EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
});
// Kick things off with an "immediate" timeout
tos0.scheduleTimeout(1);
eventBase.loop();
if (hostOverloaded) {
SKIP() << "host too heavily loaded to execute test";
}
ASSERT_EQ(1, latencyCallbacks);
ASSERT_EQ(7, tos0.getTimeouts());
ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
ASSERT_TRUE(!!tos);
ASSERT_EQ(21, tos->getTimeouts());
}
/**
* Test that thisLoop functionality works with terminateLoopSoon
*/
TEST_F(EventBaseTest, ThisLoop) {
bool runInLoop = false;
bool runThisLoop = false;
{
BackendEventBase eb;
eb.runInLoop(
[&]() {
eb.terminateLoopSoon();
eb.runInLoop([&]() { runInLoop = true; });
eb.runInLoop([&]() { runThisLoop = true; }, true);
},
true);
eb.loopForever();
// Should not work
ASSERT_FALSE(runInLoop);
// Should work with thisLoop
ASSERT_TRUE(runThisLoop);
}
// Pending loop callbacks will be run when the EventBase is destroyed.
ASSERT_TRUE(runInLoop);
}
TEST_F(EventBaseTest, EventBaseThreadLoop) {
BackendEventBase base;
bool ran = false;
base.runInEventBaseThread([&]() { ran = true; });
base.loop();
ASSERT_TRUE(ran);
}
TEST_F(EventBaseTest, EventBaseThreadName) {
BackendEventBase base;
base.setName("foo");
base.loop();
#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
char name[16];
pthread_getname_np(pthread_self(), name, 16);
ASSERT_EQ(0, strcmp("foo", name));
#endif
}
TEST_F(EventBaseTest, RunBeforeLoop) {
BackendEventBase base;
CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
base.runBeforeLoop(&cb);
base.loopForever();
ASSERT_EQ(cb.getCount(), 0);
}
TEST_F(EventBaseTest, RunBeforeLoopWait) {
BackendEventBase base;
CountedLoopCallback cb(&base, 1);
base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
base.runBeforeLoop(&cb);
base.loopForever();
// Check that we only ran once, and did not loop multiple times.
ASSERT_EQ(cb.getCount(), 0);
}
namespace {
class PipeHandler : public EventHandler {
public:
PipeHandler(EventBase* eventBase, int fd)
: EventHandler(eventBase, NetworkSocket::fromFd(fd)) {}
void handlerReady(uint16_t /* events */) noexcept override {
abort();
}
};
} // namespace
TEST_F(EventBaseTest, StopBeforeLoop) {
BackendEventBase evb;
// Give the evb something to do.
int p[2];
ASSERT_EQ(0, pipe(p));
PipeHandler handler(&evb, p[0]);
handler.registerHandler(EventHandler::READ);
// It's definitely not running yet
evb.terminateLoopSoon();
// let it run, it should exit quickly.
std::thread t([&] { evb.loop(); });
t.join();
handler.unregisterHandler();
close(p[0]);
close(p[1]);
SUCCEED();
}
TEST_F(EventBaseTest, RunCallbacksOnDestruction) {
bool ran = false;
{
BackendEventBase base;
base.runInEventBaseThread([&]() { ran = true; });
}
ASSERT_TRUE(ran);
}
TEST_F(EventBaseTest, LoopKeepAlive) {
BackendEventBase evb;
bool done = false;
std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
});
evb.loop();
ASSERT_TRUE(done);
t.join();
}
TEST_F(EventBaseTest, LoopKeepAliveInLoop) {
BackendEventBase evb;
bool done = false;
std::thread t;
evb.runInEventBaseThread([&] {
t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
});
});
evb.loop();
ASSERT_TRUE(done);
t.join();
}
TEST_F(EventBaseTest, LoopKeepAliveWithLoopForever) {
std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
bool done = false;
std::thread evThread([&] {
evb->loopForever();
evb.reset();
done = true;
});
{
auto* ev = evb.get();
Executor::KeepAlive<EventBase> keepAlive;
ev->runInEventBaseThreadAndWait(
[&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
ASSERT_FALSE(done) << "Loop finished before we asked it to";
ev->terminateLoopSoon();
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(30));
ASSERT_FALSE(done) << "Loop terminated early";
ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
}
evThread.join();
ASSERT_TRUE(done);
}
TEST_F(EventBaseTest, LoopKeepAliveShutdown) {
auto evb = std::make_unique<EventBase>();
bool done = false;
std::thread t([&done,
loopKeepAlive = getKeepAliveToken(evb.get()),
evbPtr = evb.get()]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evbPtr->runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
});
evb.reset();
ASSERT_TRUE(done);
t.join();
}
TEST_F(EventBaseTest, LoopKeepAliveAtomic) {
auto evb = std::make_unique<EventBase>();
static constexpr size_t kNumThreads = 100;
static constexpr size_t kNumTasks = 100;
std::vector<std::thread> ts;
std::vector<std::unique_ptr<Baton<>>> batons;
size_t done{0};
for (size_t i = 0; i < kNumThreads; ++i) {
batons.emplace_back(std::make_unique<Baton<>>());
}
for (size_t i = 0; i < kNumThreads; ++i) {
ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
std::vector<Executor::KeepAlive<EventBase>> keepAlives;
for (size_t j = 0; j < kNumTasks; ++j) {
keepAlives.emplace_back(getKeepAliveToken(evbPtr));
}
batonPtr->post();
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto& keepAlive : keepAlives) {
evbPtr->runInEventBaseThread(
[&done, keepAlive = std::move(keepAlive)]() { ++done; });
}
});
}
for (auto& baton : batons) {
baton->wait();
}
evb.reset();
EXPECT_EQ(kNumThreads * kNumTasks, done);
for (auto& t : ts) {
t.join();
}
}
TEST_F(EventBaseTest, LoopKeepAliveCast) {
BackendEventBase evb;
Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb);
}
TEST_F(EventBaseTest, DrivableExecutorTest) {
folly::Promise<bool> p;
auto f = p.getFuture();
BackendEventBase base;
bool finished = false;
std::thread t([&] {
/* sleep override */
std::this_thread::sleep_for(std::chrono::microseconds(10));
finished = true;
base.runInEventBaseThread([&]() { p.setValue(true); });
});
// Ensure drive does not busy wait
base.drive(); // TODO: fix notification queue init() extra wakeup
base.drive();
EXPECT_TRUE(finished);
folly::Promise<bool> p2;
auto f2 = p2.getFuture();
// Ensure waitVia gets woken up properly, even from
// a separate thread.
base.runAfterDelay([&]() { p2.setValue(true); }, 10);
f2.waitVia(&base);
EXPECT_TRUE(f2.isReady());
t.join();
}
TEST_F(EventBaseTest, IOExecutorTest) {
BackendEventBase base;
// Ensure EventBase manages itself as an IOExecutor.
EXPECT_EQ(base.getEventBase(), &base);
}
TEST_F(EventBaseTest, RequestContextTest) {
BackendEventBase evb;
auto defaultCtx = RequestContext::get();
std::weak_ptr<RequestContext> rctx_weak_ptr;
{
RequestContextScopeGuard rctx;
rctx_weak_ptr = RequestContext::saveContext();
auto context = RequestContext::get();
EXPECT_NE(defaultCtx, context);
evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
evb.loop();
}
// Ensure that RequestContext created for the scope has been released and
// deleted.
EXPECT_EQ(rctx_weak_ptr.expired(), true);
EXPECT_EQ(defaultCtx, RequestContext::get());
}
TEST_F(EventBaseTest, CancelLoopCallbackRequestContextTest) {
BackendEventBase evb;
CountedLoopCallback c(&evb, 1);
auto defaultCtx = RequestContext::get();
EXPECT_EQ(defaultCtx, RequestContext::get());
std::weak_ptr<RequestContext> rctx_weak_ptr;
{
RequestContextScopeGuard rctx;
rctx_weak_ptr = RequestContext::saveContext();
auto context = RequestContext::get();
EXPECT_NE(defaultCtx, context);
evb.runInLoop(&c);
c.cancelLoopCallback();
}
// Ensure that RequestContext created for the scope has been released and
// deleted.
EXPECT_EQ(rctx_weak_ptr.expired(), true);
EXPECT_EQ(defaultCtx, RequestContext::get());
}
TEST_F(EventBaseTest, TestStarvation) {
BackendEventBase evb;
std::promise<void> stopRequested;
std::promise<void> stopScheduled;
bool stopping{false};
std::thread t{[&] {
stopRequested.get_future().get();
evb.add([&]() { stopping = true; });
stopScheduled.set_value();
}};
size_t num{0};
std::function<void()> fn;
fn = [&]() {
if (stopping || num >= 2000) {
return;
}
if (++num == 1000) {
stopRequested.set_value();
stopScheduled.get_future().get();
}
evb.add(fn);
};
evb.add(fn);
evb.loop();
EXPECT_EQ(1000, num);
t.join();
}
TEST_F(EventBaseTest, RunOnDestructionBasic) {
bool ranOnDestruction = false;
{
BackendEventBase evb;
evb.runOnDestruction([&ranOnDestruction] { ranOnDestruction = true; });
}
EXPECT_TRUE(ranOnDestruction);
}
TEST_F(EventBaseTest, RunOnDestructionCancelled) {
struct Callback : EventBase::OnDestructionCallback {
bool ranOnDestruction{false};
void onEventBaseDestruction() noexcept final {
ranOnDestruction = true;
}
};
auto cb = std::make_unique<Callback>();
{
BackendEventBase evb;
evb.runOnDestruction(*cb);
EXPECT_TRUE(cb->cancel());
}
EXPECT_FALSE(cb->ranOnDestruction);
EXPECT_FALSE(cb->cancel());
}
TEST_F(EventBaseTest, RunOnDestructionAfterHandleDestroyed) {
BackendEventBase evb;
{
bool ranOnDestruction = false;
auto* cb = new EventBase::FunctionOnDestructionCallback(
[&ranOnDestruction] { ranOnDestruction = true; });
evb.runOnDestruction(*cb);
EXPECT_TRUE(cb->cancel());
delete cb;
}
}
TEST_F(EventBaseTest, RunOnDestructionAddCallbackWithinCallback) {
size_t callbacksCalled = 0;
{
BackendEventBase evb;
evb.runOnDestruction([&] {
++callbacksCalled;
evb.runOnDestruction([&] { ++callbacksCalled; });
});
}
EXPECT_EQ(2, callbacksCalled);
}
......@@ -14,29 +14,2208 @@
* limitations under the License.
*/
#pragma once
#include <folly/Function.h>
#include <folly/Memory.h>
#include <folly/ScopeGuard.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>
#include <folly/io/async/test/SocketPair.h>
#include <folly/io/async/test/Util.h>
#include <folly/portability/Stdlib.h>
#include <folly/portability/Unistd.h>
#include <folly/futures/Promise.h>
#include <folly/io/async/test/EventBaseTestLibProvider.h>
#include <atomic>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
using std::atomic;
using std::cerr;
using std::deque;
using std::endl;
using std::make_pair;
using std::pair;
using std::thread;
using std::unique_ptr;
using std::vector;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::milliseconds;
using folly::AsyncTimeout;
using folly::Baton;
using folly::EventBase;
using folly::EventHandler;
using folly::Executor;
using folly::NetworkSocket;
using folly::RequestContext;
using folly::RequestContextScopeGuard;
using folly::SocketPair;
using folly::TimePoint;
///////////////////////////////////////////////////////////////////////////
// Tests for read and write events
///////////////////////////////////////////////////////////////////////////
namespace folly {
namespace test {
class EventBaseBackendProvider {
class BackendEventBase : public EventBase {
public:
BackendEventBase()
: EventBase(folly::test::EventBaseBackendProvider::getBackend()) {}
};
class EventBaseTest : public ::testing::Test {
public:
EventBaseBackendProvider() = delete;
~EventBaseBackendProvider() = delete;
using GetBackendFunc =
folly::Function<std::unique_ptr<folly::EventBaseBackendBase>()>;
static std::unique_ptr<folly::EventBaseBackendBase> getBackend() {
CHECK(!!getBackendFunc_);
EventBaseTest() {
// libevent 2.x uses a coarse monotonic timer by default on Linux.
// This timer is imprecise enough to cause several of our tests to fail.
//
// Set an environment variable that causes libevent to use a non-coarse
// timer. This can be controlled programmatically by using the
// EVENT_BASE_FLAG_PRECISE_TIMER flag with event_base_new_with_config().
// However, this would require more compile-time #ifdefs to tell if we are
// using libevent 2.1+ or not. Simply using the environment variable is
// the easiest option for now.
setenv("EVENT_PRECISE_TIMER", "1", 1);
}
};
enum { BUF_SIZE = 4096 };
return getBackendFunc_();
FOLLY_ALWAYS_INLINE ssize_t writeToFD(int fd, size_t length) {
// write an arbitrary amount of data to the fd
auto bufv = vector<char>(length);
auto buf = bufv.data();
memset(buf, 'a', length);
ssize_t rc = write(fd, buf, length);
CHECK_EQ(rc, length);
return rc;
}
FOLLY_ALWAYS_INLINE size_t writeUntilFull(int fd) {
// Write to the fd until EAGAIN is returned
size_t bytesWritten = 0;
char buf[BUF_SIZE];
memset(buf, 'a', sizeof(buf));
while (true) {
ssize_t rc = write(fd, buf, sizeof(buf));
if (rc < 0) {
CHECK_EQ(errno, EAGAIN);
break;
} else {
bytesWritten += rc;
}
}
return bytesWritten;
}
FOLLY_ALWAYS_INLINE ssize_t readFromFD(int fd, size_t length) {
// write an arbitrary amount of data to the fd
auto buf = vector<char>(length);
return read(fd, buf.data(), length);
}
FOLLY_ALWAYS_INLINE size_t readUntilEmpty(int fd) {
// Read from the fd until EAGAIN is returned
char buf[BUF_SIZE];
size_t bytesRead = 0;
while (true) {
int rc = read(fd, buf, sizeof(buf));
if (rc == 0) {
CHECK(false) << "unexpected EOF";
} else if (rc < 0) {
CHECK_EQ(errno, EAGAIN);
break;
} else {
bytesRead += rc;
}
}
return bytesRead;
}
static void setGetBackendFunc(GetBackendFunc&& func) {
getBackendFunc_ = std::move(func);
FOLLY_ALWAYS_INLINE void checkReadUntilEmpty(int fd, size_t expectedLength) {
ASSERT_EQ(readUntilEmpty(fd), expectedLength);
}
struct ScheduledEvent {
int milliseconds;
uint16_t events;
size_t length;
ssize_t result;
void perform(int fd) {
if (events & EventHandler::READ) {
if (length == 0) {
result = readUntilEmpty(fd);
} else {
result = readFromFD(fd, length);
}
}
if (events & EventHandler::WRITE) {
if (length == 0) {
result = writeUntilFull(fd);
} else {
result = writeToFD(fd, length);
}
}
}
};
FOLLY_ALWAYS_INLINE void
scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
eventBase->tryRunAfterDelay(
std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
}
}
class TestHandler : public EventHandler {
public:
TestHandler(EventBase* eventBase, int fd)
: EventHandler(eventBase, NetworkSocket::fromFd(fd)), fd_(fd) {}
void handlerReady(uint16_t events) noexcept override {
ssize_t bytesRead = 0;
ssize_t bytesWritten = 0;
if (events & READ) {
// Read all available data, so EventBase will stop calling us
// until new data becomes available
bytesRead = readUntilEmpty(fd_);
}
if (events & WRITE) {
// Write until the pipe buffer is full, so EventBase will stop calling
// us until the other end has read some data
bytesWritten = writeUntilFull(fd_);
}
log.emplace_back(events, bytesRead, bytesWritten);
}
struct EventRecord {
EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
: events(events_),
timestamp(),
bytesRead(bytesRead_),
bytesWritten(bytesWritten_) {}
uint16_t events;
TimePoint timestamp;
ssize_t bytesRead;
ssize_t bytesWritten;
};
deque<EventRecord> log;
private:
static GetBackendFunc getBackendFunc_;
int fd_;
};
} // namespace test
} // namespace folly
using folly::test::BackendEventBase;
using folly::test::checkReadUntilEmpty;
using folly::test::EventBaseTest;
using folly::test::readFromFD;
using folly::test::readUntilEmpty;
using folly::test::ScheduledEvent;
using folly::test::TestHandler;
using folly::test::writeToFD;
using folly::test::writeUntilFull;
/**
* Test a READ event
*/
TEST_F(EventBaseTest, ReadEvent) {
BackendEventBase eb;
SocketPair sp;
// Register for read events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ);
// Register timeouts to perform two write events
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{160, EventHandler::WRITE, 99, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have received the first read, then unregistered itself. Check that only
// the first chunk of data was received.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start,
handler.log[0].timestamp,
milliseconds(events[0].milliseconds),
milliseconds(90));
ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
T_CHECK_TIMEOUT(
start, end, milliseconds(events[1].milliseconds), milliseconds(30));
// Make sure the second chunk of data is still waiting to be read.
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[1].length);
}
/**
* Test (READ | PERSIST)
*/
TEST_F(EventBaseTest, ReadPersist) {
BackendEventBase eb;
SocketPair sp;
// Register for read events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
// Register several timeouts to perform writes
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 1024, 0},
{20, EventHandler::WRITE, 2211, 0},
{30, EventHandler::WRITE, 4096, 0},
{100, EventHandler::WRITE, 100, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler after the third write
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
// Loop
eb.loop();
TimePoint end;
// The handler should have received the first 3 events,
// then been unregistered after that.
ASSERT_EQ(handler.log.size(), 3);
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
// Make sure the data from the last write is still waiting to be read
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[3].length);
}
/**
* Test registering for READ when the socket is immediately readable
*/
TEST_F(EventBaseTest, ReadImmediate) {
BackendEventBase eb;
SocketPair sp;
// Write some data to the socket so the other end will
// be immediately readable
size_t dataLength = 1234;
writeToFD(sp[1], dataLength);
// Register for read events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
// Register a timeout to perform another write
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 2);
// There should have been 1 event for immediate readability
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, dataLength);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
// There should be another event after the timeout wrote more data
ASSERT_EQ(handler.log[1].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
ASSERT_EQ(handler.log[1].bytesWritten, 0);
T_CHECK_TIMEOUT(start, end, milliseconds(20));
}
/**
* Test a WRITE event
*/
TEST_F(EventBaseTest, WriteEvent) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::WRITE);
// Register timeouts to perform two reads
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{60, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have only been able to write once, then unregistered itself.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
ASSERT_EQ(events[0].result, initialBytesWritten);
ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
}
/**
* Test (WRITE | PERSIST)
*/
TEST_F(EventBaseTest, WritePersist) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// Register several timeouts to read from the socket at several intervals
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{40, EventHandler::READ, 0, 0},
{70, EventHandler::READ, 0, 0},
{100, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler after the third read
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
// Loop
eb.loop();
TimePoint end;
// The handler should have received the first 3 events,
// then been unregistered after that.
ASSERT_EQ(handler.log.size(), 3);
ASSERT_EQ(events[0].result, initialBytesWritten);
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_GT(handler.log[n].bytesWritten, 0);
ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
}
T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
}
/**
* Test registering for WRITE when the socket is immediately writable
*/
TEST_F(EventBaseTest, WriteImmediate) {
BackendEventBase eb;
SocketPair sp;
// Register for write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// Register a timeout to perform a read
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
int64_t unregisterTimeout = 40;
eb.tryRunAfterDelay(
std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 2);
// Since the socket buffer was initially empty,
// there should have been 1 event for immediate writability
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
// There should be another event after the timeout wrote more data
ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[1].bytesRead, 0);
ASSERT_GT(handler.log[1].bytesWritten, 0);
T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
}
/**
* Test (READ | WRITE) when the socket becomes readable first
*/
TEST_F(EventBaseTest, ReadWrite) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t sock0WriteLength = writeUntilFull(sp[0]);
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ_WRITE);
// Register timeouts to perform a write then a read.
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{40, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have only noticed readability, then unregistered itself. Check that only
// one event was logged.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
ASSERT_EQ(handler.log[0].bytesWritten, 0);
ASSERT_EQ(events[1].result, sock0WriteLength);
T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
}
/**
* Test (READ | WRITE) when the socket becomes writable first
*/
TEST_F(EventBaseTest, WriteRead) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t sock0WriteLength = writeUntilFull(sp[0]);
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ_WRITE);
// Register timeouts to perform a read then a write.
size_t sock1WriteLength = 2345;
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{40, EventHandler::WRITE, sock1WriteLength, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// Since we didn't use the EventHandler::PERSIST flag, the handler should
// have only noticed writability, then unregistered itself. Check that only
// one event was logged.
ASSERT_EQ(handler.log.size(), 1);
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
ASSERT_EQ(events[0].result, sock0WriteLength);
ASSERT_EQ(events[1].result, sock1WriteLength);
T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
// Make sure the written data is still waiting to be read.
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[1].length);
}
/**
* Test (READ | WRITE) when the socket becomes readable and writable
* at the same time.
*/
TEST_F(EventBaseTest, ReadWriteSimultaneous) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t sock0WriteLength = writeUntilFull(sp[0]);
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(EventHandler::READ_WRITE);
// Register a timeout to perform a read and write together
ScheduledEvent events[] = {
{10, EventHandler::READ | EventHandler::WRITE, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Loop
eb.loop();
TimePoint end;
// It's not strictly required that the EventBase register us about both
// events in the same call or thw read/write notifications are delievered at
// the same. So, it's possible that if the EventBase implementation changes
// this test could start failing, and it wouldn't be considered breaking the
// API. However for now it's nice to exercise this code path.
ASSERT_EQ(handler.log.size(), 1);
if (handler.log[0].events & EventHandler::READ) {
ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
ASSERT_GT(handler.log[0].bytesWritten, 0);
}
T_CHECK_TIMEOUT(
start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
}
/**
* Test (READ | WRITE | PERSIST)
*/
TEST_F(EventBaseTest, ReadWritePersist) {
BackendEventBase eb;
SocketPair sp;
// Register for read and write events
TestHandler handler(&eb, sp[0]);
handler.registerHandler(
EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
// Register timeouts to perform several reads and writes
ScheduledEvent events[] = {
{10, EventHandler::WRITE, 2345, 0},
{20, EventHandler::READ, 0, 0},
{35, EventHandler::WRITE, 200, 0},
{45, EventHandler::WRITE, 15, 0},
{55, EventHandler::READ, 0, 0},
{120, EventHandler::WRITE, 2345, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 6);
// Since we didn't fill up the write buffer immediately, there should
// be an immediate event for writability.
ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
ASSERT_EQ(handler.log[0].bytesRead, 0);
ASSERT_GT(handler.log[0].bytesWritten, 0);
// Events 1 through 5 should correspond to the scheduled events
for (int n = 1; n < 6; ++n) {
ScheduledEvent* event = &events[n - 1];
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(event->milliseconds));
if (event->events == EventHandler::READ) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_GT(handler.log[n].bytesWritten, 0);
} else {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
ASSERT_EQ(handler.log[n].bytesRead, event->length);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
}
// The timeout should have unregistered the handler before the last write.
// Make sure that data is still waiting to be read
size_t bytesRemaining = readUntilEmpty(sp[0]);
ASSERT_EQ(bytesRemaining, events[5].length);
}
namespace {
class PartialReadHandler : public TestHandler {
public:
PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
: TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
void handlerReady(uint16_t events) noexcept override {
assert(events == EventHandler::READ);
ssize_t bytesRead = readFromFD(fd_, readLength_);
log.emplace_back(events, bytesRead, 0);
}
private:
int fd_;
size_t readLength_;
};
} // namespace
/**
* Test reading only part of the available data when a read event is fired.
* When PERSIST is used, make sure the handler gets notified again the next
* time around the loop.
*/
TEST_F(EventBaseTest, ReadPartial) {
BackendEventBase eb;
SocketPair sp;
// Register for read events
size_t readLength = 100;
PartialReadHandler handler(&eb, sp[0], readLength);
handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
// Register a timeout to perform a single write,
// with more data than PartialReadHandler will read at once
ScheduledEvent events[] = {
{10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
// Loop
eb.loop();
TimePoint end;
ASSERT_EQ(handler.log.size(), 4);
// The first 3 invocations should read readLength bytes each
for (int n = 0; n < 3; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, readLength);
ASSERT_EQ(handler.log[n].bytesWritten, 0);
}
// The last read only has readLength/2 bytes
ASSERT_EQ(handler.log[3].events, EventHandler::READ);
T_CHECK_TIMEOUT(
start, handler.log[3].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
ASSERT_EQ(handler.log[3].bytesWritten, 0);
}
namespace {
class PartialWriteHandler : public TestHandler {
public:
PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
: TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
void handlerReady(uint16_t events) noexcept override {
assert(events == EventHandler::WRITE);
ssize_t bytesWritten = writeToFD(fd_, writeLength_);
log.emplace_back(events, 0, bytesWritten);
}
private:
int fd_;
size_t writeLength_;
};
} // namespace
/**
* Test writing without completely filling up the write buffer when the fd
* becomes writable. When PERSIST is used, make sure the handler gets
* notified again the next time around the loop.
*/
TEST_F(EventBaseTest, WritePartial) {
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
size_t writeLength = 100;
PartialWriteHandler handler(&eb, sp[0], writeLength);
handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// Register a timeout to read, so that more data can be written
ScheduledEvent events[] = {
{10, EventHandler::READ, 0, 0},
{0, 0, 0, 0},
};
TimePoint start;
scheduleEvents(&eb, sp[1], events);
// Schedule a timeout to unregister the handler
eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
// Loop
eb.loop();
TimePoint end;
// Depending on how big the socket buffer is, there will be multiple writes
// Only check the first 5
int numChecked = 5;
ASSERT_GE(handler.log.size(), numChecked);
ASSERT_EQ(events[0].result, initialBytesWritten);
// The first 3 invocations should read writeLength bytes each
for (int n = 0; n < numChecked; ++n) {
ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
T_CHECK_TIMEOUT(
start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
ASSERT_EQ(handler.log[n].bytesRead, 0);
ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
}
}
/**
* Test destroying a registered EventHandler
*/
TEST_F(EventBaseTest, DestroyHandler) {
class DestroyHandler : public AsyncTimeout {
public:
DestroyHandler(EventBase* eb, EventHandler* h)
: AsyncTimeout(eb), handler_(h) {}
void timeoutExpired() noexcept override {
delete handler_;
}
private:
EventHandler* handler_;
};
BackendEventBase eb;
SocketPair sp;
// Fill up the write buffer before starting
size_t initialBytesWritten = writeUntilFull(sp[0]);
// Register for write events
TestHandler* handler = new TestHandler(&eb, sp[0]);
handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
// After 10ms, read some data, so that the handler
// will be notified that it can write.
eb.tryRunAfterDelay(
std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
// Start a timer to destroy the handler after 25ms
// This mainly just makes sure the code doesn't break or assert
DestroyHandler dh(&eb, handler);
dh.scheduleTimeout(25);
TimePoint start;
eb.loop();
TimePoint end;
// Make sure the EventHandler was uninstalled properly when it was
// destroyed, and the EventBase loop exited
T_CHECK_TIMEOUT(start, end, milliseconds(25));
// Make sure that the handler wrote data to the socket
// before it was destroyed
size_t bytesRemaining = readUntilEmpty(sp[1]);
ASSERT_GT(bytesRemaining, 0);
}
///////////////////////////////////////////////////////////////////////////
// Tests for timeout events
///////////////////////////////////////////////////////////////////////////
TEST_F(EventBaseTest, RunAfterDelay) {
BackendEventBase eb;
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
TimePoint start;
eb.tryRunAfterDelay(std::move(fn1), 10);
eb.tryRunAfterDelay(std::move(fn2), 20);
eb.tryRunAfterDelay(std::move(fn3), 40);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
T_CHECK_TIMEOUT(start, end, milliseconds(40));
}
/**
* Test the behavior of tryRunAfterDelay() when some timeouts are
* still scheduled when the EventBase is destroyed.
*/
TEST_F(EventBaseTest, RunAfterDelayDestruction) {
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
TimePoint timestamp4(false);
TimePoint start(false);
TimePoint end(false);
{
BackendEventBase eb;
start.reset();
// Run two normal timeouts
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
// Schedule a timeout to stop the event loop after 40ms
eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
// Schedule 2 timeouts that would fire after the event loop stops
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
eb.loop();
end.reset();
}
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
T_CHECK_TIMEOUT(start, end, milliseconds(40));
ASSERT_TRUE(timestamp3.isUnset());
ASSERT_TRUE(timestamp4.isUnset());
// Ideally this test should be run under valgrind to ensure that no
// memory is leaked.
}
namespace {
class TestTimeout : public AsyncTimeout {
public:
explicit TestTimeout(EventBase* eventBase)
: AsyncTimeout(eventBase), timestamp(false) {}
void timeoutExpired() noexcept override {
timestamp.reset();
}
TimePoint timestamp;
};
} // namespace
TEST_F(EventBaseTest, BasicTimeouts) {
BackendEventBase eb;
TestTimeout t1(&eb);
TestTimeout t2(&eb);
TestTimeout t3(&eb);
TimePoint start;
t1.scheduleTimeout(10);
t2.scheduleTimeout(20);
t3.scheduleTimeout(40);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
T_CHECK_TIMEOUT(start, end, milliseconds(40));
}
namespace {
class ReschedulingTimeout : public AsyncTimeout {
public:
ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
: AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
void start() {
reschedule();
}
void timeoutExpired() noexcept override {
timestamps.emplace_back();
reschedule();
}
void reschedule() {
if (iterator_ != timeouts_.end()) {
uint32_t timeout = *iterator_;
++iterator_;
scheduleTimeout(timeout);
}
}
vector<TimePoint> timestamps;
private:
vector<uint32_t> timeouts_;
vector<uint32_t>::const_iterator iterator_;
};
} // namespace
/**
* Test rescheduling the same timeout multiple times
*/
TEST_F(EventBaseTest, ReuseTimeout) {
BackendEventBase eb;
vector<uint32_t> timeouts;
timeouts.push_back(10);
timeouts.push_back(30);
timeouts.push_back(15);
ReschedulingTimeout t(&eb, timeouts);
TimePoint start;
t.start();
eb.loop();
TimePoint end;
// Use a higher tolerance than usual. We're waiting on 3 timeouts
// consecutively. In general, each timeout may go over by a few
// milliseconds, and we're tripling this error by witing on 3 timeouts.
milliseconds tolerance{6};
ASSERT_EQ(timeouts.size(), t.timestamps.size());
uint32_t total = 0;
for (size_t n = 0; n < timeouts.size(); ++n) {
total += timeouts[n];
T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
}
T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
}
/**
* Test rescheduling a timeout before it has fired
*/
TEST_F(EventBaseTest, RescheduleTimeout) {
BackendEventBase eb;
TestTimeout t1(&eb);
TestTimeout t2(&eb);
TestTimeout t3(&eb);
TimePoint start;
t1.scheduleTimeout(15);
t2.scheduleTimeout(30);
t3.scheduleTimeout(30);
auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>(
&AsyncTimeout::scheduleTimeout);
// after 10ms, reschedule t2 to run sooner than originally scheduled
eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
// after 10ms, reschedule t3 to run later than originally scheduled
eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
T_CHECK_TIMEOUT(start, end, milliseconds(50));
}
/**
* Test cancelling a timeout
*/
TEST_F(EventBaseTest, CancelTimeout) {
BackendEventBase eb;
vector<uint32_t> timeouts;
timeouts.push_back(10);
timeouts.push_back(30);
timeouts.push_back(25);
ReschedulingTimeout t(&eb, timeouts);
TimePoint start;
t.start();
eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
eb.loop();
TimePoint end;
ASSERT_EQ(t.timestamps.size(), 2);
T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
T_CHECK_TIMEOUT(start, end, milliseconds(50));
}
/**
* Test destroying a scheduled timeout object
*/
TEST_F(EventBaseTest, DestroyTimeout) {
class DestroyTimeout : public AsyncTimeout {
public:
DestroyTimeout(EventBase* eb, AsyncTimeout* t)
: AsyncTimeout(eb), timeout_(t) {}
void timeoutExpired() noexcept override {
delete timeout_;
}
private:
AsyncTimeout* timeout_;
};
BackendEventBase eb;
TestTimeout* t1 = new TestTimeout(&eb);
TimePoint start;
t1->scheduleTimeout(30);
DestroyTimeout dt(&eb, t1);
dt.scheduleTimeout(10);
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, end, milliseconds(10));
}
/**
* Test the scheduled executor impl
*/
TEST_F(EventBaseTest, ScheduledFn) {
BackendEventBase eb;
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
TimePoint start;
eb.schedule(std::move(fn1), milliseconds(9));
eb.schedule(std::move(fn2), milliseconds(19));
eb.schedule(std::move(fn3), milliseconds(39));
eb.loop();
TimePoint end;
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
T_CHECK_TIMEOUT(start, end, milliseconds(39));
}
TEST_F(EventBaseTest, ScheduledFnAt) {
BackendEventBase eb;
TimePoint timestamp0(false);
TimePoint timestamp1(false);
TimePoint timestamp2(false);
TimePoint timestamp3(false);
auto fn0 = std::bind(&TimePoint::reset, &timestamp0);
auto fn1 = std::bind(&TimePoint::reset, &timestamp1);
auto fn2 = std::bind(&TimePoint::reset, &timestamp2);
auto fn3 = std::bind(&TimePoint::reset, &timestamp3);
TimePoint start;
eb.scheduleAt(fn0, eb.now() - milliseconds(5));
eb.scheduleAt(fn1, eb.now() + milliseconds(9));
eb.scheduleAt(fn2, eb.now() + milliseconds(19));
eb.scheduleAt(fn3, eb.now() + milliseconds(39));
TimePoint loopStart;
eb.loop();
TimePoint end;
// Even though we asked to schedule the first function in the past,
// in practice it doesn't run until after 1 iteration of the HHWheelTimer tick
// interval.
T_CHECK_TIMEOUT(start, timestamp0, eb.timer().getTickInterval());
T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
T_CHECK_TIMEOUT(start, end, milliseconds(39));
}
///////////////////////////////////////////////////////////////////////////
// Test for runInThreadTestFunc()
///////////////////////////////////////////////////////////////////////////
namespace {
struct RunInThreadData {
RunInThreadData(int numThreads, int opsPerThread_)
: opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
BackendEventBase evb;
deque<pair<int, int>> values;
int opsPerThread;
int opsToGo;
};
struct RunInThreadArg {
RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
: data(data_), thread(threadId), value(value_) {}
RunInThreadData* data;
int thread;
int value;
};
void runInThreadTestFunc(RunInThreadArg* arg) {
arg->data->values.emplace_back(arg->thread, arg->value);
RunInThreadData* data = arg->data;
delete arg;
if (--data->opsToGo == 0) {
// Break out of the event base loop if we are the last thread running
data->evb.terminateLoopSoon();
}
}
} // namespace
TEST_F(EventBaseTest, RunInThread) {
constexpr uint32_t numThreads = 50;
constexpr uint32_t opsPerThread = 100;
RunInThreadData data(numThreads, opsPerThread);
deque<std::thread> threads;
SCOPE_EXIT {
// Wait on all of the threads.
for (auto& thread : threads) {
thread.join();
}
};
for (uint32_t i = 0; i < numThreads; ++i) {
threads.emplace_back([i, &data] {
for (int n = 0; n < data.opsPerThread; ++n) {
RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
usleep(10);
}
});
}
// Add a timeout event to run after 3 seconds.
// Otherwise loop() will return immediately since there are no events to run.
// Once the last thread exits, it will stop the loop(). However, this
// timeout also stops the loop in case there is a bug performing the normal
// stop.
data.evb.tryRunAfterDelay(
std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
TimePoint start;
data.evb.loop();
TimePoint end;
// Verify that the loop exited because all threads finished and requested it
// to stop. This should happen much sooner than the 3 second timeout.
// Assert that it happens in under a second. (This is still tons of extra
// padding.)
auto timeTaken =
std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime());
ASSERT_LT(timeTaken.count(), 1000);
VLOG(11) << "Time taken: " << timeTaken.count();
// Verify that we have all of the events from every thread
int expectedValues[numThreads];
for (uint32_t n = 0; n < numThreads; ++n) {
expectedValues[n] = 0;
}
for (deque<pair<int, int>>::const_iterator it = data.values.begin();
it != data.values.end();
++it) {
int threadID = it->first;
int value = it->second;
ASSERT_EQ(expectedValues[threadID], value);
++expectedValues[threadID];
}
for (uint32_t n = 0; n < numThreads; ++n) {
ASSERT_EQ(expectedValues[n], opsPerThread);
}
}
// This test simulates some calls, and verifies that the waiting happens by
// triggering what otherwise would be race conditions, and trying to detect
// whether any of the race conditions happened.
TEST_F(EventBaseTest, RunInEventBaseThreadAndWait) {
const size_t c = 256;
vector<unique_ptr<atomic<size_t>>> atoms(c);
for (size_t i = 0; i < c; ++i) {
auto& atom = atoms.at(i);
atom = std::make_unique<atomic<size_t>>(0);
}
vector<thread> threads;
for (size_t i = 0; i < c; ++i) {
threads.emplace_back([&atoms, i] {
BackendEventBase eb;
auto& atom = *atoms.at(i);
auto ebth = thread([&] { eb.loopForever(); });
eb.waitUntilRunning();
eb.runInEventBaseThreadAndWait([&] {
size_t x = 0;
atom.compare_exchange_weak(
x, 1, std::memory_order_release, std::memory_order_relaxed);
});
size_t x = 0;
atom.compare_exchange_weak(
x, 2, std::memory_order_release, std::memory_order_relaxed);
eb.terminateLoopSoon();
ebth.join();
});
}
for (size_t i = 0; i < c; ++i) {
auto& th = threads.at(i);
th.join();
}
size_t sum = 0;
for (auto& atom : atoms) {
sum += *atom;
}
EXPECT_EQ(c, sum);
}
TEST_F(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
BackendEventBase eb;
thread th(&EventBase::loopForever, &eb);
SCOPE_EXIT {
eb.terminateLoopSoon();
th.join();
};
auto mutated = false;
eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
EXPECT_TRUE(mutated);
}
TEST_F(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
BackendEventBase eb;
thread th(&EventBase::loopForever, &eb);
SCOPE_EXIT {
eb.terminateLoopSoon();
th.join();
};
eb.runInEventBaseThreadAndWait([&] {
auto mutated = false;
eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
EXPECT_TRUE(mutated);
});
}
TEST_F(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
BackendEventBase eb;
auto mutated = false;
eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
EXPECT_TRUE(mutated);
}
///////////////////////////////////////////////////////////////////////////
// Tests for runInLoop()
///////////////////////////////////////////////////////////////////////////
namespace {
class CountedLoopCallback : public EventBase::LoopCallback {
public:
CountedLoopCallback(
EventBase* eventBase,
unsigned int count,
std::function<void()> action = std::function<void()>())
: eventBase_(eventBase), count_(count), action_(action) {}
void runLoopCallback() noexcept override {
--count_;
if (count_ > 0) {
eventBase_->runInLoop(this);
} else if (action_) {
action_();
}
}
unsigned int getCount() const {
return count_;
}
private:
EventBase* eventBase_;
unsigned int count_;
std::function<void()> action_;
};
} // namespace
// Test that EventBase::loop() doesn't exit while there are
// still LoopCallbacks remaining to be invoked.
TEST_F(EventBaseTest, RepeatedRunInLoop) {
EventBase eventBase;
CountedLoopCallback c(&eventBase, 10);
eventBase.runInLoop(&c);
// The callback shouldn't have run immediately
ASSERT_EQ(c.getCount(), 10);
eventBase.loop();
// loop() should loop until the CountedLoopCallback stops
// re-installing itself.
ASSERT_EQ(c.getCount(), 0);
}
// Test that EventBase::loop() works as expected without time measurements.
TEST_F(EventBaseTest, RunInLoopNoTimeMeasurement) {
EventBase eventBase(false);
CountedLoopCallback c(&eventBase, 10);
eventBase.runInLoop(&c);
// The callback shouldn't have run immediately
ASSERT_EQ(c.getCount(), 10);
eventBase.loop();
// loop() should loop until the CountedLoopCallback stops
// re-installing itself.
ASSERT_EQ(c.getCount(), 0);
}
// Test runInLoop() calls with terminateLoopSoon()
TEST_F(EventBaseTest, RunInLoopStopLoop) {
EventBase eventBase;
CountedLoopCallback c1(&eventBase, 20);
CountedLoopCallback c2(
&eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
eventBase.runInLoop(&c1);
eventBase.runInLoop(&c2);
ASSERT_EQ(c1.getCount(), 20);
ASSERT_EQ(c2.getCount(), 10);
eventBase.loopForever();
// c2 should have stopped the loop after 10 iterations
ASSERT_EQ(c2.getCount(), 0);
// We allow the EventBase to run the loop callbacks in whatever order it
// chooses. We'll accept c1's count being either 10 (if the loop terminated
// after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
// before c1 ran).
//
// (With the current code, c1 will always run 10 times, but we don't consider
// this a hard API requirement.)
ASSERT_GE(c1.getCount(), 10);
ASSERT_LE(c1.getCount(), 11);
}
TEST_F(EventBaseTest, messageAvailableException) {
auto deadManWalking = [] {
EventBase eventBase;
std::thread t([&] {
// Call this from another thread to force use of NotificationQueue in
// runInEventBaseThread
eventBase.runInEventBaseThread(
[]() { throw std::runtime_error("boom"); });
});
t.join();
eventBase.loopForever();
};
EXPECT_DEATH(deadManWalking(), ".*");
}
TEST_F(EventBaseTest, TryRunningAfterTerminate) {
bool ran = false;
{
EventBase eventBase;
CountedLoopCallback c1(
&eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
eventBase.runInLoop(&c1);
eventBase.loopForever();
eventBase.runInEventBaseThread([&]() { ran = true; });
ASSERT_FALSE(ran);
}
// Loop callbacks are triggered on EventBase destruction
ASSERT_TRUE(ran);
}
// Test cancelling runInLoop() callbacks
TEST_F(EventBaseTest, CancelRunInLoop) {
EventBase eventBase;
CountedLoopCallback c1(&eventBase, 20);
CountedLoopCallback c2(&eventBase, 20);
CountedLoopCallback c3(&eventBase, 20);
std::function<void()> cancelC1Action =
std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
std::function<void()> cancelC2Action =
std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
// Install cancelC1 after c1
eventBase.runInLoop(&c1);
eventBase.runInLoop(&cancelC1);
// Install cancelC2 before c2
eventBase.runInLoop(&cancelC2);
eventBase.runInLoop(&c2);
// Install c3
eventBase.runInLoop(&c3);
ASSERT_EQ(c1.getCount(), 20);
ASSERT_EQ(c2.getCount(), 20);
ASSERT_EQ(c3.getCount(), 20);
ASSERT_EQ(cancelC1.getCount(), 10);
ASSERT_EQ(cancelC2.getCount(), 10);
// Run the loop
eventBase.loop();
// cancelC1 and cancelC2 should have both fired after 10 iterations and
// stopped re-installing themselves
ASSERT_EQ(cancelC1.getCount(), 0);
ASSERT_EQ(cancelC2.getCount(), 0);
// c3 should have continued on for the full 20 iterations
ASSERT_EQ(c3.getCount(), 0);
// c1 and c2 should have both been cancelled on the 10th iteration.
//
// Callbacks are always run in the order they are installed,
// so c1 should have fired 10 times, and been canceled after it ran on the
// 10th iteration. c2 should have only fired 9 times, because cancelC2 will
// have run before it on the 10th iteration, and cancelled it before it
// fired.
ASSERT_EQ(c1.getCount(), 10);
ASSERT_EQ(c2.getCount(), 11);
}
namespace {
class TerminateTestCallback : public EventBase::LoopCallback,
public EventHandler {
public:
TerminateTestCallback(EventBase* eventBase, int fd)
: EventHandler(eventBase, NetworkSocket::fromFd(fd)),
eventBase_(eventBase),
loopInvocations_(0),
maxLoopInvocations_(0),
eventInvocations_(0),
maxEventInvocations_(0) {}
void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
loopInvocations_ = 0;
maxLoopInvocations_ = maxLoopInvocations;
eventInvocations_ = 0;
maxEventInvocations_ = maxEventInvocations;
cancelLoopCallback();
unregisterHandler();
}
void handlerReady(uint16_t /* events */) noexcept override {
// We didn't register with PERSIST, so we will have been automatically
// unregistered already.
ASSERT_FALSE(isHandlerRegistered());
++eventInvocations_;
if (eventInvocations_ >= maxEventInvocations_) {
return;
}
eventBase_->runInLoop(this);
}
void runLoopCallback() noexcept override {
++loopInvocations_;
if (loopInvocations_ >= maxLoopInvocations_) {
return;
}
registerHandler(READ);
}
uint32_t getLoopInvocations() const {
return loopInvocations_;
}
uint32_t getEventInvocations() const {
return eventInvocations_;
}
private:
EventBase* eventBase_;
uint32_t loopInvocations_;
uint32_t maxLoopInvocations_;
uint32_t eventInvocations_;
uint32_t maxEventInvocations_;
};
} // namespace
/**
* Test that EventBase::loop() correctly detects when there are no more events
* left to run.
*
* This uses a single callback, which alternates registering itself as a loop
* callback versus a EventHandler callback. This exercises a regression where
* EventBase::loop() incorrectly exited if there were no more fd handlers
* registered, but a loop callback installed a new fd handler.
*/
TEST_F(EventBaseTest, LoopTermination) {
EventBase eventBase;
// Open a pipe and close the write end,
// so the read endpoint will be readable
int pipeFds[2];
int rc = pipe(pipeFds);
ASSERT_EQ(rc, 0);
close(pipeFds[1]);
TerminateTestCallback callback(&eventBase, pipeFds[0]);
// Test once where the callback will exit after a loop callback
callback.reset(10, 100);
eventBase.runInLoop(&callback);
eventBase.loop();
ASSERT_EQ(callback.getLoopInvocations(), 10);
ASSERT_EQ(callback.getEventInvocations(), 9);
// Test once where the callback will exit after an fd event callback
callback.reset(100, 7);
eventBase.runInLoop(&callback);
eventBase.loop();
ASSERT_EQ(callback.getLoopInvocations(), 7);
ASSERT_EQ(callback.getEventInvocations(), 7);
close(pipeFds[0]);
}
TEST_F(EventBaseTest, CallbackOrderTest) {
size_t num = 0;
BackendEventBase evb;
evb.runInEventBaseThread([&]() {
std::thread t([&]() {
evb.runInEventBaseThread([&]() {
num++;
EXPECT_EQ(num, 2);
});
});
t.join();
// this callback will run first
// even if it is scheduled after the first one
evb.runInEventBaseThread([&]() {
num++;
EXPECT_EQ(num, 1);
});
});
evb.loop();
EXPECT_EQ(num, 2);
}
TEST_F(EventBaseTest, AlwaysEnqueueCallbackOrderTest) {
size_t num = 0;
BackendEventBase evb;
evb.runInEventBaseThread([&]() {
std::thread t([&]() {
evb.runInEventBaseThreadAlwaysEnqueue([&]() {
num++;
EXPECT_EQ(num, 1);
});
});
t.join();
// this callback will run second
// since it was enqueued after the first one
evb.runInEventBaseThreadAlwaysEnqueue([&]() {
num++;
EXPECT_EQ(num, 2);
});
});
evb.loop();
EXPECT_EQ(num, 2);
}
///////////////////////////////////////////////////////////////////////////
// Tests for latency calculations
///////////////////////////////////////////////////////////////////////////
namespace {
class IdleTimeTimeoutSeries : public AsyncTimeout {
public:
explicit IdleTimeTimeoutSeries(
EventBase* base,
std::deque<std::size_t>& timeout)
: AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
scheduleTimeout(1);
}
~IdleTimeTimeoutSeries() override {}
void timeoutExpired() noexcept override {
++timeouts_;
if (timeout_.empty()) {
cancelTimeout();
} else {
std::size_t sleepTime = timeout_.front();
timeout_.pop_front();
if (sleepTime) {
usleep(sleepTime);
}
scheduleTimeout(1);
}
}
int getTimeouts() const {
return timeouts_;
}
private:
int timeouts_;
std::deque<std::size_t>& timeout_;
};
} // namespace
/**
* Verify that idle time is correctly accounted for when decaying our loop
* time.
*
* This works by creating a high loop time (via usleep), expecting a latency
* callback with known value, and then scheduling a timeout for later. This
* later timeout is far enough in the future that the idle time should have
* caused the loop time to decay.
*/
TEST_F(EventBaseTest, IdleTime) {
EventBase eventBase;
std::deque<std::size_t> timeouts0(4, 8080);
timeouts0.push_front(8000);
timeouts0.push_back(14000);
IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
std::deque<std::size_t> timeouts(20, 20);
std::unique_ptr<IdleTimeTimeoutSeries> tos;
bool hostOverloaded = false;
// Loop once before starting the main test. This will run NotificationQueue
// callbacks that get automatically installed when the EventBase is first
// created. We want to make sure they don't interfere with the timing
// operations below.
eventBase.loopOnce(EVLOOP_NONBLOCK);
eventBase.setLoadAvgMsec(milliseconds(1000));
eventBase.resetLoadAvg(5900.0);
auto testStart = std::chrono::steady_clock::now();
int latencyCallbacks = 0;
eventBase.setMaxLatency(microseconds(6000), [&]() {
++latencyCallbacks;
if (latencyCallbacks != 1) {
FAIL() << "Unexpected latency callback";
}
if (tos0.getTimeouts() < 6) {
// This could only happen if the host this test is running
// on is heavily loaded.
int64_t usElapsed = duration_cast<microseconds>(
std::chrono::steady_clock::now() - testStart)
.count();
EXPECT_LE(43800, usElapsed);
hostOverloaded = true;
return;
}
EXPECT_EQ(6, tos0.getTimeouts());
EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
});
// Kick things off with an "immediate" timeout
tos0.scheduleTimeout(1);
eventBase.loop();
if (hostOverloaded) {
SKIP() << "host too heavily loaded to execute test";
}
ASSERT_EQ(1, latencyCallbacks);
ASSERT_EQ(7, tos0.getTimeouts());
ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
ASSERT_TRUE(!!tos);
ASSERT_EQ(21, tos->getTimeouts());
}
/**
* Test that thisLoop functionality works with terminateLoopSoon
*/
TEST_F(EventBaseTest, ThisLoop) {
bool runInLoop = false;
bool runThisLoop = false;
{
BackendEventBase eb;
eb.runInLoop(
[&]() {
eb.terminateLoopSoon();
eb.runInLoop([&]() { runInLoop = true; });
eb.runInLoop([&]() { runThisLoop = true; }, true);
},
true);
eb.loopForever();
// Should not work
ASSERT_FALSE(runInLoop);
// Should work with thisLoop
ASSERT_TRUE(runThisLoop);
}
// Pending loop callbacks will be run when the EventBase is destroyed.
ASSERT_TRUE(runInLoop);
}
TEST_F(EventBaseTest, EventBaseThreadLoop) {
BackendEventBase base;
bool ran = false;
base.runInEventBaseThread([&]() { ran = true; });
base.loop();
ASSERT_TRUE(ran);
}
TEST_F(EventBaseTest, EventBaseThreadName) {
BackendEventBase base;
base.setName("foo");
base.loop();
#if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
char name[16];
pthread_getname_np(pthread_self(), name, 16);
ASSERT_EQ(0, strcmp("foo", name));
#endif
}
TEST_F(EventBaseTest, RunBeforeLoop) {
BackendEventBase base;
CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
base.runBeforeLoop(&cb);
base.loopForever();
ASSERT_EQ(cb.getCount(), 0);
}
TEST_F(EventBaseTest, RunBeforeLoopWait) {
BackendEventBase base;
CountedLoopCallback cb(&base, 1);
base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
base.runBeforeLoop(&cb);
base.loopForever();
// Check that we only ran once, and did not loop multiple times.
ASSERT_EQ(cb.getCount(), 0);
}
namespace {
class PipeHandler : public EventHandler {
public:
PipeHandler(EventBase* eventBase, int fd)
: EventHandler(eventBase, NetworkSocket::fromFd(fd)) {}
void handlerReady(uint16_t /* events */) noexcept override {
abort();
}
};
} // namespace
TEST_F(EventBaseTest, StopBeforeLoop) {
BackendEventBase evb;
// Give the evb something to do.
int p[2];
ASSERT_EQ(0, pipe(p));
PipeHandler handler(&evb, p[0]);
handler.registerHandler(EventHandler::READ);
// It's definitely not running yet
evb.terminateLoopSoon();
// let it run, it should exit quickly.
std::thread t([&] { evb.loop(); });
t.join();
handler.unregisterHandler();
close(p[0]);
close(p[1]);
SUCCEED();
}
TEST_F(EventBaseTest, RunCallbacksOnDestruction) {
bool ran = false;
{
BackendEventBase base;
base.runInEventBaseThread([&]() { ran = true; });
}
ASSERT_TRUE(ran);
}
TEST_F(EventBaseTest, LoopKeepAlive) {
BackendEventBase evb;
bool done = false;
std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
});
evb.loop();
ASSERT_TRUE(done);
t.join();
}
TEST_F(EventBaseTest, LoopKeepAliveInLoop) {
BackendEventBase evb;
bool done = false;
std::thread t;
evb.runInEventBaseThread([&] {
t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evb.runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
});
});
evb.loop();
ASSERT_TRUE(done);
t.join();
}
TEST_F(EventBaseTest, LoopKeepAliveWithLoopForever) {
std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
bool done = false;
std::thread evThread([&] {
evb->loopForever();
evb.reset();
done = true;
});
{
auto* ev = evb.get();
Executor::KeepAlive<EventBase> keepAlive;
ev->runInEventBaseThreadAndWait(
[&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
ASSERT_FALSE(done) << "Loop finished before we asked it to";
ev->terminateLoopSoon();
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(30));
ASSERT_FALSE(done) << "Loop terminated early";
ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
}
evThread.join();
ASSERT_TRUE(done);
}
TEST_F(EventBaseTest, LoopKeepAliveShutdown) {
auto evb = std::make_unique<EventBase>();
bool done = false;
std::thread t([&done,
loopKeepAlive = getKeepAliveToken(evb.get()),
evbPtr = evb.get()]() mutable {
/* sleep override */ std::this_thread::sleep_for(
std::chrono::milliseconds(100));
evbPtr->runInEventBaseThread(
[&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
});
evb.reset();
ASSERT_TRUE(done);
t.join();
}
TEST_F(EventBaseTest, LoopKeepAliveAtomic) {
auto evb = std::make_unique<EventBase>();
static constexpr size_t kNumThreads = 100;
static constexpr size_t kNumTasks = 100;
std::vector<std::thread> ts;
std::vector<std::unique_ptr<Baton<>>> batons;
size_t done{0};
for (size_t i = 0; i < kNumThreads; ++i) {
batons.emplace_back(std::make_unique<Baton<>>());
}
for (size_t i = 0; i < kNumThreads; ++i) {
ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
std::vector<Executor::KeepAlive<EventBase>> keepAlives;
for (size_t j = 0; j < kNumTasks; ++j) {
keepAlives.emplace_back(getKeepAliveToken(evbPtr));
}
batonPtr->post();
/* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
for (auto& keepAlive : keepAlives) {
evbPtr->runInEventBaseThread(
[&done, keepAlive = std::move(keepAlive)]() { ++done; });
}
});
}
for (auto& baton : batons) {
baton->wait();
}
evb.reset();
EXPECT_EQ(kNumThreads * kNumTasks, done);
for (auto& t : ts) {
t.join();
}
}
TEST_F(EventBaseTest, LoopKeepAliveCast) {
BackendEventBase evb;
Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb);
}
TEST_F(EventBaseTest, DrivableExecutorTest) {
folly::Promise<bool> p;
auto f = p.getFuture();
BackendEventBase base;
bool finished = false;
std::thread t([&] {
/* sleep override */
std::this_thread::sleep_for(std::chrono::microseconds(10));
finished = true;
base.runInEventBaseThread([&]() { p.setValue(true); });
});
// Ensure drive does not busy wait
base.drive(); // TODO: fix notification queue init() extra wakeup
base.drive();
EXPECT_TRUE(finished);
folly::Promise<bool> p2;
auto f2 = p2.getFuture();
// Ensure waitVia gets woken up properly, even from
// a separate thread.
base.runAfterDelay([&]() { p2.setValue(true); }, 10);
f2.waitVia(&base);
EXPECT_TRUE(f2.isReady());
t.join();
}
TEST_F(EventBaseTest, IOExecutorTest) {
BackendEventBase base;
// Ensure EventBase manages itself as an IOExecutor.
EXPECT_EQ(base.getEventBase(), &base);
}
TEST_F(EventBaseTest, RequestContextTest) {
BackendEventBase evb;
auto defaultCtx = RequestContext::get();
std::weak_ptr<RequestContext> rctx_weak_ptr;
{
RequestContextScopeGuard rctx;
rctx_weak_ptr = RequestContext::saveContext();
auto context = RequestContext::get();
EXPECT_NE(defaultCtx, context);
evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
evb.loop();
}
// Ensure that RequestContext created for the scope has been released and
// deleted.
EXPECT_EQ(rctx_weak_ptr.expired(), true);
EXPECT_EQ(defaultCtx, RequestContext::get());
}
TEST_F(EventBaseTest, CancelLoopCallbackRequestContextTest) {
BackendEventBase evb;
CountedLoopCallback c(&evb, 1);
auto defaultCtx = RequestContext::get();
EXPECT_EQ(defaultCtx, RequestContext::get());
std::weak_ptr<RequestContext> rctx_weak_ptr;
{
RequestContextScopeGuard rctx;
rctx_weak_ptr = RequestContext::saveContext();
auto context = RequestContext::get();
EXPECT_NE(defaultCtx, context);
evb.runInLoop(&c);
c.cancelLoopCallback();
}
// Ensure that RequestContext created for the scope has been released and
// deleted.
EXPECT_EQ(rctx_weak_ptr.expired(), true);
EXPECT_EQ(defaultCtx, RequestContext::get());
}
TEST_F(EventBaseTest, TestStarvation) {
BackendEventBase evb;
std::promise<void> stopRequested;
std::promise<void> stopScheduled;
bool stopping{false};
std::thread t{[&] {
stopRequested.get_future().get();
evb.add([&]() { stopping = true; });
stopScheduled.set_value();
}};
size_t num{0};
std::function<void()> fn;
fn = [&]() {
if (stopping || num >= 2000) {
return;
}
if (++num == 1000) {
stopRequested.set_value();
stopScheduled.get_future().get();
}
evb.add(fn);
};
evb.add(fn);
evb.loop();
EXPECT_EQ(1000, num);
t.join();
}
TEST_F(EventBaseTest, RunOnDestructionBasic) {
bool ranOnDestruction = false;
{
BackendEventBase evb;
evb.runOnDestruction([&ranOnDestruction] { ranOnDestruction = true; });
}
EXPECT_TRUE(ranOnDestruction);
}
TEST_F(EventBaseTest, RunOnDestructionCancelled) {
struct Callback : EventBase::OnDestructionCallback {
bool ranOnDestruction{false};
void onEventBaseDestruction() noexcept final {
ranOnDestruction = true;
}
};
auto cb = std::make_unique<Callback>();
{
BackendEventBase evb;
evb.runOnDestruction(*cb);
EXPECT_TRUE(cb->cancel());
}
EXPECT_FALSE(cb->ranOnDestruction);
EXPECT_FALSE(cb->cancel());
}
TEST_F(EventBaseTest, RunOnDestructionAfterHandleDestroyed) {
BackendEventBase evb;
{
bool ranOnDestruction = false;
auto* cb = new EventBase::FunctionOnDestructionCallback(
[&ranOnDestruction] { ranOnDestruction = true; });
evb.runOnDestruction(*cb);
EXPECT_TRUE(cb->cancel());
delete cb;
}
}
TEST_F(EventBaseTest, RunOnDestructionAddCallbackWithinCallback) {
size_t callbacksCalled = 0;
{
BackendEventBase evb;
evb.runOnDestruction([&] {
++callbacksCalled;
evb.runOnDestruction([&] { ++callbacksCalled; });
});
}
EXPECT_EQ(2, callbacksCalled);
}
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/io/async/test/EventBaseTestLibProvider.h>
namespace folly {
namespace test {
EventBaseBackendProvider::GetBackendFunc
EventBaseBackendProvider::getBackendFunc_;
} // namespace test
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/Function.h>
#include <folly/Memory.h>
#include <folly/io/async/EventBase.h>
namespace folly {
namespace test {
class EventBaseBackendProvider {
public:
EventBaseBackendProvider() = delete;
~EventBaseBackendProvider() = delete;
using GetBackendFunc =
folly::Function<std::unique_ptr<folly::EventBaseBackendBase>()>;
static std::unique_ptr<folly::EventBaseBackendBase> getBackend() {
CHECK(!!getBackendFunc_);
return getBackendFunc_();
}
static void setGetBackendFunc(GetBackendFunc&& func) {
getBackendFunc_ = std::move(func);
}
private:
static GetBackendFunc getBackendFunc_;
};
} // namespace test
} // namespace folly
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment