Commit aa65305f authored by winckel's avatar winckel

Integrated memory pools into ITTI for OAISIM and lte-softmodem targets.

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4761 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent c4e45257
......@@ -37,20 +37,21 @@
#include <errno.h>
#include <signal.h>
#include "assertions.h"
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "liblfds611.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
#ifdef RTAI
# include <rtai_shm.h>
#endif
#include "liblfds611.h"
#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
#if defined(OAI_EMU) || defined(RTAI)
# include "memory_pools.h"
# include "vcd_signal_dumper.h"
#endif
......@@ -175,6 +176,8 @@ typedef struct itti_desc_s {
#endif
#if defined(OAI_EMU) || defined(RTAI)
memory_pools_handle_t memory_pools_handle;
uint64_t vcd_poll_msg;
uint64_t vcd_receive_msg;
uint64_t vcd_send_msg;
......@@ -183,18 +186,26 @@ typedef struct itti_desc_s {
static itti_desc_t itti_desc;
void *itti_malloc(task_id_t task_id, ssize_t size)
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
{
void *ptr = NULL;
#ifdef RTAI
// ptr = rt_malloc(size);
ptr = malloc(size);
#if defined(OAI_EMU) || defined(RTAI)
ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id);
#else
ptr = malloc(size);
ptr = malloc (size);
#endif
DevCheck(ptr != NULL, size, task_id, 0);
DevCheck(ptr != NULL, size, origin_task_id, destination_task_id);
#if defined(OAI_EMU) || defined(RTAI)
if (ptr == NULL)
{
char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);
ITTI_ERROR ("\n%s", statistics);
free (statistics);
}
#endif
return ptr;
}
......@@ -202,10 +213,11 @@ void *itti_malloc(task_id_t task_id, ssize_t size)
void itti_free(task_id_t task_id, void *ptr)
{
DevAssert(ptr != NULL);
#ifdef RTAI
free(ptr);
#if defined(OAI_EMU) || defined(RTAI)
memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id);
#else
free(ptr);
free (ptr);
#endif
}
......@@ -285,7 +297,7 @@ int itti_send_broadcast_message(MessageDef *message_p) {
if (thread_id != origin_thread_id) {
/* Skip tasks which are not running */
if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
new_message_p = itti_malloc (origin_task_id, sizeof(MessageDef));
new_message_p = itti_malloc (origin_task_id, destination_task_id, sizeof(MessageDef));
DevAssert(message_p != NULL);
memcpy (new_message_p, message_p, sizeof(MessageDef));
......@@ -294,7 +306,7 @@ int itti_send_broadcast_message(MessageDef *message_p) {
}
}
}
free (message_p);
itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
return ret;
}
......@@ -315,7 +327,7 @@ inline MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, Messag
origin_task_id = itti_get_current_task_id();
}
temp = itti_malloc (origin_task_id, sizeof(MessageHeader) + size);
temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
DevAssert(temp != NULL);
temp->ittiMsgHeader.messageId = message_id;
......@@ -392,7 +404,7 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me
itti_desc.threads[destination_thread_id].task_state, message_id);
/* Allocate new list element */
new = (message_list_t *) itti_malloc (origin_task_id, sizeof(struct message_list_s));
new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));
DevAssert(new != NULL);
/* Fill in members */
......@@ -610,7 +622,7 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
}
DevAssert(message != NULL);
*received_msg = message->msg;
free (message);
itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
itti_desc.threads[task_id].sem_counter--;
} else
......@@ -640,7 +652,7 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1)
{
*received_msg = message->msg;
free (message);
itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
}
}
......@@ -907,6 +919,21 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
rt_global_heap_open();
#endif
#if defined(OAI_EMU) || defined(RTAI)
itti_desc.memory_pools_handle = memory_pools_create (4);
memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS, 50);
memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100);
memory_pools_add_pool (itti_desc.memory_pools_handle, 1000, 1000);
memory_pools_add_pool (itti_desc.memory_pools_handle, 1000, 10000);
{
char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);
printf ("%s", statistics);
free (statistics);
}
#endif
#if defined(OAI_EMU) || defined(RTAI)
itti_desc.vcd_poll_msg = 0;
itti_desc.vcd_receive_msg = 0;
......@@ -968,6 +995,15 @@ void itti_wait_tasks_end(void) {
itti_desc.running = 0;
#if defined(OAI_EMU) || defined(RTAI)
{
char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);
printf ("%s", statistics);
free (statistics);
}
#endif
if (ready_tasks > 0) {
ITTI_DEBUG(" Some threads are still running, force exit\n");
exit (0);
......
......@@ -230,7 +230,7 @@ void itti_wait_tasks_end(void);
**/
void itti_send_terminate_message(task_id_t task_id);
void *itti_malloc(task_id_t task_id, ssize_t size);
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size);
void itti_free(task_id_t task_id, void *ptr);
......
......@@ -64,17 +64,17 @@
#define SIGNAL_NAME_LENGTH 48
static const int itti_dump_debug = 0;
static const int itti_dump_debug = 0; // 0x8 | 0x4 | 0x2;
#ifdef RTAI
# define ITTI_DUMP_DEBUG(x, args...) do { if (itti_dump_debug) rt_printk("[ITTI][D]"x, ##args); } \
# define ITTI_DUMP_DEBUG(m, x, args...) do { if (m & itti_dump_debug) rt_printk("[ITTI_DUMP][D]"x, ##args); } \
while(0)
# define ITTI_DUMP_ERROR(x, args...) do { rt_printk("[ITTI][E]"x, ##args); } \
# define ITTI_DUMP_ERROR(x, args...) do { rt_printk("[ITTI_DUMP][E]"x, ##args); } \
while(0)
#else
# define ITTI_DUMP_DEBUG(x, args...) do { if (itti_dump_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \
# define ITTI_DUMP_DEBUG(m, x, args...) do { if (m & itti_dump_debug) fprintf(stdout, "[ITTI_DUMP][D]"x, ##args); } \
while(0)
# define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \
# define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI_DUMP][E]"x, ##args); } \
while(0)
#endif
......@@ -153,14 +153,10 @@ static itti_desc_t itti_dump_queue;
static FILE *dump_file = NULL;
static int itti_dump_running = 1;
static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message);
static int itti_dump_handle_new_connection(int sd, const char *xml_definition,
uint32_t xml_definition_length);
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
const uint32_t message_definition_xml_length);
static volatile uint32_t pending_messages = 0;
static
int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
/*------------------------------------------------------------------------------*/
static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
{
itti_dump_message_t *new_message;
ssize_t bytes_sent = 0, total_sent = 0;
......@@ -202,11 +198,11 @@ int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
return total_sent;
}
static void itti_dump_fwrite_message(itti_dump_queue_item_t *message)
static int itti_dump_fwrite_message(itti_dump_queue_item_t *message)
{
itti_socket_header_t header;
if (dump_file != NULL && message) {
if ((dump_file != NULL) && (message != NULL)) {
header.message_size = message->message_size + sizeof(itti_dump_message_t);
header.message_type = message->message_type;
......@@ -218,7 +214,9 @@ static void itti_dump_fwrite_message(itti_dump_queue_item_t *message)
// #if !defined(RTAI)
fflush (dump_file);
// #endif
return (1);
}
return (0);
}
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
......@@ -237,7 +235,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin
itti_dump_message = calloc(1, itti_dump_message_size);
ITTI_DUMP_DEBUG("[%d] Sending XML definition message of size %zu to observer peer\n",
ITTI_DUMP_DEBUG(0x2, "[%d] Sending XML definition message of size %zu to observer peer\n",
sd, itti_dump_message_size);
itti_dump_message->message_size = itti_dump_message_size;
......@@ -264,11 +262,33 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin
return 0;
}
static void itti_dump_user_data_delete_function(void *user_data, void *user_state)
{
if (user_data != NULL)
{
itti_dump_queue_item_t *item;
task_id_t task_id;
item = (itti_dump_queue_item_t *)user_data;
if (item->data != NULL)
{
task_id = ITTI_MSG_ORIGIN_ID(item->data);
itti_free(task_id, item->data);
}
else
{
task_id = TASK_UNKNOWN;
}
itti_free(task_id, item);
}
}
static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size,
uint32_t message_type)
{
struct lfds611_freelist_element *new_queue_element = NULL;
int overwrite_flag;
DevAssert(new != NULL);
#if defined(OAI_EMU) || defined(RTAI)
......@@ -278,26 +298,39 @@ static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t messa
new->message_type = message_type;
new->message_size = message_size;
new_queue_element = lfds611_ringbuffer_get_write_element(
itti_dump_queue.itti_message_queue, &new_queue_element, NULL);
ITTI_DUMP_DEBUG (0x1, " itti_dump_enqueue_message: lfds611_ringbuffer_get_write_element\n");
new_queue_element = lfds611_ringbuffer_get_write_element (itti_dump_queue.itti_message_queue, &new_queue_element, &overwrite_flag);
if (overwrite_flag != 0)
{
void *old = NULL;
lfds611_freelist_set_user_data_in_element(new_queue_element, (void *)new);
lfds611_freelist_get_user_data_from_element(new_queue_element, &old);
ITTI_DUMP_DEBUG (0x4, " overwrite_flag set, freeing old data %p %p\n", new_queue_element, old);
itti_dump_user_data_delete_function (old, NULL);
}
lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue,
new_queue_element);
lfds611_freelist_set_user_data_in_element(new_queue_element, (void *) new);
lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, new_queue_element);
if (overwrite_flag == 0)
{
#ifdef RTAI
__sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1);
__sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1);
#else
{
ssize_t write_ret;
eventfd_t sem_counter = 1;
{
ssize_t write_ret;
eventfd_t sem_counter = 1;
/* Call to write for an event fd must be of 8 bytes */
write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0);
}
/* Call to write for an event fd must be of 8 bytes */
write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0);
}
#endif
__sync_fetch_and_add (&pending_messages, 1);
}
ITTI_DUMP_DEBUG (0x2, " Added element to queue %p %p, pending %u, type %u\n", new_queue_element, new, pending_messages, message_type);
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
......@@ -306,111 +339,153 @@ static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t messa
return 0;
}
int itti_dump_queue_message(task_id_t sender_task,
message_number_t message_number,
MessageDef *message_p,
const char *message_name,
const uint32_t message_size)
static void itti_dump_socket_exit(void)
{
if (itti_dump_running)
{
itti_dump_queue_item_t *new;
size_t message_name_length;
DevAssert(message_name != NULL);
DevAssert(message_p != NULL);
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
new = itti_malloc(sender_task, sizeof(itti_dump_queue_item_t));
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
new->data = itti_malloc(sender_task, message_size);
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#ifndef RTAI
close(itti_dump_queue.event_fd);
#endif
close(itti_dump_queue.itti_listen_socket);
memcpy(new->data, message_p, message_size);
new->data_size = message_size;
new->message_number = message_number;
message_name_length = strlen(message_name) + 1;
DevCheck(message_name_length <= SIGNAL_NAME_LENGTH, message_name_length,
SIGNAL_NAME_LENGTH, 0);
memcpy(new->message_name, message_name, message_name_length);
itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE);
}
return 0;
/* Leave the thread as we detected end signal */
pthread_exit(NULL);
}
static void itti_dump_flush_ring_buffer(int flush_all)
static int itti_dump_flush_ring_buffer(int flush_all)
{
struct lfds611_freelist_element *element = NULL;
void *user_data;
int j;
void *user_data;
int j;
int consumer;
#ifdef RTAI
unsigned long number_of_messages;
#endif
number_of_messages = itti_dump_queue.messages_in_queue;
/* Check if there is a least one consumer */
consumer = 0;
if (dump_file != NULL)
{
consumer = 1;
}
else
{
for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
if (itti_dump_queue.itti_clients[j].sd > 0) {
consumer = 1;
break;
}
}
}
ITTI_DUMP_DEBUG("%lu elements in queue\n", number_of_messages);
if (consumer > 0)
{
#ifdef RTAI
number_of_messages = itti_dump_queue.messages_in_queue;
if (number_of_messages == 0) {
return;
}
ITTI_DUMP_DEBUG(0x4, "%lu elements in queue\n", number_of_messages);
if (number_of_messages == 0) {
return (consumer);
}
__sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages);
__sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages);
#endif
do {
/* Acquire the ring element */
lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element);
do {
/* Acquire the ring element */
lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element);
DevAssert(element != NULL);
__sync_fetch_and_sub (&pending_messages, 1);
/* Retrieve user part of the message */
lfds611_freelist_get_user_data_from_element(element, &user_data);
if (element == NULL)
{
if (flush_all != 0)
{
flush_all = 0;
}
else
{
ITTI_DUMP_DEBUG(0x8, " Dump event with no data\n");
DevMessage("Dump event with no data\n");
}
}
else
{
/* Retrieve user part of the message */
lfds611_freelist_get_user_data_from_element(element, &user_data);
if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL)
{
#ifndef RTAI
close(itti_dump_queue.event_fd);
#endif
close(itti_dump_queue.itti_listen_socket);
ITTI_DUMP_DEBUG (0x2, " removed element from queue %p %p, pending %u\n", element, user_data, pending_messages);
lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL)
{
lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
itti_dump_socket_exit();
}
/* Leave the thread as we detected end signal */
pthread_exit(NULL);
}
/* Write message to file */
itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data);
/* Write message to file */
itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data);
/* Send message to remote analyzer */
for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
if (itti_dump_queue.itti_clients[j].sd > 0) {
itti_dump_send_message(itti_dump_queue.itti_clients[j].sd,
(itti_dump_queue_item_t *)user_data);
}
}
/* Send message to remote analyzer */
for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
if (itti_dump_queue.itti_clients[j].sd > 0) {
itti_dump_send_message(itti_dump_queue.itti_clients[j].sd,
(itti_dump_queue_item_t *)user_data);
itti_dump_user_data_delete_function (user_data, NULL);
lfds611_freelist_set_user_data_in_element(element, NULL);
/* We have finished with this element, reinsert it in the ring buffer */
lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
}
} while(flush_all
#ifdef RTAI
&& --number_of_messages
#endif
);
}
return (consumer);
}
static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length)
{
if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) {
uint8_t i;
for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
/* Let's find a place to store the new client */
if (itti_dump_queue.itti_clients[i].sd == -1) {
break;
}
}
/* We have finished with this element, reinsert it in the ring buffer */
lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
} while(flush_all
#ifdef RTAI
&& --number_of_messages
#endif
);
ITTI_DUMP_DEBUG(0x2, " Found place to store new connection: %d\n", i);
DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd);
ITTI_DUMP_DEBUG(0x2, " Socket %d accepted\n", sd);
/* Send the XML message definition */
if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) {
ITTI_DUMP_ERROR(" Failed to send XML definition\n");
close (sd);
return -1;
}
itti_dump_queue.itti_clients[i].sd = sd;
itti_dump_queue.nb_connected++;
} else {
ITTI_DUMP_DEBUG(0x2, " Socket %d rejected\n", sd);
/* We have reached max number of users connected...
* Reject the connection.
*/
close (sd);
return -1;
}
return 0;
}
static void *itti_dump_socket(void *arg_p)
......@@ -428,7 +503,7 @@ static void *itti_dump_socket(void *arg_p)
struct timeval timeout;
#endif
ITTI_DUMP_DEBUG("Creating TCP dump socket on port %u\n", ITTI_PORT);
ITTI_DUMP_DEBUG(0x2, " Creating TCP dump socket on port %u\n", ITTI_PORT);
message_definition_xml = (char *)arg_p;
DevAssert(message_definition_xml != NULL);
......@@ -436,7 +511,7 @@ static void *itti_dump_socket(void *arg_p)
message_definition_xml_length = strlen(message_definition_xml) + 1;
if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
ITTI_DUMP_ERROR("Socket creation failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" ocket creation failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
}
......@@ -444,7 +519,7 @@ static void *itti_dump_socket(void *arg_p)
rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
if (rc < 0) {
ITTI_DUMP_ERROR("setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno));
close(itti_listen_socket);
pthread_exit(NULL);
}
......@@ -454,7 +529,7 @@ static void *itti_dump_socket(void *arg_p)
*/
rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on);
if (rc < 0) {
ITTI_DUMP_ERROR("ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno));
close(itti_listen_socket);
pthread_exit(NULL);
}
......@@ -466,11 +541,11 @@ static void *itti_dump_socket(void *arg_p)
if (bind(itti_listen_socket, (struct sockaddr *) &servaddr,
sizeof(servaddr)) < 0) {
ITTI_DUMP_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" Bind failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
}
if (listen(itti_listen_socket, 5) < 0) {
ITTI_DUMP_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" Listen failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
}
......@@ -515,11 +590,22 @@ static void *itti_dump_socket(void *arg_p)
rc = select(max_sd + 1, &working_set, NULL, NULL, timeout_p);
if (rc < 0) {
ITTI_DUMP_ERROR("select failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" select failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
} else if (rc == 0) {
/* Timeout */
itti_dump_flush_ring_buffer(1);
if (itti_dump_flush_ring_buffer(1) == 0)
{
if (itti_dump_running)
{
ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
usleep(100 * 1000);
}
else
{
itti_dump_socket_exit();
}
}
}
desc_ready = rc;
......@@ -538,16 +624,40 @@ static void *itti_dump_socket(void *arg_p)
/* Read will always return 1 for kernel versions > 2.6.30 */
read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
if (read_ret < 0) {
ITTI_DUMP_ERROR("Failed read for semaphore: %s\n", strerror(errno));
ITTI_DUMP_ERROR(" Failed read for semaphore: %s\n", strerror(errno));
pthread_exit(NULL);
}
DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
#if defined(KERNEL_VERSION_PRE_2_6_30)
itti_dump_flush_ring_buffer(1);
if (itti_dump_flush_ring_buffer(1) == 0)
#else
itti_dump_flush_ring_buffer(0);
if (itti_dump_flush_ring_buffer(0) == 0)
#endif
ITTI_DUMP_DEBUG("Write element to file\n");
{
if (itti_dump_running)
{
ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
usleep(100 * 1000);
#ifndef RTAI
{
ssize_t write_ret;
sem_counter = 1;
/* Call to write for an event fd must be of 8 bytes */
write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0);
}
#endif
}
else
{
itti_dump_socket_exit();
}
}
else
{
ITTI_DUMP_DEBUG(0x1, " Write element to file\n");
}
} else
#endif
if (i == itti_listen_socket) {
......@@ -556,10 +666,10 @@ static void *itti_dump_socket(void *arg_p)
if (client_socket < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
/* No more new connection */
ITTI_DUMP_DEBUG("No more new connection\n");
ITTI_DUMP_DEBUG(0x2, " No more new connection\n");
continue;
} else {
ITTI_DUMP_ERROR("accept failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" accept failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
}
}
......@@ -580,7 +690,7 @@ static void *itti_dump_socket(void *arg_p)
*/
uint8_t j;
ITTI_DUMP_DEBUG("Socket %d disconnected\n", i);
ITTI_DUMP_DEBUG(0x2, " Socket %d disconnected\n", i);
/* Close the socket and update info related to this connection */
close(i);
......@@ -622,41 +732,47 @@ static void *itti_dump_socket(void *arg_p)
return NULL;
}
static
int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length)
/*------------------------------------------------------------------------------*/
int itti_dump_queue_message(task_id_t sender_task,
message_number_t message_number,
MessageDef *message_p,
const char *message_name,
const uint32_t message_size)
{
if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) {
uint8_t i;
if (itti_dump_running)
{
itti_dump_queue_item_t *new;
size_t message_name_length;
for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
/* Let's find a place to store the new client */
if (itti_dump_queue.itti_clients[i].sd == -1) {
break;
}
}
DevAssert(message_name != NULL);
DevAssert(message_p != NULL);
ITTI_DUMP_DEBUG("Found place to store new connection: %d\n", i);
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
new = itti_malloc(sender_task, TASK_MAX + 100, sizeof(itti_dump_queue_item_t));
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd);
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
new->data = itti_malloc(sender_task, TASK_MAX + 100, message_size);
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
ITTI_DUMP_DEBUG("Socket %d accepted\n", sd);
memcpy(new->data, message_p, message_size);
new->data_size = message_size;
new->message_number = message_number;
/* Send the XML message definition */
if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) {
ITTI_DUMP_ERROR("Failed to send XML definition\n");
close (sd);
return -1;
}
message_name_length = strlen(message_name) + 1;
DevCheck(message_name_length <= SIGNAL_NAME_LENGTH, message_name_length,
SIGNAL_NAME_LENGTH, 0);
memcpy(new->message_name, message_name, message_name_length);
itti_dump_queue.itti_clients[i].sd = sd;
itti_dump_queue.nb_connected++;
} else {
ITTI_DUMP_DEBUG("Socket %d rejected\n", sd);
/* We have reached max number of users connected...
* Reject the connection.
*/
close (sd);
return -1;
itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE);
}
return 0;
......@@ -681,7 +797,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
if (dump_file == NULL)
{
ITTI_DUMP_ERROR("can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno));
ITTI_DUMP_ERROR(" can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno));
}
else
{
......@@ -700,7 +816,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
memset(&itti_dump_queue, 0, sizeof(itti_desc_t));
ITTI_DUMP_DEBUG("Creating new ring buffer for itti dump of %u elements\n",
ITTI_DUMP_DEBUG(0x2, " Creating new ring buffer for itti dump of %u elements\n",
ITTI_QUEUE_MAX_ELEMENTS);
if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue,
......@@ -708,7 +824,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
NULL,
NULL) != 1)
{
ITTI_DUMP_ERROR("Failed to create ring buffer...\n");
ITTI_DUMP_ERROR(" Failed to create ring buffer...\n");
/* Always assert on this condition */
DevAssert(0 == 1);
}
......@@ -723,7 +839,7 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
# endif
if (itti_dump_queue.event_fd == -1)
{
ITTI_DUMP_ERROR("eventfd failed: %s\n", strerror(errno));
ITTI_DUMP_ERROR(" eventfd failed: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
......@@ -739,59 +855,38 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
/* initialized with default attributes */
ret = pthread_attr_init(&itti_dump_queue.attr);
if (ret < 0) {
ITTI_DUMP_ERROR("pthread_attr_init failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" pthread_attr_init failed (%d:%s)\n", errno, strerror(errno));
DevAssert(0 == 1);
}
ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_FIFO);
if (ret < 0) {
ITTI_DUMP_ERROR("pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno));
DevAssert(0 == 1);
}
ret = pthread_attr_setschedparam(&itti_dump_queue.attr, &scheduler_param);
if (ret < 0) {
ITTI_DUMP_ERROR("pthread_attr_setschedparam failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" pthread_attr_setschedparam failed (%d:%s)\n", errno, strerror(errno));
DevAssert(0 == 1);
}
ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr,
&itti_dump_socket, (void *)messages_definition_xml);
if (ret < 0) {
ITTI_DUMP_ERROR("pthread_create failed (%d:%s)\n", errno, strerror(errno));
ITTI_DUMP_ERROR(" pthread_create failed (%d:%s)\n", errno, strerror(errno));
DevAssert(0 == 1);
}
return 0;
}
static void itti_dump_user_data_delete_function(void *user_data, void *user_state)
{
if (user_data != NULL)
{
itti_dump_queue_item_t *item;
task_id_t task_id;
item = (itti_dump_queue_item_t *)user_data;
if (item->data != NULL)
{
task_id = ITTI_MSG_ORIGIN_ID(item->data);
itti_free(task_id, item->data);
}
else
{
task_id = TASK_UNKNOWN;
}
itti_free(task_id, item);
}
}
void itti_dump_exit(void)
{
void *arg;
itti_dump_queue_item_t *new;
new = calloc(1, sizeof(itti_dump_queue_item_t));
new = itti_malloc(TASK_UNKNOWN, TASK_UNKNOWN, sizeof(itti_dump_queue_item_t));
memset(new, 0, sizeof(itti_dump_queue_item_t));
/* Set a flag to stop recording message */
itti_dump_running = 0;
......@@ -799,12 +894,12 @@ void itti_dump_exit(void)
/* Send the exit signal to other thread */
itti_dump_enqueue_message(new, 0, ITTI_DUMP_EXIT_SIGNAL);
ITTI_DUMP_DEBUG("waiting for dumper thread to finish\n");
ITTI_DUMP_DEBUG(0x2, " waiting for dumper thread to finish\n");
/* wait for the thread to terminate */
pthread_join(itti_dump_queue.itti_acceptor_thread, &arg);
ITTI_DUMP_DEBUG("dumper thread correctly exited\n");
ITTI_DUMP_DEBUG(0x2, " dumper thread correctly exited\n");
if (dump_file != NULL)
{
......
......@@ -230,7 +230,7 @@ void s1ap_eNB_handle_sctp_data_ind(sctp_data_ind_t *sctp_data_ind)
s1ap_eNB_handle_message(sctp_data_ind->assoc_id, sctp_data_ind->stream,
sctp_data_ind->buffer, sctp_data_ind->buffer_length);
free(sctp_data_ind->buffer);
itti_free(TASK_UNKNOWN, sctp_data_ind->buffer);
}
void *s1ap_eNB_task(void *arg)
......
......@@ -13,7 +13,7 @@ int sctp_itti_send_new_message_ind(task_id_t task_id, uint32_t assoc_id, uint8_t
sctp_data_ind_p = &message_p->ittiMsg.sctp_data_ind;
sctp_data_ind_p->buffer = malloc(sizeof(uint8_t) * buffer_length);
sctp_data_ind_p->buffer = itti_malloc(TASK_SCTP, task_id, sizeof(uint8_t) * buffer_length);
/* Copy the buffer */
memcpy((void *)sctp_data_ind_p->buffer, (void *)buffer, buffer_length);
......
......@@ -560,8 +560,7 @@ void *sctp_eNB_task(void *arg)
ITTI_MSG_ID(received_msg), ITTI_MSG_NAME(received_msg));
break;
}
itti_free(TASK_SCTP, received_msg);
itti_free(ITTI_MSG_ORIGIN_ID(received_msg), received_msg);
received_msg = NULL;
}
......
......@@ -45,7 +45,7 @@
#define ITTI_PORT (10006)
/* This is the queue size for signal dumper */
#define ITTI_QUEUE_MAX_ELEMENTS (200 * 1000)
#define ITTI_QUEUE_MAX_ELEMENTS (10 * 1000)
#define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */
#endif /* INTERTASK_INTERFACE_CONF_H_ */
......@@ -485,7 +485,7 @@ u8 rrc_lite_data_req(u8 eNB_id, u8 UE_id, u32 frame, u8 eNB_flag, unsigned int r
// Uses a new buffer to avoid issue with PDCP buffer content that could be changed by PDCP (asynchronous message handling).
u8 *message_buffer;
message_buffer = itti_malloc (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, sdu_size);
message_buffer = itti_malloc (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, sdu_size);
memcpy (message_buffer, Buffer, sdu_size);
message_p = itti_alloc_new_message (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, RRC_DCCH_DATA_REQ);
......@@ -531,7 +531,7 @@ void rrc_lite_data_ind(u8_t eNB_id, u8_t UE_id, u32 frame, u8 eNB_flag,u32 Srb_i
// Uses a new buffer to avoid issue with PDCP buffer content that could be changed by PDCP (asynchronous message handling).
u8 *message_buffer;
message_buffer = itti_malloc (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, sdu_size);
message_buffer = itti_malloc (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, sdu_size);
memcpy (message_buffer, Buffer, sdu_size);
message_p = itti_alloc_new_message (eNB_flag ? TASK_PDCP_ENB : TASK_PDCP_UE, RRC_DCCH_DATA_IND);
......@@ -542,7 +542,7 @@ void rrc_lite_data_ind(u8_t eNB_id, u8_t UE_id, u32 frame, u8 eNB_flag,u32 Srb_i
RRC_DCCH_DATA_IND (message_p).ue_index = UE_id;
RRC_DCCH_DATA_IND (message_p).eNB_index = eNB_id;
itti_send_msg_to_task ((eNB_flag == 1) ? TASK_RRC_ENB : TASK_RRC_UE, Mod_id, message_p);
itti_send_msg_to_task (eNB_flag ? TASK_RRC_ENB : TASK_RRC_UE, Mod_id, message_p);
}
#else
if (eNB_flag ==1) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment