Commit ced29774 authored by Matthieu Martin's avatar Matthieu Martin Committed by Facebook Github Bot

Allow python asyncio Future to wrap folly Fibers

Summary:
This implements a new AsyncioLoopController for FiberManager, wrapping AsyncioExecutor.
Thanks to recent simplication of LoopController, if I haven't missed anything, code is fairly simple, though not optimizer for thread-local.

fiber_manager.pxd|pyx allow to link the life-time of AsyncioLoopController/FiberManager to the asyncio event loop.
Code is similar to (and rely on) executor.pxd|pyd, which does the same for AsyncioExecutor.

fibers.h|pxd provide a helper function for C++/python callers.
Code is similar to futures.h|pyd, which does the same for folly Future.
This is the main reason to keep a separate function/callback parameters. Not necessary for fibers, but a generic callback implementation can work for both fibers/future, which is convenient.

There are a few lines that feel duplicated between future/fibers implementation, but it felt overkill to abstract further.

Reviewed By: andriigrynenko

Differential Revision: D7811432

fbshipit-source-id: af0a07d7554acbab1fac44b7a9a6a98340501ef4
parent 4e033e0c
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
namespace folly {
namespace python {
inline AsyncioLoopController::AsyncioLoopController(AsyncioExecutor* executor)
: executor_(executor) {}
inline AsyncioLoopController::~AsyncioLoopController() {}
inline void AsyncioLoopController::setFiberManager(fibers::FiberManager* fm) {
fm_ = fm;
}
inline void AsyncioLoopController::schedule() {
// add() is thread-safe, so this isn't properly optimized for addTask()
executor_->add([this]() { return runLoop(); });
}
inline void AsyncioLoopController::runLoop() {
if (fm_->hasTasks()) {
fm_->loopUntilNoReadyImpl();
}
}
inline void AsyncioLoopController::scheduleThreadSafe() {
executor_->add([this]() {
if (fm_->shouldRunLoopRemote()) {
return runLoop();
}
});
}
inline void AsyncioLoopController::timedSchedule(
std::function<void()>,
TimePoint) {
throw std::logic_error("Time schedule isn't supported by asyncio executor");
}
} // namespace python
} // namespace folly
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/LoopController.h>
#include <folly/python/AsyncioExecutor.h>
namespace folly {
namespace python {
class AsyncioLoopController : public fibers::LoopController {
public:
explicit AsyncioLoopController(AsyncioExecutor* executor);
~AsyncioLoopController() override;
private:
// TODO: KeepAlive token to guarantee lifetime
AsyncioExecutor* executor_;
fibers::FiberManager* fm_{nullptr};
void setFiberManager(fibers::FiberManager* fm) override;
void schedule() override;
void runLoop() override;
void scheduleThreadSafe() override;
void timedSchedule(std::function<void()> func, TimePoint time) override;
friend class fibers::FiberManager;
};
} // namespace python
} // namespace folly
#include <folly/python/AsyncioLoopController-inl.h>
...@@ -9,4 +9,4 @@ cdef extern from "folly/python/AsyncioExecutor.h" namespace "folly::python": ...@@ -9,4 +9,4 @@ cdef extern from "folly/python/AsyncioExecutor.h" namespace "folly::python":
cdef class AsyncioExecutor: cdef class AsyncioExecutor:
cdef unique_ptr[cAsyncioExecutor] cQ cdef unique_ptr[cAsyncioExecutor] cQ
cdef api cFollyExecutor* get_executor() cdef api cAsyncioExecutor* get_executor()
import asyncio import asyncio
from folly cimport cFollyExecutor
from folly.executor cimport cAsyncioExecutor from folly.executor cimport cAsyncioExecutor
from libcpp.memory cimport make_unique, unique_ptr from libcpp.memory cimport make_unique, unique_ptr
from cython.operator cimport dereference as deref from cython.operator cimport dereference as deref
...@@ -24,7 +23,7 @@ cdef class AsyncioExecutor: ...@@ -24,7 +23,7 @@ cdef class AsyncioExecutor:
deref(self.cQ).drive() deref(self.cQ).drive()
cdef cFollyExecutor* get_executor(): cdef cAsyncioExecutor* get_executor():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
Q = <AsyncioExecutor>(loop_to_q[loop]) Q = <AsyncioExecutor>(loop_to_q[loop])
......
from libcpp.memory cimport unique_ptr
from folly.executor cimport cAsyncioExecutor
cdef extern from "folly/fibers/LoopController.h" namespace "folly::fibers":
cdef cppclass cLoopController "folly::fibers::LoopController":
pass
cdef extern from "folly/fibers/FiberManagerInternal.h" namespace "folly::fibers":
cdef cppclass cFiberManager "folly::fibers::FiberManager":
cFiberManager(unique_ptr[cLoopController])
cdef extern from "folly/python/AsyncioLoopController.h" namespace "folly::python":
cdef cppclass cAsyncioLoopController "folly::python::AsyncioLoopController"(cLoopController):
cAsyncioLoopController(cAsyncioExecutor*)
cdef class FiberManager:
cdef unique_ptr[cFiberManager] cManager
cdef api cFiberManager* get_fiber_manager()
import asyncio
from libcpp.memory cimport unique_ptr
from folly.executor cimport get_executor
from folly.fiber_manager cimport cFiberManager, cLoopController, cAsyncioLoopController
from weakref import WeakKeyDictionary
#asynico Loops to FiberManager
loop_to_controller = WeakKeyDictionary()
cdef class FiberManager:
def __cinit__(self):
self.cManager.reset(new cFiberManager(
unique_ptr[cLoopController](new cAsyncioLoopController(
get_executor()))));
cdef cFiberManager* get_fiber_manager():
loop = asyncio.get_event_loop()
try:
manager = <FiberManager>(loop_to_controller[loop])
except KeyError:
manager = FiberManager()
loop_to_controller[loop] = manager
return manager.cManager.get()
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* This file serves as a helper for bridging folly::future and python
* asyncio.future.
*/
#pragma once
#include <Python.h>
#include <folly/Function.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/python/fiber_manager_api.h>
namespace folly {
namespace python {
inline folly::fibers::FiberManager* getFiberManager() {
import_folly__fiber_manager();
return get_fiber_manager();
}
/**
* Helper function with similar callback/userData parameters as bridgeFuture.
* This can be convenient in code that calls both (notably our tests),
* but most callsites should directly use getFiberManager().
*/
template <typename T>
void bridgeFibers(
folly::Function<T()>&& function,
folly::Function<void(folly::Try<T>&&, PyObject*)> callback,
PyObject* userData) {
auto* fiberManager = getFiberManager();
// We are handing over a pointer to a python object to c++ and need
// to make sure it isn't removed by python in that time.
Py_INCREF(userData);
auto guard = folly::makeGuard([=] { Py_DECREF(userData); });
fiberManager->addTask([function = std::move(function),
callback = std::move(callback),
userData,
guard = std::move(guard)]() mutable {
// This will run from inside the gil, called by the asyncio add_reader
auto res = folly::makeTryWith([&] { return function(); });
callback(std::move(res), userData);
// guard goes out of scope here, and its stored function is called
});
}
} // namespace python
} // namespace folly
from cpython.ref cimport PyObject
from folly cimport cFollyTry
cdef extern from "folly/python/fibers.h" namespace "folly::python":
void bridgeFibers "folly::python::bridgeFibers"[T](
T(*)(),
void(*)(cFollyTry[T]&&, PyObject*),
PyObject* pyFuture
)
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include <Python.h> #include <Python.h>
#include <folly/Executor.h> #include <folly/Executor.h>
#include <folly/ScopeGuard.h>
#include <folly/futures/Future.h> #include <folly/futures/Future.h>
#include <folly/python/AsyncioExecutor.h>
#include <folly/python/executor_api.h> #include <folly/python/executor_api.h>
namespace folly { namespace folly {
...@@ -47,8 +47,8 @@ void bridgeFuture( ...@@ -47,8 +47,8 @@ void bridgeFuture(
// Handle the lambdas for cython // Handle the lambdas for cython
// run callback from our Q // run callback from our Q
futureFrom.via(executor).then( futureFrom.via(executor).then(
[ callback = std::move(callback), userData, guard = std::move(guard) ]( [callback = std::move(callback), userData, guard = std::move(guard)](
folly::Try<T> && res) mutable { folly::Try<T>&& res) mutable {
// This will run from inside the gil, called by the asyncio add_reader // This will run from inside the gil, called by the asyncio add_reader
callback(std::move(res), userData); callback(std::move(res), userData);
// guard goes out of scope here, and its stored function is called // guard goes out of scope here, and its stored function is called
......
...@@ -16,3 +16,9 @@ class Futures(unittest.TestCase): ...@@ -16,3 +16,9 @@ class Futures(unittest.TestCase):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
with self.assertRaises(ValueError, msg="0 is not allowed"): with self.assertRaises(ValueError, msg="0 is not allowed"):
loop.run_until_complete(simplebridge.get_value_x5(0)) loop.run_until_complete(simplebridge.get_value_x5(0))
def test_bridge_fibers(self):
val = 1337
loop = asyncio.get_event_loop()
res = loop.run_until_complete(simplebridge.get_value_x5_fibers(val))
self.assertEqual(val * 5, res)
...@@ -33,6 +33,16 @@ folly::Future<uint64_t> future_getValueX5(uint64_t val) { ...@@ -33,6 +33,16 @@ folly::Future<uint64_t> future_getValueX5(uint64_t val) {
}); });
return f; return f;
} }
folly::Function<uint64_t()> getValueX5Fibers(uint64_t val) {
return [val]() {
if (val == 0) {
throw std::invalid_argument("0 is not allowed");
}
return val * 5;
};
} }
}
} } // namespace test
} // namespace python
} // namespace folly
import asyncio import asyncio
from folly.futures cimport bridgeFuture from folly.futures cimport bridgeFuture
from folly.fibers cimport bridgeFibers
from folly cimport cFollyFuture, cFollyTry from folly cimport cFollyFuture, cFollyTry
from libc.stdint cimport uint64_t from libc.stdint cimport uint64_t
from cpython.ref cimport PyObject from cpython.ref cimport PyObject
...@@ -7,6 +8,7 @@ from cython.operator cimport dereference as deref ...@@ -7,6 +8,7 @@ from cython.operator cimport dereference as deref
cdef extern from "folly/python/test/simple.h" namespace "folly::python::test": cdef extern from "folly/python/test/simple.h" namespace "folly::python::test":
cdef cFollyFuture[uint64_t] future_getValueX5(uint64_t val) cdef cFollyFuture[uint64_t] future_getValueX5(uint64_t val)
cdef (uint64_t(*)()) getValueX5Fibers(uint64_t val)
def get_value_x5(int val): def get_value_x5(int val):
...@@ -20,6 +22,17 @@ def get_value_x5(int val): ...@@ -20,6 +22,17 @@ def get_value_x5(int val):
return fut return fut
def get_value_x5_fibers(int val):
loop = asyncio.get_event_loop()
fut = loop.create_future()
bridgeFibers[uint64_t](
getValueX5Fibers(val),
handle_uint64_t,
<PyObject *>fut
)
return fut
cdef void handle_uint64_t(cFollyTry[uint64_t]&& res, PyObject* userData): cdef void handle_uint64_t(cFollyTry[uint64_t]&& res, PyObject* userData):
future = <object> userData future = <object> userData
if res.hasException(): if res.hasException():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment