Commit 34db5f20 authored by Xenofon Foukas's avatar Xenofon Foukas

Moved agent async interface to lock-free implementation using lock-free ringbuffer

parent 8b283444
...@@ -832,6 +832,7 @@ if (ENB_AGENT_SB_IF) ...@@ -832,6 +832,7 @@ if (ENB_AGENT_SB_IF)
${OPENAIR2_DIR}/UTIL/ASYNC_IF/socket_link.c ${OPENAIR2_DIR}/UTIL/ASYNC_IF/socket_link.c
${OPENAIR2_DIR}/UTIL/ASYNC_IF/link_manager.c ${OPENAIR2_DIR}/UTIL/ASYNC_IF/link_manager.c
${OPENAIR2_DIR}/UTIL/ASYNC_IF/message_queue.c ${OPENAIR2_DIR}/UTIL/ASYNC_IF/message_queue.c
${OPENAIR2_DIR}/UTIL/ASYNC_IF/ringbuffer_queue.c
) )
set(ASYNC_IF_LIB ASYNC_IF) set(ASYNC_IF_LIB ASYNC_IF)
include_directories(${OPENAIR2_DIR}/UTIL/ASYNC_IF) include_directories(${OPENAIR2_DIR}/UTIL/ASYNC_IF)
......
...@@ -135,32 +135,34 @@ void *receive_thread(void *args) { ...@@ -135,32 +135,34 @@ void *receive_thread(void *args) {
Protocol__ProgranMessage *msg; Protocol__ProgranMessage *msg;
while (1) { while (1) {
if (enb_agent_msg_recv(d->enb_id, ENB_AGENT_DEFAULT, &data, &size, &priority)) { //if (enb_agent_msg_recv(d->enb_id, ENB_AGENT_DEFAULT, &data, &size, &priority)) {
err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING; // err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING;
goto error; // goto error;
} //}
LOG_D(ENB_AGENT,"received message with size %d\n", size); while (enb_agent_msg_recv(d->enb_id, ENB_AGENT_DEFAULT, &data, &size, &priority) == 0) {
LOG_D(ENB_AGENT,"received message with size %d\n", size);
msg=enb_agent_handle_message(d->enb_id, data, size); msg=enb_agent_handle_message(d->enb_id, data, size);
free(data); free(data);
// check if there is something to send back to the controller // check if there is something to send back to the controller
if (msg != NULL){ if (msg != NULL){
data=enb_agent_pack_message(msg,&size); data=enb_agent_pack_message(msg,&size);
if (enb_agent_msg_send(d->enb_id, ENB_AGENT_DEFAULT, data, size, priority)) { if (enb_agent_msg_send(d->enb_id, ENB_AGENT_DEFAULT, data, size, priority)) {
err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING;
goto error; goto error;
} }
LOG_D(ENB_AGENT,"sent message with size %d\n", size); LOG_D(ENB_AGENT,"sent message with size %d\n", size);
}
} }
} }
return NULL; return NULL;
error: error:
......
...@@ -60,10 +60,10 @@ enb_agent_async_channel_t * enb_agent_async_channel_info(mid_t mod_id, char *dst ...@@ -60,10 +60,10 @@ enb_agent_async_channel_t * enb_agent_async_channel_info(mid_t mod_id, char *dst
/* /*
* create a message queue * create a message queue
*/ */
// Set size of queues statically for now
channel->send_queue = new_message_queue(); channel->send_queue = new_message_queue(500);
if (channel->send_queue == NULL) goto error; if (channel->send_queue == NULL) goto error;
channel->receive_queue = new_message_queue(); channel->receive_queue = new_message_queue(500);
if (channel->receive_queue == NULL) goto error; if (channel->receive_queue == NULL) goto error;
/* /*
......
...@@ -53,11 +53,15 @@ static void *link_manager_sender_thread(void *_manager) ...@@ -53,11 +53,15 @@ static void *link_manager_sender_thread(void *_manager)
LOG_D(MAC, "starting link manager sender thread\n"); LOG_D(MAC, "starting link manager sender thread\n");
while (manager->run) { while (manager->run) {
if (message_get(manager->send_queue, &data, &size, &priority)) while (message_get(manager->send_queue, &data, &size, &priority) == 0) {
goto error; link_send_packet(manager->socket_link, data, size);
if (link_send_packet(manager->socket_link, data, size)) free(data);
goto error; }
free(data); // if (message_get(manager->send_queue, &data, &size, &priority))
// goto error;
//if (link_send_packet(manager->socket_link, data, size))
// goto error;
//free(data);
} }
LOG_D(MAC, "link manager sender thread quits\n"); LOG_D(MAC, "link manager sender thread quits\n");
......
...@@ -38,7 +38,8 @@ ...@@ -38,7 +38,8 @@
#ifndef LINK_MANAGER_H #ifndef LINK_MANAGER_H
#define LINK_MANAGER_H #define LINK_MANAGER_H
#include "message_queue.h" //#include "message_queue.h"
#include "ringbuffer_queue.h"
#include "socket_link.h" #include "socket_link.h"
#include <pthread.h> #include <pthread.h>
......
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file ringbuffer_queue.c
* \brief Lock-free ringbuffer used for async message passing of agent
* \author Xenofon Foukas
* \date March 2016
* \version 1.0
* \email: x.foukas@sms.ed.ac.uk
* @ingroup _mac
*/
#include "ringbuffer_queue.h"
#include "log.h"
message_queue_t * new_message_queue(int size) {
message_queue_t *ret = NULL;
ret = calloc(1, sizeof(message_queue_t));
if (ret == NULL)
goto error;
lfds700_misc_library_init_valid_on_current_logical_core();
lfds700_misc_prng_init(&(ret->ps));
ret->ringbuffer_array = malloc(sizeof(struct lfds700_ringbuffer_element) * size);
lfds700_ringbuffer_init_valid_on_current_logical_core(&(ret->ringbuffer_state),
ret->ringbuffer_array,
size,
&(ret->ps),
NULL);
return ret;
error:
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
if (ret != NULL) {
free(ret->ringbuffer_array);
memset(ret, 0, sizeof(message_queue_t));
free(ret);
}
return NULL;
}
int message_put(message_queue_t *queue, void *data, int size, int priority) {
struct lfds700_misc_prng_state ls;
enum lfds700_misc_flag overwrite_occurred_flag;
message_t *overwritten_msg;
message_t *m = NULL;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls);
m = calloc(1, sizeof(message_t));
if (m == NULL)
goto error;
m->data = data;
m->size = size;
m->priority = priority;
lfds700_ringbuffer_write(&(queue->ringbuffer_state),
NULL,
(void *) m,
&overwrite_occurred_flag,
NULL,
(void **) &overwritten_msg,
&ls);
if (overwrite_occurred_flag == LFDS700_MISC_FLAG_RAISED) {
free(overwritten_msg->data);
free(overwritten_msg);
}
return 0;
error:
free(m);
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1;
}
int message_get(message_queue_t *queue, void **data, int *size, int *priority) {
message_t *m;
struct lfds700_misc_prng_state ls;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls);
if (lfds700_ringbuffer_read(&(queue->ringbuffer_state), NULL, (void **) &m, &ls) == 0) {
return -1;
}
*data = m->data;
*size = m->size;
*priority = m->priority;
free(m);
return 0;
}
message_queue_t destroy_message_queue(message_queue_t *queue) {
struct lfds700_misc_prng_state ls;
message_t *m;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls);
while (lfds700_ringbuffer_read(&(queue->ringbuffer_state), NULL, (void **) &m, &ls) != 0) {
free(m->data);
memset(m, 0, sizeof(message_t));
free(m);
}
free(queue->ringbuffer_array);
memset(queue, 0, sizeof(message_queue_t));
free(queue);
}
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file ringbuffer_queue.h
* \brief Lock-free ringbuffer used for async message passing of agent
* \author Xenofon Foukas
* \date March 2016
* \version 1.0
* \email: x.foukas@sms.ed.ac.uk
* @ingroup _mac
*/
#ifndef RINGBUFFER_QUEUE_H
#define RINGBUFFER_QUEUE_H
#include "liblfds700.h"
typedef struct message_s {
void *data;
int size;
int priority;
} message_t;
typedef struct {
struct lfds700_misc_prng_state ps;
struct lfds700_ringbuffer_element *ringbuffer_array;
struct lfds700_ringbuffer_state ringbuffer_state;
} message_queue_t;
message_queue_t * new_message_queue(int size);
int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *size, int *priority);
message_queue_t destroy_message_queue(message_queue_t *queue);
#endif /* RINGBUFFER_QUEUE_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment