Commit 37a33e67 authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

Add folly::coro::materialize() & folly::coro::dematerialize() algorithms

Summary:
The `materialize()` algorithm meterializes values from a stream into a stream of events that represent each value and also the end-of-stream and stream-error events.

`dematerialize()` reverses a materialized stream by replaying the events recorded in the input stream to an output stream.

Reviewed By: lewissbaker

Differential Revision: D16772236

fbshipit-source-id: 27de543ea2d02691a376fd0f11d0113c5c39287d
parent f3352d86
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/coro/Materialize.h>
namespace folly {
namespace coro {
template <typename Reference, typename Value>
AsyncGenerator<Reference, Value> dematerialize(
AsyncGenerator<CallbackRecord<Reference>, CallbackRecord<Value>> source) {
while (auto item = co_await source.next()) {
if (item->hasValue()) {
// Value
co_yield std::move(*item).value();
} else if (item->hasNone()) {
// None
co_return;
} else if (item->hasError()) {
// Exception
std::move(*item).error().throw_exception();
} else {
DCHECK(false);
std::terminate();
}
}
}
} // namespace coro
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Materialize.h>
#include <variant>
namespace folly {
namespace coro {
// Dematerialize the CallbackRecords from an input stream into a stream that
// replays all of the events materialized in the input stream.
//
// The input is a stream of CallbackRecord.
//
// The output is a stream of Value.
//
// Example:
// AsyncGenerator<CallbackRecord<int>> stream();
//
// Task<void> consumer() {
// auto events = dematerialize(stream());
// try {
// while (auto item = co_await events.next()) {
// // Value
// auto&& value = *item;
// std::cout << "value " << value << "\n";
// }
// // None
// std::cout << "end\n";
// } catch (const std::exception& error) {
// // Exception
// std::cout << "error " << error.what() << "\n";
// }
// }
template <typename Reference, typename Value>
AsyncGenerator<Reference, Value> dematerialize(
AsyncGenerator<CallbackRecord<Reference>, CallbackRecord<Value>> source);
} // namespace coro
} // namespace folly
#include <folly/experimental/coro/Dematerialize-inl.h>
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace folly {
namespace coro {
template <typename Reference, typename Value>
AsyncGenerator<CallbackRecord<Reference>, CallbackRecord<Value>> materialize(
AsyncGenerator<Reference, Value> source) {
using EventType = CallbackRecord<Reference>;
folly::exception_wrapper ex;
try {
while (auto item = co_await source.next()) {
co_yield EventType{callback_record_value, *std::move(item)};
}
} catch (const std::exception& e) {
ex = folly::exception_wrapper{std::current_exception(), e};
} catch (...) {
ex = folly::exception_wrapper{std::current_exception()};
}
if (ex) {
co_yield EventType{callback_record_error, std::move(ex)};
} else {
co_yield EventType{callback_record_none};
}
}
} // namespace coro
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <variant>
namespace folly {
namespace coro {
enum class CallbackRecordSelector { Invalid, Value, None, Error };
constexpr inline std::in_place_index_t<0> const callback_record_value{};
constexpr inline std::in_place_index_t<1> const callback_record_none{};
constexpr inline std::in_place_index_t<2> const callback_record_error{};
//
// CallbackRecord records the result of a single invocation of a callback.
//
// This is very related to Try and expected, but this also records None in
// addition to Value and Error results.
//
// When the callback supports multiple overloads of Value then T would be
// something like a variant<tuple<..>, ..>
//
// When the callback supports multiple overloads of Error then all the errors
// are coerced to folly::exception_wrapper
//
template <class T>
class CallbackRecord {
static void clear(CallbackRecord* that) {
auto selector =
std::exchange(that->selector_, CallbackRecordSelector::Invalid);
if (selector == CallbackRecordSelector::Value) {
that->value_.destruct();
} else if (selector == CallbackRecordSelector::Error) {
that->error_.destruct();
}
}
template <class OtherReference>
static void convert_variant(
CallbackRecord* that,
const CallbackRecord<OtherReference>& other) {
if (other.hasValue()) {
that->value_.construct(other.value_.get());
} else if (other.hasError()) {
that->error_.construct(other.error_.get());
}
that->selector_ = other.selector_;
}
template <class OtherReference>
static void convert_variant(
CallbackRecord* that,
CallbackRecord<OtherReference>&& other) {
if (other.hasValue()) {
that->value_.construct(std::move(other.value_).get());
} else if (other.hasError()) {
that->error_.construct(std::move(other.error_).get());
}
that->selector_ = other.selector_;
}
public:
~CallbackRecord() {
clear(this);
}
CallbackRecord() noexcept : selector_(CallbackRecordSelector::Invalid) {}
template <class V>
CallbackRecord(const std::in_place_index_t<0>&, V&& v) noexcept(
std::is_nothrow_constructible_v<T, V>)
: CallbackRecord() {
value_.construct(std::forward<V>(v));
selector_ = CallbackRecordSelector::Value;
}
explicit CallbackRecord(const std::in_place_index_t<1>&) noexcept
: selector_(CallbackRecordSelector::None) {}
CallbackRecord(
const std::in_place_index_t<2>&,
folly::exception_wrapper e) noexcept
: CallbackRecord() {
error_.construct(std::move(e));
selector_ = CallbackRecordSelector::Error;
}
CallbackRecord(CallbackRecord&& other) noexcept(
std::is_nothrow_move_constructible_v<T>)
: CallbackRecord() {
convert_variant(this, std::move(other));
}
CallbackRecord& operator=(CallbackRecord&& other) noexcept(
std::is_nothrow_move_constructible_v<T>) {
if (&other != this) {
clear(this);
convert_variant(this, std::move(other));
}
return *this;
}
template <class U>
CallbackRecord(CallbackRecord<U>&& other) noexcept(
std::is_nothrow_constructible_v<T, U>)
: CallbackRecord() {
convert_variant(this, std::move(other));
}
bool hasNone() const noexcept {
return selector_ == CallbackRecordSelector::None;
}
bool hasError() const noexcept {
return selector_ == CallbackRecordSelector::Error;
}
decltype(auto) error() & {
DCHECK(hasError());
return error_.get();
}
decltype(auto) error() && {
DCHECK(hasError());
return std::move(error_).get();
}
decltype(auto) error() const& {
DCHECK(hasError());
return error_.get();
}
decltype(auto) error() const&& {
DCHECK(hasError());
return std::move(error_).get();
}
bool hasValue() const noexcept {
return selector_ == CallbackRecordSelector::Value;
}
decltype(auto) value() & {
DCHECK(hasValue());
return value_.get();
}
decltype(auto) value() && {
DCHECK(hasValue());
return std::move(value_).get();
}
decltype(auto) value() const& {
DCHECK(hasValue());
return value_.get();
}
decltype(auto) value() const&& {
DCHECK(hasValue());
return std::move(value_).get();
}
explicit operator bool() const noexcept {
return selector_ != CallbackRecordSelector::Invalid;
}
private:
union {
detail::ManualLifetime<T> value_;
detail::ManualLifetime<folly::exception_wrapper> error_;
};
CallbackRecordSelector selector_;
};
// Materialize the results an input stream into a stream that
// contains all of the events of the input stream.
//
// The output is a stream of CallbackRecord.
//
// Example:
// AsyncGenerator<int> stream();
//
// Task<void> consumer() {
// auto events = materialize(stream());
// while (auto item = co_await events.next()) {
// auto&& event = *item;
// if (event.hasValue()) {
// // Value
// int value = std::move(event).value();
// std::cout << "value " << value << "\n";
// } else if (event.hasNone()) {
// // None
// std::cout << "end\n";
// } else {
// // Exception
// folly::exception_wrapper error = std::move(event).error();
// std::cout << "error " << error.what() << "\n";
// }
// }
// }
template <typename Reference, typename Value>
AsyncGenerator<CallbackRecord<Reference>, CallbackRecord<Value>> materialize(
AsyncGenerator<Reference, Value> source);
} // namespace coro
} // namespace folly
#include <folly/experimental/coro/Materialize-inl.h>
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/Dematerialize.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
using namespace folly::coro;
TEST(Dematerialize, SimpleStream) {
struct MyError : std::exception {};
const int seenEndOfStream = 100;
const int seenError = 200;
std::vector<int> lastSeen(3, -1);
std::vector<int> expectedSeen = {
{seenEndOfStream, seenEndOfStream, seenError}};
auto selectStream = [](int index) -> AsyncGenerator<int> {
auto ints = [](int n) -> AsyncGenerator<int> {
for (int i = 0; i <= n; ++i) {
co_await co_reschedule_on_current_executor;
co_yield i;
}
};
auto failing = []() -> AsyncGenerator<int> {
co_yield 0;
co_yield 1;
throw MyError{};
};
if (index == 0) {
return ints(4);
} else if (index == 1) {
return ints(3);
}
return failing();
};
auto test = [&](int index) -> Task<void> {
auto generator = dematerialize(materialize(selectStream(index)));
try {
while (auto item = co_await generator.next()) {
auto value = *item;
CHECK(index >= 0 && index <= 2);
CHECK_EQ(lastSeen[index] + 1, value);
lastSeen[index] = value;
}
// None
if (index == 0) {
CHECK_EQ(4, lastSeen[index]);
} else if (index == 1) {
CHECK_EQ(3, lastSeen[index]);
} else {
// Stream 2 should have completed with an error not EndOfStream.
CHECK(false);
}
lastSeen[index] = seenEndOfStream;
} catch (const MyError&) {
// Error
CHECK_EQ(2, index);
CHECK_EQ(1, lastSeen[index]);
lastSeen[index] = seenError;
}
CHECK_EQ(expectedSeen[index], lastSeen[index]);
};
blockingWait(test(0));
blockingWait(test(1));
blockingWait(test(2));
}
TEST(Dematerialize, GeneratorOfRValueReference) {
auto makeGenerator =
[]() -> folly::coro::AsyncGenerator<std::unique_ptr<int>&&> {
co_yield std::make_unique<int>(10);
auto ptr = std::make_unique<int>(20);
co_yield std::move(ptr);
CHECK(ptr == nullptr);
};
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto gen = dematerialize(materialize(makeGenerator()));
auto result = co_await gen.next();
CHECK_EQ(10, **result);
// Don't move it to a local var.
result = co_await gen.next();
CHECK_EQ(20, **result);
auto ptr = *result; // Move it to a local var.
result = co_await gen.next();
CHECK(!result);
}());
}
struct MoveOnly {
explicit MoveOnly(int value) : value_(value) {}
MoveOnly(MoveOnly&& other) noexcept
: value_(std::exchange(other.value_, -1)) {}
~MoveOnly() {}
MoveOnly& operator=(MoveOnly&&) = delete;
int value() const {
return value_;
}
private:
int value_;
};
TEST(Dematerialize, GeneratorOfMoveOnlyType) {
auto makeGenerator = []() -> folly::coro::AsyncGenerator<MoveOnly> {
MoveOnly rvalue(1);
co_yield std::move(rvalue);
CHECK_EQ(-1, rvalue.value());
co_yield MoveOnly(2);
};
folly::coro::blockingWait([&]() -> folly::coro::Task<void> {
auto gen = dematerialize(materialize(makeGenerator()));
auto result = co_await gen.next();
// NOTE: It's an error to dereference using '*it' as this returns a copy
// of the iterator's reference type, which in this case is 'MoveOnly'.
CHECK_EQ(1, result->value());
result = co_await gen.next();
CHECK_EQ(2, result->value());
result = co_await gen.next();
CHECK(!result);
}());
}
#endif
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/Materialize.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
using namespace folly::coro;
TEST(Materialize, SimpleStream) {
struct MyError : std::exception {};
const int seenEndOfStream = 100;
const int seenError = 200;
std::vector<int> lastSeen(3, -1);
std::vector<int> expectedSeen = {
{seenEndOfStream, seenEndOfStream, seenError}};
auto selectStream = [](int index) -> AsyncGenerator<int> {
auto ints = [](int n) -> AsyncGenerator<int> {
for (int i = 0; i <= n; ++i) {
co_await co_reschedule_on_current_executor;
co_yield i;
}
};
auto failing = []() -> AsyncGenerator<int> {
co_yield 0;
co_yield 1;
throw MyError{};
};
if (index == 0) {
return ints(4);
} else if (index == 1) {
return ints(3);
}
return failing();
};
auto test = [&](int index) -> Task<void> {
auto generator = materialize(selectStream(index));
while (auto item = co_await generator.next()) {
auto event = std::move(*item);
CHECK(index >= 0 && index <= 2);
if (event.hasValue()) {
auto value = std::move(event).value();
CHECK_EQ(lastSeen[index] + 1, value);
lastSeen[index] = value;
} else if (event.hasNone()) {
// None
if (index == 0) {
CHECK_EQ(4, lastSeen[index]);
} else if (index == 1) {
CHECK_EQ(3, lastSeen[index]);
} else if (event.hasError()) {
// Stream 2 should have completed with an error not EndOfStream.
CHECK(false);
}
lastSeen[index] = seenEndOfStream;
} else {
// Error
CHECK_EQ(2, index);
CHECK_EQ(1, lastSeen[index]);
CHECK(std::move(event).error().is_compatible_with<MyError>());
lastSeen[index] = seenError;
}
}
CHECK_EQ(expectedSeen[index], lastSeen[index]);
};
blockingWait(test(0));
blockingWait(test(1));
blockingWait(test(2));
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment