Commit 27cfd48e authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add support for io_uring based async read/recvmsg

Summary:
Add support for io_uring based async read/recvmsg

(Note: this ignores all push blocking failures!)

Reviewed By: kevin-vigor

Differential Revision: D21208891

fbshipit-source-id: 21b3b5e0f201e982ecb50a284d03ab53413ec3e3
parent e8a222d4
......@@ -220,7 +220,7 @@ int IoUringBackend::submitOne(IoCb* /*unused*/) {
}
int IoUringBackend::cancelOne(IoCb* ioCb) {
auto* rentry = static_cast<IoSqe*>(allocIoCb());
auto* rentry = static_cast<IoSqe*>(allocIoCb(EventCallback()));
if (!rentry) {
return 0;
}
......@@ -228,7 +228,7 @@ int IoUringBackend::cancelOne(IoCb* ioCb) {
auto* sqe = ::io_uring_get_sqe(&ioRing_);
CHECK(sqe);
rentry->prepPollRemove(sqe, ioCb); // prev entry
rentry->prepCancel(sqe, ioCb); // prev entry
int ret = submitBusyCheck(1, WaitForEventsMode::DONT_WAIT);
......@@ -305,11 +305,37 @@ size_t IoUringBackend::submitList(
CHECK(sqe); // this should not happen
auto* ev = entry->event_->getEvent();
entry->prepPollAdd(
sqe,
ev->ev_fd,
getPollFlags(ev->ev_events),
(ev->ev_events & EV_PERSIST) != 0);
const auto& cb = entry->event_->getCallback();
bool processed = false;
switch (cb.type_) {
case EventCallback::Type::TYPE_NONE:
break;
case EventCallback::Type::TYPE_READ:
if (auto* iov = cb.readCb_->allocateData()) {
processed = true;
entry->prepRead(
sqe, ev->ev_fd, &iov->data_, (ev->ev_events & EV_PERSIST) != 0);
entry->cbData_.set(iov);
}
break;
case EventCallback::Type::TYPE_RECVMSG:
if (auto* msg = cb.recvmsgCb_->allocateData()) {
processed = true;
entry->prepRecvmsg(
sqe, ev->ev_fd, &msg->data_, (ev->ev_events & EV_PERSIST) != 0);
entry->cbData_.set(msg);
}
break;
}
if (!processed) {
entry->cbData_.reset();
entry->prepPollAdd(
sqe,
ev->ev_fd,
getPollFlags(ev->ev_events),
(ev->ev_events & EV_PERSIST) != 0);
}
i++;
if (ioCbs.empty()) {
int num = submitBusyCheck(i, waitForEvents);
......
......@@ -104,16 +104,52 @@ class IoUringBackend : public PollIoBackend {
::io_uring_sqe_set_data(sqe, this);
}
FOLLY_ALWAYS_INLINE void prepPollRemove(
void prepRead(void* entry, int fd, const struct iovec* iov, bool registerFd)
override {
CHECK(entry);
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
if (registerFd && !fdRecord_) {
fdRecord_ = backend_->registerFd(fd);
}
if (fdRecord_) {
::io_uring_prep_read(
sqe, fdRecord_->idx_, iov->iov_base, iov->iov_len, 0 /*offset*/);
sqe->flags |= IOSQE_FIXED_FILE;
} else {
::io_uring_prep_read(
sqe, fd, iov->iov_base, iov->iov_len, 0 /*offset*/);
}
::io_uring_sqe_set_data(sqe, this);
}
void prepRecvmsg(void* entry, int fd, struct msghdr* msg, bool registerFd)
override {
CHECK(entry);
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
if (registerFd && !fdRecord_) {
fdRecord_ = backend_->registerFd(fd);
}
if (fdRecord_) {
::io_uring_prep_recvmsg(sqe, fdRecord_->idx_, msg, MSG_TRUNC);
sqe->flags |= IOSQE_FIXED_FILE;
} else {
::io_uring_prep_recvmsg(sqe, fd, msg, 0);
}
::io_uring_sqe_set_data(sqe, this);
}
FOLLY_ALWAYS_INLINE void prepCancel(
struct io_uring_sqe* sqe,
void* user_data) {
CHECK(sqe);
::io_uring_prep_poll_remove(sqe, user_data);
::io_uring_prep_cancel(sqe, user_data, 0);
::io_uring_sqe_set_data(sqe, this);
}
};
PollIoBackend::IoCb* allocNewIoCb() override {
PollIoBackend::IoCb* allocNewIoCb(const EventCallback& /*cb*/) override {
auto* ret = new IoSqe(this, false);
ret->backendCb_ = PollIoBackend::processPollIoCb;
......
......@@ -337,9 +337,9 @@ size_t PollIoBackend::processSignals() {
return ret;
}
PollIoBackend::IoCb* PollIoBackend::allocIoCb() {
PollIoBackend::IoCb* PollIoBackend::allocIoCb(const EventCallback& cb) {
// try to allocate from the pool first
if (FOLLY_LIKELY(freeHead_ != nullptr)) {
if ((cb.type_ == EventCallback::Type::TYPE_NONE) && (freeHead_ != nullptr)) {
auto* ret = freeHead_;
freeHead_ = freeHead_->next_;
numIoCbInUse_++;
......@@ -347,7 +347,7 @@ PollIoBackend::IoCb* PollIoBackend::allocIoCb() {
}
// alloc a new IoCb
auto* ret = allocNewIoCb();
auto* ret = allocNewIoCb(cb);
if (FOLLY_LIKELY(!!ret)) {
numIoCbInUse_++;
}
......@@ -357,6 +357,7 @@ PollIoBackend::IoCb* PollIoBackend::allocIoCb() {
void PollIoBackend::releaseIoCb(PollIoBackend::IoCb* aioIoCb) {
numIoCbInUse_--;
aioIoCb->cbData_.releaseData();
// unregister the file descriptor record
if (aioIoCb->fdRecord_) {
unregisterFd(aioIoCb->fdRecord_);
......@@ -388,7 +389,7 @@ void PollIoBackend::processPollIo(IoCb* ioCb, int64_t res) noexcept {
// add it to the active list
event_ref_flags(ev) |= EVLIST_ACTIVE;
ev->ev_res = getPollEvents(res, ev->ev_events);
ev->ev_res = res;
activeEvents_.push_back(*ioCb);
} else {
releaseIoCb(ioCb);
......@@ -413,11 +414,15 @@ size_t PollIoBackend::processActiveEvents() {
// prevent the callback from freeing the aioIoCb
ioCb->useCount_++;
// handle spurious poll events that return 0
// this can happen during high load on process startup
if (ev->ev_res) {
(*event_ref_callback(ev))(
(int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
if (!ioCb->cbData_.processCb(ev->ev_res)) {
// adjust the ev_res for the poll case
ev->ev_res = getPollEvents(ev->ev_res, ev->ev_events);
// handle spurious poll events that return 0
// this can happen during high load on process startup
if (ev->ev_res) {
(*event_ref_callback(ev))(
(int)ev->ev_fd, ev->ev_res, event_ref_arg(ev));
}
}
// get the event again
event = ioCb->event_;
......@@ -535,7 +540,7 @@ int PollIoBackend::eb_event_add(Event& event, const struct timeval* timeout) {
if ((ev->ev_events & (EV_READ | EV_WRITE)) &&
!(event_ref_flags(ev) & (EVLIST_INSERTED | EVLIST_ACTIVE))) {
auto* iocb = allocIoCb();
auto* iocb = allocIoCb(event.getCallback());
CHECK(iocb);
iocb->event_ = &event;
......
......@@ -96,6 +96,83 @@ class PollIoBackend : public EventBaseBackendBase {
virtual void
prepPollAdd(void* entry, int fd, uint32_t events, bool registerFd) = 0;
virtual void prepRead(
void* /*entry*/,
int /*fd*/,
const struct iovec* /*iov*/,
bool /*registerFd*/) {}
virtual void prepRecvmsg(
void* /*entry*/,
int /*fd*/,
struct msghdr* /*msg*/,
bool /*registerFd*/) {}
struct EventCallbackData {
EventCallback::Type type_{EventCallback::Type::TYPE_NONE};
union {
EventReadCallback::IoVec* ioVec_;
EventRecvmsgCallback::MsgHdr* msgHdr_;
};
void set(EventReadCallback::IoVec* ioVec) {
type_ = EventCallback::Type::TYPE_READ;
ioVec_ = ioVec;
}
void set(EventRecvmsgCallback::MsgHdr* msgHdr) {
type_ = EventCallback::Type::TYPE_RECVMSG;
msgHdr_ = msgHdr;
}
void reset() {
type_ = EventCallback::Type::TYPE_NONE;
}
bool processCb(int res) {
bool ret = false;
switch (type_) {
case EventCallback::Type::TYPE_READ: {
ret = true;
auto cbFunc = ioVec_->cbFunc_;
cbFunc(ioVec_, res);
break;
}
case EventCallback::Type::TYPE_RECVMSG: {
ret = true;
auto cbFunc = msgHdr_->cbFunc_;
cbFunc(msgHdr_, res);
break;
}
case EventCallback::Type::TYPE_NONE:
break;
}
type_ = EventCallback::Type::TYPE_NONE;
return ret;
}
void releaseData() {
switch (type_) {
case EventCallback::Type::TYPE_READ: {
auto freeFunc = ioVec_->freeFunc_;
freeFunc(ioVec_);
break;
}
case EventCallback::Type::TYPE_RECVMSG: {
auto freeFunc = msgHdr_->freeFunc_;
freeFunc(msgHdr_);
break;
}
case EventCallback::Type::TYPE_NONE:
break;
}
type_ = EventCallback::Type::TYPE_NONE;
}
};
EventCallbackData cbData_;
};
using IoCbList =
......@@ -231,10 +308,10 @@ class PollIoBackend : public EventBaseBackendBase {
void processPollIo(IoCb* ioCb, int64_t res) noexcept;
IoCb* FOLLY_NULLABLE allocIoCb();
IoCb* FOLLY_NULLABLE allocIoCb(const EventCallback& cb);
void releaseIoCb(IoCb* aioIoCb);
virtual IoCb* allocNewIoCb() = 0;
virtual IoCb* allocNewIoCb(const EventCallback& cb) = 0;
virtual void* allocSubmissionEntry() = 0;
virtual int getActiveEvents(WaitForEventsMode waitForEvents) = 0;
......
......@@ -117,6 +117,8 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
validateSocketOptions(
options, addy.getFamily(), SocketOptionKey::ApplyPos::POST_BIND),
SocketOptionKey::ApplyPos::POST_BIND);
applyEventCallback();
}
void setReusePort(bool reusePort) {
......@@ -208,6 +210,11 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
}
}
void setEventCallback(EventRecvmsgCallback* cb) {
eventCb_ = cb;
applyEventCallback();
}
private:
// AsyncUDPSocket::ReadCallback
void getReadBuffer(void** buf, size_t* len) noexcept override {
......@@ -283,6 +290,16 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
}
}
void applyEventCallback() {
if (socket_) {
if (eventCb_) {
socket_->setEventCallback(eventCb_);
} else {
socket_->resetEventCallback();
}
}
}
EventBase* const evb_;
const size_t packetSize_;
......@@ -302,6 +319,8 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback,
bool reusePort_{false};
bool reuseAddr_{false};
EventRecvmsgCallback* eventCb_{nullptr};
};
} // namespace folly
......@@ -19,11 +19,72 @@
#include <memory>
#include <folly/io/async/EventUtil.h>
#include <folly/net/NetOps.h>
#include <folly/portability/Event.h>
#include <folly/portability/IOVec.h>
namespace folly {
class EventBase;
class EventReadCallback {
public:
struct IoVec {
virtual ~IoVec() = default;
using FreeFunc = void (*)(IoVec*);
using CallbackFunc = void (*)(IoVec*, int);
void* arg_{nullptr};
struct iovec data_;
FreeFunc freeFunc_{nullptr};
CallbackFunc cbFunc_{nullptr};
};
EventReadCallback() = default;
virtual ~EventReadCallback() = default;
virtual IoVec* allocateData() = 0;
};
class EventRecvmsgCallback {
public:
struct MsgHdr {
virtual ~MsgHdr() = default;
using FreeFunc = void (*)(MsgHdr*);
using CallbackFunc = void (*)(MsgHdr*, int);
void* arg_{nullptr};
struct msghdr data_;
FreeFunc freeFunc_{nullptr};
CallbackFunc cbFunc_{nullptr};
};
EventRecvmsgCallback() = default;
virtual ~EventRecvmsgCallback() = default;
virtual MsgHdr* allocateData() = 0;
};
struct EventCallback {
enum class Type { TYPE_NONE = 0, TYPE_READ = 1, TYPE_RECVMSG = 2 };
Type type_{Type::TYPE_NONE};
union {
EventReadCallback* readCb_;
EventRecvmsgCallback* recvmsgCb_;
};
void set(EventReadCallback* cb) {
type_ = Type::TYPE_READ;
readCb_ = cb;
}
void set(EventRecvmsgCallback* cb) {
type_ = Type::TYPE_RECVMSG;
recvmsgCb_ = cb;
}
void reset() {
type_ = Type::TYPE_NONE;
}
};
class EventBaseEvent {
public:
EventBaseEvent() = default;
......@@ -75,6 +136,22 @@ class EventBaseEvent {
freeFn_ = freeFn;
}
void setCallback(EventReadCallback* cb) {
cb_.set(cb);
}
void setCallback(EventRecvmsgCallback* cb) {
cb_.set(cb);
}
void resetCallback() {
cb_.reset();
}
const EventCallback& getCallback() const {
return cb_;
}
void eb_event_set(
libevent_fd_t fd,
short events,
......@@ -110,6 +187,7 @@ class EventBaseEvent {
EventBase* evb_{nullptr};
void* userData_{nullptr};
FreeFunction freeFn_{nullptr};
EventCallback cb_;
};
class EventBaseBackendBase {
......
......@@ -179,6 +179,18 @@ class EventHandler {
bool isPending() const;
void setEventCallback(EventReadCallback* cb) {
event_.setCallback(cb);
}
void setEventCallback(EventRecvmsgCallback* cb) {
event_.setCallback(cb);
}
void resetEventCallback() {
event_.resetCallback();
}
private:
bool registerImpl(uint16_t events, bool internal);
void ensureNotRegistered(const char* fn);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment