Commit cd22e69f authored by winckel's avatar winckel

Removed non ENABLE_EVENT_FD option in ITTI.

Modified message processing for RT task when RTAI is enabled.
Clean-up logs format in RRC.
Fixed some warnings.

pre-ci Ok.

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4550 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 9a30eabe
......@@ -41,14 +41,11 @@
# include <rtai_fifos.h>
#endif
#include "queue.h"
#include "assertions.h"
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
# include <sys/eventfd.h>
# include "liblfds611.h"
#endif
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "liblfds611.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
......@@ -92,10 +89,6 @@ typedef enum task_state_s {
/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
typedef struct message_list_s {
#if !defined(ENABLE_EVENT_FD)
STAILQ_ENTRY(message_list_s) next_element;
#endif
MessageDef *msg; ///< Pointer to the message
message_number_t message_number; ///< Unique message number
......@@ -108,26 +101,11 @@ typedef struct thread_desc_s {
/* State of the thread */
volatile task_state_t task_state;
} thread_desc_t;
typedef struct task_desc_s {
/* Queue of messages belonging to the task */
#if !defined(ENABLE_EVENT_FD)
STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
/* Number of messages in the queue */
volatile uint32_t message_in_queue;
/* Mutex for the message queue */
pthread_mutex_t message_queue_mutex;
/* Conditional var for message queue and task synchro */
pthread_cond_t message_queue_cond_var;
#else
struct lfds611_queue_state *message_queue;
/* This fd is used internally by ITTI. */
int epoll_fd;
/* The task fd */
/* The thread fd */
int task_event_fd;
/* Number of events to monitor */
......@@ -141,7 +119,19 @@ typedef struct task_desc_s {
struct epoll_event *events;
int epoll_nb_events;
#ifdef RTAI
/* Flag to mark real time thread */
unsigned real_time;
/* Counter to indicate from RTAI threads that messages are pending for the thread */
unsigned messages_pending;
#endif
} thread_desc_t;
typedef struct task_desc_s {
/* Queue of messages belonging to the task */
struct lfds611_queue_state *message_queue;
} task_desc_t;
typedef struct itti_desc_s {
......@@ -161,6 +151,11 @@ typedef struct itti_desc_s {
const message_info_t *messages_info;
itti_lte_time_t lte_time;
int running;
#ifdef RTAI
pthread_t rt_relay_thread;
#endif
} itti_desc_t;
static itti_desc_t itti_desc;
......@@ -192,7 +187,7 @@ const char *itti_get_task_name(task_id_t task_id)
return (itti_desc.tasks_info[task_id].name);
}
static task_id_t itti_get_current_task_id()
static task_id_t itti_get_current_task_id(void)
{
task_id_t task_id;
thread_id_t thread_id;
......@@ -280,9 +275,9 @@ inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds
return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message)
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
{
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
thread_id_t destination_thread_id;
thread_id_t origin_task_id;
message_list_t *new;
uint32_t priority;
......@@ -290,9 +285,10 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
uint32_t message_id;
DevAssert(message != NULL);
DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
DevCheck(destination_task_id < itti_desc.task_max, destination_task_id, itti_desc.task_max, 0);
message->ittiMsgHeader.destinationTaskId = task_id;
destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
message->ittiMsgHeader.destinationTaskId = destination_task_id;
message->ittiMsgHeader.instance = instance;
message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
......@@ -303,7 +299,7 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
task_id);
destination_task_id);
#endif
priority = itti_get_message_priority (message_id);
......@@ -311,37 +307,23 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
/* Increment the global message number */
message_number = itti_increment_message_number ();
#ifdef RTAI
/*
*
#ifdef RTAI
if ((pthread_self() == itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) ||
(task_id == TASK_UNKNOWN) ||
((TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN) &&
(pthread_self() == itti_desc.threads[TASK_GET_PARENT_TASK_ID(origin_task_id)].task_thread)))
#endif
*/
itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
itti_desc.messages_info[message_id].name,
message_number,
priority,
itti_get_task_name(origin_task_id),
task_id,
itti_get_task_name(task_id));
if (task_id != TASK_UNKNOWN)
if (destination_task_id != TASK_UNKNOWN)
{
/* We cannot send a message if the task is not running */
DevCheck(itti_desc.threads[thread_id].task_state == TASK_STATE_READY, itti_desc.threads[thread_id].task_state,
TASK_STATE_READY, thread_id);
#if !defined(ENABLE_EVENT_FD)
/* Lock the mutex to get exclusive access to the list */
pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
/* Check the number of messages in the queue */
DevCheck(itti_desc.tasks[task_id].message_in_queue < itti_desc.tasks_info[task_id].queue_size,
task_id, itti_desc.tasks[task_id].message_in_queue, itti_desc.tasks_info[task_id].queue_size);
#endif
DevCheck(itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY, itti_desc.threads[destination_thread_id].task_state,
TASK_STATE_READY, destination_thread_id);
/* Allocate new list element */
new = (message_list_t *) malloc (sizeof(struct message_list_s));
......@@ -352,101 +334,68 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
new->message_number = message_number;
new->message_priority = priority;
#if defined(ENABLE_EVENT_FD)
# ifdef RTAI
if ((pthread_self() != itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].task_thread) &&
(TASK_GET_PARENT_TASK_ID(origin_task_id) != TASK_UNKNOWN))
{
/* This is the RT task, -> enqueue in the parent thread */
lfds611_queue_enqueue(itti_desc.tasks[TASK_GET_PARENT_TASK_ID(origin_task_id)].message_queue, new);
/* Enqueue message in destination task queue */
lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new);
/* Signal from RT thread */
// rtf_sem_post(56);
} else
# endif
/* No need to use a event fd for subtasks */
if (TASK_GET_PARENT_TASK_ID(task_id) == TASK_UNKNOWN)
#ifdef RTAI
if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time)
{
/* This is a RT task, increase destination task messages pending counter */
__sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
}
else
#endif
{
/* Only use event fd for tasks, subtasks will pool the queue */
if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN)
{
ssize_t write_ret;
uint64_t sem_counter = 1;
lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new);
/* Call to write for an event fd must be of 8 bytes */
write_ret = write(itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, task_id);
} else {
lfds611_queue_enqueue(itti_desc.tasks[task_id].message_queue, new);
write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, destination_thread_id);
}
#else
if (STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
STAILQ_INSERT_HEAD (&itti_desc.tasks[task_id].message_queue, new, next_element);
}
else {
// struct message_list_s *insert_after = NULL;
// struct message_list_s *temp;
//
// /* This method is inefficient... */
// STAILQ_FOREACH(temp, &itti_desc.tasks[task_id].message_queue, next_element) {
// struct message_list_s *next;
// next = STAILQ_NEXT(temp, next_element);
// /* Increment message priority to create a sort of
// * priority based scheduler */
// // if (temp->message_priority < TASK_PRIORITY_MAX) {
// // temp->message_priority++;
// // }
// if (next && next->message_priority < priority) {
// insert_after = temp;
// break;
// }
// }
// if (insert_after == NULL) {
STAILQ_INSERT_TAIL (&itti_desc.tasks[task_id].message_queue, new, next_element);
// } else {
// STAILQ_INSERT_AFTER(&itti_desc.tasks[task_id].message_queue, insert_after, new,
// next_element);
// }
}
/* Update the number of messages in the queue */
itti_desc.tasks[task_id].message_in_queue++;
if (itti_desc.tasks[task_id].message_in_queue == 1) {
/* Emit a signal to wake up target task thread */
pthread_cond_signal (&itti_desc.tasks[task_id].message_queue_cond_var);
}
/* Release the mutex */
pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
#endif
ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
itti_desc.messages_info[message_id].name,
message_number,
priority,
itti_get_task_name(origin_task_id),
destination_task_id,
itti_get_task_name(destination_task_id));
}
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG_END,
task_id);
destination_task_id);
#endif
return 0;
}
#if defined(ENABLE_EVENT_FD)
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
thread_id_t thread_id;
struct epoll_event event;
DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
DevCheck(fd >= 0, fd, 0, 0);
itti_desc.tasks[task_id].nb_events++;
thread_id = TASK_GET_THREAD_ID(task_id);
itti_desc.threads[thread_id].nb_events++;
/* Reallocate the events */
itti_desc.tasks[task_id].events = realloc(
itti_desc.tasks[task_id].events,
itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
itti_desc.threads[thread_id].events = realloc(
itti_desc.threads[thread_id].events,
itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
event.events = EPOLLIN | EPOLLERR;
event.data.fd = fd;
/* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, fd,
if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
&event) != 0)
{
ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
......@@ -460,11 +409,14 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd)
void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
thread_id_t thread_id;
DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
DevCheck(fd >= 0, fd, 0, 0);
thread_id = TASK_GET_THREAD_ID(task_id);
/* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
{
ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
itti_get_task_name(task_id), fd, strerror(errno));
......@@ -472,23 +424,27 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
DevAssert(0 == 1);
}
itti_desc.tasks[task_id].nb_events--;
itti_desc.tasks[task_id].events = realloc(
itti_desc.tasks[task_id].events,
itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
itti_desc.threads[thread_id].nb_events--;
itti_desc.threads[thread_id].events = realloc(
itti_desc.threads[thread_id].events,
itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
}
int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
thread_id_t thread_id;
DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
*events = itti_desc.tasks[task_id].events;
thread_id = TASK_GET_THREAD_ID(task_id);
*events = itti_desc.threads[thread_id].events;
return itti_desc.tasks[task_id].epoll_nb_events;
return itti_desc.threads[thread_id].epoll_nb_events;
}
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
thread_id_t thread_id;
int epoll_ret = 0;
int epoll_timeout = 0;
int i;
......@@ -496,6 +452,7 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
DevAssert(received_msg != NULL);
thread_id = TASK_GET_THREAD_ID(task_id);
*received_msg = NULL;
if (polling) {
......@@ -510,9 +467,9 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
}
do {
epoll_ret = epoll_wait(itti_desc.tasks[task_id].epoll_fd,
itti_desc.tasks[task_id].events,
itti_desc.tasks[task_id].nb_events,
epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
itti_desc.threads[thread_id].events,
itti_desc.threads[thread_id].nb_events,
epoll_timeout);
} while (epoll_ret < 0 && errno == EINTR);
......@@ -526,19 +483,19 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
return;
}
itti_desc.tasks[task_id].epoll_nb_events = epoll_ret;
itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;
for (i = 0; i < epoll_ret; i++) {
/* Check if there is an event for ITTI for the event fd */
if ((itti_desc.tasks[task_id].events[i].events & EPOLLIN) &&
(itti_desc.tasks[task_id].events[i].data.fd == itti_desc.tasks[task_id].task_event_fd))
if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
(itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd))
{
struct message_list_s *message;
uint64_t sem_counter;
ssize_t read_ret;
/* Read will always return 1 */
read_ret = read (itti_desc.tasks[task_id].task_event_fd, &sem_counter, sizeof(sem_counter));
read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
......@@ -551,7 +508,6 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
}
}
}
#endif
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
......@@ -559,39 +515,9 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
task_id);
#endif
#if defined(ENABLE_EVENT_FD)
itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
#else
DevCheck(task_id < itti_desc.task_max, task_id, itti_desc.task_max, 0);
DevAssert(received_msg != NULL);
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
if (itti_desc.tasks[task_id].message_in_queue == 0) {
ITTI_DEBUG("Message in queue[(%u:%s)] == 0, waiting\n", task_id, itti_get_task_name(task_id));
// Wait while list == 0
pthread_cond_wait (&itti_desc.tasks[task_id].message_queue_cond_var,
&itti_desc.tasks[task_id].message_queue_mutex);
ITTI_DEBUG("Receiver queue[(%u:%s)] got new message notification\n",
task_id, itti_get_task_name(task_id));
}
if (!STAILQ_EMPTY (&itti_desc.tasks[task_id].message_queue)) {
message_list_t *temp = STAILQ_FIRST (&itti_desc.tasks[task_id].message_queue);
/* Update received_msg reference */
*received_msg = temp->msg;
/* Remove message from queue */
STAILQ_REMOVE_HEAD (&itti_desc.tasks[task_id].message_queue, next_element);
free (temp);
itti_desc.tasks[task_id].message_in_queue--;
}
// Release the mutex
pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
#endif
#if defined(OAI_EMU) || defined(RTAI)
#if defined(OAI_EMU) || defined(RTAI)
vcd_signal_dumper_dump_variable_by_name(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG_END,
task_id);
#endif
......@@ -608,7 +534,6 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
task_id);
#endif
#if defined(ENABLE_EVENT_FD)
{
struct message_list_s *message;
......@@ -618,33 +543,6 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
free (message);
}
}
#else
if (itti_desc.tasks[task_id].message_in_queue != 0) {
message_list_t *temp;
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
STAILQ_FOREACH (temp, &itti_desc.tasks[task_id].message_queue, next_element)
{
/* Update received_msg reference */
*received_msg = temp->msg;
/* Remove message from queue */
STAILQ_REMOVE (&itti_desc.tasks[task_id].message_queue, temp, message_list_s, next_element);
free (temp);
itti_desc.tasks[task_id].message_in_queue--;
ITTI_DEBUG(
"Receiver queue[(%u:%s)] got new message %s, number %lu\n",
task_id, itti_get_task_name(task_id), itti_desc.messages_info[temp->msg->ittiMsgHeader.messageId].name, temp->message_number);
break;
}
// Release the mutex
pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
}
#endif
if ((itti_debug_poll) && (*received_msg == NULL)) {
ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
......@@ -678,6 +576,17 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar
return 0;
}
#ifdef RTAI
void itti_set_task_real_time(task_id_t task_id)
{
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
itti_desc.threads[thread_id].real_time = TRUE;
}
#endif
void itti_mark_task_ready(task_id_t task_id)
{
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
......@@ -696,20 +605,10 @@ void itti_mark_task_ready(task_id_t task_id)
/* Register the thread in itti dump */
itti_dump_thread_use_ring_buffer();
#if defined(ENABLE_EVENT_FD)
/* Mark the thread as using LFDS queue */
lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
#else
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
#endif
itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
#if !defined(ENABLE_EVENT_FD)
// Release the mutex
pthread_mutex_unlock (&itti_desc.tasks[task_id].message_queue_mutex);
#endif
}
void itti_exit_task(void) {
......@@ -727,6 +626,40 @@ void itti_terminate_tasks(task_id_t task_id) {
pthread_exit (NULL);
}
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
thread_id_t thread_id;
unsigned pending_messages;
while (itti_desc.running)
{
usleep (100);
/* Checks for all non real time tasks if they have pending messages */
for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
{
if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
&& (itti_desc.threads[thread_id].real_time == FALSE))
{
pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);
if (pending_messages > 0)
{
ssize_t write_ret;
uint64_t sem_counter = pending_messages;
/* Call to write for an event fd must be of 8 bytes */
write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
}
}
}
}
return NULL;
}
#endif
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name) {
task_id_t task_id;
......@@ -768,7 +701,6 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");
#if defined(ENABLE_EVENT_FD)
ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
......@@ -784,32 +716,38 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
ret = rtf_sem_init(56, 0);
}
# endif
}
itti_desc.tasks[task_id].epoll_fd = epoll_create1(0);
if (itti_desc.tasks[task_id].epoll_fd == -1) {
/* Initializing each thread */
for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
{
itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
if (itti_desc.threads[thread_id].epoll_fd == -1) {
ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
if (itti_desc.tasks[task_id].task_event_fd == -1)
itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
if (itti_desc.threads[thread_id].task_event_fd == -1)
{
ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
itti_desc.tasks[task_id].nb_events = 1;
itti_desc.threads[thread_id].nb_events = 1;
itti_desc.tasks[task_id].events = calloc(1, sizeof(struct epoll_event));
itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
itti_desc.tasks[task_id].events->events = EPOLLIN | EPOLLERR;
itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd;
itti_desc.threads[thread_id].events->events = EPOLLIN | EPOLLERR;
itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;
/* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD,
itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0)
if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
{
ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
/* Always assert on this condition */
......@@ -817,24 +755,19 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
}
ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
itti_desc.tasks[task_id].task_event_fd, itti_get_task_name(task_id));
#else
STAILQ_INIT (&itti_desc.tasks[task_id].message_queue);
itti_desc.tasks[task_id].message_in_queue = 0;
itti_desc.threads[thread_id].task_event_fd, itti_get_task_name(task_id));
// Initialize mutexes
pthread_mutex_init (&itti_desc.tasks[task_id].message_queue_mutex, NULL);
// Initialize Cond vars
pthread_cond_init (&itti_desc.tasks[task_id].message_queue_cond_var, NULL);
#ifdef RTAI
itti_desc.threads[thread_id].real_time = FALSE;
itti_desc.threads[thread_id].messages_pending = 0;
#endif
}
/* Initializing each thread */
for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++)
{
itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
}
itti_desc.running = TRUE;
#ifdef RTAI
/* Start RT relay thread */
DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
#endif
itti_dump_init (messages_definition_xml, dump_file_name);
......@@ -891,6 +824,8 @@ void itti_wait_tasks_end(void) {
}
} while ((ready_tasks > 0) && (retries--));
itti_desc.running = FALSE;
if (ready_tasks > 0) {
ITTI_DEBUG("Some threads are still running, force exit\n");
exit (0);
......
......@@ -34,9 +34,7 @@
* @{
*/
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
#endif
#include <sys/epoll.h>
#ifdef RTAI
# include <rtai_sem.h>
......@@ -123,7 +121,6 @@ int itti_send_broadcast_message(MessageDef *message_p);
**/
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message);
#if defined(ENABLE_EVENT_FD)
/** \brief Add a new fd to monitor.
* NOTE: it is up to the user to read data associated with the fd
* \param task_id Task ID of the receiving task
......@@ -143,7 +140,6 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd);
* @returns number of events to handle
**/
int itti_get_events(task_id_t task_id, struct epoll_event **events);
#endif
/** \brief Retrieves a message in the queue associated to task_id.
* If the queue is empty, the thread is blocked till a new message arrives.
......@@ -168,6 +164,13 @@ int itti_create_task(task_id_t task_id,
void *(*start_routine) (void *),
void *args_p);
#ifdef RTAI
/** \brief Mark the task as a real time task
* \param task_id task to mark as real time
**/
void itti_set_task_real_time(task_id_t task_id);
#endif
/** \brief Mark the task as in ready state
* \param task_id task to mark as ready
**/
......
......@@ -69,6 +69,6 @@ int timer_remove(long timer_id);
* \param mme_config MME common configuration
* @returns -1 on failure, 0 otherwise
**/
int timer_init();
int timer_init(void);
#endif
......@@ -2355,12 +2355,12 @@ void *rrc_ue_task(void *args_p) {
break;
case MESSAGE_TEST:
LOG_I(RRC, "Received %s\n", msg_name);
LOG_I(RRC, "[UE %d] Received %s\n", Mod_id, msg_name);
break;
/* MAC messages */
case RRC_MAC_IN_SYNC_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name,
RRC_MAC_IN_SYNC_IND (msg_p).frame, RRC_MAC_IN_SYNC_IND (msg_p).enb_index);
UE_rrc_inst[Mod_id].Info[RRC_MAC_IN_SYNC_IND (msg_p).enb_index].N310_cnt = 0;
......@@ -2369,14 +2369,14 @@ void *rrc_ue_task(void *args_p) {
break;
case RRC_MAC_OUT_OF_SYNC_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name,
RRC_MAC_OUT_OF_SYNC_IND (msg_p).frame, RRC_MAC_OUT_OF_SYNC_IND (msg_p).enb_index);
UE_rrc_inst[Mod_id].Info[RRC_MAC_OUT_OF_SYNC_IND (msg_p).enb_index].N310_cnt ++;
break;
case RRC_MAC_BCCH_DATA_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name,
RRC_MAC_BCCH_DATA_IND (msg_p).frame, RRC_MAC_BCCH_DATA_IND (msg_p).enb_index);
decode_BCCH_DLSCH_Message (Mod_id, RRC_MAC_BCCH_DATA_IND (msg_p).frame,
......@@ -2385,7 +2385,7 @@ void *rrc_ue_task(void *args_p) {
break;
case RRC_MAC_CCCH_DATA_CNF:
LOG_I(RRC, "Received %s: instance %d, eNB %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: eNB %d\n", Mod_id, msg_name,
RRC_MAC_CCCH_DATA_CNF (msg_p).enb_index);
// reset the tx buffer to indicate RRC that ccch was successfully transmitted (for example if contention resolution succeeds)
......@@ -2393,7 +2393,7 @@ void *rrc_ue_task(void *args_p) {
break;
case RRC_MAC_CCCH_DATA_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d\n", Mod_id, msg_name,
RRC_MAC_CCCH_DATA_IND (msg_p).frame, RRC_MAC_CCCH_DATA_IND (msg_p).enb_index);
srb_info_p = &UE_rrc_inst[Mod_id].Srb0[RRC_MAC_CCCH_DATA_IND (msg_p).enb_index];
......@@ -2407,7 +2407,7 @@ void *rrc_ue_task(void *args_p) {
#ifdef Rel10
case RRC_MAC_MCCH_DATA_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, eNB %d, mbsfn SA %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: frame %d, eNB %d, mbsfn SA %d\n", Mod_id, msg_name,
RRC_MAC_MCCH_DATA_IND (msg_p).frame, RRC_MAC_MCCH_DATA_IND (msg_p).enb_index, RRC_MAC_MCCH_DATA_IND (msg_p).mbsfn_sync_area);
decode_MCCH_Message (Mod_id, RRC_MAC_MCCH_DATA_IND (msg_p).frame, RRC_MAC_MCCH_DATA_IND (msg_p).enb_index,
......@@ -2418,7 +2418,7 @@ void *rrc_ue_task(void *args_p) {
/* PDCP messages */
case RRC_DCCH_DATA_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, DCCH %d, UE %d\n", msg_name, instance,
LOG_I(RRC, "[UE %d] Received %s: frame %d, DCCH %d, UE %d\n", Mod_id, msg_name,
RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index, RRC_DCCH_DATA_IND (msg_p).ue_index);
rrc_ue_decode_dcch (Mod_id, RRC_DCCH_DATA_IND (msg_p).frame,
......@@ -2435,7 +2435,7 @@ void *rrc_ue_task(void *args_p) {
uint32_t length;
uint8_t *buffer;
LOG_I(RRC, "Received %s: instance %d, UEid %d\n", msg_name, instance, NAS_UPLINK_DATA_REQ (msg_p).UEid);
LOG_I(RRC, "[UE %d] Received %s: UEid %d\n", Mod_id, msg_name, NAS_UPLINK_DATA_REQ (msg_p).UEid);
/* Create message for PDCP (ULInformationTransfer_t) */
length = do_ULInformationTransfer(&buffer, NAS_UPLINK_DATA_REQ (msg_p).nasMsg.length, NAS_UPLINK_DATA_REQ (msg_p).nasMsg.data);
......@@ -2446,7 +2446,7 @@ void *rrc_ue_task(void *args_p) {
}
default:
LOG_E(RRC, "Received unexpected message %s\n", msg_name);
LOG_E(RRC, "[UE %d] Received unexpected message %s\n", Mod_id, msg_name);
break;
}
......
......@@ -459,7 +459,7 @@ static void rrc_lite_eNB_init_security(u8 Mod_id, u8 UE_index)
}
ascii_buffer[2 * i] = '\0';
LOG_T(RRC, "[OSA][MOD %02d][UE %02d] kenb = %s\n", Mod_id, UE_index, ascii_buffer);
LOG_T(RRC, "[OSA][eNB %d][UE %d] kenb = %s\n", Mod_id, UE_index, ascii_buffer);
#endif
}
......@@ -494,7 +494,7 @@ static uint8_t rrc_eNB_get_next_free_UE_index (uint8_t Mod_id, uint8_t *UE_ident
}
if (reg == 0) {
LOG_I(RRC, "Adding UE %d\n", first_index);
LOG_I(RRC, "[eNB %d] Adding UE %d\n", Mod_id, first_index);
return (first_index);
}
else {
......@@ -507,7 +507,7 @@ void rrc_eNB_free_UE_index (uint8_t Mod_id, uint8_t UE_id)
DevCheck(Mod_id < NB_eNB_INST, Mod_id, UE_id, NB_eNB_INST);
DevCheck(UE_id < NUMBER_OF_UE_MAX, Mod_id, UE_id, NUMBER_OF_UE_MAX);
LOG_I (RRC, "Removing UE %d 0x%" PRIx64 "\n", UE_id, eNB_rrc_inst[Mod_id].Info.UE_list[UE_id]);
LOG_I (RRC, "[eNB %d] Removing UE %d rv 0x%" PRIx64 "\n", Mod_id, UE_id, eNB_rrc_inst[Mod_id].Info.UE_list[UE_id]);
eNB_rrc_inst[Mod_id].Info.UE[UE_id].Status = RRC_IDLE;
eNB_rrc_inst[Mod_id].Info.UE_list[UE_id] = 0;
}
......@@ -3030,12 +3030,12 @@ void *rrc_enb_task(void *args_p) {
break;
case MESSAGE_TEST:
LOG_I(RRC, "Received %s\n", msg_name);
LOG_I(RRC, "[eNB %d] Received %s\n", instance, msg_name);
break;
/* Messages from MAC */
case RRC_MAC_CCCH_DATA_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d,\n", msg_name, instance,
LOG_I(RRC, "[eNB %d] Received %s: instance %d, frame %d,\n", instance, msg_name,
RRC_MAC_CCCH_DATA_IND (msg_p).frame);
srb_info_p = &eNB_rrc_inst[instance].Srb0;
......@@ -3048,8 +3048,8 @@ void *rrc_enb_task(void *args_p) {
/* Messages from PDCP */
case RRC_DCCH_DATA_IND:
LOG_I(RRC, "Received %s: instance %d, frame %d, DCCH %d, UE %d\n", msg_name, instance,
RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index, RRC_DCCH_DATA_IND (msg_p).ue_index);
LOG_I(RRC, "[eNB %d][UE %d] Received %s: instance %d, frame %d, DCCH %d\n", instance, RRC_DCCH_DATA_IND (msg_p).ue_index, msg_name,
RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index);
rrc_eNB_decode_dcch (instance, RRC_DCCH_DATA_IND (msg_p).frame, RRC_DCCH_DATA_IND (msg_p).dcch_index,
RRC_DCCH_DATA_IND (msg_p).ue_index, RRC_DCCH_DATA_IND (msg_p).sdu_p,
......@@ -3074,7 +3074,7 @@ void *rrc_enb_task(void *args_p) {
break;
case S1AP_PAGING_IND:
LOG_E(RRC, "Received not yet implemented message %s\n", msg_name);
LOG_E(RRC, "[eNB %d] Received not yet implemented message %s\n", instance, msg_name);
break;
case S1AP_UE_CONTEXT_RELEASE_REQ:
......@@ -3083,7 +3083,7 @@ void *rrc_enb_task(void *args_p) {
#endif
default:
LOG_E(RRC, "Received unexpected message %s\n", msg_name);
LOG_E(RRC, "[eNB %d] Received unexpected message %s\n", instance, msg_name);
break;
}
......
......@@ -102,7 +102,7 @@ static uint8_t get_UE_index_from_initial_id(uint8_t mod_id, uint16_t ue_initial_
for (ue_index = 0; ue_index < NUMBER_OF_UE_MAX; ue_index++) {
/* Check if this UE is in use */
LOG_D(RRC, "[eNB %d][UE %d] 0x%" PRIx64 " %d\n", mod_id, ue_index,
LOG_D(RRC, "[eNB %d][UE %d] UE rv 0x%" PRIx64 " %d\n", mod_id, ue_index,
eNB_rrc_inst[mod_id].Info.UE_list[ue_index], eNB_rrc_inst[mod_id].Info.UE[ue_index].ue_initial_id);
if (eNB_rrc_inst[mod_id].Info.UE_list[ue_index] != 0) {
......@@ -129,7 +129,7 @@ static uint8_t get_UE_index_from_eNB_ue_s1ap_id(uint8_t mod_id, uint32_t eNB_ue_
for (ue_index = 0; ue_index < NUMBER_OF_UE_MAX; ue_index++) {
/* Check if this UE is in use */
LOG_D(RRC, "[eNB %d][UE %d] 0x%" PRIx64 " %d\n", mod_id, ue_index,
LOG_D(RRC, "[eNB %d][UE %d] UE rv 0x%" PRIx64 " %d\n", mod_id, ue_index,
eNB_rrc_inst[mod_id].Info.UE_list[ue_index], eNB_rrc_inst[mod_id].Info.UE[ue_index].eNB_ue_s1ap_id);
if (eNB_rrc_inst[mod_id].Info.UE_list[ue_index] != 0) {
......
......@@ -460,24 +460,12 @@ void *emos_thread (void *arg)
#if defined(ENABLE_ITTI)
void *dummy_l2l1_task(void *arg)
{
ssize_t ret = 0;
MessageDef *received_msg;
itti_set_task_real_time(TASK_L2L1);
itti_mark_task_ready(TASK_L2L1);
while (!oai_exit)
{
usleep(100);
do {
itti_poll_msg(TASK_L2L1, &received_msg);
if (received_msg != NULL) {
itti_send_msg_to_task(ITTI_MSG_DESTINATION_ID(received_msg),
ITTI_MSG_INSTANCE(received_msg),
received_msg);
}
} while(received_msg != NULL);
usleep(500000);
}
return NULL;
}
......@@ -537,10 +525,10 @@ static void *eNB_thread(void *arg)
// at the eNB, even slots have double as much time since most of the processing is done here and almost nothing in odd slots
LOG_D(HW,"eNB Frame %d, time %llu: missed slot, proceeding with next one (slot %d, hw_slot %d, diff %d)\n",frame, rt_get_time_ns(), slot, hw_slot, diff);
slot++;
if (frame>0) {
oai_exit=1;
if (frame > 0) {
oai_exit = 1;
#if defined(ENABLE_ITTI)
itti_send_terminate_message(TASK_L2L1);
itti_send_terminate_message (TASK_L2L1);
#endif
}
if (slot==20){
......@@ -618,7 +606,7 @@ static void *eNB_thread(void *arg)
if (fs4_test==0)
{
phy_procedures_eNB_lte (last_slot, next_slot, PHY_vars_eNB_g[0], 0, 0,NULL);
phy_procedures_eNB_lte (last_slot, next_slot, PHY_vars_eNB_g[0], 0, no_relay,NULL);
#ifndef IFFT_FPGA
slot_offset_F = (next_slot)*
(PHY_vars_eNB_g[0]->lte_frame_parms.ofdm_symbol_size)*
......@@ -1234,6 +1222,15 @@ int main(int argc, char **argv) {
vcd_signal_dumper_init("/tmp/openair_dump_eNB.vcd");
}
#if defined(ENABLE_ITTI)
if (UE_flag == 1) {
log_set_instance_type (LOG_INSTANCE_UE);
}
else {
log_set_instance_type (LOG_INSTANCE_ENB);
}
#endif
#if defined(ENABLE_ITTI)
itti_init(TASK_MAX, THREAD_MAX, MESSAGES_ID_MAX, tasks_info, messages_info, messages_definition_xml, itti_dump_file);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment