Commit 51ae3ffb authored by Dan Melnic's avatar Dan Melnic Committed by Facebook GitHub Bot

Add sendmsg/recmsg io_uring support

Summary: Add sendmsg/recmsg io_uring support

Reviewed By: yfeldblum

Differential Revision: D27148068

fbshipit-source-id: efb9174fb40f5f5bf0d0318db12cb61075866381
parent 653f9938
...@@ -1297,6 +1297,24 @@ void IoUringBackend::queueFallocate( ...@@ -1297,6 +1297,24 @@ void IoUringBackend::queueFallocate(
submitImmediateIoSqe(*ioSqe); submitImmediateIoSqe(*ioSqe);
} }
void IoUringBackend::queueSendmsg(
int fd, const struct msghdr* msg, unsigned int flags, FileOpCallback&& cb) {
auto* ioSqe = new SendmsgIoSqe(this, fd, msg, flags, std::move(cb));
ioSqe->backendCb_ = processFileOpCB;
incNumIoSqeInUse();
submitImmediateIoSqe(*ioSqe);
}
void IoUringBackend::queueRecvmsg(
int fd, struct msghdr* msg, unsigned int flags, FileOpCallback&& cb) {
auto* ioSqe = new RecvmsgIoSqe(this, fd, msg, flags, std::move(cb));
ioSqe->backendCb_ = processFileOpCB;
incNumIoSqeInUse();
submitImmediateIoSqe(*ioSqe);
}
void IoUringBackend::processFileOp(IoSqe* sqe, int64_t res) noexcept { void IoUringBackend::processFileOp(IoSqe* sqe, int64_t res) noexcept {
auto* ioSqe = reinterpret_cast<FileOpIoSqe*>(sqe); auto* ioSqe = reinterpret_cast<FileOpIoSqe*>(sqe);
// save the res // save the res
......
...@@ -236,6 +236,16 @@ class IoUringBackend : public EventBaseBackendBase { ...@@ -236,6 +236,16 @@ class IoUringBackend : public EventBaseBackendBase {
void queueFallocate( void queueFallocate(
int fd, int mode, off_t offset, off_t len, FileOpCallback&& cb); int fd, int mode, off_t offset, off_t len, FileOpCallback&& cb);
// sendmgs/recvmsg
void queueSendmsg(
int fd,
const struct msghdr* msg,
unsigned int flags,
FileOpCallback&& cb);
void queueRecvmsg(
int fd, struct msghdr* msg, unsigned int flags, FileOpCallback&& cb);
protected: protected:
enum class WaitForEventsMode { WAIT, DONT_WAIT }; enum class WaitForEventsMode { WAIT, DONT_WAIT };
...@@ -817,6 +827,46 @@ class IoUringBackend : public EventBaseBackendBase { ...@@ -817,6 +827,46 @@ class IoUringBackend : public EventBaseBackendBase {
off_t len_; off_t len_;
}; };
struct SendmsgIoSqe : public FileOpIoSqe {
SendmsgIoSqe(
IoUringBackend* backend,
int fd,
const struct msghdr* msg,
unsigned int flags,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)), msg_(msg), flags_(flags) {}
~SendmsgIoSqe() override = default;
void processSubmit(struct io_uring_sqe* sqe) override {
::io_uring_prep_sendmsg(sqe, fd_, msg_, flags_);
::io_uring_sqe_set_data(sqe, this);
}
const struct msghdr* msg_;
unsigned int flags_;
};
struct RecvmsgIoSqe : public FileOpIoSqe {
RecvmsgIoSqe(
IoUringBackend* backend,
int fd,
struct msghdr* msg,
unsigned int flags,
FileOpCallback&& cb)
: FileOpIoSqe(backend, fd, std::move(cb)), msg_(msg), flags_(flags) {}
~RecvmsgIoSqe() override = default;
void processSubmit(struct io_uring_sqe* sqe) override {
::io_uring_prep_recvmsg(sqe, fd_, msg_, flags_);
::io_uring_sqe_set_data(sqe, this);
}
struct msghdr* msg_;
unsigned int flags_;
};
int getActiveEvents(WaitForEventsMode waitForEvents); int getActiveEvents(WaitForEventsMode waitForEvents);
size_t submitList(IoSqeList& ioSqes, WaitForEventsMode waitForEvents); size_t submitList(IoSqeList& ioSqes, WaitForEventsMode waitForEvents);
int submitOne(); int submitOne();
......
...@@ -1047,6 +1047,110 @@ TEST(IoUringBackend, FileWriteMany) { ...@@ -1047,6 +1047,110 @@ TEST(IoUringBackend, FileWriteMany) {
EXPECT_EQ(bFsync, true); EXPECT_EQ(bFsync, true);
} }
TEST(IoUringBackend, SendmsgRecvmsg) {
static constexpr size_t kBackendCapacity = 256;
static constexpr size_t kBackendMaxSubmit = 32;
static constexpr size_t kBackendMaxGet = 32;
folly::PollIoBackend::Options options;
options.setCapacity(kBackendCapacity)
.setMaxSubmit(kBackendMaxSubmit)
.setMaxGet(kBackendMaxGet)
.setUseRegisteredFds(false);
auto evbPtr = getEventBase(options);
SKIP_IF(!evbPtr) << "Backend not available";
auto* backendPtr = dynamic_cast<folly::IoUringBackend*>(evbPtr->getBackend());
CHECK(!!backendPtr);
// we want raw sockets
auto sendFd = ::socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
CHECK_GT(sendFd, 0);
auto recvFd = ::socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
CHECK_GT(sendFd, 0);
folly::SocketAddress addr("::1", 0);
sockaddr_storage addrStorage;
addr.getAddress(&addrStorage);
auto& saddr = reinterpret_cast<sockaddr&>(addrStorage);
auto ret = ::bind(sendFd, &saddr, addr.getActualSize());
CHECK_EQ(ret, 0);
ret = ::bind(recvFd, &saddr, addr.getActualSize());
CHECK_EQ(ret, 0);
folly::SocketAddress sendAddr;
folly::SocketAddress recvAddr;
sendAddr.setFromLocalAddress(folly::NetworkSocket(sendFd));
recvAddr.setFromLocalAddress(folly::NetworkSocket(recvFd));
bool sendDone = false;
bool recvDone = false;
static constexpr size_t kNumBytes = 64;
static std::array<char, kNumBytes> sendBuf, recvBuf;
folly::IoUringBackend::FileOpCallback sendCb = [&](int res) {
CHECK_EQ(res, kNumBytes);
CHECK(!sendDone);
sendDone = true;
if (recvDone) {
evbPtr->terminateLoopSoon();
}
};
folly::IoUringBackend::FileOpCallback recvCb = [&](int res) {
CHECK_EQ(res, kNumBytes);
CHECK(!recvDone);
recvDone = true;
CHECK_EQ(::memcmp(sendBuf.data(), recvBuf.data(), kNumBytes), 0);
if (sendDone) {
evbPtr->terminateLoopSoon();
}
};
struct msghdr sendMsg = {};
struct msghdr recvMsg = {};
struct iovec sendIov, recvIov;
sendIov.iov_base = sendBuf.data();
sendIov.iov_len = sendBuf.size();
recvIov.iov_base = recvBuf.data();
recvIov.iov_len = recvBuf.size();
recvAddr.getAddress(&addrStorage);
sendMsg.msg_iov = &sendIov;
sendMsg.msg_iovlen = 1;
sendMsg.msg_name = reinterpret_cast<void*>(&addrStorage);
sendMsg.msg_namelen = recvAddr.getActualSize();
sendMsg.msg_iov = &sendIov;
sendMsg.msg_iovlen = 1;
recvMsg.msg_iov = &recvIov;
recvMsg.msg_iovlen = 1;
::memset(sendBuf.data(), 0xAB, sendBuf.size());
::memset(recvBuf.data(), 0x0, recvBuf.size());
CHECK_NE(::memcmp(sendBuf.data(), recvBuf.data(), kNumBytes), 0);
backendPtr->queueRecvmsg(recvFd, &recvMsg, 0, std::move(recvCb));
backendPtr->queueSendmsg(sendFd, &sendMsg, 0, std::move(sendCb));
evbPtr->loopForever();
CHECK(sendDone && recvDone);
::close(sendFd);
::close(recvFd);
}
namespace folly { namespace folly {
namespace test { namespace test {
static constexpr size_t kCapacity = 32; static constexpr size_t kCapacity = 32;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment