Commit 3dbfdf90 authored by lahiker42's avatar lahiker42

rpc works!


git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@126 00440858-1255-0410-a3e6-75ea37f81c3a
parent d2995318
/* NOTE: this may not work very well on windows, where i'm
not sure that "SOCKETs" are allocated nicely like
file-descriptors are */
#include <assert.h>
#include <alloca.h>
#include <sys/time.h>
......@@ -7,10 +10,14 @@
#include <sys/poll.h>
#include <limits.h>
#include <errno.h>
#include <signal.h>
#include "protobuf-c-dispatch.h"
#include "gskrbtreemacros.h"
#include "gsklistmacros.h"
#define DEBUG_DISPATCH_INTERNALS 0
#define DEBUG_DISPATCH 0
#define protobuf_c_assert(condition) assert(condition)
#define ALLOC_WITH_ALLOCATOR(allocator, size) ((allocator)->alloc ((allocator)->allocator_data, (size)))
......@@ -106,16 +113,30 @@ struct _ProtobufCDispatchIdle
ProtobufCDispatch *protobuf_c_dispatch_new (ProtobufCAllocator *allocator)
{
RealDispatch *rv = ALLOC (sizeof (RealDispatch));
struct timeval tv;
rv->base.n_changes = 0;
rv->notifies_desired_alloced = 8;
rv->base.notifies_desired = ALLOC (sizeof (ProtobufC_FDNotify) * rv->notifies_desired_alloced);
rv->base.n_notifies_desired = 0;
rv->callbacks = ALLOC (sizeof (Callback) * rv->notifies_desired_alloced);
rv->changes_alloced = 8;
rv->base.changes = ALLOC (sizeof (ProtobufC_FDNotify) * rv->changes_alloced);
rv->fd_map_size = 16;
rv->fd_map = ALLOC (sizeof (FDMap) * rv->fd_map_size);
rv->allocator = allocator;
rv->timer_tree = NULL;
rv->first_idle = rv->last_idle = NULL;
memset (rv->fd_map, 255, sizeof (FDMap) * rv->fd_map_size);
rv->recycled_idles = NULL;
rv->recycled_timeouts = NULL;
/* need to handle SIGPIPE more gracefully than default */
signal (SIGPIPE, SIG_IGN);
gettimeofday (&tv, NULL);
rv->base.last_dispatch_secs = tv.tv_sec;
rv->base.last_dispatch_usecs = tv.tv_usec;
return &rv->base;
}
......@@ -124,7 +145,20 @@ protobuf_c_dispatch_free(ProtobufCDispatch *dispatch)
{
RealDispatch *d = (RealDispatch *) dispatch;
ProtobufCAllocator *allocator = d->allocator;
while (d->recycled_timeouts != NULL)
{
ProtobufCDispatchTimer *t = d->recycled_timeouts;
d->recycled_timeouts = t->right;
FREE (t);
}
while (d->recycled_idles != NULL)
{
ProtobufCDispatchIdle *i = d->recycled_idles;
d->recycled_idles = i->next;
FREE (i);
}
FREE (d->base.notifies_desired);
FREE (d->base.changes);
FREE (d->callbacks);
FREE (d->fd_map);
FREE (d);
......@@ -138,9 +172,9 @@ protobuf_c_dispatch_peek_allocator (ProtobufCDispatch *dispatch)
}
/* TODO: perhaps thread-private dispatches make more sense? */
static ProtobufCDispatch *def = NULL;
ProtobufCDispatch *protobuf_c_dispatch_default (void)
{
static ProtobufCDispatch *def = NULL;
if (def == NULL)
def = protobuf_c_dispatch_new (&protobuf_c_default_allocator);
return def;
......@@ -187,6 +221,9 @@ allocate_notifies_desired_index (RealDispatch *d)
d->base.notifies_desired = n;
d->notifies_desired_alloced = new_size;
}
#if DEBUG_DISPATCH_INTERNALS
fprintf (stderr, "allocate_notifies_desired_index: returning %u\n", rv);
#endif
return rv;
}
static unsigned
......@@ -230,12 +267,16 @@ deallocate_notify_desired_index (RealDispatch *d,
unsigned nd_ind = d->fd_map[fd].notify_desired_index;
unsigned from = d->base.n_notifies_desired - 1;
unsigned from_fd;
#if DEBUG_DISPATCH_INTERNALS
fprintf (stderr, "deallocate_notify_desired_index: fd=%d, nd_ind=%u\n",fd,nd_ind);
#endif
d->fd_map[fd].notify_desired_index = -1;
if (nd_ind == from)
{
d->base.n_notifies_desired--;
return;
}
from_fd = d->base.notifies_desired[nd_ind].fd;
from_fd = d->base.notifies_desired[from].fd;
d->fd_map[from_fd].notify_desired_index = nd_ind;
d->base.notifies_desired[nd_ind] = d->base.notifies_desired[from];
d->base.n_notifies_desired--;
......@@ -253,6 +294,12 @@ protobuf_c_dispatch_watch_fd (ProtobufCDispatch *dispatch,
RealDispatch *d = (RealDispatch *) dispatch;
unsigned f = fd; /* avoid tiring compiler warnings: "comparison of signed versus unsigned" */
unsigned nd_ind, change_ind;
#if DEBUG_DISPATCH
fprintf (stderr, "dispatch: watch_fd: %d, %s%s\n",
fd,
(events&PROTOBUF_C_EVENT_READABLE)?"r":"",
(events&PROTOBUF_C_EVENT_WRITABLE)?"w":"");
#endif
if (callback == NULL)
assert (events == 0);
else
......@@ -260,12 +307,14 @@ protobuf_c_dispatch_watch_fd (ProtobufCDispatch *dispatch,
ensure_fd_map_big_enough (d, f);
if (d->fd_map[f].notify_desired_index == -1)
{
d->fd_map[f].notify_desired_index = allocate_notifies_desired_index (d);
if (callback != NULL)
nd_ind = d->fd_map[f].notify_desired_index = allocate_notifies_desired_index (d);
}
else
{
if (callback == NULL)
deallocate_notify_desired_index (d, f);
else
nd_ind = d->fd_map[f].notify_desired_index;
}
if (callback == NULL)
......@@ -304,6 +353,9 @@ protobuf_c_dispatch_fd_closed(ProtobufCDispatch *dispatch,
{
unsigned f = fd;
RealDispatch *d = (RealDispatch *) dispatch;
#if DEBUG_DISPATCH
fprintf (stderr, "dispatch: fd %d closed\n", fd);
#endif
ensure_fd_map_big_enough (d, f);
d->fd_map[fd].closed_since_notify_started = 1;
if (d->fd_map[f].change_index != -1)
......@@ -330,8 +382,6 @@ protobuf_c_dispatch_dispatch (ProtobufCDispatch *dispatch,
unsigned i;
FDMap *fd_map = d->fd_map;
struct timeval tv;
if (n_notifies == 0)
return;
fd_max = 0;
for (i = 0; i < n_notifies; i++)
if (fd_max < (unsigned) notifies[i].fd)
......@@ -352,6 +402,20 @@ protobuf_c_dispatch_dispatch (ProtobufCDispatch *dispatch,
}
}
/* handle idle functions */
while (d->first_idle != NULL)
{
ProtobufCDispatchIdle *idle = d->first_idle;
ProtobufCDispatchIdleFunc func = idle->func;
void *data = idle->func_data;
GSK_LIST_REMOVE_FIRST (GET_IDLE_LIST (d));
idle->func = NULL; /* set to NULL to render remove_idle a no-op */
func (dispatch, data);
idle->next = d->recycled_idles;
d->recycled_idles = idle;
}
/* handle timers */
gettimeofday (&tv, NULL);
......@@ -439,7 +503,9 @@ protobuf_c_dispatch_run (ProtobufCDispatch *dispatch)
/* compute timeout */
if (d->first_idle != NULL)
{
timeout = 0;
}
else if (d->timer_tree == NULL)
timeout = -1;
else
......@@ -516,6 +582,7 @@ protobuf_c_dispatch_add_timer(ProtobufCDispatch *dispatch,
{
RealDispatch *d = (RealDispatch *) dispatch;
ProtobufCDispatchTimer *rv;
ProtobufCDispatchTimer *at;
ProtobufCDispatchTimer *conflict;
protobuf_c_assert (func != NULL);
if (d->recycled_timeouts != NULL)
......@@ -533,6 +600,17 @@ protobuf_c_dispatch_add_timer(ProtobufCDispatch *dispatch,
rv->func_data = func_data;
rv->dispatch = d;
GSK_RBTREE_INSERT (GET_TIMER_TREE (d), rv, conflict);
/* is this the first element in the tree */
for (at = rv; at != NULL; at = at->parent)
if (at->parent && at->parent->right == at)
break;
if (at == NULL) /* yes, so set the public members */
{
dispatch->has_timeout = 1;
dispatch->timeout_secs = rv->timeout_secs;
dispatch->timeout_usecs = rv->timeout_usecs;
}
return rv;
}
......@@ -609,8 +687,20 @@ protobuf_c_dispatch_add_idle (ProtobufCDispatch *dispatch,
void
protobuf_c_dispatch_remove_idle (ProtobufCDispatchIdle *idle)
{
if (idle->func != NULL)
{
RealDispatch *d = idle->dispatch;
GSK_LIST_REMOVE (GET_IDLE_LIST (d), idle);
idle->next = d->recycled_idles;
d->recycled_idles = idle;
}
}
void protobuf_c_dispatch_destroy_default (void)
{
if (def)
{
ProtobufCDispatch *kill = def;
def = NULL;
protobuf_c_dispatch_free (kill);
}
}
......@@ -115,5 +115,6 @@ struct _ProtobufCDispatch
/* private data follows */
};
void protobuf_c_dispatch_destroy_default (void);
#endif
/* KNOWN DEFECTS:
- server does not obey max_pending_requests_per_connection
- no ipv6 support
*/
#include <string.h>
#include <assert.h>
......@@ -27,6 +28,9 @@
#undef FALSE
#define FALSE 0
/* enabled for efficiency, can be useful to disable for debugging */
#define RECYCLE_REQUESTS 1
#define UINT_TO_POINTER(ui) ((void*)(ui))
#define POINTER_TO_UINT(ptr) ((unsigned)(ptr))
......@@ -54,6 +58,15 @@ struct _Closure
void *closure_data;
};
static void
error_handler (ProtobufC_RPC_Error_Code code,
const char *message,
void *error_func_data)
{
fprintf (stderr, "*** error: %s: %s\n",
(char*) error_func_data, message);
}
struct _ProtobufC_RPC_Client
{
ProtobufCService base_service;
......@@ -67,6 +80,8 @@ struct _ProtobufC_RPC_Client
protobuf_c_boolean autoretry;
unsigned autoretry_millis;
ProtobufC_NameLookup_Func resolver;
ProtobufC_RPC_Error_Func error_handler;
void *error_handler_data;
ProtobufC_RPC_ClientState state;
union {
struct {
......@@ -109,7 +124,11 @@ static void
handle_autoretry_timeout (ProtobufCDispatch *dispatch,
void *func_data)
{
begin_name_lookup (func_data);
ProtobufC_RPC_Client *client = func_data;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_FAILED_WAITING);
client->allocator->free (client->allocator,
client->info.failed_waiting.error_message);
begin_name_lookup (client);
}
static void
......@@ -416,6 +435,7 @@ grow_closure_array (ProtobufC_RPC_Client *client)
new_closures[i].closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
new_closures[i].response_type = NULL;
new_closures[i].closure = NULL;
client->info.connected.first_free_request_id = old_size + 1;
client->allocator->free (client->allocator, client->info.connected.closures);
client->info.connected.closures = new_closures;
......@@ -472,11 +492,11 @@ enqueue_request (ProtobufC_RPC_Client *client,
protobuf_c_message_pack (input, packed_data);
/* Append to buffer */
protobuf_c_assert (sizeof (header) == 16);
protobuf_c_assert (sizeof (header) == 12);
header.method_index = uint32_to_le (method_index);
header.packed_size = uint32_to_le (packed_size);
header.request_id = request_id;
protobuf_c_data_buffer_append (&client->outgoing, &header, 16);
protobuf_c_data_buffer_append (&client->outgoing, &header, 12);
protobuf_c_data_buffer_append (&client->outgoing, packed_data, packed_size);
/* Clean up if not using alloca() */
......@@ -554,7 +574,7 @@ handle_client_fd_events (int fd,
break;
/* lookup request by id */
if (request_id >= client->info.connected.closures_alloced
if (request_id > client->info.connected.closures_alloced
|| request_id == 0
|| client->info.connected.closures[request_id-1].response_type == NULL)
{
......@@ -575,6 +595,7 @@ handle_client_fd_events (int fd,
packed_data);
if (msg == NULL)
{
fprintf(stderr, "unable to unpack msg of length %u", message_length);
client_failed (client, "failed to unpack message");
client->allocator->free (client->allocator, packed_data);
return;
......@@ -582,6 +603,10 @@ handle_client_fd_events (int fd,
/* invoke closure */
closure->closure (msg, closure->closure_data);
closure->response_type = NULL;
closure->closure = NULL;
closure->closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
client->info.connected.first_free_request_id = request_id;
/* clean up */
protobuf_c_message_free_unpacked (msg, client->allocator);
......@@ -684,9 +709,11 @@ destroy_client_rpc (ProtobufCService *service)
protobuf_c_data_buffer_clear (&client->incoming);
protobuf_c_data_buffer_clear (&client->outgoing);
client->state = PROTOBUF_C_CLIENT_STATE_DESTROYED;
client->allocator->free (client->allocator, client->name);
/* free closures only once we are in the destroyed state */
for (i = 0; i < n_closures; i++)
if (closures[i].response_type != NULL)
closures[i].closure (NULL, closures[i].closure_data);
if (closures)
client->allocator->free (client->allocator, closures);
......@@ -731,6 +758,8 @@ ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
rv->autoretry = 1;
rv->autoretry_millis = 2*1000;
rv->resolver = trivial_sync_libc_resolver;
rv->error_handler = error_handler;
rv->error_handler_data = "protobuf-c rpc client";
rv->info.init.idle = protobuf_c_dispatch_add_idle (dispatch, handle_init_idle, rv);
return &rv->base_service;
}
......@@ -792,6 +821,7 @@ struct _ProtobufC_RPC_Server
ProtobufCService *underlying;
char *bind_name;
ServerConnection *first_connection, *last_connection;
ProtobufC_FD listening_fd;
ServerRequest *recycled_requests;
......@@ -810,7 +840,6 @@ struct _ProtobufC_RPC_Server
static void
server_connection_close (ServerConnection *conn)
{
ServerRequest *req;
ProtobufCAllocator *allocator = conn->server->allocator;
/* general cleanup */
......@@ -825,6 +854,7 @@ server_connection_close (ServerConnection *conn)
/* disassocate all the requests from the connection */
while (conn->first_pending_request != NULL)
{
ServerRequest *req = conn->first_pending_request;
conn->first_pending_request = req->info.alive.next;
req->conn = NULL;
req->info.defunct.allocator = allocator;
......@@ -934,6 +964,8 @@ create_server_request (ServerConnection *conn,
}
rv->conn = conn;
rv->request_id = request_id;
rv->method_index = method_index;
conn->n_pending_requests++;
GSK_LIST_APPEND (GET_PENDING_REQUEST_LIST (conn), rv);
return rv;
}
......@@ -953,8 +985,10 @@ server_connection_response_closure (const ProtobufCMessage *message,
/* defunct request */
ProtobufCAllocator *allocator = request->info.defunct.allocator;
allocator->free (allocator, request);
return;
}
else if (message == NULL)
if (message == NULL)
{
/* send failed status */
uint32_t header[4];
......@@ -988,6 +1022,18 @@ server_connection_response_closure (const ProtobufCMessage *message,
handle_server_connection_events,
conn);
GSK_LIST_REMOVE (GET_PENDING_REQUEST_LIST (conn), request);
conn->n_pending_requests--;
#if RECYCLE_REQUESTS
/* recycle request */
request->info.recycled.next = conn->server->recycled_requests;
conn->server->recycled_requests = request;
#else
/* free the request immediately */
conn->server->allocator->free (conn->server->allocator, request);
#endif
}
static void
......@@ -1115,6 +1161,7 @@ handle_server_listener_readable (int fd,
protobuf_c_data_buffer_init (&conn->incoming, server->allocator);
protobuf_c_data_buffer_init (&conn->outgoing, server->allocator);
conn->n_pending_requests = 0;
conn->first_pending_request = conn->last_pending_request = NULL;
conn->server = server;
GSK_LIST_APPEND (GET_CONNECTION_LIST (server), conn);
protobuf_c_dispatch_watch_fd (server->dispatch, conn->fd, PROTOBUF_C_EVENT_READABLE,
......@@ -1136,6 +1183,10 @@ server_new_from_fd (ProtobufC_FD listening_fd,
server->first_connection = server->last_connection = NULL;
server->max_pending_requests_per_connection = 32;
server->bind_name = allocator->alloc (allocator, strlen (bind_name) + 1);
server->error_handler = error_handler;
server->error_handler_data = "protobuf-c rpc server";
server->listening_fd = listening_fd;
server->recycled_requests = NULL;
strcpy (server->bind_name, bind_name);
set_fd_nonblocking (listening_fd);
protobuf_c_dispatch_watch_fd (dispatch, listening_fd, PROTOBUF_C_EVENT_READABLE,
......@@ -1143,6 +1194,60 @@ server_new_from_fd (ProtobufC_FD listening_fd,
return server;
}
/* this function is for handling the common problem
that we bind over-and-over again to the same
unix path.
ideally, you'd think the OS's SO_REUSEADDR flag would
cause this to happen, but it doesn't,
at least on my linux 2.6 box.
in fact, we really need a way to test without
actually connecting to the remote server,
which might annoy it.
XXX: we should survey what others do here... like x-windows...
*/
/* NOTE: stolen from gsk, obviously */
void
_gsk_socket_address_local_maybe_delete_stale_socket (const char *path,
struct sockaddr *addr,
unsigned addr_len)
{
int fd;
struct stat statbuf;
if (stat (path, &statbuf) < 0)
return;
if (!S_ISSOCK (statbuf.st_mode))
{
fprintf (stderr, "%s existed but was not a socket\n", path);
return;
}
fd = socket (PF_UNIX, SOCK_STREAM, 0);
if (fd < 0)
return;
set_fd_nonblocking (fd);
if (connect (fd, addr, addr_len) < 0)
{
if (errno == EINPROGRESS)
{
close (fd);
return;
}
}
else
{
close (fd);
return;
}
/* ok, we should delete the stale socket */
close (fd);
if (unlink (path) < 0)
fprintf (stderr, "unable to delete %s: %s\n",
path, strerror(errno));
}
ProtobufC_RPC_Server *
protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type,
const char *name,
......@@ -1164,6 +1269,9 @@ protobuf_c_rpc_server_new (ProtobufC_RPC_AddressType type,
strncpy (addr_un.sun_path, name, sizeof (addr_un.sun_path));
address_len = sizeof (addr_un);
address = (struct sockaddr *) (&addr_un);
_gsk_socket_address_local_maybe_delete_stale_socket (name,
address,
address_len);
break;
case PROTOBUF_C_RPC_ADDRESS_TCP:
protocol_family = PF_UNIX;
......@@ -1216,6 +1324,7 @@ protobuf_c_rpc_server_destroy (ProtobufC_RPC_Server *server,
server->recycled_requests = req->info.recycled.next;
server->allocator->free (server->allocator, req);
}
protobuf_c_dispatch_close_fd (server->dispatch, server->listening_fd);
if (destroy_underlying)
protobuf_c_service_destroy (server->underlying);
server->allocator->free (server->allocator, server);
......
......@@ -37,5 +37,5 @@ BUILT_SOURCES = generated-code/test.pb-c.c generated-code/test.pb-c.h \
generated-code/test-full.pb.cc generated-code/test-full.pb.h \
generated-code/test-full-cxx-output.inc
DISTCLEANFILES = $(BUILT_SOURCES)
TESTS = test-generated-code test-generated-code2
TESTS = test-generated-code test-generated-code2 test-rpc
EXTRA_DIST = test.proto test-full.proto common-test-arrays.h
#include <assert.h>
#include <string.h>
#include <stdarg.h>
#include <stdio.h>
#include "generated-code/test.pb-c.h"
#include <google/protobuf-c/protobuf-c-rpc.h>
static void
message (const char *format, ...)
{
va_list args;
va_start (args, format);
vfprintf (stderr, format, args);
va_end (args);
fputc ('\n', stderr);
}
/* --- A local service --- */
static void
test__by_name (Foo__DirLookup_Service *service,
......@@ -35,6 +47,7 @@ test__by_name (Foo__DirLookup_Service *service,
if (number != NULL)
{
pn.number = number;
pn.has_type = 1;
pn.type = type;
person.n_phone = 1;
person.phone = pns;
......@@ -158,8 +171,10 @@ int main()
ProtobufC_RPC_Client *client;
ProtobufC_RPC_Server *server;
message ("testing local service");
test_service (local_service);
message ("creating client");
/* Create a client with no server. Verify that
the client returns a failure immediately */
remote_service = protobuf_c_rpc_client_new (PROTOBUF_C_RPC_ADDRESS_LOCAL,
......@@ -172,18 +187,23 @@ int main()
is_done = 0;
protobuf_c_dispatch_add_timer_millis (protobuf_c_dispatch_default (),
250, set_boolean_true, &is_done);
message ("verify client cannot connect");
while (!is_done)
{
protobuf_c_dispatch_run (protobuf_c_dispatch_default ());
assert (!protobuf_c_rpc_client_is_connected (client));
}
message ("testing unconnected client");
test_defunct_client (remote_service);
message ("creating server");
/* Create a server and wait for the client to connect. */
server = protobuf_c_rpc_server_new (PROTOBUF_C_RPC_ADDRESS_LOCAL,
"test.socket",
local_service,
NULL);
assert (server != NULL);
message ("waiting to connect");
is_done = 0;
protobuf_c_dispatch_add_timer_millis (protobuf_c_dispatch_default (),
250, set_boolean_true, &is_done);
......@@ -201,23 +221,25 @@ int main()
protobuf_c_dispatch_run (protobuf_c_dispatch_default ());
/* Test the client */
message ("testing client");
test_service (remote_service);
/* Destroy the server and ensure that a request is failed in
a timely fashion. */
message ("destroying server");
protobuf_c_rpc_server_destroy (server, 0);
server = NULL;
message ("test client has no data");
test_defunct_client (remote_service);
/* Create a server again and wait for the client to reconnect. */
message ("creating server again");
server = protobuf_c_rpc_server_new (PROTOBUF_C_RPC_ADDRESS_LOCAL,
"test.socket",
local_service,
NULL);
assert (server != NULL);
is_done = 0;
while (!is_done)
protobuf_c_dispatch_run (protobuf_c_dispatch_default ());
protobuf_c_dispatch_add_timer_millis (protobuf_c_dispatch_default (),
250, set_boolean_true, &is_done);
while (!is_done)
......@@ -225,14 +247,21 @@ int main()
assert (protobuf_c_rpc_client_is_connected (client));
/* Test the client again, for kicks. */
message ("testing client again");
test_service (remote_service);
/* Destroy the client */
message ("destroying client");
protobuf_c_service_destroy (remote_service);
/* Destroy the server */
message ("destroying server");
protobuf_c_rpc_server_destroy (server, 0);
protobuf_c_dispatch_destroy_default ();
unlink ("test.socket");
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment