Commit 9549eeb5 authored by Jacob Bower's avatar Jacob Bower Committed by Facebook GitHub Bot

Allow custom global executor for native callbacks

Summary:
Makes `folly::AsyncioExecutor` an abstract class and adds new `folly::python::setExecutorForLoop()` (C++) and `folly.executor.set_executor_for_loop()` (Python) functions. These control what is returned by `folly::python::getExecutor()` and `folly.executor.get_executor()` and hence allow installing a custom system for handling native callbacks on the Python event loop.

In this implementation we only allow the executor for a given Python event loop to be set _once_ before being explicitly cleared. An alternative which was discussed was returning the previous executor when setting a new one, allowing the caller to stack them. My rationale for not doing this is I think this makes it too easy to accidentally "lose" callbacks registered with an existing executor.

Reviewed By: vladima, pranavtbhat

Differential Revision: D32486564

fbshipit-source-id: 4c9ad4e5f2b7b89f1edafdb4eddd9ccacb23635f
parent 91a18af1
......@@ -34,22 +34,23 @@ namespace python {
class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor {
public:
using Func = folly::Func;
~AsyncioExecutor() override {
virtual ~AsyncioExecutor() override {
keepAliveRelease();
while (keepAliveCounter_ > 0) {
drive();
}
}
void add(Func func) override { queue_.putMessage(std::move(func)); }
int fileno() const { return consumer_.getFd(); }
void drive() noexcept override { driveImpl(/* canDiscard = */ true); }
void drive() noexcept final {
if (FOLLY_DETAIL_PY_ISFINALIZING()) {
// if Python is finalizing calling scheduled functions MAY segfault.
// any code that could have been called is now inconsequential.
return;
}
driveNoDiscard();
}
void driveNoDiscard() noexcept { driveImpl(/* canDiscard = */ false); }
virtual void driveNoDiscard() noexcept = 0;
protected:
bool keepAliveAcquire() noexcept override {
......@@ -66,13 +67,19 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor {
}
private:
void driveImpl(bool canDiscard) noexcept {
std::atomic<size_t> keepAliveCounter_{1};
};
class NotificationQueueAsyncioExecutor : public AsyncioExecutor {
public:
using Func = folly::Func;
void add(Func func) override { queue_.putMessage(std::move(func)); }
int fileno() const { return consumer_.getFd(); }
void driveNoDiscard() noexcept override {
consumer_.consume([&](Func&& func) {
if (canDiscard && FOLLY_DETAIL_PY_ISFINALIZING()) {
// if Python is finalizing calling scheduled functions MAY segfault.
// any code that could have been called is now inconsequential.
return;
}
try {
func();
} catch (...) {
......@@ -85,8 +92,7 @@ class AsyncioExecutor : public DrivableExecutor, public SequencedExecutor {
folly::NotificationQueue<Func> queue_;
folly::NotificationQueue<Func>::SimpleConsumer consumer_{queue_};
std::atomic<size_t> keepAliveCounter_{1};
}; // AsyncioExecutor
}; // NotificationQueueAsyncioExecutor
} // namespace python
} // namespace folly
......@@ -19,7 +19,6 @@
#include <stdexcept>
#include <folly/CppAttributes.h>
#include <folly/python/AsyncioExecutor.h>
#include <folly/python/executor_api.h> // @manual
namespace folly {
......@@ -27,18 +26,27 @@ namespace python {
namespace {
void do_import() {
void ensure_imported() {
static bool imported = false;
if (!imported) {
if (0 != import_folly__executor()) {
throw std::runtime_error("import_folly__executor failed");
}
imported = true;
}
}
} // namespace
folly::Executor* getExecutor() {
FOLLY_MAYBE_UNUSED static bool done = (do_import(), false);
ensure_imported();
return get_running_executor(false); // TODO: fried set this to true
}
int setExecutorForLoop(PyObject* loop, AsyncioExecutor* executor) {
ensure_imported();
return set_executor_for_loop(loop, executor);
}
} // namespace python
} // namespace folly
......@@ -17,11 +17,17 @@
#pragma once
#include <folly/Executor.h>
#include <folly/python/AsyncioExecutor.h>
namespace folly {
namespace python {
folly::Executor* getExecutor();
// Returns -1 if an executor was already set for loop, 0 otherwise. A NULL
// executor clears the current executor (caller is responsible for freeing
// any existing executor).
int setExecutorForLoop(PyObject* loop, AsyncioExecutor* executor);
} // namespace python
} // namespace folly
......@@ -19,12 +19,18 @@ from folly cimport cFollyExecutor
cdef extern from "folly/python/AsyncioExecutor.h" namespace "folly::python":
cdef cppclass cAsyncioExecutor "folly::python::AsyncioExecutor"(cFollyExecutor):
int fileno()
void drive()
void driveNoDiscard()
cdef cppclass cNotificationQueueAsyncioExecutor "folly::python::NotificationQueueAsyncioExecutor"(cAsyncioExecutor):
int fileno()
cdef class AsyncioExecutor:
cdef unique_ptr[cAsyncioExecutor] cQ
cdef cAsyncioExecutor* _executor
cdef class NotificationQueueAsyncioExecutor(AsyncioExecutor):
cdef unique_ptr[cNotificationQueueAsyncioExecutor] cQ
cdef api cAsyncioExecutor* get_executor()
cdef api int set_executor_for_loop(loop, cAsyncioExecutor* executor)
cdef api cAsyncioExecutor* get_running_executor(bint running)
......@@ -14,7 +14,7 @@
import asyncio
from folly.executor cimport cAsyncioExecutor
from folly.executor cimport cAsyncioExecutor, cNotificationQueueAsyncioExecutor
from libcpp.memory cimport make_unique, unique_ptr
from cython.operator cimport dereference as deref
from weakref import WeakKeyDictionary
......@@ -24,16 +24,21 @@ loop_to_q = WeakKeyDictionary()
cdef class AsyncioExecutor:
pass
cdef class NotificationQueueAsyncioExecutor(AsyncioExecutor):
def __cinit__(self):
self.cQ = make_unique[cAsyncioExecutor]()
self.cQ = make_unique[cNotificationQueueAsyncioExecutor]()
self._executor = self.cQ.get()
def fileno(AsyncioExecutor self):
def fileno(NotificationQueueAsyncioExecutor self):
return deref(self.cQ).fileno()
def drive(AsyncioExecutor self):
def drive(NotificationQueueAsyncioExecutor self):
deref(self.cQ).drive()
def __dealloc__(AsyncioExecutor self):
def __dealloc__(NotificationQueueAsyncioExecutor self):
# We drive it one last time
deref(self.cQ).drive()
# We are Explicitly reset here, otherwise it is possible
......@@ -58,12 +63,26 @@ cdef cAsyncioExecutor* get_running_executor(bint running):
except RuntimeError:
return NULL
try:
Q = <AsyncioExecutor>(loop_to_q[loop])
executor = <AsyncioExecutor>(loop_to_q[loop])
except KeyError:
Q = AsyncioExecutor()
loop.add_reader(Q.fileno(), Q.drive)
loop_to_q[loop] = Q
return Q.cQ.get()
executor = NotificationQueueAsyncioExecutor()
loop.add_reader(executor.fileno(), executor.drive)
loop_to_q[loop] = executor
return executor._executor
cdef int set_executor_for_loop(loop, cAsyncioExecutor* c_executor):
if c_executor == NULL:
del loop_to_q[loop]
return 0
if loop in loop_to_q:
return -1
executor = AsyncioExecutor()
executor._executor = c_executor
loop_to_q[loop] = executor
return 0
cdef cAsyncioExecutor* get_executor():
return get_running_executor(False)
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <folly/python/AsyncioExecutor.h>
namespace folly {
namespace python {
namespace test {
class TestAsyncioExecutor : public AsyncioExecutor {
public:
void add(Func) override {}
void driveNoDiscard() noexcept override {}
};
inline std::unique_ptr<TestAsyncioExecutor> makeTestAsyncioExecutor() {
return std::make_unique<TestAsyncioExecutor>();
}
} // namespace test
} // namespace python
} // namespace folly
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from . import test_set_executor_cython
class TestSetExecutor(unittest.TestCase):
def test_set_custom_executor(self):
test_set_executor_cython.test_set_custom_executor(self)
def test_cannot_override_existing_loop(self):
test_set_executor_cython.test_cannot_override_existing_loop(self)
def test_clear_existing_loop(self):
test_set_executor_cython.test_clear_existing_loop(self)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from libcpp.memory cimport unique_ptr
from folly.executor cimport set_executor_for_loop, get_executor, cAsyncioExecutor
cdef extern from "folly/python/test/test_set_executor.h" namespace "folly::python::test":
cdef cppclass cTestAsyncioExecutor "folly::python::test::TestAsyncioExecutor"(cAsyncioExecutor):
pass
cdef unique_ptr[cTestAsyncioExecutor] makeTestAsyncioExecutor()
def test_set_custom_executor(test):
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
executor = makeTestAsyncioExecutor()
test.assertEqual(set_executor_for_loop(loop, executor.get()), 0)
test.assertTrue(get_executor() == executor.get())
finally:
loop.close()
def test_cannot_override_existing_loop(test):
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
# Creates a default executor
executor = get_executor()
test.assertEqual(set_executor_for_loop(loop, executor), -1)
finally:
loop.close()
def test_clear_existing_loop(test):
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
# Creates a default executor
executor = get_executor()
test.assertEqual(set_executor_for_loop(loop, NULL), 0)
# This would return -1 if an executor were set
# (see test_cannot_override_existing_loop)
test.assertEqual(set_executor_for_loop(loop, executor), 0)
finally:
loop.close()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment