Commit b963f391 authored by Kirk Shoop's avatar Kirk Shoop Committed by Facebook Github Bot

Add folly::coro::multiplex() algorithm

Summary: This algorithm multiplexes values from a stream-of-streams into a single stream of events that identifies the stream index the event originated from and also the end-of-stream and stream-error events.

Reviewed By: lewissbaker

Differential Revision: D15643142

fbshipit-source-id: a309a485f11465d9068ea81f4bc32823dd34291f
parent 600a1046
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/experimental/coro/Materialize.h>
#include <folly/experimental/coro/Merge.h>
#include <folly/experimental/coro/Transform.h>
namespace folly {
namespace coro {
template <
typename SelectIdFn,
typename Reference,
typename Value,
typename KeyType>
AsyncGenerator<
Enumerated<KeyType, CallbackRecord<Reference>>,
Enumerated<KeyType, CallbackRecord<Value>>>
multiplex(
folly::Executor::KeepAlive<> exec,
AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources,
SelectIdFn&& selectId) {
using EventType = CallbackRecord<Reference>;
using ReferenceType = Enumerated<KeyType, EventType>;
using ValueType = Enumerated<KeyType, CallbackRecord<Value>>;
return merge(
std::move(exec),
transform(
std::move(sources),
[selectId = std::forward<SelectIdFn>(selectId)](
AsyncGenerator<Reference, Value>&& item) mutable {
KeyType id = invoke(selectId, item);
return transform(
materialize(std::move(item)),
[id = std::move(id)](EventType&& event) {
return ReferenceType{id, std::move(event)};
});
}));
}
struct MultiplexIdcountFn {
size_t n = 0;
template <typename Inner>
size_t operator()(Inner&&) noexcept {
return n++;
}
};
template <typename Reference, typename Value>
AsyncGenerator<
Enumerated<size_t, CallbackRecord<Reference>>,
Enumerated<size_t, CallbackRecord<Value>>>
multiplex(
folly::Executor::KeepAlive<> exec,
AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources) {
return multiplex(std::move(exec), std::move(sources), MultiplexIdcountFn{});
}
} // namespace coro
} // namespace folly
/*
* Copyright 2019-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ExceptionWrapper.h>
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/Materialize.h>
#include <tuple>
#include <variant>
namespace folly {
namespace coro {
template <typename Id, typename Value>
using Enumerated = std::tuple<Id, Value>;
// Multiplex the results of multiple streams into a single stream that
// contains all of the events of the input stream.
//
// The output is a stream of std::tuple<Id, Event> where the first tuple
// element is the result of a call to selectId(innerStream). The default
// selectId returns a size_t set to the index of the stream that the event
// came from and where Event is the CallbackRecord result of
// materialize(innerStream).
//
// Example:
// AsyncGenerator<AsyncGenerator<int>&&> streams();
//
// Task<void> consumer() {
// auto events = multiplex(streams());
// while (auto item != co_await events.next()) {
// auto&& [index, event] = *item;
// if (event.index() == 0) {
// // Value
// int value = std::get<0>(event);
// std::cout << index << " value " << value << "\n";
// } else if (event.index() == 1) {
// // End Of Stream
// std::cout << index << " end\n";
// } else {
// // Exception
// folly::exception_wrapper error = std::get<2>(event);
// std::cout << index << " error " << error.what() << "\n";
// }
// }
// }
template <
typename SelectIdFn,
typename Reference,
typename Value,
typename KeyType = std::decay_t<
invoke_result_t<SelectIdFn&, const AsyncGenerator<Reference, Value>&>>>
AsyncGenerator<
Enumerated<KeyType, CallbackRecord<Reference>>,
Enumerated<KeyType, CallbackRecord<Value>>>
multiplex(
folly::Executor::KeepAlive<> exec,
AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources,
SelectIdFn&& selectId);
template <typename Reference, typename Value>
AsyncGenerator<
Enumerated<size_t, CallbackRecord<Reference>>,
Enumerated<size_t, CallbackRecord<Value>>>
multiplex(
folly::Executor::KeepAlive<> exec,
AsyncGenerator<AsyncGenerator<Reference, Value>>&& sources);
} // namespace coro
} // namespace folly
#include <folly/experimental/coro/Multiplex-inl.h>
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/AsyncGenerator.h>
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/CurrentExecutor.h>
#include <folly/experimental/coro/Multiplex.h>
#include <folly/experimental/coro/Task.h>
#include <folly/portability/GTest.h>
using namespace folly::coro;
TEST(Multiplex, SimpleStream) {
struct MyError : std::exception {};
blockingWait([]() -> Task<void> {
auto makeStreamOfStreams = []() -> AsyncGenerator<AsyncGenerator<int>> {
auto ints = [](int n) -> AsyncGenerator<int> {
for (int i = 0; i <= n; ++i) {
co_await co_reschedule_on_current_executor;
co_yield i;
}
};
auto failing = []() -> AsyncGenerator<int> {
co_yield 0;
co_yield 1;
throw MyError{};
};
co_yield ints(4);
co_yield ints(3);
co_yield failing();
};
std::vector<int> lastSeen(3, -1);
const int seenEndOfStream = 100;
const int seenError = 200;
auto generator =
multiplex(co_await co_current_executor, makeStreamOfStreams());
while (auto item = co_await generator.next()) {
auto [index, event] = std::move(*item);
CHECK(index >= 0 && index <= 2);
if (event.hasValue()) {
auto value = std::move(event).value();
CHECK_EQ(lastSeen[index] + 1, value);
lastSeen[index] = value;
} else if (event.hasNone()) {
// None
if (index == 0) {
CHECK_EQ(4, lastSeen[index]);
} else if (index == 1) {
CHECK_EQ(3, lastSeen[index]);
} else {
// Stream 2 should have completed with an error not EndOfStream.
CHECK(false);
}
lastSeen[index] = seenEndOfStream;
} else if (event.hasError()) {
// Error
CHECK_EQ(2, index);
CHECK_EQ(1, lastSeen[index]);
CHECK(std::move(event).error().is_compatible_with<MyError>());
lastSeen[index] = seenError;
}
}
CHECK_EQ(seenEndOfStream, lastSeen[0]);
CHECK_EQ(seenEndOfStream, lastSeen[1]);
CHECK_EQ(seenError, lastSeen[2]);
}());
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment