Commit 0643d6b5 authored by Robert Schmidt's avatar Robert Schmidt

Make queue_recv() functions return size, not as out-param

parent 63c4a90e
...@@ -115,7 +115,7 @@ void *receive_thread(void *args) { ...@@ -115,7 +115,7 @@ void *receive_thread(void *args) {
while (1) { while (1) {
while (flexran_agent_msg_recv(d->mod_id, FLEXRAN_AGENT_DEFAULT, &data, &size, &priority) == 0) { while ((size = flexran_agent_msg_recv(d->mod_id, FLEXRAN_AGENT_DEFAULT, &data, &priority)) > 0) {
LOG_D(FLEXRAN_AGENT,"received message with size %d\n", size); LOG_D(FLEXRAN_AGENT,"received message with size %d\n", size);
......
...@@ -94,11 +94,11 @@ int flexran_agent_async_msg_send(void *data, int size, int priority, void *chann ...@@ -94,11 +94,11 @@ int flexran_agent_async_msg_send(void *data, int size, int priority, void *chann
return message_put(channel->send_queue, data, size, priority); return message_put(channel->send_queue, data, size, priority);
} }
int flexran_agent_async_msg_recv(void **data, int *size, int *priority, void *channel_info) { int flexran_agent_async_msg_recv(void **data, int *priority, void *channel_info) {
flexran_agent_async_channel_t *channel; flexran_agent_async_channel_t *channel;
channel = (flexran_agent_async_channel_t *)channel_info; channel = (flexran_agent_async_channel_t *)channel_info;
return message_get(channel->receive_queue, data, size, priority); return message_get(channel->receive_queue, data, priority);
} }
void flexran_agent_async_release(flexran_agent_channel_t *channel) { void flexran_agent_async_release(flexran_agent_channel_t *channel) {
......
...@@ -46,7 +46,7 @@ flexran_agent_async_channel_t * flexran_agent_async_channel_info(mid_t mod_id, c ...@@ -46,7 +46,7 @@ flexran_agent_async_channel_t * flexran_agent_async_channel_info(mid_t mod_id, c
int flexran_agent_async_msg_send(void *data, int size, int priority, void *channel_info); int flexran_agent_async_msg_send(void *data, int size, int priority, void *channel_info);
/* Receive a message from a given channel */ /* Receive a message from a given channel */
int flexran_agent_async_msg_recv(void **data, int *size, int *priority, void *channel_info); int flexran_agent_async_msg_recv(void **data, int *priority, void *channel_info);
/* Release a channel */ /* Release a channel */
void flexran_agent_async_release(flexran_agent_channel_t *channel); void flexran_agent_async_release(flexran_agent_channel_t *channel);
......
...@@ -53,7 +53,7 @@ int flexran_agent_msg_send(mid_t mod_id, agent_id_t agent_id, void *data, int si ...@@ -53,7 +53,7 @@ int flexran_agent_msg_send(mid_t mod_id, agent_id_t agent_id, void *data, int si
return -1; return -1;
} }
int flexran_agent_msg_recv(mid_t mod_id, agent_id_t agent_id, void **data, int *size, int *priority) { int flexran_agent_msg_recv(mid_t mod_id, agent_id_t agent_id, void **data, int *priority) {
/*Check if agent id is valid*/ /*Check if agent id is valid*/
if (agent_id >= FLEXRAN_AGENT_MAX || agent_id < 0) { if (agent_id >= FLEXRAN_AGENT_MAX || agent_id < 0) {
goto error; goto error;
...@@ -66,7 +66,7 @@ int flexran_agent_msg_recv(mid_t mod_id, agent_id_t agent_id, void **data, int * ...@@ -66,7 +66,7 @@ int flexran_agent_msg_recv(mid_t mod_id, agent_id_t agent_id, void **data, int *
goto error; goto error;
} }
return channel->msg_recv(data, size, priority, channel->channel_info); return channel->msg_recv(data, priority, channel->channel_info);
error: error:
LOG_E(FLEXRAN_AGENT, "No channel registered for agent with id %d\n", agent_id); LOG_E(FLEXRAN_AGENT, "No channel registered for agent with id %d\n", agent_id);
...@@ -104,7 +104,7 @@ void flexran_agent_unregister_channel(mid_t mod_id, agent_id_t agent_id) { ...@@ -104,7 +104,7 @@ void flexran_agent_unregister_channel(mid_t mod_id, agent_id_t agent_id) {
int flexran_agent_create_channel(void *channel_info, int flexran_agent_create_channel(void *channel_info,
int (*msg_send)(void *data, int size, int priority, void *channel_info), int (*msg_send)(void *data, int size, int priority, void *channel_info),
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info), int (*msg_recv)(void **data, int *priority, void *channel_info),
void (*release)(flexran_agent_channel_t *channel)) { void (*release)(flexran_agent_channel_t *channel)) {
int channel_id = ++flexran_agent_channel_id; int channel_id = ++flexran_agent_channel_id;
......
...@@ -39,7 +39,7 @@ int channel_id; ...@@ -39,7 +39,7 @@ int channel_id;
void *channel_info; void *channel_info;
/*Callbacks for channel message Tx and Rx*/ /*Callbacks for channel message Tx and Rx*/
int (*msg_send)(void *data, int size, int priority, void *channel_info); int (*msg_send)(void *data, int size, int priority, void *channel_info);
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info); int (*msg_recv)(void **data, int *priority, void *channel_info);
void (*release)(struct flexran_agent_channel_s *channel); void (*release)(struct flexran_agent_channel_s *channel);
} flexran_agent_channel_t; } flexran_agent_channel_t;
...@@ -49,7 +49,7 @@ typedef struct flexran_agent_channel_instance_s{ ...@@ -49,7 +49,7 @@ typedef struct flexran_agent_channel_instance_s{
/*Send and receive messages using the channel registered for a specific agent*/ /*Send and receive messages using the channel registered for a specific agent*/
int flexran_agent_msg_send(mid_t mod_id, agent_id_t agent_id, void *data, int size, int priority); int flexran_agent_msg_send(mid_t mod_id, agent_id_t agent_id, void *data, int size, int priority);
int flexran_agent_msg_recv(mid_t mod_id, agent_id_t agent_id, void **data, int *size, int *priority); int flexran_agent_msg_recv(mid_t mod_id, agent_id_t agent_id, void **data, int *priority);
/*Register a channel to an agent. Use FLEXRAN_AGENT_MAX to register the /*Register a channel to an agent. Use FLEXRAN_AGENT_MAX to register the
*same channel to all agents*/ *same channel to all agents*/
...@@ -61,7 +61,7 @@ void flexran_agent_unregister_channel(mid_t mod_id, agent_id_t agent_id); ...@@ -61,7 +61,7 @@ void flexran_agent_unregister_channel(mid_t mod_id, agent_id_t agent_id);
/*Create a new channel. Returns the id of the new channel or negative number otherwise*/ /*Create a new channel. Returns the id of the new channel or negative number otherwise*/
int flexran_agent_create_channel(void *channel_info, int flexran_agent_create_channel(void *channel_info,
int (*msg_send)(void *data, int size, int priority, void *channel_info), int (*msg_send)(void *data, int size, int priority, void *channel_info),
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info), int (*msg_recv)(void **data, int *priority, void *channel_info),
void (*release)(flexran_agent_channel_t *channel)); void (*release)(flexran_agent_channel_t *channel));
/*Unregister a channel from all agents and destroy it. Returns 0 in case of success*/ /*Unregister a channel from all agents and destroy it. Returns 0 in case of success*/
......
...@@ -266,7 +266,7 @@ proto_agent_receive(void *args) ...@@ -266,7 +266,7 @@ proto_agent_receive(void *args)
msg = NULL; msg = NULL;
ser_msg = NULL; ser_msg = NULL;
if (proto_agent_async_msg_recv(&data, &size, &priority, inst->channel->channel_info)){ if ((size = proto_agent_async_msg_recv(&data, &priority, inst->channel->channel_info)) <= 0){
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING; err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error; goto error;
} }
......
...@@ -78,10 +78,10 @@ int proto_agent_async_msg_send(void *data, int size, int priority, void *channel ...@@ -78,10 +78,10 @@ int proto_agent_async_msg_send(void *data, int size, int priority, void *channel
return message_put(channel->send_queue, data, size, priority); return message_put(channel->send_queue, data, size, priority);
} }
int proto_agent_async_msg_recv(void **data, int *size, int *priority, void *channel_info) int proto_agent_async_msg_recv(void **data, int *priority, void *channel_info)
{ {
proto_agent_async_channel_t *channel = channel_info; proto_agent_async_channel_t *channel = channel_info;
return message_get(channel->receive_queue, data, size, priority); return message_get(channel->receive_queue, data, priority);
} }
void proto_agent_async_release(proto_agent_channel_t *channel) void proto_agent_async_release(proto_agent_channel_t *channel)
......
...@@ -53,7 +53,7 @@ proto_agent_async_channel_info(mod_id_t mod_id, const char *bind_ip, uint16_t bi ...@@ -53,7 +53,7 @@ proto_agent_async_channel_info(mod_id_t mod_id, const char *bind_ip, uint16_t bi
int proto_agent_async_msg_send(void *data, int size, int priority, void *channel_info); int proto_agent_async_msg_send(void *data, int size, int priority, void *channel_info);
int proto_agent_async_msg_recv(void **data, int *size, int *priority, void *channel_info); int proto_agent_async_msg_recv(void **data, int *priority, void *channel_info);
void proto_agent_async_release(proto_agent_channel_t *channel); void proto_agent_async_release(proto_agent_channel_t *channel);
......
...@@ -72,7 +72,7 @@ void proto_agent_unregister_channel(mod_id_t mod_id, proto_agent_id_t agent_id) ...@@ -72,7 +72,7 @@ void proto_agent_unregister_channel(mod_id_t mod_id, proto_agent_id_t agent_id)
int proto_agent_create_channel(void *channel_info, int proto_agent_create_channel(void *channel_info,
int (*msg_send)(void *data, int size, int priority, void *channel_info), int (*msg_send)(void *data, int size, int priority, void *channel_info),
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info), int (*msg_recv)(void **data, int *priority, void *channel_info),
void (*release)(proto_agent_channel_t *channel)) { void (*release)(proto_agent_channel_t *channel)) {
int channel_id = ++proto_agent_channel_id; int channel_id = ++proto_agent_channel_id;
......
...@@ -51,7 +51,7 @@ typedef struct proto_agent_channel_s { ...@@ -51,7 +51,7 @@ typedef struct proto_agent_channel_s {
struct proto_agent_async_channel_s *channel_info; struct proto_agent_async_channel_s *channel_info;
/*Callbacks for channel message Tx and Rx*/ /*Callbacks for channel message Tx and Rx*/
int (*msg_send)(void *data, int size, int priority, void *channel_info); int (*msg_send)(void *data, int size, int priority, void *channel_info);
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info); int (*msg_recv)(void **data, int *priority, void *channel_info);
void (*release)(struct proto_agent_channel_s *channel); void (*release)(struct proto_agent_channel_s *channel);
} proto_agent_channel_t; } proto_agent_channel_t;
...@@ -70,7 +70,7 @@ void proto_agent_unregister_channel(mod_id_t mod_id, proto_agent_id_t agent_id); ...@@ -70,7 +70,7 @@ void proto_agent_unregister_channel(mod_id_t mod_id, proto_agent_id_t agent_id);
/*Create a new channel. Returns the id of the new channel or negative number otherwise*/ /*Create a new channel. Returns the id of the new channel or negative number otherwise*/
int proto_agent_create_channel(void *channel_info, int proto_agent_create_channel(void *channel_info,
int (*msg_send)(void *data, int size, int priority, void *channel_info), int (*msg_send)(void *data, int size, int priority, void *channel_info),
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info), int (*msg_recv)(void **data, int *priority, void *channel_info),
void (*release)(proto_agent_channel_t *channel)); void (*release)(proto_agent_channel_t *channel));
/*Unregister a channel from all agents and destroy it. Returns 0 in case of success*/ /*Unregister a channel from all agents and destroy it. Returns 0 in case of success*/
......
...@@ -46,7 +46,7 @@ static void *link_manager_sender_thread(void *_manager) ...@@ -46,7 +46,7 @@ static void *link_manager_sender_thread(void *_manager)
LOG_D(MAC, "starting link manager sender thread\n"); LOG_D(MAC, "starting link manager sender thread\n");
while (manager->run) { while (manager->run) {
while (message_get(manager->send_queue, &data, &size, &priority) == 0) { while ((size = message_get(manager->send_queue, &data, &priority)) > 0) {
link_send_packet(manager->socket_link, data, size, manager->peer_addr, manager->peer_port); link_send_packet(manager->socket_link, data, size, manager->peer_addr, manager->peer_port);
free(data); free(data);
} }
...@@ -184,7 +184,7 @@ int main(void) ...@@ -184,7 +184,7 @@ int main(void)
data = strdup("hello"); if (data == NULL) goto error; data = strdup("hello"); if (data == NULL) goto error;
if (message_put(send_queue, data, 6, 100)) goto error; if (message_put(send_queue, data, 6, 100)) goto error;
if (message_get(receive_queue, &data, &size, &priority)) goto error; if ((size = message_get(receive_queue, &data, &priority)) <= 0) goto error;
printf("received message:\n"); printf("received message:\n");
printf(" data: %s\n", (char *)data); printf(" data: %s\n", (char *)data);
printf(" size: %d\n", size); printf(" size: %d\n", size);
...@@ -228,7 +228,7 @@ int main(void) ...@@ -228,7 +228,7 @@ int main(void)
manager = create_link_manager(send_queue, receive_queue, link); manager = create_link_manager(send_queue, receive_queue, link);
if (manager == NULL) goto error; if (manager == NULL) goto error;
if (message_get(receive_queue, &data, &size, &priority)) goto error; if ((size = message_get(receive_queue, &data, &priority)) <= 0) goto error;
printf("received message:\n"); printf("received message:\n");
printf(" data: %s\n", (char *)data); printf(" data: %s\n", (char *)data);
printf(" size: %d\n", size); printf(" size: %d\n", size);
......
...@@ -72,6 +72,8 @@ error: ...@@ -72,6 +72,8 @@ error:
int message_put(message_queue_t *queue, void *data, int size, int priority) int message_put(message_queue_t *queue, void *data, int size, int priority)
{ {
message_t *m = NULL; message_t *m = NULL;
if (size <= 0)
goto error;
m = calloc(1, sizeof(message_t)); m = calloc(1, sizeof(message_t));
if (m == NULL) if (m == NULL)
...@@ -106,12 +108,12 @@ int message_put(message_queue_t *queue, void *data, int size, int priority) ...@@ -106,12 +108,12 @@ int message_put(message_queue_t *queue, void *data, int size, int priority)
return 0; return 0;
error: error:
free(m); if (m) free(m);
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__); LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1; return -1;
} }
int message_get(message_queue_t *queue, void **data, int *size, int *priority) int message_get(message_queue_t *queue, void **data, int *priority)
{ {
message_t *m; message_t *m;
...@@ -136,12 +138,11 @@ int message_get(message_queue_t *queue, void **data, int *size, int *priority) ...@@ -136,12 +138,11 @@ int message_get(message_queue_t *queue, void **data, int *size, int *priority)
goto error; goto error;
*data = m->data; *data = m->data;
*size = m->size; const int size = m->size;
*priority = m->priority; *priority = m->priority;
free(m); free(m);
return 0; return size;
error: error:
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__); LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1; return -1;
...@@ -181,10 +182,10 @@ int main(void) ...@@ -181,10 +182,10 @@ int main(void)
if (message_put(q, "hello", 6, 0)) goto error; if (message_put(q, "hello", 6, 0)) goto error;
if (message_put(q, "world", 6, 1)) goto error; if (message_put(q, "world", 6, 1)) goto error;
if (message_get(q, &data, &size, &priority)) goto error; if ((size = message_get(q, &data, &priority)) <= 0) goto error;
printf("message:\n data: '%s'\n size: %d\n priority: %d\n", printf("message:\n data: '%s'\n size: %d\n priority: %d\n",
(char *)data, size, priority); (char *)data, size, priority);
if (message_get(q, &data, &size, &priority)) goto error; if ((size = message_get(q, &data, &priority)) <= 0) goto error;
printf("message:\n data: '%s'\n size: %d\n priority: %d\n", printf("message:\n data: '%s'\n size: %d\n priority: %d\n",
(char *)data, size, priority); (char *)data, size, priority);
......
...@@ -55,7 +55,7 @@ typedef struct { ...@@ -55,7 +55,7 @@ typedef struct {
message_queue_t *new_message_queue(void); message_queue_t *new_message_queue(void);
int message_put(message_queue_t *queue, void *data, int size, int priority); int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *size, int *priority); int message_get(message_queue_t *queue, void **data, int *priority);
void destroy_message_queue(message_queue_t *queue); void destroy_message_queue(message_queue_t *queue);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -67,6 +67,9 @@ int message_put(message_queue_t *queue, void *data, int size, int priority) { ...@@ -67,6 +67,9 @@ int message_put(message_queue_t *queue, void *data, int size, int priority) {
message_t *overwritten_msg; message_t *overwritten_msg;
message_t *m = NULL; message_t *m = NULL;
if (size <= 0)
goto error;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE; LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls); lfds700_misc_prng_init(&ls);
...@@ -94,12 +97,11 @@ int message_put(message_queue_t *queue, void *data, int size, int priority) { ...@@ -94,12 +97,11 @@ int message_put(message_queue_t *queue, void *data, int size, int priority) {
return 0; return 0;
error: error:
free(m);
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__); LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1; return -1;
} }
int message_get(message_queue_t *queue, void **data, int *size, int *priority) { int message_get(message_queue_t *queue, void **data, int *priority) {
message_t *m; message_t *m;
struct lfds700_misc_prng_state ls; struct lfds700_misc_prng_state ls;
...@@ -111,10 +113,10 @@ int message_get(message_queue_t *queue, void **data, int *size, int *priority) { ...@@ -111,10 +113,10 @@ int message_get(message_queue_t *queue, void **data, int *size, int *priority) {
} }
*data = m->data; *data = m->data;
*size = m->size; const int size = m->size;
*priority = m->priority; *priority = m->priority;
free(m); free(m);
return 0; return size;
} }
void destroy_message_queue(message_queue_t *queue) { void destroy_message_queue(message_queue_t *queue) {
......
...@@ -47,7 +47,7 @@ typedef struct { ...@@ -47,7 +47,7 @@ typedef struct {
message_queue_t * new_message_queue(int size); message_queue_t * new_message_queue(int size);
int message_put(message_queue_t *queue, void *data, int size, int priority); int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *size, int *priority); int message_get(message_queue_t *queue, void **data, int *priority);
void destroy_message_queue(message_queue_t *queue); void destroy_message_queue(message_queue_t *queue);
#endif /* RINGBUFFER_QUEUE_H */ #endif /* RINGBUFFER_QUEUE_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment