Commit c82e22f5 authored by Ahmed Soliman's avatar Ahmed Soliman Committed by Facebook Github Bot

Move iobuf Python wrapper from thrift to folly

Summary:
We expose iobuf to Python via the folly.iobuf Python extension. The code for this extension has historically been in the fbthrift source tree. Move it to folly so that folly produces folly extensions and thrift produces thrift extensions.

Without this change we end up with the folly.iobuf Cython extension being packaged with thrift rather than folly, leading the need for a number of work-arounds to allow Python and Cython to find folly.iobuf without stopping their searches on folly.executor.

The tests for IOBuf python extension has been added to folly while maintaining the thrift-specific IOBuf tests in thrift.

Reviewed By: vitaut

Differential Revision: D14405645

fbshipit-source-id: 662a4e32c219f6d044bb6e903a2525ec1b4059c5
parent 51a8d664
/*
* Copyright 2018-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Python.h>
#include <folly/Executor.h>
#include <folly/Function.h>
#include <folly/ScopeGuard.h>
#include <folly/io/IOBuf.h>
#if PY_VERSION_HEX < 0x03040000
#define PyGILState_Check() (true)
#endif
namespace thrift {
namespace py3 {
struct PyBufferData {
folly::Executor* executor;
PyObject* py_object;
};
std::unique_ptr<folly::IOBuf> iobuf_from_python(
folly::Executor* executor,
PyObject* py_object,
void* buf,
uint64_t length) {
Py_INCREF(py_object);
auto* userData = new PyBufferData();
userData->executor = executor;
userData->py_object = py_object;
return folly::IOBuf::takeOwnership(
buf,
length,
[](void* buf, void* userData) {
auto* py_data = (PyBufferData*)userData;
auto* py_object = py_data->py_object;
if (PyGILState_Check()) {
Py_DECREF(py_object);
} else if (py_data->executor) {
py_data->executor->add(
[py_object]() mutable { Py_DECREF(py_object); });
} else {
/*
This is the last ditch effort. We don't have the GIL and we have no
asyncio executor. In this case we will attempt to use the
pendingCall interface to cpython. This is likely to fail under
heavy load due to lock contention.
*/
int ret = Py_AddPendingCall(
[](void* userData) {
Py_DECREF((PyObject*)userData);
return 0;
},
(void*)py_object);
if (ret != 0) {
LOG(ERROR)
<< "an IOBuf was created from a non-asyncio thread, and all attempts "
<< "to free the underlying buffer has failed, memory has leaked!";
} else {
LOG(WARNING)
<< "an IOBuf was created from a non-asyncio thread, and we successful "
<< "handled cleanup but this is not a reliable interface, it will fail "
<< "under heavy load, do not create IOBufs from non-asyncio threads. ";
}
}
delete py_data;
},
userData);
}
bool check_iobuf_equal(const folly::IOBuf* a, const folly::IOBuf* b) {
return folly::IOBufEqualTo{}(a, b);
}
bool check_iobuf_less(const folly::IOBuf* a, const folly::IOBuf* b) {
return folly::IOBufLess{}(a, b);
}
} // namespace py3
} // namespace thrift
# distutils: language = c++
from libcpp.string cimport string
from libc.stdint cimport uint64_t
from libcpp.memory cimport unique_ptr
from libc.string cimport const_uchar
from folly cimport cFollyExecutor
from cpython.ref cimport PyObject
from cython.view cimport memoryview
cdef extern from "folly/io/IOBuf.h" namespace "folly":
cdef cppclass cIOBuf "folly::IOBuf":
uint64_t length()
const_uchar* data()
bint empty()
bint isChained()
size_t countChainElements()
uint64_t computeChainDataLength()
unique_ptr[cIOBuf] clone()
cIOBuf* prev()
cIOBuf* next()
void appendChain(unique_ptr[cIOBuf]&& ciobuf)
cdef extern from "folly/io/IOBuf.h" namespace "folly::IOBuf":
unique_ptr[cIOBuf] wrapBuffer(const_uchar* buf, uint64_t capacity)
unique_ptr[cIOBuf] createChain(size_t totalCapacity, size_t maxBufCapacity)
cdef extern from "folly/io/IOBufQueue.h" namespace "folly::IOBufQueue":
cdef cppclass cIOBufQueueOptions "folly::IOBufQueue::Options":
pass
cIOBufQueueOptions cacheChainLength()
cdef extern from "folly/io/IOBufQueue.h" namespace "folly":
cdef cppclass cIOBufQueue "folly::IOBufQueue":
cIOBufQueue(cIOBufQueueOptions)
cIOBufQueue()
unique_ptr[cIOBuf] move()
void append(unique_ptr[cIOBuf]&& buf)
cdef extern from '<utility>' namespace 'std':
unique_ptr[cIOBuf] move(unique_ptr[cIOBuf])
cdef extern from "folly/python/iobuf.h" namespace "thrift::py3":
unique_ptr[cIOBuf] iobuf_from_python(cFollyExecutor*, PyObject*, void*, uint64_t)
bint check_iobuf_equal(cIOBuf*, cIOBuf*)
bint check_iobuf_less(cIOBuf*, cIOBuf*)
cdef extern from "Python.h":
cdef int PyBUF_C_CONTIGUOUS
cdef class IOBuf:
cdef object __weakref__
cdef cIOBuf* _this
cdef object _parent
cdef object _hash
cdef unique_ptr[cIOBuf] _ours
cdef Py_ssize_t shape[1]
cdef Py_ssize_t strides[1]
@staticmethod
cdef IOBuf create(cIOBuf* this, object parent)
cdef unique_ptr[cIOBuf] c_clone(self)
cdef unique_ptr[cIOBuf] from_python_buffer(memoryview view)
cdef IOBuf from_unique_ptr(unique_ptr[cIOBuf] iobuf)
from typing import Any, Union, Optional, Iterator, Hashable, Iterable
class IOBuf(Hashable):
def __init__(self, buffer: Union[IOBuf, bytes, bytearray, memoryview]) -> None: ...
def clone(self) -> IOBuf: ...
@property
def next(self) -> Optional[IOBuf]: ...
@property
def prev(self) -> Optional[IOBuf]: ...
@property
def is_chained(self) -> bool: ...
def chain_size(self) -> int: ...
def chain_count(self) -> int: ...
def __bytes__(self) -> bytes: ...
def __bool__(self) -> bool: ...
def __len__(self) -> int: ...
def __iter__(
self
) -> Iterator[bytes]: ... # Lies, but typing likes it for b''.join()
def __hash__(self) -> int: ...
def __eq__(self, other: Any) -> bool: ...
def __ne__(self, other: Any) -> bool: ...
def __lt__(self, other: IOBuf) -> bool: ...
def __gt__(self, other: IOBuf) -> bool: ...
def __le__(self, other: IOBuf) -> bool: ...
def __ge__(self, other: IOBuf) -> bool: ...
from folly.executor cimport get_executor
from cpython cimport Py_buffer
from weakref import WeakValueDictionary
from cpython.object cimport Py_LT, Py_LE, Py_EQ, Py_NE, Py_GT, Py_GE
from cython.operator cimport dereference as deref
from cython.view cimport memoryview
__cache = WeakValueDictionary()
__all__ = ['IOBuf']
cdef unique_ptr[cIOBuf] from_python_buffer(memoryview view):
"""Take a python object that supports buffer protocol"""
return move(
iobuf_from_python(
get_executor(),
<PyObject*>view,
view.view.buf,
view.shape[0],
)
)
cdef IOBuf from_unique_ptr(unique_ptr[cIOBuf] ciobuf):
inst = <IOBuf>IOBuf.__new__(IOBuf)
inst._ours = move(ciobuf)
inst._parent = inst
inst._this = inst._ours.get()
__cache[(<unsigned long>inst._this, id(inst))] = inst
return inst
cdef class IOBuf:
def __init__(self, buffer not None):
cdef memoryview view = memoryview(buffer, PyBUF_C_CONTIGUOUS)
self._ours = move(from_python_buffer(view))
self._this = self._ours.get()
self._parent = self
self._hash = None
__cache[(<unsigned long>self._this, id(self))] = self
@staticmethod
cdef IOBuf create(cIOBuf* this, object parent):
key = (<unsigned long>this, id(parent))
cdef IOBuf inst = __cache.get(key)
if inst is None:
inst = <IOBuf>IOBuf.__new__(IOBuf)
inst._this = this
inst._parent = parent
__cache[key] = inst
return inst
cdef unique_ptr[cIOBuf] c_clone(self):
return move(self._this.clone())
def clone(self):
""" Clone the iobuf chain """
return from_unique_ptr(self._this.clone())
@property
def next(self):
_next = self._this.next()
if _next == self._this:
return None
return IOBuf.create(_next, self._parent)
@property
def prev(self):
_prev = self._this.prev()
if _prev == self._this:
return None
return IOBuf.create(_prev, self._parent)
@property
def is_chained(self):
return self._this.isChained()
def chain_size(self):
return self._this.computeChainDataLength()
def chain_count(self):
return self._this.countChainElements()
def __bytes__(self):
return <bytes>self._this.data()[:self._this.length()]
def __bool__(self):
return not self._this.empty()
def __getbuffer__(self, Py_buffer *buffer, int flags):
self.shape[0] = self._this.length()
self.strides[0] = 1
buffer.buf = <void *>self._this.data()
buffer.format = NULL
buffer.internal = NULL
buffer.itemsize = 1
buffer.len = self.shape[0]
buffer.ndim = 1
buffer.obj = self
buffer.readonly = 1
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = NULL
def __releasebuffer__(self, Py_buffer *buffer):
# Read-only means we need no logic here
pass
def __len__(self):
return self._this.length()
def __iter__(self):
"Iterates through the chain of buffers returning a memory view for each"
yield memoryview(self, PyBUF_C_CONTIGUOUS)
next = self.next
while next is not None and next != self:
yield memoryview(next, PyBUF_C_CONTIGUOUS)
next = next.next
def __hash__(self):
if not self._hash:
self._hash = hash(b''.join(self))
return self._hash
def __richcmp__(self, other, op):
cdef int cop = op
if not (isinstance(self, IOBuf) and isinstance(other, IOBuf)):
if cop == Py_EQ: # different types are never equal
return False
elif cop == Py_NE: # different types are always notequal
return True
else:
return NotImplemented
cdef cIOBuf* othis = (<IOBuf>other)._this
if cop == Py_EQ:
return check_iobuf_equal(self._this, othis)
elif cop == Py_NE:
return not(check_iobuf_equal(self._this, othis))
elif cop == Py_LT:
return check_iobuf_less(self._this, othis)
elif cop == Py_LE:
return not(check_iobuf_less(othis, self._this))
elif cop == Py_GT:
return check_iobuf_less(othis, self._this)
elif cop == Py_GE:
return not(check_iobuf_less(self._this, othis))
else:
return NotImplemented
#!/usr/bin/env python3
import unittest
from folly.iobuf import IOBuf
from .iobuf_helper import get_empty_chain, make_chain
class IOBufTests(unittest.TestCase):
def test_empty_chain(self) -> None:
ebuf = get_empty_chain()
self.assertFalse(ebuf)
self.assertTrue(ebuf.is_chained)
self.assertEqual(len(ebuf), 0)
self.assertEqual(ebuf.chain_size(), 0)
self.assertEqual(ebuf.chain_count(), 8)
self.assertEqual(b''.join(ebuf), b'')
self.assertEqual(b'', bytes(ebuf))
def test_chain(self) -> None:
control = [b'facebook', b'thrift', b'python3', b'cython']
chain = make_chain([IOBuf(x) for x in control])
self.assertTrue(chain.is_chained)
self.assertTrue(chain)
self.assertEqual(bytes(chain), control[0])
self.assertEqual(len(chain), len(control[0]))
self.assertEqual(chain.chain_size(), sum(len(x) for x in control))
self.assertEqual(chain.chain_count(), len(control))
self.assertEqual(memoryview(chain.next), control[1]) # type: ignore
self.assertEqual(b''.join(chain), b''.join(control))
def test_hash(self) -> None:
x = b"omg"
y = b"wtf"
xb = IOBuf(x)
yb = IOBuf(y)
hash(xb)
self.assertNotEqual(hash(xb), hash(yb))
self.assertEqual(hash(xb), hash(IOBuf(x)))
def test_empty(self) -> None:
x = b""
xb = IOBuf(x)
self.assertEqual(memoryview(xb), x) # type: ignore
self.assertEqual(bytes(xb), x)
self.assertFalse(xb)
self.assertEqual(len(xb), len(x))
def test_iter(self) -> None:
x = b"testtest"
xb = IOBuf(x)
self.assertEqual(b''.join(iter(xb)), x)
def test_bytes(self) -> None:
x = b"omgwtfbbq"
xb = IOBuf(x)
self.assertEqual(bytes(xb), x)
def test_cmp(self) -> None:
x = IOBuf(b"abc")
y = IOBuf(b"def")
z = IOBuf(b"abc")
self.assertEqual(x, z)
self.assertNotEqual(x, y)
self.assertLess(x, y)
self.assertLessEqual(x, y)
self.assertLessEqual(x, z)
self.assertGreater(y, x)
self.assertGreaterEqual(y, x)
from folly.iobuf import IOBuf
from typing import List
def get_empty_chain() -> IOBuf: ...
def make_chain(data: List[IOBuf]) -> IOBuf: ...
# distutils: language=c++
from folly.iobuf cimport from_unique_ptr, createChain, IOBuf
def get_empty_chain():
return from_unique_ptr(createChain(1024, 128))
def make_chain(data):
cdef IOBuf head = data.pop(0)
# Make a new chain
head = from_unique_ptr(head.c_clone())
cdef IOBuf tbuf
cdef IOBuf last = head
for tbuf in data:
last._this.appendChain(tbuf.c_clone())
last = last.next
return head
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment