Commit c049b564 authored by James Sedgwick's avatar James Sedgwick Committed by Anton Likhtarov

move rx to folly/experimental

Summary: As above. I want to use this for the thread pools and it probably belongs in folly long-term anywya (if we stick with it)

Test Plan: compiled the one user

Reviewed By: hans@fb.com

Subscribers: fugalh, mwa, jgehring, fuegen, njormrod

FB internal diff: D1560578
parent 260f12ae
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "types.h"
#include "Subject.h"
#include "Subscription.h"
#include <folly/wangle/Executor.h>
#include <list>
#include <memory>
namespace folly { namespace wangle {
template <class T>
struct Observable {
virtual ~Observable() = default;
/// Subscribe the given Observer to this Observable.
// Eventually this will return a Subscription object of some kind, in order
// to support cancellation. This is kinda really important. Maybe I should
// just do it now, using an dummy Subscription object.
virtual Subscription subscribe(ObserverPtr<T> o) {
observers_.push_back(o);
return Subscription();
}
/// Returns a new Observable that will call back on the given Scheduler.
/// The returned Observable must outlive the parent Observable.
// This and subscribeOn should maybe just be a first-class feature of an
// Observable, rather than making new ones whose lifetimes are tied to their
// parents. In that case it'd return a reference to this object for
// chaining.
ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
// you're right Hannes, if we have Observable::create we don't need this
// helper class.
struct ViaSubject : public Observable<T>
{
ViaSubject(SchedulerPtr scheduler,
Observable* obs)
: scheduler_(scheduler), observable_(obs)
{}
Subscription subscribe(ObserverPtr<T> o) override {
return observable_->subscribe(
Observer<T>::create(
[=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
[=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
[=]() { scheduler_->add([o] { o->onCompleted(); }); }));
}
protected:
SchedulerPtr scheduler_;
Observable* observable_;
};
return std::make_shared<ViaSubject>(scheduler, this);
}
/// Returns a new Observable that will subscribe to this parent Observable
/// via the given Scheduler. This can be subtle and confusing at first, see
/// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
struct Subject_ : public Subject<T> {
public:
Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
}
Subscription subscribe(ObserverPtr<T> o) {
scheduler_->add([=] {
observable_->subscribe(o);
});
return Subscription();
}
protected:
SchedulerPtr scheduler_;
Observable* observable_;
};
return folly::make_unique<Subject_>(scheduler, this);
}
protected:
std::list<ObserverPtr<T>> observers_;
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "types.h"
#include <functional>
#include <memory>
#include <stdexcept>
#include <folly/Memory.h>
namespace folly { namespace wangle {
template <class T> class FunctionObserver;
/// Observer interface. You can subclass it, or you can just use create()
/// to use std::functions.
template <class T>
struct Observer {
// These are what it means to be an Observer.
virtual void onNext(T) = 0;
virtual void onError(Error) = 0;
virtual void onCompleted() = 0;
virtual ~Observer() = default;
/// Create an Observer with std::function callbacks. Handy to make ad-hoc
/// Observers with lambdas.
///
/// Templated for maximum perfect forwarding flexibility, but ultimately
/// whatever you pass in has to implicitly become a std::function for the
/// same signature as onNext(), onError(), and onCompleted() respectively.
/// (see the FunctionObserver typedefs)
template <class N, class E, class C>
static std::unique_ptr<Observer> create(
N&& onNextFn, E&& onErrorFn, C&& onCompletedFn)
{
return folly::make_unique<FunctionObserver<T>>(
std::forward<N>(onNextFn),
std::forward<E>(onErrorFn),
std::forward<C>(onCompletedFn));
}
/// Create an Observer with only onNext and onError callbacks.
/// onCompleted will just be a no-op.
template <class N, class E>
static std::unique_ptr<Observer> create(N&& onNextFn, E&& onErrorFn) {
return folly::make_unique<FunctionObserver<T>>(
std::forward<N>(onNextFn),
std::forward<E>(onErrorFn),
nullptr);
}
/// Create an Observer with only an onNext callback.
/// onError and onCompleted will just be no-ops.
template <class N>
static std::unique_ptr<Observer> create(N&& onNextFn) {
return folly::make_unique<FunctionObserver<T>>(
std::forward<N>(onNextFn),
nullptr,
nullptr);
}
};
/// An observer that uses std::function callbacks. You don't really want to
/// make one of these directly - instead use the Observer::create() methods.
template <class T>
struct FunctionObserver : public Observer<T> {
typedef std::function<void(T)> OnNext;
typedef std::function<void(Error)> OnError;
typedef std::function<void()> OnCompleted;
/// We don't need any fancy overloads of this constructor because that's
/// what Observer::create() is for.
template <class N = OnNext, class E = OnError, class C = OnCompleted>
FunctionObserver(N&& n, E&& e, C&& c)
: onNext_(std::forward<N>(n)),
onError_(std::forward<E>(e)),
onCompleted_(std::forward<C>(c))
{}
void onNext(T val) override {
if (onNext_) onNext_(val);
}
void onError(Error e) override {
if (onError_) onError_(e);
}
void onCompleted() override {
if (onCompleted_) onCompleted_();
}
protected:
OnNext onNext_;
OnError onError_;
OnCompleted onCompleted_;
};
}}
Rx is a pattern for "functional reactive programming" that started at
Microsoft in C#, and has been reimplemented in various languages, notably
RxJava for JVM languages.
It is basically the plural of Futures (a la Wangle).
singular | plural
+---------------------------------+-----------------------------------
sync | Foo getData() | std::vector<Foo> getData()
async | wangle::Future<Foo> getData() | wangle::Observable<Foo> getData()
For more on Rx, I recommend these resources:
Netflix blog post (RxJava): http://techblog.netflix.com/2013/02/rxjava-netflix-api.html
Introduction to Rx eBook (C#): http://www.introtorx.com/content/v1.0.10621.0/01_WhyRx.html
The RxJava wiki: https://github.com/Netflix/RxJava/wiki
Netflix QCon presentation: http://www.infoq.com/presentations/netflix-functional-rx
https://rx.codeplex.com/
There are open source C++ implementations, I haven't looked at them. They
might be the best way to go rather than writing it NIH-style. I mostly did it
as an exercise, to think through how closely we might want to integrate
something like this with Wangle, and to get a feel for how it works in C++.
I haven't even tried to support move-only data in this version. I'm on the
fence about the usage of shared_ptr. Subject is underdeveloped. A whole rich
set of operations is obviously missing. I haven't decided how to handle
subscriptions (and therefore cancellation), but I'm pretty sure C#'s
"Disposable" is thoroughly un-C++ (opposite of RAII). So for now subscribe
returns nothing at all and you can't cancel anything ever. The whole thing is
probably riddled with lifetime corner case bugs that will come out like a
swarm of angry bees as soon as someone tries an infinite sequence, or tries to
partially observe a long sequence. I'm pretty sure subscribeOn has a bug that
I haven't tracked down yet.
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "Observable.h"
#include "Observer.h"
namespace folly { namespace wangle {
/// Subject interface. A Subject is both an Observable and an Observer. There
/// is a default implementation of the Observer methods that just forwards the
/// observed events to the Subject's observers.
template <class T>
struct Subject : public Observable<T>, public Observer<T> {
void onNext(T val) override {
for (auto& o : this->observers_)
o->onNext(val);
}
void onError(Error e) override {
for (auto& o : this->observers_)
o->onError(e);
}
void onCompleted() override {
for (auto& o : this->observers_)
o->onCompleted();
}
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly { namespace wangle {
// TODO
struct Subscription {
};
}}
/*
* Copyright 2014 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ExceptionWrapper.h>
namespace folly { namespace wangle {
typedef folly::exception_wrapper Error;
// The wangle::Executor is basically an rx Scheduler (by design). So just
// alias it.
typedef std::shared_ptr<folly::wangle::Executor> SchedulerPtr;
template <class T> struct Observable;
template <class T> struct Observer;
template <class T> struct Subject;
template <class T> using ObservablePtr = std::shared_ptr<Observable<T>>;
template <class T> using ObserverPtr = std::shared_ptr<Observer<T>>;
template <class T> using SubjectPtr = std::shared_ptr<Subject<T>>;
}}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment