Commit 1d3ca26b authored by Jason Fried's avatar Jason Fried Committed by Facebook Github Bot

Folly Futures to Python Asyncio Futures Bridge

Summary:
folly/python/futures.h provides some helper functions for bridging folly::future to asyncio.Future.

folly/python/NotificationQueueExecutor.h is a Driveable executor that has a fileno() method that can be monitored using (select, epoll) to determine if the drive method should be called.

folly/python/executor.pyx is an implementation of a "driver" for the NotificationQueueExecutor from the python asyncio side. It tracks also keeps track of asyncio eventloops to Executor mappings.

the getExecutor() from folly/python/futures.h uses that mapping to return the correct executor for this python thread.

Reviewed By: andriigrynenko, yfeldblum

Differential Revision: D4687029

fbshipit-source-id: e79314606ffa18cb6933fe6b749991bfea646cde
parent 5d9dbb73
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/ExceptionString.h>
#include <folly/Function.h>
#include <folly/futures/DrivableExecutor.h>
#include <folly/io/async/NotificationQueue.h>
namespace folly {
namespace python {
class NotificationQueueExecutor : public folly::DrivableExecutor {
public:
using Func = folly::Func;
void add(Func func) override {
queue_.putMessage(std::move(func));
}
int fileno() const {
return consumer_.getFd();
}
void drive() noexcept override {
Func func;
while (queue_.tryConsume(func)) {
try {
func();
} catch (const std::exception& ex) {
LOG(ERROR) << "Exception thrown by NotificationQueueExecutor task."
<< "Exception message: " << folly::exceptionStr(ex);
} catch (...) {
LOG(ERROR) << "Unknown Exception thrown "
<< "by NotificationQueueExecutor task.";
}
}
}
private:
folly::NotificationQueue<Func> queue_;
folly::NotificationQueue<Func>::SimpleConsumer consumer_{queue_};
}; // NotificationQueueExecutor
} // python
} // folly
from libcpp cimport bool as cbool
cdef extern from "folly/ExceptionWrapper.h" namespace "folly":
cdef cppclass cFollyExceptionWrapper "folly::exception_wrapper":
void throwException() except +
cdef extern from "folly/Try.h" namespace "folly" nogil:
cdef cppclass cFollyTry "folly::Try"[T]:
T value() except+
cbool hasException[T]()
cbool hasException()
cFollyExceptionWrapper exception()
cdef extern from "folly/futures/Future.h" namespace "folly" nogil:
cdef cppclass cFollyFuture "folly::Future"[T]:
pass
#TODO add via and then
cdef extern from "folly/Unit.h" namespace "folly":
struct cFollyUnit "folly::Unit":
pass
cFollyUnit c_unit "folly::unit"
cdef extern from "folly/futures/Promise.h" namespace "folly":
cdef cppclass cFollyPromise "folly::Promise"[T]:
void setValue[M](M& value)
void setException[E](E& value)
cdef extern from "folly/Executor.h" namespace "folly":
cdef cppclass cFollyExecutor "folly::Executor":
pass
from libcpp.memory cimport unique_ptr
from folly cimport cFollyExecutor
cdef extern from "folly/python/NotificationQueueExecutor.h" namespace "folly::python":
cdef cppclass cNotificationQueueExecutor "folly::python::NotificationQueueExecutor"(cFollyExecutor):
int fileno()
void drive()
cdef class NotificationQueueExecutor:
cdef unique_ptr[cNotificationQueueExecutor] cQ
cdef api cFollyExecutor* get_executor()
import asyncio
from folly cimport cFollyExecutor
from folly.executor cimport cNotificationQueueExecutor
from libcpp.memory cimport make_unique, unique_ptr
from cython.operator cimport dereference as deref
#asynico Loops to NotificationQueueExecutor
loop_to_q = {}
cdef class NotificationQueueExecutor:
def __cinit__(self):
self.cQ = make_unique[cNotificationQueueExecutor]();
def fileno(NotificationQueueExecutor self):
return deref(self.cQ).fileno()
def drive(NotificationQueueExecutor self):
deref(self.cQ).drive()
def __dealloc__(NotificationQueueExecutor self):
# We drive it one last time
deref(self.cQ).drive()
cdef cFollyExecutor* get_executor():
loop = asyncio.get_event_loop()
try:
Q = <NotificationQueueExecutor>(loop_to_q[loop])
except KeyError:
Q = NotificationQueueExecutor()
loop.add_reader(Q.fileno(), Q.drive)
loop_to_q[loop] = Q
return Q.cQ.get()
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This file serves as a helper for bridging folly::future and python
* asyncio.future.
*/
#pragma once
#include <Python.h>
#include <folly/Executor.h>
#include <folly/ScopeGuard.h>
#include <folly/futures/Future.h>
#include <folly/python/executor_api.h>
namespace folly {
namespace python {
class PyGILStateGuard {
public:
~PyGILStateGuard() {
PyGILState_Release(gstate);
}
private:
PyGILState_STATE gstate{PyGILState_Ensure()};
};
inline folly::Executor* getExecutor() {
PyGILStateGuard gstate;
import_folly__executor();
return get_executor();
}
template <typename T>
void bridgeFuture(
folly::Executor* executor,
folly::Future<T>&& futureFrom,
folly::Function<void(folly::Try<T>&&, PyObject*)> callback,
PyObject* userData) {
// We are handing over a pointer to a python object to c++ and need
// to make sure it isn't removed by python in that time.
Py_INCREF(userData);
auto guard = folly::makeGuard([=] { Py_DECREF(userData); });
// Handle the lambdas for cython
// run callback from our Q
futureFrom.via(executor).then(
[ callback = std::move(callback), userData, guard = std::move(guard) ](
folly::Try<T> && res) mutable {
// This will run from inside the gil, called by the asyncio add_reader
callback(std::move(res), userData);
// guard goes out of scope here, and its stored function is called
});
}
template <typename T>
void bridgeFuture(
folly::Future<T>&& futureFrom,
folly::Function<void(folly::Try<T>&&, PyObject*)> callback,
PyObject* userData) {
bridgeFuture(
getExecutor(), std::move(futureFrom), std::move(callback), userData);
}
} // python
} // folly
from cpython.ref cimport PyObject
from folly cimport cFollyTry, cFollyFuture, cFollyExecutor
cdef extern from "folly/python/futures.h" namespace "folly::python":
void bridgeFuture[T](
cFollyFuture[T]&& fut,
void(*)(cFollyTry[T]&&, PyObject*),
PyObject* pyFuture
)
# No clue but cython overloading is getting confused so we alias
void bridgeFutureWith "folly::python::bridgeFuture"[T](
cFollyExecutor* executor,
cFollyFuture[T]&& fut,
void(*)(cFollyTry[T]&&, PyObject*),
PyObject* pyFuture
)
#!/usr/bin/env python3
import asyncio
import unittest
from . import simplebridge
class Futures(unittest.TestCase):
def test_bridge(self):
val = 1337
loop = asyncio.get_event_loop()
res = loop.run_until_complete(simplebridge.get_value_x5(val))
self.assertEqual(val * 5, res)
def test_bridge_exception(self):
loop = asyncio.get_event_loop()
with self.assertRaises(ValueError, msg="0 is not allowed"):
loop.run_until_complete(simplebridge.get_value_x5(0))
/*
* Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <cstdint>
namespace folly {
namespace python {
namespace test {
folly::Future<uint64_t> future_getValueX5(uint64_t val) {
folly::Promise<uint64_t> p;
auto f = p.getFuture();
p.setWith([val] {
if (val == 0) {
throw std::invalid_argument("0 is not allowed");
}
return val * 5;
});
return f;
}
}
}
}
import asyncio
from folly.futures cimport bridgeFuture
from folly cimport cFollyFuture, cFollyTry
from libc.stdint cimport uint64_t
from cpython.ref cimport PyObject
from cython.operator cimport dereference as deref
cdef extern from "folly/python/test/simple.h" namespace "folly::python::test":
cdef cFollyFuture[uint64_t] future_getValueX5(uint64_t val)
def get_value_x5(int val):
loop = asyncio.get_event_loop()
fut = loop.create_future()
bridgeFuture[uint64_t](
future_getValueX5(val),
handle_uint64_t,
<PyObject *>fut
)
return fut
cdef void handle_uint64_t(cFollyTry[uint64_t]&& res, PyObject* userData):
future = <object> userData
if res.hasException():
try:
res.exception().throwException()
except Exception as ex:
future.set_exception(ex)
else:
future.set_result(res.value())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment