Commit 925ab325 authored by Sergey Anpilov's avatar Sergey Anpilov Committed by Facebook GitHub Bot

Add folly::coro::filter(AsyncGenerator)

Summary: Filtering is one of the basic stream operations and it would be convenient to have it for AsyncGenerators

Reviewed By: yfeldblum

Differential Revision: D22136080

fbshipit-source-id: ca6a233a8c1aeddb63899aeaba8ca4c1b6dfa535
parent c9c55642
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace folly {
namespace coro {
template <typename FilterFn, typename Reference, typename Value>
AsyncGenerator<Reference, Value> filter(
AsyncGenerator<Reference, Value> source,
FilterFn filterFn) {
while (auto item = co_await source.next()) {
if (invoke(filterFn, item.value())) {
co_yield std::move(item).value();
}
}
}
} // namespace coro
} // namespace folly
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/experimental/coro/AsyncGenerator.h>
namespace folly {
namespace coro {
// Filter the Values from an input stream using an unary predicate.
//
// The input is a stream of Values.
//
// The output is a stream of Values that satisfy the predicate.
//
// Example:
// AsyncGenerator<int> getAllNumbers();
//
// AsyncGenerator<int> getEvenNumbers(AsyncGenerator<int> allNumbers) {
// return filter(getAllNumbers(), [](int i){ return i % 2 == 0; });
// }
template <typename FilterFn, typename Reference, typename Value>
AsyncGenerator<Reference, Value> filter(
AsyncGenerator<Reference, Value> source,
FilterFn filterFn);
} // namespace coro
} // namespace folly
#include <folly/experimental/coro/Filter-inl.h>
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Portability.h>
#if FOLLY_HAS_COROUTINES
#include <folly/experimental/coro/BlockingWait.h>
#include <folly/experimental/coro/Filter.h>
#include <folly/portability/GTest.h>
using namespace folly::coro;
class FilterTest : public testing::Test {};
TEST_F(FilterTest, SimpleStream) {
const auto allNumbers = []() -> AsyncGenerator<int> {
for (int i = 0; i < 10; ++i) {
co_yield i;
}
};
auto evenNumbers = filter(allNumbers(), [](int i) { return i % 2 == 0; });
EXPECT_EQ(0, blockingWait(evenNumbers.next()).value());
EXPECT_EQ(2, blockingWait(evenNumbers.next()).value());
EXPECT_EQ(4, blockingWait(evenNumbers.next()).value());
EXPECT_EQ(6, blockingWait(evenNumbers.next()).value());
EXPECT_EQ(8, blockingWait(evenNumbers.next()).value());
EXPECT_FALSE(blockingWait(evenNumbers.next()));
}
TEST_F(FilterTest, EmptyInputStream) {
const auto emptyStream = []() -> AsyncGenerator<int> { co_return; };
auto emptyStreamFiltered = filter(emptyStream(), [](int) { return true; });
EXPECT_FALSE(blockingWait(emptyStreamFiltered.next()));
}
TEST_F(FilterTest, EmptyOutputStream) {
const auto nonEmptyStream = []() -> AsyncGenerator<int> {
co_yield 0;
co_yield 1;
co_yield 2;
};
auto emptyOutputStream = filter(nonEmptyStream(), [](int) { return false; });
EXPECT_FALSE(blockingWait(emptyOutputStream.next()));
}
TEST_F(FilterTest, ThrowingStream) {
struct Exception : std::exception {};
const auto throwingStream = []() -> AsyncGenerator<int> {
co_yield 0;
co_yield 1;
throw Exception{};
};
auto throwingStreamFiltered =
filter(throwingStream(), [](int) { return true; });
EXPECT_EQ(0, blockingWait(throwingStreamFiltered.next()).value());
EXPECT_EQ(1, blockingWait(throwingStreamFiltered.next()).value());
EXPECT_THROW(blockingWait(throwingStreamFiltered.next()), Exception);
}
TEST_F(FilterTest, ThrowingPredicate) {
struct Exception : std::exception {};
auto nonThrowingStream = []() -> AsyncGenerator<int> {
co_yield 0;
co_yield 1;
co_yield 2;
};
auto evenNumbers = filter(nonThrowingStream(), [](int i) {
if (i == 1) {
throw Exception{};
};
return true;
});
EXPECT_EQ(0, blockingWait(evenNumbers.next()).value());
EXPECT_THROW(blockingWait(evenNumbers.next()), Exception);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment