Commit 98715f43 authored by Tatsuhiro Tsujikawa's avatar Tatsuhiro Tsujikawa

python: Add HTTP/2 server using asyncio

parent 9cc7f9fb
...@@ -24,12 +24,216 @@ from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t ...@@ -24,12 +24,216 @@ from libc.stdint cimport uint8_t, uint16_t, uint32_t, int32_t
cdef extern from 'nghttp2/nghttp2.h': cdef extern from 'nghttp2/nghttp2.h':
const char NGHTTP2_PROTO_VERSION_ID[]
const char NGHTTP2_CLIENT_CONNECTION_HEADER[]
const size_t NGHTTP2_INITIAL_WINDOW_SIZE
ctypedef struct nghttp2_session:
pass
ctypedef enum nghttp2_error:
NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE
ctypedef enum nghttp2_flag:
NGHTTP2_FLAG_NONE
NGHTTP2_FLAG_END_STREAM
NGHTTP2_FLAG_ACK
ctypedef enum nghttp2_error_code:
NGHTTP2_NO_ERROR
NGHTTP2_PROTOCOL_ERROR
NGHTTP2_INTERNAL_ERROR
NGHTTP2_SETTINGS_TIMEOUT
ctypedef enum nghttp2_frame_type:
NGHTTP2_DATA
NGHTTP2_HEADERS
NGHTTP2_RST_STREAM
NGHTTP2_SETTINGS
NGHTTP2_PUSH_PROMISE
NGHTTP2_GOAWAY
ctypedef struct nghttp2_nv: ctypedef struct nghttp2_nv:
uint8_t *name uint8_t *name
uint8_t *value uint8_t *value
uint16_t namelen uint16_t namelen
uint16_t valuelen uint16_t valuelen
ctypedef enum nghttp2_settings_id:
SETTINGS_HEADER_TABLE_SIZE
NGHTTP2_SETTINGS_HEADER_TABLE_SIZE
NGHTTP2_SETTINGS_ENABLE_PUSH
NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
ctypedef struct nghttp2_settings_entry:
int32_t settings_id
uint32_t value
ctypedef struct nghttp2_frame_hd:
size_t length
int32_t stream_id
uint8_t type
uint8_t flags
ctypedef struct nghttp2_data:
nghttp2_frame_hd hd
size_t padlen
ctypedef enum nghttp2_headers_category:
NGHTTP2_HCAT_REQUEST
NGHTTP2_HCAT_RESPONSE
NGHTTP2_HCAT_PUSH_RESPONSE
NGHTTP2_HCAT_HEADERS
ctypedef struct nghttp2_headers:
nghttp2_frame_hd hd
size_t padlen
nghttp2_nv *nva
size_t nvlen
nghttp2_headers_category cat
int32_t pri
ctypedef struct nghttp2_rst_stream:
nghttp2_frame_hd hd
nghttp2_error_code error_code
ctypedef struct nghttp2_push_promise:
nghttp2_frame_hd hd
nghttp2_nv *nva
size_t nvlen
int32_t promised_stream_id
ctypedef struct nghttp2_goaway:
nghttp2_frame_hd hd
int32_t last_stream_id
nghttp2_error_code error_code
uint8_t *opaque_data
size_t opaque_data_len
ctypedef union nghttp2_frame:
nghttp2_frame_hd hd
nghttp2_data data
nghttp2_headers headers
nghttp2_rst_stream rst_stream
nghttp2_push_promise push_promise
nghttp2_goaway goaway
ctypedef ssize_t (*nghttp2_send_callback)\
(nghttp2_session *session, const uint8_t *data, size_t length,
int flags, void *user_data)
ctypedef int (*nghttp2_on_frame_recv_callback)\
(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
ctypedef int (*nghttp2_on_data_chunk_recv_callback)\
(nghttp2_session *session, uint8_t flags, int32_t stream_id,
const uint8_t *data, size_t length, void *user_data)
ctypedef int (*nghttp2_before_frame_send_callback)\
(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
ctypedef int (*nghttp2_on_stream_close_callback)\
(nghttp2_session *session, int32_t stream_id,
nghttp2_error_code error_code, void *user_data)
ctypedef int (*nghttp2_on_begin_headers_callback)\
(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
ctypedef int (*nghttp2_on_header_callback)\
(nghttp2_session *session,
const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen,
void *user_data)
ctypedef int (*nghttp2_on_frame_send_callback)\
(nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
ctypedef int (*nghttp2_on_frame_not_send_callback)\
(nghttp2_session *session, const nghttp2_frame *frame,
int lib_error_code, void *user_data)
ctypedef struct nghttp2_session_callbacks:
nghttp2_send_callback send_callback
nghttp2_on_frame_recv_callback on_frame_recv_callback
nghttp2_on_data_chunk_recv_callback on_data_chunk_recv_callback
nghttp2_before_frame_send_callback before_frame_send_callback
nghttp2_on_frame_send_callback on_frame_send_callback
nghttp2_on_frame_not_send_callback on_frame_not_send_callback
nghttp2_on_stream_close_callback on_stream_close_callback
nghttp2_on_begin_headers_callback on_begin_headers_callback
nghttp2_on_header_callback on_header_callback
int nghttp2_session_client_new(nghttp2_session **session_ptr,
const nghttp2_session_callbacks *callbacks,
void *user_data)
int nghttp2_session_server_new(nghttp2_session **session_ptr,
const nghttp2_session_callbacks *callbacks,
void *user_data)
void nghttp2_session_del(nghttp2_session *session)
ssize_t nghttp2_session_mem_recv(nghttp2_session *session,
const uint8_t *data, size_t datalen)
ssize_t nghttp2_session_mem_send(nghttp2_session *session,
const uint8_t **data_ptr)
int nghttp2_session_send(nghttp2_session *session)
int nghttp2_session_want_read(nghttp2_session *session)
int nghttp2_session_want_write(nghttp2_session *session)
ctypedef union nghttp2_data_source:
int fd
void *ptr
ctypedef ssize_t (*nghttp2_data_source_read_callback)\
(nghttp2_session *session, int32_t stream_id,
uint8_t *buf, size_t length, int *eof,
nghttp2_data_source *source, void *user_data)
ctypedef struct nghttp2_data_provider:
nghttp2_data_source source
nghttp2_data_source_read_callback read_callback
int nghttp2_submit_request(nghttp2_session *session, int32_t pri,
const nghttp2_nv *nva, size_t nvlen,
const nghttp2_data_provider *data_prd,
void *stream_user_data)
int nghttp2_submit_response(nghttp2_session *session,
int32_t stream_id,
const nghttp2_nv *nva, size_t nvlen,
const nghttp2_data_provider *data_prd)
int nghttp2_submit_push_promise(nghttp2_session *session, uint8_t flags,
int32_t stream_id,
const nghttp2_nv *nva, size_t nvlen,
void *stream_user_data)
int nghttp2_submit_settings(nghttp2_session *session, uint8_t flags,
const nghttp2_settings_entry *iv, size_t niv)
int nghttp2_submit_rst_stream(nghttp2_session *session, uint8_t flags,
int32_t stream_id,
nghttp2_error_code error_code)
void* nghttp2_session_get_stream_user_data(nghttp2_session *session,
uint32_t stream_id)
int nghttp2_session_set_stream_user_data(nghttp2_session *session,
uint32_t stream_id,
void *stream_user_data)
int nghttp2_session_terminate_session(nghttp2_session *session,
nghttp2_error_code error_code)
const char* nghttp2_strerror(int lib_error_code) const char* nghttp2_strerror(int lib_error_code)
cdef extern from 'nghttp2_helper.h': cdef extern from 'nghttp2_helper.h':
......
...@@ -239,3 +239,719 @@ def print_hd_table(hdtable): ...@@ -239,3 +239,719 @@ def print_hd_table(hdtable):
'y' if entry.ref else 'n', 'y' if entry.ref else 'n',
entry.name.decode('utf-8'), entry.name.decode('utf-8'),
entry.value.decode('utf-8'))) entry.value.decode('utf-8')))
try:
import socket
import io
import asyncio
import traceback
import sys
import email.utils
import datetime
import time
except ImportError:
pass
cdef _get_stream_user_data(cnghttp2.nghttp2_session *session,
int32_t stream_id):
cdef void *stream_user_data
stream_user_data = cnghttp2.nghttp2_session_get_stream_user_data\
(session, stream_id)
if stream_user_data == NULL:
return None
return <object>stream_user_data
cdef size_t _make_nva(cnghttp2.nghttp2_nv **nva_ptr, headers):
cdef cnghttp2.nghttp2_nv *nva
cdef size_t nvlen
nvlen = len(headers)
nva = <cnghttp2.nghttp2_nv*>malloc(sizeof(cnghttp2.nghttp2_nv) * nvlen)
for i, (k, v) in enumerate(headers):
nva[i].name = k
nva[i].namelen = len(k)
nva[i].value = v
nva[i].valuelen = len(v)
nva_ptr[0] = nva
return nvlen
cdef int server_on_header(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
handler = _get_stream_user_data(session, frame.hd.stream_id)
if not handler:
return 0
key = name[:namelen]
values = value[:valuelen].split(b'\x00')
if key == b':scheme':
handler.scheme = values[0]
elif key == b':method':
handler.method = values[0]
elif key == b':authority' or key == b'host':
handler.host = values[0]
elif key == b':path':
handler.path = values[0]
if key == b'cookie':
handler.cookies.extend(values)
else:
for v in values:
handler.headers.append((key, v))
return 0
cdef int server_on_begin_request_headers(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
handler = http2._make_handler(frame.hd.stream_id)
cnghttp2.nghttp2_session_set_stream_user_data(session, frame.hd.stream_id,
<void*>handler)
return 0
cdef int server_on_begin_headers(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame,
void *user_data):
if frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
return server_on_begin_request_headers(session, frame, user_data)
return 0
cdef int server_on_frame_recv(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
if frame.hd.type == cnghttp2.NGHTTP2_DATA:
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
handler = _get_stream_user_data(session, frame.hd.stream_id)
if not handler:
return 0
try:
handler.on_request_done()
except:
sys.stderr.write(traceback.format_exc())
return http2._rst_stream(frame.hd.stream_id)
elif frame.hd.type == cnghttp2.NGHTTP2_HEADERS:
if frame.headers.cat == cnghttp2.NGHTTP2_HCAT_REQUEST:
handler = _get_stream_user_data(session, frame.hd.stream_id)
if not handler:
return 0
# Check required header fields. We expect that :authority
# or host header field.
if handler.scheme is None or handler.method is None or\
handler.host is None or handler.path is None:
return http2._rst_stream(frame.hd.stream_id,
cnghttp2.NGHTTP2_PROTOCOL_ERROR)
if handler.cookies:
handler.headers.append((b'cookie',
b'; '.join(handler.cookies)))
handler.cookies = None
try:
handler.on_headers()
if frame.hd.flags & cnghttp2.NGHTTP2_FLAG_END_STREAM:
handler.on_request_done()
except:
sys.stderr.write(traceback.format_exc())
return http2._rst_stream(frame.hd.stream_id)
elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK):
http2._stop_settings_timer()
return 0
cdef int server_on_data_chunk_recv(cnghttp2.nghttp2_session *session,
uint8_t flags,
int32_t stream_id, const uint8_t *data,
size_t length, void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
handler = _get_stream_user_data(session, stream_id)
if not handler:
return 0
try:
handler.on_data(data[:length])
except:
sys.stderr.write(traceback.format_exc())
return http2._rst_stream(stream_id)
return 0
cdef int server_on_frame_send(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
# For PUSH_PROMISE, send push response immediately
handler = _get_stream_user_data\
(session, frame.push_promise.promised_stream_id)
if not handler:
return 0
handler.stream_id = frame.push_promise.promised_stream_id
http2.send_response(handler)
elif frame.hd.type == cnghttp2.NGHTTP2_SETTINGS:
if (frame.hd.flags & cnghttp2.NGHTTP2_FLAG_ACK) == 0:
return 0
http2._start_settings_timer()
cdef int server_on_frame_not_send(cnghttp2.nghttp2_session *session,
const cnghttp2.nghttp2_frame *frame,
int lib_error_code,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
if frame.hd.type == cnghttp2.NGHTTP2_PUSH_PROMISE:
# We have to remove handler here. Without this, it is not
# removed until session is terminated.
handler = _get_stream_user_data\
(session, frame.push_promise.promised_stream_id)
if not handler:
return 0
http2._remove_handler(handler)
cdef int server_on_stream_close(cnghttp2.nghttp2_session *session,
int32_t stream_id,
cnghttp2.nghttp2_error_code error_code,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
handler = _get_stream_user_data(session, stream_id)
if not handler:
return 0
try:
handler.on_close(error_code)
except:
sys.stderr.write(traceback.format_exc())
http2._remove_handler(handler)
return 0
cdef ssize_t server_data_source_read(cnghttp2.nghttp2_session *session,
int32_t stream_id,
uint8_t *buf, size_t length, int *eof,
cnghttp2.nghttp2_data_source *source,
void *user_data):
cdef http2 = <_HTTP2SessionCore>user_data
handler = <object>source.ptr
try:
data = handler.response_body.read(length)
except:
sys.stderr.write(traceback.format_exc())
return cnghttp2.NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
if data:
nread = len(data)
memcpy(buf, <uint8_t*>data, nread)
return nread
eof[0] = 1
return 0
cdef class _HTTP2SessionCore:
cdef cnghttp2.nghttp2_session *session
cdef transport
cdef handler_class
cdef handlers
cdef settings_timer
def __cinit__(self, transport, handler_class):
cdef cnghttp2.nghttp2_session_callbacks callbacks
cdef cnghttp2.nghttp2_settings_entry iv[2]
cdef int rv
self.session = NULL
self.transport = transport
self.handler_class = handler_class
self.handlers = set()
self.settings_timer = None
memset(&callbacks, 0, sizeof(callbacks))
callbacks.on_header_callback = server_on_header
callbacks.on_begin_headers_callback = server_on_begin_headers
callbacks.on_frame_recv_callback = server_on_frame_recv
callbacks.on_stream_close_callback = server_on_stream_close
callbacks.on_frame_send_callback = server_on_frame_send
callbacks.on_frame_not_send_callback = server_on_frame_not_send
callbacks.on_data_chunk_recv_callback = server_on_data_chunk_recv
rv = cnghttp2.nghttp2_session_server_new(&self.session, &callbacks,
<void*>self)
if rv != 0:
raise Exception('nghttp2_session_server_new failed: {}'.format\
(_strerror(rv)))
iv[0].settings_id = cnghttp2.NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
iv[0].value = 100
iv[1].settings_id = cnghttp2.NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
iv[1].value = cnghttp2.NGHTTP2_INITIAL_WINDOW_SIZE
rv = cnghttp2.nghttp2_submit_settings(self.session,
cnghttp2.NGHTTP2_FLAG_NONE,
iv, sizeof(iv) / sizeof(iv[0]))
if rv != 0:
raise Exception('nghttp2_submit_settings failed: {}'.format\
(_strerror(rv)))
def __dealloc__(self):
cnghttp2.nghttp2_session_del(self.session)
def data_received(self, data):
cdef ssize_t rv
rv = cnghttp2.nghttp2_session_mem_recv(self.session, data, len(data))
if rv < 0:
raise Exception('nghttp2_session_mem_recv failed: {}'.format\
(_strerror(rv)))
self.send_data()
OUTBUF_MAX = 65535
SETTINGS_TIMEOUT = 5.0
def send_data(self):
cdef ssize_t outbuflen
cdef const uint8_t *outbuf
while True:
if self.transport.get_write_buffer_size() > self.OUTBUF_MAX:
break
outbuflen = cnghttp2.nghttp2_session_mem_send(self.session, &outbuf)
if outbuflen == 0:
break
if outbuflen < 0:
raise Exception('nghttp2_session_mem_send faild: {}'.format\
(_strerror(outbuflen)))
self.transport.write(outbuf[:outbuflen])
if self.transport.get_write_buffer_size() == 0 and \
cnghttp2.nghttp2_session_want_read(self.session) == 0 and \
cnghttp2.nghttp2_session_want_write(self.session) == 0:
self.transport.close()
def _make_handler(self, stream_id):
handler = self.handler_class(self, stream_id)
self.handlers.add(handler)
return handler
def _remove_handler(self, handler):
self.handlers.remove(handler)
def send_response(self, handler):
cdef cnghttp2.nghttp2_data_provider prd
cdef cnghttp2.nghttp2_data_provider *prd_ptr
cdef cnghttp2.nghttp2_nv *nva
cdef size_t nvlen
cdef int rv
nva = NULL
nvlen = _make_nva(&nva, handler.response_headers)
if handler.response_body:
prd.source.ptr = <void*>handler
prd.read_callback = server_data_source_read
prd_ptr = &prd
else:
prd_ptr = NULL
rv = cnghttp2.nghttp2_submit_response(self.session, handler.stream_id,
nva, nvlen, prd_ptr)
free(nva)
if rv != 0:
# TODO Ignore return value
self._rst_stream(handler.stream_id)
raise Exception('nghttp2_submit_response failed: {}'.format\
(_strerror(rv)))
self._log_request(handler)
def push(self, handler, promised_handler):
cdef cnghttp2.nghttp2_nv *nva
cdef size_t nvlen
self.handlers.add(promised_handler)
nva = NULL
nvlen = _make_nva(&nva, promised_handler.headers)
rv = cnghttp2.nghttp2_submit_push_promise(self.session,
cnghttp2.NGHTTP2_FLAG_NONE,
handler.stream_id,
nva, nvlen,
<void*>promised_handler)
if rv != 0:
raise Exception('nghttp2_submit_push_promise failed: {}'.format\
(_strerror(rv)))
def _rst_stream(self, stream_id,
error_code=cnghttp2.NGHTTP2_INTERNAL_ERROR):
cdef int rv
rv = cnghttp2.nghttp2_submit_rst_stream\
(self.session, cnghttp2.NGHTTP2_FLAG_NONE,
stream_id, error_code)
return rv
def _start_settings_timer(self):
loop = asyncio.get_event_loop()
self.settings_timer = loop.call_later(self.SETTINGS_TIMEOUT,
self._settings_timeout)
def _stop_settings_timer(self):
if self.settings_timer:
self.settings_timer.cancel()
self.settings_timer = None
def _settings_timeout(self):
cdef int rv
self.settings_timer = None
rv = cnghttp2.nghttp2_session_terminate_session\
(self.session, cnghttp2.NGHTTP2_SETTINGS_TIMEOUT)
try:
self.send_data()
except Exception as err:
sys.stderr.write(traceback.format_exc())
self.transport.close()
return
def _get_client_address(self):
return self.transport.get_extra_info('peername')
def _log_request(self, handler):
now = datetime.datetime.now()
tv = time.mktime(now.timetuple())
datestr = email.utils.formatdate(timeval=tv, localtime=False,
usegmt=True)
try:
method = handler.method.decode('utf-8')
except:
method = handler.method
try:
path = handler.path.decode('utf-8')
except:
path = handler.path
sys.stderr.write('{} - - [{}] "{} {} HTTP/2.0" {} - {}\n'.format\
(handler.client_address[0],
datestr, method, path, handler.status,
'P' if handler.pushed else '-'))
class BaseRequestHandler:
"""HTTP/2 request (stream) handler base class.
The class is used to handle the HTTP/2 stream. By default, it does
not nothing. It must be subclassed to handle each event callback
method.
The first callback method invoked is on_headers(). It is called
when HEADERS frame, which includes request header fields, is
arrived.
If request has request body, on_data(data) is invoked for each
chunk of received data.
When whole request is received, on_request_done() is invoked.
When stream is closed, on_close(error_code) is called.
The application can send response using send_response() method. It
can be used in on_headers(), on_data() or on_request_done().
The application can push resource using push() method. It must be
used before send_response() call.
The following instance variables are available:
client_address
Contains a tuple of the form (host, port) referring to the client's
address.
stream_id
Stream ID of this stream
scheme
Scheme of the request URI. This is a value of :scheme header field.
method
Method of this stream. This is a value of :method header field.
host
This is a value of :authority or host header field.
path
This is a value of :path header field.
"""
def __init__(self, http2, stream_id):
self.headers = []
self.cookies = []
# Stream ID. For promised stream, it is initially -1.
self.stream_id = stream_id
self.http2 = http2
# address of the client
self.client_address = self.http2._get_client_address()
# :scheme header field in request
self.scheme = None
# :method header field in request
self.method = None
# :authority or host header field in request
self.host = None
# :path header field in request
self.path = None
# HTTP status
self.status = None
# True if this is a handler for pushed resource
self.pushed = False
def on_headers(self):
'''Called when request HEADERS is arrived.
'''
pass
def on_data(self, data):
'''Called when a chunk of request body is arrived. This method will be
called multiple times until all data are received.
'''
pass
def on_request_done(self):
'''Called when whole request was received
'''
pass
def on_close(self, error_code):
'''Called when stream is about to close.
'''
pass
def send_response(self, status=200, headers=None, body=None):
'''Send response. The status is HTTP status code. The headers is
additional response headers. The :status header field is
appended by the library. The body is the response body. It
could be None if response body is empty. Or it must be
instance of either str, bytes or io.IOBase. If instance of str
is specified, it is encoded using UTF-8.
The headers is a list of tuple of the form (name, value). The
name and value are byte string.
On error, exception was thrown.
'''
if self.status is not None:
raise Exception('response has already been sent')
if not status:
raise Exception('status must not be empty')
body = self._wrap_body(body)
self._set_response_prop(status, headers, body)
self.http2.send_response(self)
def push(self, path, method='GET', request_headers=None,
status=200, headers=None, body=None):
'''Push a resource. The path is a path portion of request URI for this
resource. The method is a method to access this resource. The
request_headers is additional request headers to access this
resource. The :scheme, :method, :authority and :path are
appended by the library. The :scheme and :authority are
inherited from the request (not request_headers parameter).
The status is HTTP status code. The headers is additional
response headers. The :status header field is appended by the
library. The body is the response body. It could be None if
response body is empty. Or it must be instance of either str,
bytes or io.IOBase. If instance of str is specified, it is
encoded using UTF-8.
The headers and request_headers are a list of tuple of the
form (name, value). The name and value are byte string.
On error, exception was thrown.
'''
if not status:
raise Exception('status must not be empty')
if not method:
raise Exception('method must not be empty')
if not path:
raise Exception('path must not be empty')
body = self._wrap_body(body)
promised_handler = self.http2._make_handler(-1)
promised_handler.pushed = True
promised_handler.scheme = self.scheme
promised_handler.method = method.encode('utf-8')
promised_handler.host = self.host
promised_handler.path = path.encode('utf-8')
promised_handler._set_response_prop(status, headers, body)
if request_headers is None:
request_headers = []
request_headers.append((b':scheme', promised_handler.scheme))
request_headers.append((b':method', promised_handler.method))
request_headers.append((b':authority', promised_handler.host))
request_headers.append((b':path', promised_handler.path))
promised_handler.headers = request_headers
self.http2.push(self, promised_handler)
def _set_response_prop(self, status, headers, body):
self.status = status
self.response_headers = headers
if self.response_headers is None:
self.response_headers = []
self.response_headers.append((b':status', str(status).encode('utf-8')))
if body:
self.response_body = io.BufferedReader(body)
else:
self.response_body = None
def _wrap_body(self, body):
if body is None:
return body
elif isinstance(body, str):
return io.BytesIO(body.encode('utf-8'))
elif isinstance(body, bytes):
return io.BytesIO(body)
elif isinstance(body, io.IOBase):
return body
else:
raise Exception(('body must be None or instance of str or bytes '
'or io.BytesIO'))
class _HTTP2Session(asyncio.Protocol):
def __init__(self, RequestHandlerClass):
asyncio.Protocol.__init__(self)
self.RequestHandlerClass = RequestHandlerClass
self.http2 = None
def connection_made(self, transport):
self.transport = transport
self.connection_header = cnghttp2.NGHTTP2_CLIENT_CONNECTION_HEADER
sock = self.transport.get_extra_info('socket')
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
ssl_ctx = self.transport.get_extra_info('sslcontext')
if ssl_ctx:
if sock.selected_npn_protocol().encode('utf-8') != \
cnghttp2.NGHTTP2_PROTO_VERSION_ID:
self.transport.abort()
def connection_lost(self, exc):
if self.http2:
self.http2 = None
def data_received(self, data):
nread = min(len(data), len(self.connection_header))
if self.connection_header.startswith(data[:nread]):
data = data[nread:]
self.connection_header = self.connection_header[nread:]
if len(self.connection_header) == 0:
try:
self.http2 = _HTTP2SessionCore(self.transport,
self.RequestHandlerClass)
except Exception as err:
sys.stderr.write(traceback.format_exc())
self.transport.abort()
return
self.data_received = self.data_received2
self.resume_writing = self.resume_writing2
self.data_received(data)
else:
self.transport.abort()
def data_received2(self, data):
try:
self.http2.data_received(data)
except Exception as err:
sys.stderr.write(traceback.format_exc())
self.transport.close()
return
def resume_writing2(self):
try:
self.http2.send_data()
except Exception as err:
sys.stderr.write(traceback.format_exc())
self.transport.close()
return
class HTTP2Server:
'''HTTP/2 server.
This class builds on top of the asyncio event loop. On
construction, RequestHandlerClass must be given, which must be a
subclass of BaseRequestHandler class.
'''
def __init__(self, address, RequestHandlerClass, ssl=None):
'''address is a tuple of the listening address and port (e.g.,
('127.0.0.1', 8080)). RequestHandlerClass must be a subclass
of BaseRequestHandler class to handle a HTTP/2 stream. The
ssl can be ssl.SSLContext instance. If it is not None, the
resulting server is SSL/TLS capable.
'''
def session_factory():
return _HTTP2Session(RequestHandlerClass)
self.loop = asyncio.get_event_loop()
coro = self.loop.create_server(session_factory,
host=address[0], port=address[1],
ssl=ssl)
self.server = self.loop.run_until_complete(coro)
def serve_forever(self):
try:
self.loop.run_forever()
finally:
self.server.close()
self.loop.close()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment