Commit 5698ad1d authored by lahiker42's avatar lahiker42

rpc impl


git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@114 00440858-1255-0410-a3e6-75ea37f81c3a
parent db15bf48
......@@ -22,6 +22,8 @@ protoc_c_LDADD = \
-lprotoc
libprotobuf_c_la_SOURCES = \
google/protobuf-c/protobuf-c-dispatch.c \
google/protobuf-c/protobuf-c-rpc.c \
google/protobuf-c/protobuf-c.c
noinst_HEADERS = \
......
......@@ -136,7 +136,7 @@
G_STMT_START{ \
type _gsk_last = NULL; \
type _gsk_at = (top); \
gboolean _gsk_last_was_left = FALSE; \
protobuf_c_boolean _gsk_last_was_left = 0; \
collision_node = NULL; \
while (_gsk_at != NULL) \
{ \
......@@ -145,12 +145,12 @@ G_STMT_START{ \
comparator(_gsk_at, (node), _gsk_compare_rv); \
if (_gsk_compare_rv > 0) \
{ \
_gsk_last_was_left = TRUE; \
_gsk_last_was_left = 1; \
_gsk_at = _gsk_at->left; \
} \
else if (_gsk_compare_rv < 0) \
{ \
_gsk_last_was_left = FALSE; \
_gsk_last_was_left = 0; \
_gsk_at = _gsk_at->right; \
} \
else \
......@@ -241,7 +241,7 @@ G_STMT_START{ \
type _gsk_rb_del_x; \
type _gsk_rb_del_y; \
type _gsk_rb_del_nullpar = NULL; /* Used to emulate sentinel nodes */ \
gboolean _gsk_rb_del_fixup; \
protobuf_c_boolean _gsk_rb_del_fixup; \
if (_gsk_rb_del_z->left == NULL || _gsk_rb_del_z->right == NULL) \
_gsk_rb_del_y = _gsk_rb_del_z; \
else \
......@@ -502,7 +502,7 @@ G_STMT_START{ \
#define GSK_RBTREE_NEXT_(top,type,is_red,set_is_red,parent,left,right,comparator, in, out) \
G_STMT_START{ \
type _gsk_next_at = (in); \
g_assert (_gsk_next_at != NULL); \
protobuf_c_assert (_gsk_next_at != NULL); \
if (_gsk_next_at->right != NULL) \
{ \
_gsk_next_at = _gsk_next_at->right; \
......@@ -585,7 +585,7 @@ G_STMT_START{ \
G_STMT_START{ \
type _gsk_last = NULL; \
type _gsk_at = (top); \
gboolean _gsk_last_was_left = FALSE; \
protobuf_c_boolean _gsk_last_was_left = 0; \
collision_node = NULL; \
while (_gsk_at != NULL) \
{ \
......@@ -594,12 +594,12 @@ G_STMT_START{ \
comparator(_gsk_at, (node), _gsk_compare_rv); \
if (_gsk_compare_rv > 0) \
{ \
_gsk_last_was_left = TRUE; \
_gsk_last_was_left = 1; \
_gsk_at = _gsk_at->left; \
} \
else if (_gsk_compare_rv < 0) \
{ \
_gsk_last_was_left = FALSE; \
_gsk_last_was_left = 0; \
_gsk_at = _gsk_at->right; \
} \
else \
......@@ -699,7 +699,7 @@ G_STMT_START{ \
type _gsk_rb_del_x; \
type _gsk_rb_del_y; \
type _gsk_rb_del_nullpar = NULL; /* Used to emulate sentinel nodes */ \
gboolean _gsk_rb_del_fixup; \
protobuf_c_boolean _gsk_rb_del_fixup; \
if (_gsk_rb_del_z->left == NULL || _gsk_rb_del_z->right == NULL) \
_gsk_rb_del_y = _gsk_rb_del_z; \
else \
......
This diff is collapsed.
#ifndef __PROTOBUF_C_DISPATCH_H_
#define __PROTOBUF_C_DISPATCH_H_
typedef struct _ProtobufCDispatch ProtobufCDispatch;
typedef struct _ProtobufCDispatchTimer ProtobufCDispatchTimer;
typedef struct _ProtobufCDispatchIdle ProtobufCDispatchIdle;
#include "protobuf-c.h"
typedef enum
{
PROTOBUF_C_EVENT_READABLE = (1<<0),
......@@ -5,7 +14,7 @@ typedef enum
} ProtobufC_Events;
/* Create or destroy a Dispatch */
ProtobufCDispatch *protobuf_c_dispatch_new (ProtobufCAllocator *allocator);
ProtobufCDispatch *protobuf_c_dispatch_new (ProtobufCAllocator *allocator);
void protobuf_c_dispatch_free(ProtobufCDispatch *dispatch);
ProtobufCAllocator *protobuf_c_dispatch_peek_allocator (ProtobufCDispatch *);
......@@ -34,6 +43,12 @@ ProtobufCDispatchTimer *
unsigned timeout_usecs,
ProtobufCDispatchTimerFunc func,
void *func_data);
ProtobufCDispatchTimer *
protobuf_c_dispatch_add_timer_millis
(ProtobufCDispatch *dispatch,
unsigned milliseconds,
ProtobufCDispatchTimerFunc func,
void *func_data);
void protobuf_c_dispatch_remove_timer (ProtobufCDispatchTimer *);
/* Idle functions */
......@@ -56,11 +71,6 @@ void protobuf_c_dispatch_run (ProtobufCDispatch *dispatch);
/* --- API for those who want to embed a dispatch into their own main-loop --- */
void protobuf_c_dispatch_dispatch (ProtobufCDispatch *dispatch,
size_t n_notifies,
ProtobufC_FDNotify *notifies);
void protobuf_c_dispatch_clear_changes (ProtobufCDispatch *);
#ifdef WIN32
typedef SOCKET ProtobufC_FD;
#else
......@@ -72,6 +82,11 @@ typedef struct {
ProtobufC_Events events;
} ProtobufC_FDNotify;
void protobuf_c_dispatch_dispatch (ProtobufCDispatch *dispatch,
size_t n_notifies,
ProtobufC_FDNotify *notifies);
void protobuf_c_dispatch_clear_changes (ProtobufCDispatch *);
struct _ProtobufCDispatch
{
......@@ -92,6 +107,11 @@ struct _ProtobufCDispatch
timeout 0 is appropriate */
protobuf_c_boolean has_idle;
unsigned long last_dispatch_secs;
unsigned last_dispatch_usecs;
/* private data follows */
};
#endif
......@@ -13,6 +13,16 @@ typedef enum
PROTOBUF_C_CLIENT_STATE_FAILED /* if no autoretry */
} ProtobufC_ClientState;
struct _Closure
{
/* these will be NULL for unallocated request ids */
const ProtobufCMessageDescriptor *response_type;
ProtobufCClosure closure;
/* this is the next request id, or 0 for none */
void *closure_data;
};
struct _ProtobufC_RPC_Client
{
ProtobufCService base_service;
......@@ -32,9 +42,18 @@ struct _ProtobufC_RPC_Client
ProtobufCDispatch_Idle *idle;
} init;
struct {
ProtobufCDispatch_Timer *timer;
protobuf_c_boolean pending;
} name_lookup;
struct {
ProtobufCDispatchTimer *timer;
char *error_message;
} failed_waiting;
struct {
unsigned closures_alloced;
unsigned first_free_request_id;
/* indexed by (request_id-1) */
Closure *closures;
} connected;
struct {
char *error_message;
} failed;
......@@ -54,7 +73,59 @@ client_failed (ProtobufC_RPC_Client *client,
const char *format_str,
...)
{
...
va_list args;
switch (client->state)
{
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
protobuf_c_assert (!client->info.name_lookup.pending);
break;
case PROTOBUF_C_CLIENT_STATE_CONNECTING:
/* nothing to do */
break;
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
/* nothing to do */
break;
/* should not get here */
case PROTOBUF_C_CLIENT_STATE_INIT:
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
case PROTOBUF_C_CLIENT_STATE_FAILED:
protobuf_c_assert (FALSE);
break;
}
if (client->fd >= 0)
{
protobuf_c_dispatch_close (client->dispatch, client->fd);
client->fd = -1;
}
protobuf_c_data_buffer_reset (&client->incoming);
protobuf_c_data_buffer_reset (&client->outgoing);
/* Compute the message */
va_start (args, format_str);
vsnprintf (buf, sizeof (buf), format_str, args);
va_end (args);
buf[sizeof(buf)-1] = 0;
msg_len = strlen (buf);
msg = client->allocator->alloc (client->allocator, msg_len + 1);
strcpy (msg, buf);
/* go to one of the failed states */
if (client->autoretry)
{
client->state = PROTOBUF_C_CLIENT_STATE_FAILED_WAITING;
client->info.failed_waiting.timer
= protobuf_c_dispatch_add_timer_millis (client->dispatch,
client->autoretry_millis,
handle_autoretry_timeout,
client);
client->info.failed_waiting.error_message = msg;
}
else
{
client->state = PROTOBUF_C_CLIENT_STATE_FAILED;
client->info.failed.error_message = msg;
}
}
static void
......@@ -89,17 +160,15 @@ begin_connecting (ProtobufC_RPC_Client *client,
client_failed (client, "error connecting to remote host: %s", strerror (errno));
return;
}
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTED;
if (client->first_outgoing_request != NULL)
{
/* register interest in writing to fd (there can be no pending requests, of course,
since we just connected) */
protobuf_c_dispatch_watch_fd (client->dispatch,
client->fd,
PROTOBUF_C_EVENT_WRITABLE,
handle_connected_client_fd_events,
client);
}
client->info.connected.closures_alloced = 1;
client->info.connected.first_free_request_id = 1;
client->info.connected.closures = client->allocator->alloc (client->allocator, sizeof (Closure));
client->info.connected.closures[0].closure = NULL;
client->info.connected.closures[0].response_type = NULL;
client->info.connected.closures[0].closure_data = UINT_TO_POINTER (0);
}
static void
handle_name_lookup_success (const uint8_t *address,
......@@ -189,6 +258,33 @@ handle_init_idle (ProtobufCDispatch *dispatch,
ProtobufC_RPC_Client *client = data;
}
static void
grow_closure_array (ProtobufC_RPC_Client *client)
{
/* resize array */
unsigned old_size = client->info.connected.closures_alloced;
unsigned new_size = old_size * 2;
Closure *new_closures = client->allocator->alloc (client->allocator, sizeof (Closure) * new_size);
memcpy (new_closures,
client->info.connected.closures,
sizeof (Closure) * old_size);
/* build new free list */
for (i = old_size; i < new_size - 1; i++)
{
new_closures[i].response_type = NULL;
new_closures[i].closure = NULL;
new_closures[i].closure_data = UINT_TO_POINTER (i+2);
}
new_closures[i].closure_data = client->info.connected.first_free_request_id;
new_closures[i].response_type = NULL;
new_closures[i].closure = NULL;
client->allocator->free (client->allocator, client->info.connected.closures);
client->info.connected.closures = new_closures;
client->info.connected.closures_alloced = new_size;
}
static void
enqueue_request (ProtobufC_RPC_Client *client,
unsigned method_index,
......@@ -196,7 +292,52 @@ enqueue_request (ProtobufC_RPC_Client *client,
ProtobufCClosure closure,
void *closure_data)
{
...
uint32_t request_id;
struct {
uint32_t method_index;
uint32_t packed_size;
uint32_t request_id;
} header;
size_t packed_size;
uint8_t *packed_data;
Closure *closure;
const ProtobufCServiceDescriptor *desc = client->base_service.descriptor;
const ProtobufCMethodDescriptor *method = descriptor->methods + method_index;
protobuf_c_assert (method_index < desc->n_methods);
/* Allocate request_id */
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED);
if (client->info.connected.first_free_request_id == 0)
grow_closure_array (client);
request_id = client->info.connected.first_free_request_id;
closure = client->info.connected.closures + (request_id - 1);
client->info.connected.first_free_request_id = POINTER_TO_UINT (closure->closure_data);
/* Pack message */
packed_size = protobuf_c_message_get_packed_size (input);
if (packed_size < client->allocator->max_alloca)
packed_data = alloca (packed_size);
else
packed_data = client->allocator->alloc (client->allocator, packed_size);
protobuf_c_message_pack (input, packed_data);
/* Append to buffer */
protobuf_c_assert (sizeof (header) == 16);
header.method_index = uint32_to_le (method_index);
header.packed_size = uint32_to_le (packed_size);
header.request_id = request_id;
protobuf_c_data_buffer_append (&client->outgoing, &header, 16);
protobuf_c_data_buffer_append (&client->outgoing, packed_data, packed_size);
/* Clean up if not using alloca() */
if (packed_size >= client->allocator->max_alloca)
client->allocator->free (client->allocator, packed_data);
/* Add closure to request-tree */
client->info.connected.closures[request_id-1].response_type = client->descriptor->methods[method_index].output;
client->info.connected.closures[request_id-1].closure = closure;
client->info.connected.closures[request_id-1].closure_data = closure_data;
}
static void
......
......@@ -3,15 +3,15 @@
/* Protocol is:
* client issues request with header:
* service_index
* request_id
* message_length
* service_index 32-bit little-endian
* message_length 32-bit little-endian
* request_id 64-bit any-endian
* server responds with header:
* service_index
* request_id
* message_length
* service_index 32-bit little-endian
* message_length 32-bit little-endian
* request_id 64-bit any-endian
*/
typedef struct _ProtobufC_Dispatch ProtobufC_Dispatch;
#include "protobuf-c-dispatch.h"
typedef enum
{
......@@ -19,6 +19,12 @@ typedef enum
PROTOBUF_C_RPC_ADDRESS_TCP /* host/port tcp socket */
} ProtobufC_RPC_AddressType;
typedef enum
{
PROTOBUF_C_ERROR_CODE_HOST_NOT_FOUND,
PROTOBUF_C_ERROR_CODE_CONNECTION_REFUSED
} ProtobufC_RPC_Error_Code;
typedef void (*ProtobufC_RPC_Error_Func) (ProtobufC_RPC_Error_Code code,
const char *message,
void *error_func_data);
......@@ -29,7 +35,7 @@ typedef void (*ProtobufC_RPC_Error_Func) (ProtobufC_RPC_Error_Code code,
ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
const char *name,
const ProtobufCServiceDescriptor *descriptor,
ProtobufC_Dispatch *dispatch);
ProtobufCDispatch *dispatch);
/* --- configuring the client */
typedef struct _ProtobufC_RPC_Client ProtobufC_RPC_Client;
......@@ -45,7 +51,7 @@ typedef void (*ProtobufC_NameLookup_Func) (ProtobufCDispatch *dispatch,
const char *name,
ProtobufC_NameLookup_Found found_func,
ProtobufC_NameLookup_Failed failed_func,
gpointer callback_data);
void *callback_data);
void protobuf_c_rpc_client_set_name_resolver (ProtobufC_RPC_Client *client,
ProtobufC_NameLookup_Func resolver);
......@@ -66,11 +72,12 @@ void protobuf_c_rpc_client_set_autoretry_period (ProtobufC_RPC_Client *client,
so you may configure the client immediately after creation */
/* --- Server API --- */
typedef struct _ProtobufC_RPC_Server ProtobufC_RPC_Server;
ProtobufC_RPC_Server *
protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type,
const char *name,
ProtobufCService *service,
ProtobufC_Dispatch *dispatch);
ProtobufCDispatch *dispatch);
ProtobufCService *
protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server,
protobuf_c_boolean free_underlying_service);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment