Commit 462a6843 authored by lahiker42's avatar lahiker42

misc progress


git-svn-id: https://protobuf-c.googlecode.com/svn/trunk@116 00440858-1255-0410-a3e6-75ea37f81c3a
parent bc20a9a3
......@@ -23,6 +23,7 @@ protoc_c_LDADD = \
libprotobuf_c_la_SOURCES = \
google/protobuf-c/protobuf-c-dispatch.c \
google/protobuf-c/protobuf-c-data-buffer.c \
google/protobuf-c/protobuf-c-rpc.c \
google/protobuf-c/protobuf-c.c
......
......@@ -16,15 +16,21 @@
*/
#define GSK_DEBUG_BUFFER_ALLOCATIONS 0
#define BUFFER_RECYCLING 0
#include <sys/types.h>
#if HAVE_WRITEV
#include <sys/uio.h>
#endif
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <alloca.h>
#include "protobuf-c-data-buffer.h"
#undef TRUE
#define TRUE 1
#undef FALSE
#define FALSE 0
#define PROTOBUF_C_FRAGMENT_DATA_SIZE 4096
#define PROTOBUF_C_FRAGMENT_DATA(frag) ((uint8_t*)(((ProtobufCDataBufferFragment*)(frag))+1))
......@@ -46,101 +52,44 @@ protobuf_c_data_buffer_fragment_end (ProtobufCDataBufferFragment *frag)
}
/* --- ProtobufCDataBufferFragment recycling --- */
#if !GSK_DEBUG_BUFFER_ALLOCATIONS
#if BUFFER_RECYCLING
static int num_recycled = 0;
static ProtobufCDataBufferFragment* recycling_stack = 0;
G_LOCK_DEFINE_STATIC (recycling_stack);
/* Foreign fragments are of a different size, and have a different
* pool accordingly.
*/
static GMemChunk *foreign_mem_chunk = NULL;
G_LOCK_DEFINE_STATIC (foreign_mem_chunk);
#endif
static ProtobufCDataBufferFragment *
new_native_fragment()
new_native_fragment(ProtobufCAllocator *allocator)
{
ProtobufCDataBufferFragment *frag;
#if GSK_DEBUG_BUFFER_ALLOCATIONS
frag = (ProtobufCDataBufferFragment *) g_malloc (BUF_CHUNK_SIZE);
#if !BUFFER_RECYCLING
frag = (ProtobufCDataBufferFragment *) allocator->alloc (allocator, BUF_CHUNK_SIZE);
#else /* optimized (?) */
G_LOCK (recycling_stack);
if (recycling_stack)
{
frag = recycling_stack;
recycling_stack = recycling_stack->next;
num_recycled--;
G_UNLOCK (recycling_stack);
}
else
{
G_UNLOCK (recycling_stack);
frag = (ProtobufCDataBufferFragment *) g_malloc (BUF_CHUNK_SIZE);
}
#endif /* !GSK_DEBUG_BUFFER_ALLOCATIONS */
frag->buf_start = frag->buf_length = 0;
frag->next = 0;
frag->is_foreign = 0;
return frag;
}
static ProtobufCDataBufferFragment *
new_foreign_fragment (gconstpointer ptr,
int length,
GDestroyNotify destroy,
gpointer ddata)
{
ProtobufCDataBufferFragment *fragment;
#if GSK_DEBUG_BUFFER_ALLOCATIONS
fragment = g_malloc (sizeof (ProtobufCDataBufferFragment));
#else
G_LOCK (foreign_mem_chunk);
if (foreign_mem_chunk == NULL)
foreign_mem_chunk = g_mem_chunk_create (ProtobufCDataBufferFragment, 16,
G_ALLOC_AND_FREE);
fragment = g_mem_chunk_alloc (foreign_mem_chunk);
G_UNLOCK (foreign_mem_chunk);
#endif
fragment->is_foreign = 1;
fragment->buf_start = 0;
fragment->buf_length = length;
fragment->next = NULL;
fragment->buf = (char *) ptr;
fragment->destroy = destroy;
fragment->destroy_data = ddata;
return fragment;
}
#if GSK_DEBUG_BUFFER_ALLOCATIONS
#define recycle(frag) g_free(frag)
#if GSK_DEBUG_BUFFER_ALLOCATIONS || !BUFFER_RECYCLING
#define recycle(allocator, frag) allocator->free (allocator, frag)
#else /* optimized (?) */
static void
recycle(ProtobufCDataBufferFragment* frag)
recycle(ProtobufCDataBufferFragment* frag,
ProtobufCAllocator *allocator)
{
if (frag->is_foreign)
{
if (frag->destroy != NULL)
(*frag->destroy) (frag->destroy_data);
G_LOCK (foreign_mem_chunk);
g_mem_chunk_free (foreign_mem_chunk, frag);
G_UNLOCK (foreign_mem_chunk);
return;
}
G_LOCK (recycling_stack);
#if defined(MAX_RECYCLED)
if (num_recycled >= MAX_RECYCLED)
{
g_free (frag);
G_UNLOCK (recycling_stack);
return;
}
#endif
frag->next = recycling_stack;
recycling_stack = frag;
num_recycled++;
G_UNLOCK (recycling_stack);
}
#endif /* !GSK_DEBUG_BUFFER_ALLOCATIONS */
......@@ -154,7 +103,7 @@ recycle(ProtobufCDataBufferFragment* frag)
void
protobuf_c_data_buffer_cleanup_recycling_bin ()
{
#if !GSK_DEBUG_BUFFER_ALLOCATIONS
#if !GSK_DEBUG_BUFFER_ALLOCATIONS && BUFFER_RECYCLING
G_LOCK (recycling_stack);
while (recycling_stack != NULL)
{
......@@ -208,7 +157,7 @@ verify_buffer (const ProtobufCDataBuffer *buffer)
*/
void
protobuf_c_data_buffer_append(ProtobufCDataBuffer *buffer,
gconstpointer data,
const void *data,
size_t length)
{
CHECK_INTEGRITY (buffer);
......@@ -218,7 +167,7 @@ protobuf_c_data_buffer_append(ProtobufCDataBuffer *buffer,
size_t avail;
if (!buffer->last_frag)
{
buffer->last_frag = buffer->first_frag = new_native_fragment ();
buffer->last_frag = buffer->first_frag = new_native_fragment (buffer->allocator);
avail = protobuf_c_data_buffer_fragment_avail (buffer->last_frag);
}
else
......@@ -226,7 +175,7 @@ protobuf_c_data_buffer_append(ProtobufCDataBuffer *buffer,
avail = protobuf_c_data_buffer_fragment_avail (buffer->last_frag);
if (avail <= 0)
{
buffer->last_frag->next = new_native_fragment ();
buffer->last_frag->next = new_native_fragment (buffer->allocator);
avail = protobuf_c_data_buffer_fragment_avail (buffer->last_frag);
buffer->last_frag = buffer->last_frag->next;
}
......@@ -244,7 +193,7 @@ protobuf_c_data_buffer_append(ProtobufCDataBuffer *buffer,
void
protobuf_c_data_buffer_append_repeated_char (ProtobufCDataBuffer *buffer,
char character,
gsize count)
size_t count)
{
CHECK_INTEGRITY (buffer);
buffer->size += count;
......@@ -253,7 +202,7 @@ protobuf_c_data_buffer_append_repeated_char (ProtobufCDataBuffer *buffer,
size_t avail;
if (!buffer->last_frag)
{
buffer->last_frag = buffer->first_frag = new_native_fragment ();
buffer->last_frag = buffer->first_frag = new_native_fragment (buffer->allocator);
avail = protobuf_c_data_buffer_fragment_avail (buffer->last_frag);
}
else
......@@ -261,7 +210,7 @@ protobuf_c_data_buffer_append_repeated_char (ProtobufCDataBuffer *buffer,
avail = protobuf_c_data_buffer_fragment_avail (buffer->last_frag);
if (avail <= 0)
{
buffer->last_frag->next = new_native_fragment ();
buffer->last_frag->next = new_native_fragment (buffer->allocator);
avail = protobuf_c_data_buffer_fragment_avail (buffer->last_frag);
buffer->last_frag = buffer->last_frag->next;
}
......@@ -298,7 +247,7 @@ void
protobuf_c_data_buffer_append_string(ProtobufCDataBuffer *buffer,
const char *string)
{
g_return_if_fail (string != NULL);
assert (string != NULL);
protobuf_c_data_buffer_append (buffer, string, strlen (string));
}
......@@ -345,7 +294,7 @@ protobuf_c_data_buffer_append_string0 (ProtobufCDataBuffer *buffer,
*/
size_t
protobuf_c_data_buffer_read(ProtobufCDataBuffer *buffer,
gpointer data,
void *data,
size_t max_length)
{
size_t rv = 0;
......@@ -363,7 +312,7 @@ protobuf_c_data_buffer_read(ProtobufCDataBuffer *buffer,
buffer->first_frag = first->next;
if (!buffer->first_frag)
buffer->last_frag = NULL;
recycle (first);
recycle (buffer->allocator, first);
}
else
{
......@@ -376,7 +325,7 @@ protobuf_c_data_buffer_read(ProtobufCDataBuffer *buffer,
}
}
buffer->size -= rv;
g_assert (rv == orig_max_length || buffer->size == 0);
assert (rv == orig_max_length || buffer->size == 0);
CHECK_INTEGRITY (buffer);
return rv;
}
......@@ -399,7 +348,7 @@ protobuf_c_data_buffer_read(ProtobufCDataBuffer *buffer,
*/
size_t
protobuf_c_data_buffer_peek (const ProtobufCDataBuffer *buffer,
gpointer data,
void *data,
size_t max_length)
{
int rv = 0;
......@@ -448,8 +397,8 @@ protobuf_c_data_buffer_read_line(ProtobufCDataBuffer *buffer)
CHECK_INTEGRITY (buffer);
for (at = buffer->first_frag; at; at = at->next)
{
char *start = protobuf_c_data_buffer_fragment_start (at);
char *got;
uint8_t *start = protobuf_c_data_buffer_fragment_start (at);
uint8_t *got;
got = memchr (start, '\n', at->buf_length);
if (got)
{
......@@ -460,7 +409,7 @@ protobuf_c_data_buffer_read_line(ProtobufCDataBuffer *buffer)
}
if (at == NULL)
return NULL;
rv = g_new (char, len + 1);
rv = buffer->allocator->alloc (buffer->allocator, len + 1);
/* If we found a newline, read it out, truncating
* it with NUL before we return from the function... */
if (at)
......@@ -491,7 +440,7 @@ protobuf_c_data_buffer_parse_string0(ProtobufCDataBuffer *buffer)
char *rv;
if (index0 < 0)
return NULL;
rv = g_new (char, index0 + 1);
rv = buffer->allocator->alloc (buffer->allocator, index0 + 1);
protobuf_c_data_buffer_read (buffer, rv, index0 + 1);
return rv;
}
......@@ -517,7 +466,7 @@ protobuf_c_data_buffer_peek_char(const ProtobufCDataBuffer *buffer)
for (frag = buffer->first_frag; frag; frag = frag->next)
if (frag->buf_length > 0)
break;
return * (const unsigned char *) (protobuf_c_data_buffer_fragment_start ((ProtobufCDataBufferFragment*)frag));
return * protobuf_c_data_buffer_fragment_start ((ProtobufCDataBufferFragment*)frag);
}
/**
......@@ -536,7 +485,7 @@ protobuf_c_data_buffer_read_char (ProtobufCDataBuffer *buffer)
char c;
if (protobuf_c_data_buffer_read (buffer, &c, 1) == 0)
return -1;
return (int) (guint8) c;
return (int) (uint8_t) c;
}
/**
......@@ -549,7 +498,7 @@ protobuf_c_data_buffer_read_char (ProtobufCDataBuffer *buffer)
*
* returns: number of bytes discarded.
*/
int
size_t
protobuf_c_data_buffer_discard(ProtobufCDataBuffer *buffer,
size_t max_discard)
{
......@@ -565,7 +514,7 @@ protobuf_c_data_buffer_discard(ProtobufCDataBuffer *buffer,
buffer->first_frag = first->next;
if (!buffer->first_frag)
buffer->last_frag = NULL;
recycle (first);
recycle (buffer->allocator, first);
}
else
{
......@@ -580,6 +529,16 @@ protobuf_c_data_buffer_discard(ProtobufCDataBuffer *buffer,
return rv;
}
static inline protobuf_c_boolean
errno_is_ignorable (int e)
{
#ifdef EWOULDBLOCK /* for windows */
if (e == EWOULDBLOCK)
return 1;
#endif
return e == EINTR || e == EAGAIN;
}
/**
* protobuf_c_data_buffer_writev:
* @read_from: buffer to take data from.
......@@ -617,7 +576,7 @@ protobuf_c_data_buffer_writev (ProtobufCDataBuffer *read_from,
frag_at = frag_at->next;
}
rv = writev (fd, iov, nfrag);
if (rv < 0 && gsk_errno_is_ignorable (errno))
if (rv < 0 && errno_is_ignorable (errno))
return 0;
if (rv <= 0)
return rv;
......@@ -639,6 +598,8 @@ protobuf_c_data_buffer_writev (ProtobufCDataBuffer *read_from,
* returns: the number of bytes transferred,
* or -1 on a write error (consult errno).
*/
#undef MIN
#define MIN(a,b) ((a) < (b) ? (a) : (b))
int
protobuf_c_data_buffer_writev_len (ProtobufCDataBuffer *read_from,
int fd,
......@@ -670,7 +631,7 @@ protobuf_c_data_buffer_writev_len (ProtobufCDataBuffer *read_from,
bytes -= frag_bytes;
}
rv = writev (fd, iov, i);
if (rv < 0 && gsk_errno_is_ignorable (errno))
if (rv < 0 && errno_is_ignorable (errno))
return 0;
if (rv <= 0)
return rv;
......@@ -711,20 +672,33 @@ protobuf_c_data_buffer_read_in_fd(ProtobufCDataBuffer *write_to,
* but it also is allowed to start using it again.
*/
void
protobuf_c_data_buffer_destruct(ProtobufCDataBuffer *to_destroy)
protobuf_c_data_buffer_reset(ProtobufCDataBuffer *to_destroy)
{
ProtobufCDataBufferFragment *at = to_destroy->first_frag;
CHECK_INTEGRITY (to_destroy);
while (at)
{
ProtobufCDataBufferFragment *next = at->next;
recycle (at);
recycle (to_destroy->allocator, at);
at = next;
}
to_destroy->first_frag = to_destroy->last_frag = NULL;
to_destroy->size = 0;
}
void
protobuf_c_data_buffer_clear(ProtobufCDataBuffer *to_destroy)
{
ProtobufCDataBufferFragment *at = to_destroy->first_frag;
CHECK_INTEGRITY (to_destroy);
while (at)
{
ProtobufCDataBufferFragment *next = at->next;
recycle (to_destroy->allocator, at);
at = next;
}
}
/**
* protobuf_c_data_buffer_index_of:
* @buffer: buffer to scan.
......@@ -734,7 +708,7 @@ protobuf_c_data_buffer_destruct(ProtobufCDataBuffer *to_destroy)
* returns: its index in the buffer, or -1 if the character
* is not in the buffer.
*/
int
ssize_t
protobuf_c_data_buffer_index_of(ProtobufCDataBuffer *buffer,
char char_to_find)
{
......@@ -742,8 +716,8 @@ protobuf_c_data_buffer_index_of(ProtobufCDataBuffer *buffer,
int rv = 0;
while (at)
{
char *start = protobuf_c_data_buffer_fragment_start (at);
char *saught = memchr (start, char_to_find, at->buf_length);
uint8_t *start = protobuf_c_data_buffer_fragment_start (at);
uint8_t *saught = memchr (start, char_to_find, at->buf_length);
if (saught)
return (saught - start) + rv;
else
......@@ -775,10 +749,10 @@ protobuf_c_data_buffer_str_index_of (ProtobufCDataBuffer *buffer,
while (frag_rem > 0)
{
ProtobufCDataBufferFragment *subfrag;
const char *subfrag_at;
const uint8_t *subfrag_at;
size_t subfrag_rem;
const char *str_at;
if (G_LIKELY (*frag_at != str_to_find[0]))
if (*frag_at != str_to_find[0])
{
frag_at++;
frag_rem--;
......@@ -798,7 +772,7 @@ protobuf_c_data_buffer_str_index_of (ProtobufCDataBuffer *buffer,
subfrag = subfrag->next;
if (subfrag == NULL)
goto bad_guess;
subfrag_at = subfrag->buf + subfrag->buf_start;
subfrag_at = protobuf_c_data_buffer_fragment_start (subfrag);
subfrag_rem = subfrag->buf_length;
}
while (*str_at != '\0' && subfrag_rem != 0)
......@@ -963,45 +937,7 @@ protobuf_c_data_buffer_transfer(ProtobufCDataBuffer *dst,
}
#endif /* !GSK_DEBUG_BUFFER_ALLOCATIONS */
/* --- foreign data --- */
/**
* protobuf_c_data_buffer_append_foreign:
* @buffer: the buffer to append into.
* @data: the data to append.
* @length: length of @data.
* @destroy: optional method to call when the data is no longer needed.
* @destroy_data: the argument to the destroy method.
*
* This function allows data to be placed in a buffer without
* copying. It is the callers' responsibility to ensure that
* @data will remain valid until the destroy method is called.
* @destroy may be omitted if @data is permanent, for example,
* if appended a static string into a buffer.
*/
void protobuf_c_data_buffer_append_foreign (ProtobufCDataBuffer *buffer,
gconstpointer data,
int length,
GDestroyNotify destroy,
gpointer destroy_data)
{
ProtobufCDataBufferFragment *fragment;
CHECK_INTEGRITY (buffer);
fragment = new_foreign_fragment (data, length, destroy, destroy_data);
fragment->next = NULL;
if (buffer->last_frag == NULL)
buffer->first_frag = fragment;
else
buffer->last_frag->next = fragment;
buffer->last_frag = fragment;
buffer->size += length;
CHECK_INTEGRITY (buffer);
}
#if 0
/**
* protobuf_c_data_buffer_printf:
* @buffer: the buffer to append to.
......@@ -1096,7 +1032,7 @@ int
protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
char **strings)
{
guint8 init_char_map[16];
uint8_t init_char_map[16];
int num_strings;
int num_bits = 0;
int total_index = 0;
......@@ -1104,9 +1040,9 @@ protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
memset (init_char_map, 0, sizeof (init_char_map));
for (num_strings = 0; strings[num_strings] != NULL; num_strings++)
{
guint8 c = strings[num_strings][0];
guint8 mask = (1 << (c % 8));
guint8 *rack = init_char_map + (c / 8);
uint8_t c = strings[num_strings][0];
uint8_t mask = (1 << (c % 8));
uint8_t *rack = init_char_map + (c / 8);
if ((*rack & mask) == 0)
{
*rack |= mask;
......@@ -1137,7 +1073,7 @@ protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
{
while (remaining > 0)
{
guint8 i = (guint8) (*at);
uint8_t i = (uint8_t) (*at);
if (init_char_map[i / 8] & (1 << (i % 8)))
break;
remaining--;
......@@ -1165,6 +1101,7 @@ protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
}
return -1;
}
#endif
/* --- ProtobufCDataBufferIterator --- */
......@@ -1183,7 +1120,7 @@ protobuf_c_data_buffer_iterator_construct (ProtobufCDataBufferIterator *iterator
if (iterator->fragment != NULL)
{
iterator->in_cur = 0;
iterator->cur_data = (guint8*)protobuf_c_data_buffer_fragment_start (iterator->fragment);
iterator->cur_data = (uint8_t*)protobuf_c_data_buffer_fragment_start (iterator->fragment);
iterator->cur_length = iterator->fragment->buf_length;
}
else
......@@ -1208,17 +1145,17 @@ protobuf_c_data_buffer_iterator_construct (ProtobufCDataBufferIterator *iterator
*/
size_t
protobuf_c_data_buffer_iterator_peek (ProtobufCDataBufferIterator *iterator,
gpointer out,
void *out,
size_t max_length)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const guint8 *frag_data = iterator->cur_data;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t out_remaining = max_length;
guint8 *out_at = out;
uint8_t *out_at = out;
while (fragment != NULL)
{
......@@ -1237,7 +1174,7 @@ protobuf_c_data_buffer_iterator_peek (ProtobufCDataBufferIterator *iterator
fragment = fragment->next;
if (fragment != NULL)
{
frag_data = (guint8 *) protobuf_c_data_buffer_fragment_start (fragment);
frag_data = (uint8_t *) protobuf_c_data_buffer_fragment_start (fragment);
frag_length = fragment->buf_length;
}
in_frag = 0;
......@@ -1259,17 +1196,17 @@ protobuf_c_data_buffer_iterator_peek (ProtobufCDataBufferIterator *iterator
*/
size_t
protobuf_c_data_buffer_iterator_read (ProtobufCDataBufferIterator *iterator,
gpointer out,
void *out,
size_t max_length)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const guint8 *frag_data = iterator->cur_data;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t out_remaining = max_length;
guint8 *out_at = out;
uint8_t *out_at = out;
while (fragment != NULL)
{
......@@ -1289,7 +1226,7 @@ protobuf_c_data_buffer_iterator_read (ProtobufCDataBufferIterator *iterator
fragment = fragment->next;
if (fragment != NULL)
{
frag_data = (guint8 *) protobuf_c_data_buffer_fragment_start (fragment);
frag_data = (uint8_t *) protobuf_c_data_buffer_fragment_start (fragment);
frag_length = fragment->buf_length;
}
in_frag = 0;
......@@ -1314,14 +1251,14 @@ protobuf_c_data_buffer_iterator_read (ProtobufCDataBufferIterator *iterator
* returns: whether the character was found.
*/
gboolean
protobuf_c_boolean
protobuf_c_data_buffer_iterator_find_char (ProtobufCDataBufferIterator *iterator,
char c)
{
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const guint8 *frag_data = iterator->cur_data;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t new_offset = iterator->offset;
......@@ -1331,7 +1268,7 @@ protobuf_c_data_buffer_iterator_find_char (ProtobufCDataBufferIterator *iterator
for (;;)
{
size_t frag_remaining = frag_length - in_frag;
const guint8 * ptr = memchr (frag_data + in_frag, c, frag_remaining);
const uint8_t * ptr = memchr (frag_data + in_frag, c, frag_remaining);
if (ptr != NULL)
{
iterator->offset = (ptr - frag_data) - in_frag + new_offset;
......@@ -1347,7 +1284,7 @@ protobuf_c_data_buffer_iterator_find_char (ProtobufCDataBufferIterator *iterator
new_offset += frag_length - in_frag;
in_frag = 0;
frag_length = fragment->buf_length;
frag_data = (guint8 *) fragment->buf + fragment->buf_start;
frag_data = protobuf_c_data_buffer_fragment_start (fragment);
}
}
......@@ -1368,7 +1305,7 @@ protobuf_c_data_buffer_iterator_skip (ProtobufCDataBufferIterator *iterator
ProtobufCDataBufferFragment *fragment = iterator->fragment;
size_t frag_length = iterator->cur_length;
const guint8 *frag_data = iterator->cur_data;
const uint8_t *frag_data = iterator->cur_data;
size_t in_frag = iterator->in_cur;
size_t out_remaining = max_length;
......@@ -1388,7 +1325,7 @@ protobuf_c_data_buffer_iterator_skip (ProtobufCDataBufferIterator *iterator
fragment = fragment->next;
if (fragment != NULL)
{
frag_data = (guint8 *) protobuf_c_data_buffer_fragment_start (fragment);
frag_data = (uint8_t *) protobuf_c_data_buffer_fragment_start (fragment);
frag_length = fragment->buf_length;
}
else
......
......@@ -22,12 +22,13 @@ struct _ProtobufCDataBuffer
ProtobufCDataBufferFragment *first_frag;
ProtobufCDataBufferFragment *last_frag;
ProtobufCAllocator *allocator;
};
#define PROTOBUF_C_DATA_BUFFER_STATIC_INIT { 0, NULL, NULL }
void protobuf_c_data_buffer_construct (ProtobufCDataBuffer *buffer);
void protobuf_c_data_buffer_init (ProtobufCDataBuffer *buffer,
ProtobufCAllocator *allocator);
void protobuf_c_data_buffer_clear (ProtobufCDataBuffer *buffer);
void protobuf_c_data_buffer_reset (ProtobufCDataBuffer *buffer);
size_t protobuf_c_data_buffer_read (ProtobufCDataBuffer *buffer,
void* data,
......@@ -101,11 +102,11 @@ int protobuf_c_data_buffer_read_in_fd (ProtobufCDataBuffer *
/*
* Scanning the buffer.
*/
ssize_t protobuf_c_data_buffer_index_of (ProtobufCDataBuffer *buffer,
int protobuf_c_data_buffer_index_of (ProtobufCDataBuffer *buffer,
char char_to_find);
ssize_t protobuf_c_data_buffer_str_index_of (ProtobufCDataBuffer *buffer,
int protobuf_c_data_buffer_str_index_of (ProtobufCDataBuffer *buffer,
const char *str_to_find);
ssize_t protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
int protobuf_c_data_buffer_polystr_index_of (ProtobufCDataBuffer *buffer,
char **strings);
/* This deallocates memory used by the buffer-- you are responsible
......
......@@ -17,6 +17,8 @@ typedef enum
ProtobufCDispatch *protobuf_c_dispatch_new (ProtobufCAllocator *allocator);
void protobuf_c_dispatch_free(ProtobufCDispatch *dispatch);
ProtobufCDispatch *protobuf_c_dispatch_default (void);
ProtobufCAllocator *protobuf_c_dispatch_peek_allocator (ProtobufCDispatch *);
typedef void (*ProtobufCDispatchCallback) (int fd,
......
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "protobuf-c-rpc.h"
#include "protobuf-c-data-buffer.h"
typedef struct _ProtobufC_RPC_Client ProtobufC_RPC_Client;
#define protobuf_c_assert(x) assert(x)
#undef TRUE
#define TRUE 1
#undef FALSE
#define FALSE 0
#define UINT_TO_POINTER(ui) ((void*)(ui))
#define POINTER_TO_UINT(ptr) ((unsigned)(ptr))
#define MAX_FAILED_MSG_LENGTH 512
typedef enum
{
......@@ -11,8 +35,9 @@ typedef enum
PROTOBUF_C_CLIENT_STATE_CONNECTED,
PROTOBUF_C_CLIENT_STATE_FAILED_WAITING,
PROTOBUF_C_CLIENT_STATE_FAILED /* if no autoretry */
} ProtobufC_ClientState;
} ProtobufC_RPC_ClientState;
typedef struct _Closure Closure;
struct _Closure
{
/* these will be NULL for unallocated request ids */
......@@ -32,17 +57,18 @@ struct _ProtobufC_RPC_Client
ProtobufCDispatch *dispatch;
ProtobufC_RPC_AddressType address_type;
char *name;
ProtobufC_ClientState client_state;
ProtobufC_FD fd;
protobuf_c_boolean autoretry;
unsigned autoretry_millis;
ProtobufC_NameLookup_Func resolver;
ProtobufC_RPC_ClientState state;
union {
struct {
ProtobufCDispatch_Idle *idle;
ProtobufCDispatchIdle *idle;
} init;
struct {
protobuf_c_boolean pending;
uint16_t port;
} name_lookup;
struct {
ProtobufCDispatchTimer *timer;
......@@ -57,9 +83,12 @@ struct _ProtobufC_RPC_Client
struct {
char *error_message;
} failed;
};
} info;
};
static void begin_name_lookup (ProtobufC_RPC_Client *client);
static void
set_fd_nonblocking(int fd)
{
......@@ -68,12 +97,24 @@ set_fd_nonblocking(int fd)
fcntl (fd, F_SETFL, flags | O_NONBLOCK);
}
static void
handle_autoretry_timeout (ProtobufCDispatch *dispatch,
void *func_data)
{
begin_name_lookup (func_data);
}
static void
client_failed (ProtobufC_RPC_Client *client,
const char *format_str,
...)
{
va_list args;
char buf[MAX_FAILED_MSG_LENGTH];
size_t msg_len;
char *msg;
size_t n_closures = 0;
Closure *closures = NULL;
switch (client->state)
{
case PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP:
......@@ -83,7 +124,8 @@ client_failed (ProtobufC_RPC_Client *client,
/* nothing to do */
break;
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
/* nothing to do */
n_closures = client->info.connected.closures_alloced;
closures = client->info.connected.closures;
break;
/* should not get here */
......@@ -95,7 +137,7 @@ client_failed (ProtobufC_RPC_Client *client,
}
if (client->fd >= 0)
{
protobuf_c_dispatch_close (client->dispatch, client->fd);
protobuf_c_dispatch_close_fd (client->dispatch, client->fd);
client->fd = -1;
}
protobuf_c_data_buffer_reset (&client->incoming);
......@@ -126,11 +168,89 @@ client_failed (ProtobufC_RPC_Client *client,
client->state = PROTOBUF_C_CLIENT_STATE_FAILED;
client->info.failed.error_message = msg;
}
/* we defer calling the closures to avoid
any re-entrancy issues (e.g. people further RPC should
not see a socket in the "connected" state-- at least,
it shouldn't be accessing the array of closures that we are considering */
if (closures != NULL)
{
unsigned i;
for (i = 0; i < n_closures; i++)
if (closures[i].response_type != NULL)
closures[i].closure (NULL, closures[i].closure_data);
client->allocator->free (client->allocator, closures);
}
}
static inline protobuf_c_boolean
errno_is_ignorable (int e)
{
#ifdef EWOULDBLOCK /* for windows */
if (e == EWOULDBLOCK)
return 1;
#endif
return e == EINTR || e == EAGAIN;
}
static void
set_state_connected (ProtobufC_RPC_Client *client)
{
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTED;
client->info.connected.closures_alloced = 1;
client->info.connected.first_free_request_id = 1;
client->info.connected.closures = client->allocator->alloc (client->allocator, sizeof (Closure));
client->info.connected.closures[0].closure = NULL;
client->info.connected.closures[0].response_type = NULL;
client->info.connected.closures[0].closure_data = UINT_TO_POINTER (0);
}
static void
handle_client_fd_connect_events (int fd,
unsigned events,
void *callback_data)
{
ProtobufC_RPC_Client *client = callback_data;
socklen_t size_int = sizeof (int);
int fd_errno = EINVAL;
if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &fd_errno, &size_int) < 0)
{
/* Note: this behavior is vaguely hypothetically broken,
* in terms of ignoring getsockopt's error;
* however, this shouldn't happen, and EINVAL is ok if it does.
* Furthermore some broken OS's return an error code when
* fetching SO_ERROR!
*/
}
if (fd_errno == 0)
{
/* goto state CONNECTED */
protobuf_c_dispatch_watch_fd (client->dispatch,
client->fd,
0, NULL, NULL);
set_state_connected (client);
}
else if (errno_is_ignorable (fd_errno))
{
/* remain in CONNECTING state */
return;
}
else
{
/* Call error handler */
protobuf_c_dispatch_close_fd (client->dispatch, client->fd);
client_failed (client,
"failed connecting to server: %s",
strerror (fd_errno));
}
}
static void
begin_connecting (ProtobufC_RPC_Client *client,
struct sockaddr_t *address,
struct sockaddr *address,
size_t addr_len)
{
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
......@@ -161,28 +281,21 @@ begin_connecting (ProtobufC_RPC_Client *client,
return;
}
client->state = PROTOBUF_C_CLIENT_STATE_CONNECTED;
client->info.connected.closures_alloced = 1;
client->info.connected.first_free_request_id = 1;
client->info.connected.closures = client->allocator->alloc (client->allocator, sizeof (Closure));
client->info.connected.closures[0].closure = NULL;
client->info.connected.closures[0].response_type = NULL;
client->info.connected.closures[0].closure_data = UINT_TO_POINTER (0);
set_state_connected (client);
}
static void
handle_name_lookup_success (const uint8_t *address,
void *callback_data)
{
ProtobufC_RPC_Client *client = callback_data;
struct sockaddr_in address;
struct sockaddr_in addr;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
protobuf_c_assert (client->info.name_lookup.pending);
client->info.name_lookup.pending = 0;
address.sin_family = PF_INET;
memcpy (address.sin_addr, address, 4);
address.sin_port = htons (client->info.name_lookup.port);
begin_connecting (client, (struct sockaddr *) &address, sizeof (address));
addr.sin_family = PF_INET;
memcpy (&addr.sin_addr, address, 4);
addr.sin_port = htons (client->info.name_lookup.port);
begin_connecting (client, (struct sockaddr *) &addr, sizeof (addr));
}
static void
......@@ -193,7 +306,7 @@ handle_name_lookup_failure (const char *error_message,
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_NAME_LOOKUP);
protobuf_c_assert (client->info.name_lookup.pending);
client->info.name_lookup.pending = 0;
client_failed ("name lookup failed (for name from %s): %s", client->name, error_message);
client_failed (client, "name lookup failed (for name from %s): %s", client->name, error_message);
}
static void
......@@ -211,7 +324,8 @@ begin_name_lookup (ProtobufC_RPC_Client *client)
struct sockaddr_un addr;
addr.sun_family = AF_UNIX;
strncpy (addr.sun_path, client->name, sizeof (addr.sun_path));
begin_connecting (client, (struct sockaddr *) &addr);
begin_connecting (client, (struct sockaddr *) &addr,
sizeof (addr));
return;
}
......@@ -236,7 +350,7 @@ begin_name_lookup (ProtobufC_RPC_Client *client)
client->info.name_lookup.pending = 1;
client->info.name_lookup.port = port;
client->resolver (client->dispatch,
hostname,
host,
handle_name_lookup_success,
handle_name_lookup_failure,
client);
......@@ -256,6 +370,8 @@ handle_init_idle (ProtobufCDispatch *dispatch,
void *data)
{
ProtobufC_RPC_Client *client = data;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_INIT);
begin_name_lookup (client);
}
static void
......@@ -264,6 +380,7 @@ grow_closure_array (ProtobufC_RPC_Client *client)
/* resize array */
unsigned old_size = client->info.connected.closures_alloced;
unsigned new_size = old_size * 2;
unsigned i;
Closure *new_closures = client->allocator->alloc (client->allocator, sizeof (Closure) * new_size);
memcpy (new_closures,
client->info.connected.closures,
......@@ -276,7 +393,7 @@ grow_closure_array (ProtobufC_RPC_Client *client)
new_closures[i].closure = NULL;
new_closures[i].closure_data = UINT_TO_POINTER (i+2);
}
new_closures[i].closure_data = client->info.connected.first_free_request_id;
new_closures[i].closure_data = UINT_TO_POINTER (client->info.connected.first_free_request_id);
new_closures[i].response_type = NULL;
new_closures[i].closure = NULL;
......@@ -284,6 +401,17 @@ grow_closure_array (ProtobufC_RPC_Client *client)
client->info.connected.closures = new_closures;
client->info.connected.closures_alloced = new_size;
}
static uint32_t
uint32_to_le (uint32_t le)
{
#if IS_LITTLE_ENDIAN
return le;
#else
return (le << 24) | (le >> 24)
| ((le >> 8) & 0xff0000)
| ((le << 8) & 0xff00);
#endif
}
static void
enqueue_request (ProtobufC_RPC_Client *client,
......@@ -300,9 +428,9 @@ enqueue_request (ProtobufC_RPC_Client *client,
} header;
size_t packed_size;
uint8_t *packed_data;
Closure *closure;
Closure *cl;
const ProtobufCServiceDescriptor *desc = client->base_service.descriptor;
const ProtobufCMethodDescriptor *method = descriptor->methods + method_index;
const ProtobufCMethodDescriptor *method = desc->methods + method_index;
protobuf_c_assert (method_index < desc->n_methods);
......@@ -311,8 +439,8 @@ enqueue_request (ProtobufC_RPC_Client *client,
if (client->info.connected.first_free_request_id == 0)
grow_closure_array (client);
request_id = client->info.connected.first_free_request_id;
closure = client->info.connected.closures + (request_id - 1);
client->info.connected.first_free_request_id = POINTER_TO_UINT (closure->closure_data);
cl = client->info.connected.closures + (request_id - 1);
client->info.connected.first_free_request_id = POINTER_TO_UINT (cl->closure_data);
/* Pack message */
packed_size = protobuf_c_message_get_packed_size (input);
......@@ -335,9 +463,77 @@ enqueue_request (ProtobufC_RPC_Client *client,
client->allocator->free (client->allocator, packed_data);
/* Add closure to request-tree */
client->info.connected.closures[request_id-1].response_type = client->descriptor->methods[method_index].output;
client->info.connected.closures[request_id-1].closure = closure;
client->info.connected.closures[request_id-1].closure_data = closure_data;
cl->response_type = method->output;
cl->closure = closure;
cl->closure_data = closure_data;
}
static void
handle_client_fd_events (int fd,
unsigned events,
void *func_data)
{
ProtobufC_RPC_Client *client = func_data;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED);
if (events & PROTOBUF_C_EVENT_WRITABLE)
{
int write_rv = protobuf_c_data_buffer_writev (&client->outgoing,
client->fd);
if (write_rv < 0 && !errno_is_ignorable (errno))
{
client_failed (client,
"writing to file-descriptor: %s",
strerror (errno));
return;
}
if (client->outgoing.size == 0)
protobuf_c_dispatch_watch_fd (client->dispatch, client->fd,
PROTOBUF_C_EVENT_READABLE,
handle_client_fd_events, client);
}
if (events & PROTOBUF_C_EVENT_READABLE)
{
/* do read */
int read_rv = protobuf_c_data_buffer_read_in_fd (&client->incoming,
client->fd);
if (read_rv < 0)
{
if (!errno_is_ignorable (errno))
{
client_failed (client,
"reading from file-descriptor: %s",
strerror (errno));
}
}
else if (read_rv == 0)
{
/* handle eof */
...
}
else
{
/* try processing buffer */
while (client->incoming.size >= 12)
{
...
}
}
}
}
static void
update_connected_client_watch (ProtobufC_RPC_Client *client)
{
unsigned events = PROTOBUF_C_EVENT_READABLE;
protobuf_c_assert (client->state == PROTOBUF_C_CLIENT_STATE_CONNECTED);
protobuf_c_assert (client->fd >= 0);
if (client->outgoing.size > 0)
events |= PROTOBUF_C_EVENT_WRITABLE;
protobuf_c_dispatch_watch_fd (client->dispatch,
client->fd,
events,
handle_client_fd_events, client);
}
static void
......@@ -359,7 +555,7 @@ invoke_client_rpc (ProtobufCService *service,
case PROTOBUF_C_CLIENT_STATE_CONNECTED:
{
int had_outgoing = (client->first_outgoing_request != NULL);
int had_outgoing = (client->outgoing.size > 0);
enqueue_request (client, method_index, input, closure, closure_data);
if (!had_outgoing)
update_connected_client_watch (client);
......@@ -373,25 +569,32 @@ invoke_client_rpc (ProtobufCService *service,
}
}
static void
destroy_client_rpc (ProtobufCService *service)
{
ProtobufC_RPC_Client *client = (ProtobufC_RPC_Client *) service;
...
}
ProtobufCService *protobuf_c_rpc_client_new (ProtobufC_RPC_AddressType type,
const char *name,
const ProtobufCServiceDescriptor *descriptor,
ProtobufCDispatch *dispatch);
ProtobufCDispatch *orig_dispatch)
{
ProtobufCDispatch *dispatch = options->dispatch ? options->dispatch : protobuf_c_dispatch_default ();
ProtobufCDispatch *dispatch = orig_dispatch ? orig_dispatch : protobuf_c_dispatch_default ();
ProtobufCAllocator *allocator = protobuf_c_dispatch_peek_allocator (dispatch);
ProtobufC_RPC_Client *rv = allocator->alloc (allocator, sizeof (ProtobufC_RPC_Client));
rv->base.descriptor = descriptor;
rv->base.invoke = invoke_client_rpc;
rv->base.destroy = destroy_client_rpc;
protobuf_c_data_buffer_init (&rv->incoming);
protobuf_c_data_buffer_init (&rv->outgoing);
rv->base_service.descriptor = descriptor;
rv->base_service.invoke = invoke_client_rpc;
rv->base_service.destroy = destroy_client_rpc;
protobuf_c_data_buffer_init (&rv->incoming, allocator);
protobuf_c_data_buffer_init (&rv->outgoing, allocator);
rv->allocator = allocator;
rv->dispatch = dispatch;
rv->address_type = type;
rv->name = strcpy (allocator->alloc (allocator, strlen (name) + 1), name);
rv->client_state = PROTOBUF_C_CLIENT_STATE_INIT;
rv->state = PROTOBUF_C_CLIENT_STATE_INIT;
rv->fd = -1;
rv->info.init = protobuf_c_dispatch_add_idle (dispatch, handle_init_idle, rv);
return &rv->base;
rv->info.init.idle = protobuf_c_dispatch_add_idle (dispatch, handle_init_idle, rv);
return &rv->base_service;
}
......@@ -313,6 +313,9 @@ struct _ProtobufCBufferSimple
/* ====== private ====== */
#include "protobuf-c-private.h"
/* TODO: crib from glib */
#define PROTOBUF_C_GNUC_PRINTF(format_argno, ellipsis_argno)
PROTOBUF_C_END_DECLS
#endif /* __PROTOBUF_C_RUNTIME_H_ */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment