Commit 64058698 authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

processSubmit rework

Summary: processSubmit rework

Reviewed By: mokomull

Differential Revision: D22405528

fbshipit-source-id: 99db524e62264d80556e6ac771f6d504cb31bd37
parent fb4af641
......@@ -111,9 +111,6 @@ IoUringBackend::IoUringBackend(Options options)
throw NotAvailable("io_uring_queue_init error");
}
sqRingMask_ = *ioRing_.sq.kring_mask;
cqRingMask_ = *ioRing_.cq.kring_mask;
numEntries_ *= 2;
entries_.reset(new IoSqe[numEntries_]);
......@@ -307,48 +304,9 @@ size_t IoUringBackend::submitList(
auto* sqe = ::io_uring_get_sqe(&ioRing_);
CHECK(sqe); // this should not happen
auto* ev = entry->event_->getEvent();
if (ev) {
const auto& cb = entry->event_->getCallback();
bool processed = false;
switch (cb.type_) {
case EventCallback::Type::TYPE_NONE:
processed = entry->processSubmit(sqe);
break;
case EventCallback::Type::TYPE_READ:
if (auto* iov = cb.readCb_->allocateData()) {
processed = true;
entry->prepRead(
sqe,
ev->ev_fd,
&iov->data_,
0,
(ev->ev_events & EV_PERSIST) != 0);
entry->cbData_.set(iov);
}
break;
case EventCallback::Type::TYPE_RECVMSG:
if (auto* msg = cb.recvmsgCb_->allocateData()) {
processed = true;
entry->prepRecvmsg(
sqe, ev->ev_fd, &msg->data_, (ev->ev_events & EV_PERSIST) != 0);
entry->cbData_.set(msg);
}
break;
}
if (!processed) {
entry->cbData_.reset();
entry->prepPollAdd(
sqe,
ev->ev_fd,
getPollFlags(ev->ev_events),
(ev->ev_events & EV_PERSIST) != 0);
}
} else {
entry->processSubmit(sqe);
}
entry->processSubmit(sqe);
i++;
if (ioCbs.empty()) {
int num = submitBusyCheck(i, waitForEvents);
CHECK_EQ(num, i);
......
......@@ -126,6 +126,48 @@ class IoUringBackend : public PollIoBackend {
: PollIoBackend::IoCb(backend, poolAlloc) {}
~IoSqe() override = default;
void processSubmit(void* entry) override {
auto* ev = event_->getEvent();
if (ev) {
struct io_uring_sqe* sqe =
reinterpret_cast<struct io_uring_sqe*>(entry);
const auto& cb = event_->getCallback();
switch (cb.type_) {
case EventCallback::Type::TYPE_NONE:
break;
case EventCallback::Type::TYPE_READ:
if (auto* iov = cb.readCb_->allocateData()) {
prepRead(
sqe,
ev->ev_fd,
&iov->data_,
0,
(ev->ev_events & EV_PERSIST) != 0);
cbData_.set(iov);
return;
}
break;
case EventCallback::Type::TYPE_RECVMSG:
if (auto* msg = cb.recvmsgCb_->allocateData()) {
prepRecvmsg(
sqe,
ev->ev_fd,
&msg->data_,
(ev->ev_events & EV_PERSIST) != 0);
cbData_.set(msg);
return;
}
break;
}
prepPollAdd(
sqe,
ev->ev_fd,
getPollFlags(ev->ev_events),
(ev->ev_events & EV_PERSIST) != 0);
}
}
void prepPollAdd(void* entry, int fd, uint32_t events, bool registerFd)
override {
CHECK(entry);
......@@ -148,7 +190,7 @@ class IoUringBackend : public PollIoBackend {
int fd,
const struct iovec* iov,
off_t offset,
bool registerFd) override {
bool registerFd) {
CHECK(entry);
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
if (registerFd && !fdRecord_) {
......@@ -170,7 +212,7 @@ class IoUringBackend : public PollIoBackend {
int fd,
const struct iovec* iov,
off_t offset,
bool registerFd) override {
bool registerFd) {
CHECK(entry);
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
if (registerFd && !fdRecord_) {
......@@ -187,8 +229,7 @@ class IoUringBackend : public PollIoBackend {
::io_uring_sqe_set_data(sqe, this);
}
void prepRecvmsg(void* entry, int fd, struct msghdr* msg, bool registerFd)
override {
void prepRecvmsg(void* entry, int fd, struct msghdr* msg, bool registerFd) {
CHECK(entry);
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
if (registerFd && !fdRecord_) {
......@@ -264,9 +305,8 @@ class IoUringBackend : public PollIoBackend {
~ReadIoSqe() override = default;
bool processSubmit(void* entry) override {
void processSubmit(void* entry) override {
prepRead(entry, fd_, iov_.data(), offset_, false);
return true;
}
};
......@@ -274,9 +314,8 @@ class IoUringBackend : public PollIoBackend {
using ReadWriteIoSqe::ReadWriteIoSqe;
~WriteIoSqe() override = default;
bool processSubmit(void* entry) override {
void processSubmit(void* entry) override {
prepWrite(entry, fd_, iov_.data(), offset_, false);
return true;
}
};
......@@ -285,12 +324,10 @@ class IoUringBackend : public PollIoBackend {
~ReadvIoSqe() override = default;
bool processSubmit(void* entry) override {
void processSubmit(void* entry) override {
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
::io_uring_prep_readv(sqe, fd_, iov_.data(), iov_.size(), offset_);
::io_uring_sqe_set_data(sqe, this);
return true;
}
};
......@@ -298,11 +335,10 @@ class IoUringBackend : public PollIoBackend {
using ReadWriteIoSqe::ReadWriteIoSqe;
~WritevIoSqe() override = default;
bool processSubmit(void* entry) override {
void processSubmit(void* entry) override {
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
::io_uring_prep_writev(sqe, fd_, iov_.data(), iov_.size(), offset_);
::io_uring_sqe_set_data(sqe, this);
return true;
}
};
......@@ -321,7 +357,7 @@ class IoUringBackend : public PollIoBackend {
~FSyncIoSqe() override = default;
bool processSubmit(void* entry) override {
void processSubmit(void* entry) override {
struct io_uring_sqe* sqe = reinterpret_cast<struct io_uring_sqe*>(entry);
unsigned int fsyncFlags = 0;
......@@ -336,7 +372,6 @@ class IoUringBackend : public PollIoBackend {
::io_uring_prep_fsync(sqe, fd_, fsyncFlags);
::io_uring_sqe_set_data(sqe, this);
return true;
}
FSyncFlags flags_;
......@@ -367,9 +402,6 @@ class IoUringBackend : public PollIoBackend {
struct io_uring_params params_;
struct io_uring ioRing_;
uint32_t sqRingMask_{0};
uint32_t cqRingMask_{0};
FdRegistry fdRegistry_;
};
} // namespace folly
......@@ -110,9 +110,7 @@ class PollIoBackend : public EventBaseBackendBase {
: backend_(backend), poolAlloc_(poolAlloc) {}
virtual ~IoCb() = default;
virtual bool processSubmit(void* /*entry*/) {
return false;
}
virtual void processSubmit(void* entry) = 0;
virtual void processActive() {}
......@@ -136,26 +134,6 @@ class PollIoBackend : public EventBaseBackendBase {
virtual void
prepPollAdd(void* entry, int fd, uint32_t events, bool registerFd) = 0;
virtual void prepRead(
void* /*entry*/,
int /*fd*/,
const struct iovec* /*iov*/,
off_t /*offset*/,
bool /*registerFd*/) {}
virtual void prepWrite(
void* /*entry*/,
int /*fd*/,
const struct iovec* /*iov*/,
off_t /*offset*/,
bool /*registerFd*/) {}
virtual void prepRecvmsg(
void* /*entry*/,
int /*fd*/,
struct msghdr* /*msg*/,
bool /*registerFd*/) {}
struct EventCallbackData {
EventCallback::Type type_{EventCallback::Type::TYPE_NONE};
union {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment