Commit f4e009d4 authored by lahiker42's avatar lahiker42

...


git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@118 00440858-1255-0410-a3e6-75ea37f81c3a
parent e135ebd2
This diff is collapsed.
......@@ -119,17 +119,19 @@ protobuf_c_data_buffer_cleanup_recycling_bin ()
/* --- Public methods --- */
/**
* protobuf_c_data_buffer_construct:
* protobuf_c_data_buffer_init:
* @buffer: buffer to initialize (as empty).
*
* Construct an empty buffer out of raw memory.
* (This is equivalent to filling the buffer with 0s)
*/
void
protobuf_c_data_buffer_construct(ProtobufCDataBuffer *buffer)
protobuf_c_data_buffer_init(ProtobufCDataBuffer *buffer,
ProtobufCAllocator *allocator)
{
buffer->first_frag = buffer->last_frag = NULL;
buffer->size = 0;
buffer->allocator = allocator;
}
#if defined(GSK_DEBUG) || GSK_DEBUG_BUFFER_ALLOCATIONS
......@@ -708,7 +710,7 @@ protobuf_c_data_buffer_clear(ProtobufCDataBuffer *to_destroy)
* returns: its index in the buffer, or -1 if the character
* is not in the buffer.
*/
ssize_t
int
protobuf_c_data_buffer_index_of(ProtobufCDataBuffer *buffer,
char char_to_find)
{
......@@ -1103,242 +1105,3 @@ protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
}
#endif
/* --- ProtobufCDataBufferIterator --- */
/**
* protobuf_c_data_buffer_iterator_construct:
* @iterator: to initialize.
* @to_iterate: the buffer to walk through.
*
* Initialize a new #ProtobufCDataBufferIterator.
*/
void
protobuf_c_data_buffer_iterator_construct (ProtobufCDataBufferIterator *iterator,
ProtobufCDataBuffer *to_iterate)
{
iterator->fragment = to_iterate->first_frag;
if (iterator->fragment != NULL)
{
iterator->in_cur = 0;
iterator->cur_data = (uint8_t*)protobuf_c_data_buffer_fragment_start (iterator->fragment);
iterator->cur_length = iterator->fragment->buf_length;
}
else
{
iterator->in_cur = 0;
iterator->cur_data = NULL;
iterator->cur_length = 0;
}
iterator->offset = 0;
}
/**
* protobuf_c_data_buffer_iterator_peek:
* @iterator: to peek data from.
* @out: to copy data into.
* @max_length: maximum number of bytes to write to @out.
*
* Peek data from the current position of an iterator.
* The iterator's position is not changed.
*
* returns: number of bytes peeked into @out.
*/
size_t
protobuf_c_data_buffer_iterator_peek (ProtobufCDataBufferIterator *iterator,
void *out,
size_t max_length)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t out_remaining = max_length;
uint8_t *out_at = out;
while (fragment != NULL)
{
size_t frag_remaining = frag_length - in_frag;
if (out_remaining <= frag_remaining)
{
memcpy (out_at, frag_data + in_frag, out_remaining);
out_remaining = 0;
break;
}
memcpy (out_at, frag_data + in_frag, frag_remaining);
out_remaining -= frag_remaining;
out_at += frag_remaining;
fragment = fragment->next;
if (fragment != NULL)
{
frag_data = (uint8_t *) protobuf_c_data_buffer_fragment_start (fragment);
frag_length = fragment->buf_length;
}
in_frag = 0;
}
return max_length - out_remaining;
}
/**
* protobuf_c_data_buffer_iterator_read:
* @iterator: to read data from.
* @out: to copy data into.
* @max_length: maximum number of bytes to write to @out.
*
* Peek data from the current position of an iterator.
* The iterator's position is updated to be at the end of
* the data read.
*
* returns: number of bytes read into @out.
*/
size_t
protobuf_c_data_buffer_iterator_read (ProtobufCDataBufferIterator *iterator,
void *out,
size_t max_length)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t out_remaining = max_length;
uint8_t *out_at = out;
while (fragment != NULL)
{
size_t frag_remaining = frag_length - in_frag;
if (out_remaining <= frag_remaining)
{
memcpy (out_at, frag_data + in_frag, out_remaining);
in_frag += out_remaining;
out_remaining = 0;
break;
}
memcpy (out_at, frag_data + in_frag, frag_remaining);
out_remaining -= frag_remaining;
out_at += frag_remaining;
fragment = fragment->next;
if (fragment != NULL)
{
frag_data = (uint8_t *) protobuf_c_data_buffer_fragment_start (fragment);
frag_length = fragment->buf_length;
}
in_frag = 0;
}
iterator->in_cur = in_frag;
iterator->fragment = fragment;
iterator->cur_length = frag_length;
iterator->cur_data = frag_data;
iterator->offset += max_length - out_remaining;
return max_length - out_remaining;
}
/**
* protobuf_c_data_buffer_iterator_find_char:
* @iterator: to advance.
* @c: the character to look for.
*
* If it exists,
* skip forward to the next instance of @c and return TRUE.
* Otherwise, do nothing and return FALSE.
*
* returns: whether the character was found.
*/
protobuf_c_boolean
protobuf_c_data_buffer_iterator_find_char (ProtobufCDataBufferIterator *iterator,
char c)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t new_offset = iterator->offset;
if (fragment == NULL)
return -1;
for (;;)
{
size_t frag_remaining = frag_length - in_frag;
const uint8_t * ptr = memchr (frag_data + in_frag, c, frag_remaining);
if (ptr != NULL)
{
iterator->offset = (ptr - frag_data) - in_frag + new_offset;
iterator->fragment = fragment;
iterator->in_cur = ptr - frag_data;
iterator->cur_length = frag_length;
iterator->cur_data = frag_data;
return TRUE;
}
fragment = fragment->next;
if (fragment == NULL)
return FALSE;
new_offset += frag_length - in_frag;
in_frag = 0;
frag_length = fragment->buf_length;
frag_data = protobuf_c_data_buffer_fragment_start (fragment);
}
}
/**
* protobuf_c_data_buffer_iterator_skip:
* @iterator: to advance.
* @max_length: maximum number of bytes to skip forward.
*
* Advance an iterator forward in the buffer,
* returning the number of bytes skipped.
*
* returns: number of bytes skipped forward.
*/
size_t
protobuf_c_data_buffer_iterator_skip (ProtobufCDataBufferIterator *iterator,
size_t max_length)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t out_remaining = max_length;
while (fragment != NULL)
{
size_t frag_remaining = frag_length - in_frag;
if (out_remaining <= frag_remaining)
{
in_frag += out_remaining;
out_remaining = 0;
break;
}
out_remaining -= frag_remaining;
fragment = fragment->next;
if (fragment != NULL)
{
frag_data = (uint8_t *) protobuf_c_data_buffer_fragment_start (fragment);
frag_length = fragment->buf_length;
}
else
{
frag_data = NULL;
frag_length = 0;
}
in_frag = 0;
}
iterator->in_cur = in_frag;
iterator->fragment = fragment;
iterator->cur_length = frag_length;
iterator->cur_data = frag_data;
iterator->offset += max_length - out_remaining;
return max_length - out_remaining;
}
......@@ -45,6 +45,8 @@ char *protobuf_c_data_buffer_parse_string0 (ProtobufCDataBuffer *buf
int protobuf_c_data_buffer_peek_char (const ProtobufCDataBuffer *buffer);
int protobuf_c_data_buffer_read_char (ProtobufCDataBuffer *buffer);
int protobuf_c_data_buffer_index_of(ProtobufCDataBuffer *buffer,
char char_to_find);
/*
* Appending to the buffer.
*/
......@@ -72,13 +74,6 @@ void protobuf_c_data_buffer_append_string0 (ProtobufCDataBuffer *buf
const char *string);
void protobuf_c_data_buffer_printf (ProtobufCDataBuffer *buffer,
const char *format,
...) PROTOBUF_C_GNUC_PRINTF(2,3);
void protobuf_c_data_buffer_vprintf (ProtobufCDataBuffer *buffer,
const char *format,
va_list args);
/* Take all the contents from src and append
* them to dst, leaving src empty.
*/
......@@ -99,16 +94,6 @@ int protobuf_c_data_buffer_writev_len (ProtobufCDataBuffer *
int protobuf_c_data_buffer_read_in_fd (ProtobufCDataBuffer *write_to,
int read_from);
/*
* Scanning the buffer.
*/
int protobuf_c_data_buffer_index_of (ProtobufCDataBuffer *buffer,
char char_to_find);
int protobuf_c_data_buffer_str_index_of (ProtobufCDataBuffer *buffer,
const char *str_to_find);
int protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
char **strings);
/* This deallocates memory used by the buffer-- you are responsible
* for the allocation and deallocation of the ProtobufCDataBuffer itself. */
void protobuf_c_data_buffer_destruct (ProtobufCDataBuffer *to_destroy);
......@@ -116,32 +101,4 @@ void protobuf_c_data_buffer_destruct (ProtobufCDataBuffer *to_
/* Free all unused buffer fragments. */
void protobuf_c_data_buffer_cleanup_recycling_bin ();
/* intended for use on the stack */
typedef struct _ProtobufCDataBufferIterator ProtobufCDataBufferIterator;
struct _ProtobufCDataBufferIterator
{
ProtobufCDataBufferFragment *fragment;
size_t in_cur;
size_t cur_length;
const uint8_t *cur_data;
size_t offset;
};
#define protobuf_c_data_buffer_iterator_offset(iterator) ((iterator)->offset)
void protobuf_c_data_buffer_iterator_construct (ProtobufCDataBufferIterator *iterator,
ProtobufCDataBuffer *to_iterate);
unsigned protobuf_c_data_buffer_iterator_peek (ProtobufCDataBufferIterator *iterator,
void *out,
unsigned max_length);
unsigned protobuf_c_data_buffer_iterator_read (ProtobufCDataBufferIterator *iterator,
void *out,
unsigned max_length);
unsigned protobuf_c_data_buffer_iterator_skip (ProtobufCDataBufferIterator *iterator,
unsigned max_length);
protobuf_c_boolean protobuf_c_data_buffer_iterator_find_char (ProtobufCDataBufferIterator *iterator,
char c);
#endif
......@@ -9,6 +9,7 @@
#include <errno.h>
#include "protobuf-c-dispatch.h"
#include "gskrbtreemacros.h"
#include "gsklistmacros.h"
#define protobuf_c_assert(condition) assert(condition)
......@@ -70,6 +71,16 @@ struct _ProtobufCDispatchTimer
void *func_data;
};
struct _ProtobufCDispatchIdle
{
RealDispatch *dispatch;
ProtobufCDispatchIdle *prev, *next;
/* user callback */
ProtobufCDispatchIdleFunc func;
void *func_data;
};
/* Define the tree of timers, as per gskrbtreemacros.h */
#define TIMER_GET_IS_RED(n) ((n)->is_red)
#define TIMER_SET_IS_RED(n,v) ((n)->is_red = (v))
......@@ -87,6 +98,10 @@ struct _ProtobufCDispatchTimer
parent, left, right, \
TIMERS_COMPARE
/* declare the idle-handler list */
#define GET_IDLE_LIST(d) \
ProtobufCDispatchIdle *, d->first_idle, d->last_idle, prev, next
/* Create or destroy a Dispatch */
ProtobufCDispatch *protobuf_c_dispatch_new (ProtobufCAllocator *allocator)
{
......@@ -114,6 +129,13 @@ protobuf_c_dispatch_free(ProtobufCDispatch *dispatch)
FREE (d);
}
ProtobufCAllocator *
protobuf_c_dispatch_peek_allocator (ProtobufCDispatch *dispatch)
{
RealDispatch *d = (RealDispatch *) dispatch;
return d->allocator;
}
static void
enlarge_fd_map (RealDispatch *d,
unsigned fd)
......@@ -550,3 +572,35 @@ void protobuf_c_dispatch_remove_timer (ProtobufCDispatchTimer *timer)
}
}
}
ProtobufCDispatchIdle *
protobuf_c_dispatch_add_idle (ProtobufCDispatch *dispatch,
ProtobufCDispatchIdleFunc func,
void *func_data)
{
RealDispatch *d = (RealDispatch *) dispatch;
ProtobufCDispatchIdle *rv;
if (d->recycled_idles != NULL)
{
rv = d->recycled_idles;
d->recycled_idles = rv->next;
}
else
{
ProtobufCAllocator *allocator = d->allocator;
rv = ALLOC (sizeof (ProtobufCDispatchIdle));
}
GSK_LIST_APPEND (GET_IDLE_LIST (d), rv);
rv->func = func;
rv->func_data = func_data;
rv->dispatch = d;
return rv;
}
void
protobuf_c_dispatch_remove_idle (ProtobufCDispatchIdle *idle)
{
RealDispatch *d = idle->dispatch;
GSK_LIST_REMOVE (GET_IDLE_LIST (d), idle);
idle->next = d->recycled_idles;
d->recycled_idles = idle;
}
......@@ -34,7 +34,8 @@ typedef enum
PROTOBUF_C_CLIENT_STATE_CONNECTING,
PROTOBUF_C_CLIENT_STATE_CONNECTED,
PROTOBUF_C_CLIENT_STATE_FAILED_WAITING,
PROTOBUF_C_CLIENT_STATE_FAILED /* if no autoretry */
PROTOBUF_C_CLIENT_STATE_FAILED, /* if no autoretry */
PROTOBUF_C_CLIENT_STATE_DESTROYED
} ProtobufC_RPC_ClientState;
typedef struct _Closure Closure;
......@@ -68,18 +69,19 @@ struct _ProtobufC_RPC_Client
} init;
struct {
protobuf_c_boolean pending;
protobuf_c_boolean destroyed_while_pending;
uint16_t port;
} name_lookup;
struct {
ProtobufCDispatchTimer *timer;
char *error_message;
} failed_waiting;
struct {
unsigned closures_alloced;
unsigned first_free_request_id;
/* indexed by (request_id-1) */
Closure *closures;
} connected;
struct {
ProtobufCDispatchTimer *timer;
char *error_message;
} failed_waiting;
struct {
char *error_message;
} failed;
......@@ -87,6 +89,7 @@ struct _ProtobufC_RPC_Client
};
static void begin_name_lookup (ProtobufC_RPC_Client *client);
static void destroy_client_rpc (ProtobufCService *service);
static void
......@@ -132,6 +135,7 @@ client_failed (ProtobufC_RPC_Client *client,
case PROTOBUF_C_CLIENT_STATE_INIT:
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
case PROTOBUF_C_CLIENT_STATE_FAILED:
case PROTOBUF_C_CLIENT_STATE_DESTROYED:
protobuf_c_assert (FALSE);
break;
}
......@@ -292,6 +296,11 @@ handle_name_lookup_success (const uint8_t *address,
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
protobuf_c_assert (client->info.name_lookup.pending);
client->info.name_lookup.pending = 0;
if (client->info.name_lookup.destroyed_while_pending)
{
destroy_client_rpc (&client->base_service);
return;
}
addr.sin_family = PF_INET;
memcpy (&addr.sin_addr, address, 4);
addr.sin_port = htons (client->info.name_lookup.port);
......@@ -306,6 +315,11 @@ handle_name_lookup_failure (const char *error_message,
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
protobuf_c_assert (client->info.name_lookup.pending);
client->info.name_lookup.pending = 0;
if (client->info.name_lookup.destroyed_while_pending)
{
destroy_client_rpc (&client->base_service);
return;
}
client_failed (client, "name lookup failed (for name from %s): %s", client->name, error_message);
}
......@@ -348,6 +362,7 @@ begin_name_lookup (ProtobufC_RPC_Client *client)
port = atoi (colon + 1);
client->info.name_lookup.pending = 1;
client->info.name_lookup.destroyed_while_pending = 0;
client->info.name_lookup.port = port;
client->resolver (client->dispatch,
host,
......@@ -521,16 +536,50 @@ handle_client_fd_events (int fd,
{
uint32_t header[3];
unsigned service_index, message_length, request_id;
Closure *closure;
uint8_t *packed_data;
ProtobufCMessage *msg;
protobuf_c_data_buffer_peek (&client->incoming, header, sizeof (header));
service_index = uint32_from_le (header[0]);
message_length = uint32_from_le (header[1]);
request_id = header[2]; /* already native-endian */
if (12 + message_length > client.incoming.size)
if (12 + message_length > client->incoming.size)
break;
/* lookup request by id */
...
if (request_id >= client->info.connected.closures_alloced
|| request_id == 0
|| client->info.connected.closures[request_id-1].response_type == NULL)
{
client_failed (client, "bad request-id in response from server");
return;
}
closure = client->info.connected.closures + (request_id - 1);
/* read message and unpack */
protobuf_c_data_buffer_discard (&client->incoming, 12);
packed_data = client->allocator->alloc (client->allocator, message_length);
protobuf_c_data_buffer_read (&client->incoming, packed_data, message_length);
/* TODO: use fast temporary allocator */
msg = protobuf_c_message_unpack (closure->response_type,
client->allocator,
message_length,
packed_data);
if (msg == NULL)
{
client_failed (client, "failed to unpack message");
client->allocator->free (client->allocator, packed_data);
return;
}
/* invoke closure */
closure->closure (msg, closure->closure_data);
/* clean up */
protobuf_c_message_free_unpacked (msg, client->allocator);
client->allocator->free (client->allocator, packed_data);
}
}
}
......@@ -577,7 +626,8 @@ invoke_client_rpc (ProtobufCService *service,
break;
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
case PROTOBUF_C_CLIENT_STATE_FAILED: /* if no autoretry */
case PROTOBUF_C_CLIENT_STATE_FAILED:
case PROTOBUF_C_CLIENT_STATE_DESTROYED:
closure (NULL, closure_data);
break;
}
......@@ -587,7 +637,55 @@ static void
destroy_client_rpc (ProtobufCService *service)
{
ProtobufC_RPC_Client *client = (ProtobufC_RPC_Client *) service;
...
ProtobufC_RPC_ClientState state = client->state;
unsigned i;
unsigned n_closures = 0;
Closure *closures = NULL;
switch (state)
{
case PROTOBUF_C_CLIENT_STATE_INIT:
protobuf_c_dispatch_remove_idle (client->info.init.idle);
break;
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
if (client->info.name_lookup.pending)
{
client->info.name_lookup.destroyed_while_pending = 1;
return;
}
break;
case PROTOBUF_C_CLIENT_STATE_CONNECTING:
break;
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
n_closures = client->info.connected.closures_alloced;
closures = client->info.connected.closures;
break;
case PROTOBUF_C_CLIENT_STATE_FAILED_WAITING:
protobuf_c_dispatch_remove_timer (client->info.failed_waiting.timer);
client->allocator->free (client->allocator, client->info.failed_waiting.error_message);
break;
case PROTOBUF_C_CLIENT_STATE_FAILED:
client->allocator->free (client->allocator, client->info.failed.error_message);
break;
case PROTOBUF_C_CLIENT_STATE_DESTROYED:
protobuf_c_assert (0);
break;
}
if (client->fd >= 0)
{
protobuf_c_dispatch_close_fd (client->dispatch, client->fd);
client->fd = -1;
}
protobuf_c_data_buffer_clear (&client->incoming);
protobuf_c_data_buffer_clear (&client->outgoing);
client->state = PROTOBUF_C_CLIENT_STATE_DESTROYED;
/* free closures only once we are in the destroyed state */
for (i = 0; i < n_closures; i++)
closures[i].closure (NULL, closures[i].closure_data);
if (closures)
client->allocator->free (client->allocator, closures);
client->allocator->free (client->allocator, client);
}
ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment