Commit 4349899e authored by lahiker42's avatar lahiker42

more rpc fun


git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@122 00440858-1255-0410-a3e6-75ea37f81c3a
parent ccdeb8b8
...@@ -533,19 +533,20 @@ handle_client_fd_events (int fd, ...@@ -533,19 +533,20 @@ handle_client_fd_events (int fd,
else else
{ {
/* try processing buffer */ /* try processing buffer */
while (client->incoming.size >= 12) while (client->incoming.size >= 16)
{ {
uint32_t header[3]; uint32_t header[4];
unsigned service_index, message_length, request_id; unsigned status_code, method_index, message_length, request_id;
Closure *closure; Closure *closure;
uint8_t *packed_data; uint8_t *packed_data;
ProtobufCMessage *msg; ProtobufCMessage *msg;
protobuf_c_data_buffer_peek (&client->incoming, header, sizeof (header)); protobuf_c_data_buffer_peek (&client->incoming, header, sizeof (header));
service_index = uint32_from_le (header[0]); status_code = uint32_from_le (header[0]);
message_length = uint32_from_le (header[1]); method_index = uint32_from_le (header[1]);
request_id = header[2]; /* already native-endian */ message_length = uint32_from_le (header[2]);
request_id = header[3]; /* already native-endian */
if (12 + message_length > client->incoming.size) if (16 + message_length > client->incoming.size)
break; break;
/* lookup request by id */ /* lookup request by id */
...@@ -559,7 +560,7 @@ handle_client_fd_events (int fd, ...@@ -559,7 +560,7 @@ handle_client_fd_events (int fd,
closure = client->info.connected.closures + (request_id - 1); closure = client->info.connected.closures + (request_id - 1);
/* read message and unpack */ /* read message and unpack */
protobuf_c_data_buffer_discard (&client->incoming, 12); protobuf_c_data_buffer_discard (&client->incoming, 16);
packed_data = client->allocator->alloc (client->allocator, message_length); packed_data = client->allocator->alloc (client->allocator, message_length);
protobuf_c_data_buffer_read (&client->incoming, packed_data, message_length); protobuf_c_data_buffer_read (&client->incoming, packed_data, message_length);
...@@ -719,7 +720,16 @@ struct _ServerRequest ...@@ -719,7 +720,16 @@ struct _ServerRequest
{ {
uint32_t request_id; uint32_t request_id;
ServerConnection *conn; ServerConnection *conn;
ServerRequest *prev, *next; union {
/* if conn != NULL, then the request is alive: */
struct { ServerRequest *prev, *next; } alive;
/* if conn == NULL, then the request is defunct: */
struct { ProtobufCAllocator *allocator; } defunct;
/* well, if it is in the recycled list, then it's recycled :/ */
struct { ServerRequest *next; } recycled;
} info;
}; };
struct _ServerConnection struct _ServerConnection
{ {
...@@ -740,12 +750,17 @@ struct _ProtobufC_RPC_Server ...@@ -740,12 +750,17 @@ struct _ProtobufC_RPC_Server
char *bind_name; char *bind_name;
ServerConnection *first_connection, *last_connection; ServerConnection *first_connection, *last_connection;
ServerRequest *recycled_requests;
ProtobufC_RPC_Error_Func error_handler;
void *error_handler_data;
/* configuration */ /* configuration */
unsigned max_pending_requests_per_connection; unsigned max_pending_requests_per_connection;
}; };
#define GET_PENDING_REQUEST_LIST(conn) \ #define GET_PENDING_REQUEST_LIST(conn) \
ServerRequest *, conn->first_pending_request, server->last_pending_request, prev, next ServerRequest *, conn->first_pending_request, conn->last_pending_request, info.alive.prev, info.alive.next
#define GET_CONNECTION_LIST(server) \ #define GET_CONNECTION_LIST(server) \
ServerConnection *, server->first_connection, server->last_connection, prev, next ServerConnection *, server->first_connection, server->last_connection, prev, next
...@@ -753,6 +768,7 @@ static void ...@@ -753,6 +768,7 @@ static void
server_connection_close (ServerConnection *conn) server_connection_close (ServerConnection *conn)
{ {
ServerRequest *req; ServerRequest *req;
ProtobufCAllocator *allocator = conn->server->allocator;
/* general cleanup */ /* general cleanup */
protobuf_c_dispatch_close_fd (conn->server->dispatch, conn->fd); protobuf_c_dispatch_close_fd (conn->server->dispatch, conn->fd);
...@@ -764,51 +780,151 @@ server_connection_close (ServerConnection *conn) ...@@ -764,51 +780,151 @@ server_connection_close (ServerConnection *conn)
GSK_LIST_REMOVE (GET_CONNECTION_LIST (conn->server), conn); GSK_LIST_REMOVE (GET_CONNECTION_LIST (conn->server), conn);
/* disassocate all the requests from the connection */ /* disassocate all the requests from the connection */
for (req = conn->first_pending_request; req; req = req->next) while (conn->first_pending_request != NULL)
{
conn->first_pending_request = req->info.alive.next;
req->conn = NULL; req->conn = NULL;
req->info.defunct.allocator = allocator;
}
/* free the connection itself */ /* free the connection itself */
conn->server->allocator->free (conn->server->allocator, conn); allocator->free (allocator, conn);
}
static void
server_failed_literal (ProtobufC_RPC_Server *server,
ProtobufC_RPC_Error_Code code,
const char *msg)
{
if (server->error_handler != NULL)
server->error_handler (code, msg, server->error_handler_data);
} }
static void static void
server_failed (ProtobufC_RPC_Server *server, server_failed (ProtobufC_RPC_Server *server,
ProtobufC_RPC_Error_Code code,
const char *format, const char *format,
...) ...)
{ {
... va_list args;
char buf[MAX_FAILED_MSG_LENGTH];
va_start (args, format);
vsnprintf (buf, sizeof (buf), format, args);
buf[sizeof(buf)-1] = 0;
va_end (args);
server_failed_literal (server, code, buf);
}
static protobuf_c_boolean
address_to_name (const struct sockaddr *addr,
unsigned addr_len,
char *name_out,
unsigned name_out_buf_length)
{
if (addr->sa_family == PF_INET)
{
/* convert to dotted address + port */
const struct sockaddr_in *addr_in = (const struct sockaddr_in *) addr;
const uint8_t *addr = (const uint8_t *) &(addr_in->sin_addr);
uint16_t port = htons (addr_in->sin_port);
snprintf (name_out, name_out_buf_length,
"%u.%u.%u.%u:%u",
addr[0], addr[1], addr[2], addr[3], port);
return TRUE;
}
return FALSE;
} }
static void static void
server_connection_failed (ServerConnection *conn, server_connection_failed (ServerConnection *conn,
ProtobufC_RPC_Error_Code code,
const char *format, const char *format,
...) ...)
{ {
/* do vsnprintf() */ char remote_addr_name[64];
... char msg[MAX_FAILED_MSG_LENGTH];
char *msg_end = msg + sizeof (msg);
char *msg_at;
struct sockaddr addr;
socklen_t addr_len = sizeof (addr);
va_list args;
/* if we can, find the remote name of this connection */ /* if we can, find the remote name of this connection */
... if (getpeername (conn->fd, &addr, &addr_len) == 0
&& address_to_name (&addr, addr_len, remote_addr_name, sizeof (remote_addr_name)))
snprintf (msg, sizeof (msg), "connection to %s from %s: ",
conn->server->bind_name, remote_addr_name);
else
snprintf (msg, sizeof (msg), "connection to %s: ",
conn->server->bind_name);
msg[sizeof(msg)-1] = 0;
msg_at = strchr (msg, 0);
/* do vsnprintf() */
va_start (args, format);
vsnprintf(msg_at, msg_end - msg_at, format, args);
va_end (args);
msg[sizeof(msg)-1] = 0;
/* invoke server error hook */ /* invoke server error hook */
if (remote_addr_name == NULL) server_failed_literal (conn->server, code, msg);
server_failed (conn->server,
"connection to %s: %s",
conn->server->bind_name, err_msg);
else
server_failed (conn->server,
"connection to %s from %s: %s",
conn->server->bind_name, remote_addr_name, err_msg);
server_connection_close (conn); server_connection_close (conn);
} }
static ServerRequest *
create_server_request (ServerConnection *conn,
uint32_t request_id)
{
ServerRequest *rv;
if (conn->server->recycled_requests != NULL)
{
rv = conn->server->recycled_requests;
conn->server->recycled_requests = rv->info.recycled.next;
}
else
{
ProtobufCAllocator *allocator = conn->server->allocator;
rv = allocator->alloc (allocator, sizeof (ServerRequest));
}
rv->conn = conn;
rv->request_id = request_id;
GSK_LIST_APPEND (GET_PENDING_REQUEST_LIST (conn), rv);
return rv;
}
static void
server_connection_response_closure (const ProtobufCMessage *message,
void *closure_data)
{
ServerRequest *request = closure_data;
if (request->conn == NULL)
{
/* defunct request */
ProtobufCAllocator *allocator = request->info.defunct.allocator;
allocator->free (allocator, request);
}
else if (message == NULL)
{
/* send failed status */
...
}
else
{
/* send success response */
...
}
}
static void static void
handle_server_connection_events (int fd, handle_server_connection_events (int fd,
unsigned events, unsigned events,
void *data) void *data)
{ {
ServerConnection *conn = data; ServerConnection *conn = data;
ProtobufCService *service = conn->server->underlying;
ProtobufCAllocator *allocator = conn->server->allocator;
if (events & PROTOBUF_C_EVENT_READABLE) if (events & PROTOBUF_C_EVENT_READABLE)
{ {
int read_rv = protobuf_c_data_buffer_read_in_fd (&conn->incoming, fd); int read_rv = protobuf_c_data_buffer_read_in_fd (&conn->incoming, fd);
...@@ -816,7 +932,9 @@ handle_server_connection_events (int fd, ...@@ -816,7 +932,9 @@ handle_server_connection_events (int fd,
{ {
if (!errno_is_ignorable (errno)) if (!errno_is_ignorable (errno))
{ {
server_connection_failed (conn, "reading from file-descriptor: %s", server_connection_failed (conn,
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
"reading from file-descriptor: %s",
strerror (errno)); strerror (errno));
return; return;
} }
...@@ -824,7 +942,9 @@ handle_server_connection_events (int fd, ...@@ -824,7 +942,9 @@ handle_server_connection_events (int fd,
else if (read_rv == 0) else if (read_rv == 0)
{ {
if (conn->first_pending_request != NULL) if (conn->first_pending_request != NULL)
server_connection_failed (conn, "closed while calls pending"); server_connection_failed (conn,
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
"closed while calls pending");
else else
server_connection_close (conn); server_connection_close (conn);
return; return;
...@@ -833,31 +953,48 @@ handle_server_connection_events (int fd, ...@@ -833,31 +953,48 @@ handle_server_connection_events (int fd,
while (conn->incoming.size >= 12) while (conn->incoming.size >= 12)
{ {
uint32_t header[3]; uint32_t header[3];
uint32_t service_index, message_length, request_id; uint32_t method_index, message_length, request_id;
uint8_t *packed_data;
ProtobufCMessage *message;
ServerRequest *server_request;
protobuf_c_data_buffer_peek (&conn->incoming, header, 12); protobuf_c_data_buffer_peek (&conn->incoming, header, 12);
service_index = uint32_from_le (header[0]); method_index = uint32_from_le (header[0]);
message_length = uint32_from_le (header[1]); message_length = uint32_from_le (header[1]);
request_id = header[2]; /* store in whatever endianness it comes in */ request_id = header[2]; /* store in whatever endianness it comes in */
if (conn->incoming.size < 12 + message_length) if (conn->incoming.size < 12 + message_length)
break; break;
if (service_index >= conn->server->service->descriptor->n_methods) if (method_index >= conn->server->underlying->descriptor->n_methods)
{ {
server_connection_failed (conn, "bad service_index %u", service_index); server_connection_failed (conn,
PROTOBUF_C_ERROR_CODE_BAD_REQUEST,
"bad method_index %u", method_index);
return; return;
} }
/* Read message */ /* Read message */
protobuf_c_data_buffer_discard (&conn->incoming, 12); protobuf_c_data_buffer_discard (&conn->incoming, 12);
... packed_data = allocator->alloc (allocator, message_length);
protobuf_c_data_buffer_read (&conn->incoming, packed_data, message_length);
/* Unpack message */ /* Unpack message */
... message = protobuf_c_message_unpack (service->descriptor->methods[method_index].input,
allocator, message_length, packed_data);
allocator->free (allocator, packed_data);
if (message == NULL)
{
server_connection_failed (conn,
PROTOBUF_C_ERROR_CODE_BAD_REQUEST,
"error unpacking message");
return;
}
/* Invoke service (note that it may call back immediately) */ /* Invoke service (note that it may call back immediately) */
server_request = ...; server_request = create_server_request (conn, request_id);
... service->invoke (service, method_index, message,
server_connection_response_closure, server_request);
protobuf_c_message_free_unpacked (message, allocator);
} }
} }
if ((events & PROTOBUF_C_EVENT_WRITABLE) != 0 if ((events & PROTOBUF_C_EVENT_WRITABLE) != 0
......
...@@ -3,11 +3,12 @@ ...@@ -3,11 +3,12 @@
/* Protocol is: /* Protocol is:
* client issues request with header: * client issues request with header:
* service_index 32-bit little-endian * method_index 32-bit little-endian
* message_length 32-bit little-endian * message_length 32-bit little-endian
* request_id 32-bit any-endian * request_id 32-bit any-endian
* server responds with header: * server responds with header:
* service_index 32-bit little-endian * status_code 32-bit little-endian
* method_index 32-bit little-endian
* message_length 32-bit little-endian * message_length 32-bit little-endian
* request_id 32-bit any-endian * request_id 32-bit any-endian
*/ */
...@@ -22,9 +23,17 @@ typedef enum ...@@ -22,9 +23,17 @@ typedef enum
typedef enum typedef enum
{ {
PROTOBUF_C_ERROR_CODE_HOST_NOT_FOUND, PROTOBUF_C_ERROR_CODE_HOST_NOT_FOUND,
PROTOBUF_C_ERROR_CODE_CONNECTION_REFUSED PROTOBUF_C_ERROR_CODE_CONNECTION_REFUSED,
PROTOBUF_C_ERROR_CODE_CLIENT_TERMINATED,
PROTOBUF_C_ERROR_CODE_BAD_REQUEST,
} ProtobufC_RPC_Error_Code; } ProtobufC_RPC_Error_Code;
typedef enum
{
PROTOBUF_C_STATUS_CODE_SUCCESS,
PROTOBUF_C_STATUS_CODE_TOO_MANY_PENDING
} ProtobufC_RPC_Status_Code;
typedef void (*ProtobufC_RPC_Error_Func) (ProtobufC_RPC_Error_Code code, typedef void (*ProtobufC_RPC_Error_Func) (ProtobufC_RPC_Error_Code code,
const char *message, const char *message,
void *error_func_data); void *error_func_data);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment