Commit bf66d627 authored by Kevin Vigor's avatar Kevin Vigor Committed by Facebook GitHub Bot

co-routine interface

Summary: Add co-routine interface to SImpleAsyncIO.

Reviewed By: iahs

Differential Revision: D25932969

fbshipit-source-id: df14e25a0fe293a218d6bef0c4bd1bc8e690efba
parent 448228ba
......@@ -30,6 +30,9 @@
#error "Cannot build without at least one of AsyncIO.h and IoUring.h"
#endif
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/Baton.h>
#endif
#include <folly/portability/Sockets.h>
namespace folly {
......@@ -207,4 +210,30 @@ void SimpleAsyncIO::pwrite(
std::move(completor));
}
#if FOLLY_HAS_COROUTINES
folly::coro::Task<int>
SimpleAsyncIO::co_pwrite(int fd, const void* buf, size_t size, off_t start) {
folly::coro::Baton done;
int result;
pwrite(fd, buf, size, start, [&done, &result](int rc) {
result = rc;
done.post();
});
co_await done;
co_return result;
}
folly::coro::Task<int>
SimpleAsyncIO::co_pread(int fd, void* buf, size_t size, off_t start) {
folly::coro::Baton done;
int result;
pread(fd, buf, size, start, [&done, &result](int rc) {
result = rc;
done.post();
});
co_await done;
co_return result;
}
#endif // FOLLY_HAS_COROUTINES
} // namespace folly
......@@ -23,6 +23,10 @@
#include <folly/io/async/ScopedEventBaseThread.h>
#include <queue>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/Task.h>
#endif
namespace folly {
class SimpleAsyncIO : public EventHandler {
......@@ -106,6 +110,14 @@ class SimpleAsyncIO : public EventHandler {
using SimpleAsyncIOCompletor = Function<void(int rc)>;
/* Initiate an asynchronous read request.
*
* Parameters and return value are same as pread(2).
*
* Completion is indicated by an asynchronous call to the given completor
* callback. The sole parameter to the callback is the result of the
* operation.
*/
void pread(
int fd,
void* buf,
......@@ -113,6 +125,14 @@ class SimpleAsyncIO : public EventHandler {
off_t start,
SimpleAsyncIOCompletor completor);
/* Initiate an asynchronous write request.
*
* Parameters and return value are same as pwrite(2).
*
* Completion is indicated by an asynchronous call to the given completor
* callback. The sole parameter to the callback is the result of the
* operation.
*/
void pwrite(
int fd,
const void* data,
......@@ -120,6 +140,22 @@ class SimpleAsyncIO : public EventHandler {
off_t offset,
SimpleAsyncIOCompletor completor);
#if FOLLY_HAS_COROUTINES
/* Coroutine version of pread().
*
* Identical to pread() except that result is obtained by co_await instead of
* callback.
*/
folly::coro::Task<int> co_pread(int fd, void* buf, size_t size, off_t start);
/* Coroutine version of pwrite().
*
* Identical to pwrite() except that result is obtained by co_await instead of
* callback.
*/
folly::coro::Task<int>
co_pwrite(int fd, const void* buf, size_t size, off_t start);
#endif
private:
std::unique_ptr<AsyncBaseOp> getOp();
void putOp(std::unique_ptr<AsyncBaseOp>&&);
......
......@@ -17,6 +17,8 @@
#include <folly/experimental/io/SimpleAsyncIO.h>
#include <folly/File.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Collect.h>
#include <folly/synchronization/Baton.h>
#include <glog/logging.h>
......@@ -158,6 +160,61 @@ TEST_P(SimpleAsyncIOTest, DestroyWithPendingIO) {
ASSERT_EQ(completed, numWrites);
}
#if FOLLY_HAS_COROUTINES
static folly::coro::Task<folly::Unit> doCoAsyncWrites(
SimpleAsyncIO& aio,
int fd,
std::string const& data,
int copies) {
std::vector<folly::coro::Task<int>> writes;
for (int i = 0; i < copies; ++i) {
writes.emplace_back(
aio.co_pwrite(fd, data.data(), data.length(), data.length() * i));
}
auto results = co_await folly::coro::collectAllRange(std::move(writes));
for (int result : results) {
EXPECT_EQ(result, data.length());
}
co_return Unit{};
}
static folly::coro::Task<folly::Unit> doCoAsyncReads(
SimpleAsyncIO& aio,
int fd,
std::string const& data,
int copies) {
std::vector<std::unique_ptr<char[]>> buffers;
std::vector<folly::coro::Task<int>> reads;
for (int i = 0; i < copies; ++i) {
buffers.emplace_back(std::make_unique<char[]>(data.length()));
reads.emplace_back(
aio.co_pread(fd, buffers[i].get(), data.length(), data.length() * i));
}
auto results = co_await folly::coro::collectAllRange(std::move(reads));
for (int i = 0; i < copies; ++i) {
EXPECT_EQ(results[i], data.length());
EXPECT_EQ(::memcmp(data.data(), buffers[i].get(), data.length()), 0);
}
co_return Unit{};
}
TEST_P(SimpleAsyncIOTest, CoroutineReadWrite) {
auto tmpfile = File::temporary();
int fd = tmpfile.fd();
SimpleAsyncIO aio(config_);
std::string testStr = "Uncle Touchy goes to college";
folly::coro::blockingWait(doCoAsyncWrites(aio, fd, testStr, 10));
folly::coro::blockingWait(doCoAsyncReads(aio, fd, testStr, 10));
}
#endif // FOLLY_HAS_COROUTINES
INSTANTIATE_TEST_CASE_P(
SimpleAsyncIOTests,
SimpleAsyncIOTest,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment