Commit 068807e1 authored by winckel's avatar winckel

Added ITTI function to synchronize task start when they are all ready.

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4729 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent bb8e616d
...@@ -166,6 +166,10 @@ typedef struct itti_desc_s { ...@@ -166,6 +166,10 @@ typedef struct itti_desc_s {
itti_lte_time_t lte_time; itti_lte_time_t lte_time;
int running; int running;
volatile uint32_t created_tasks;
volatile uint32_t ready_tasks;
volatile int wait_tasks;
#ifdef RTAI #ifdef RTAI
pthread_t rt_relay_thread; pthread_t rt_relay_thread;
#endif #endif
...@@ -373,7 +377,7 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me ...@@ -373,7 +377,7 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me
if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED)
{ {
ITTI_DEBUG("Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n", ITTI_DEBUG(" Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
itti_desc.messages_info[message_id].name, itti_desc.messages_info[message_id].name,
message_number, message_number,
priority, priority,
...@@ -424,7 +428,7 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me ...@@ -424,7 +428,7 @@ int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, Me
} }
} }
ITTI_DEBUG("Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n", ITTI_DEBUG(" Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
itti_desc.messages_info[message_id].name, itti_desc.messages_info[message_id].name,
message_number, message_number,
priority, priority,
...@@ -469,13 +473,13 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd) ...@@ -469,13 +473,13 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd)
if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd, if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
&event) != 0) &event) != 0)
{ {
ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n", ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
itti_get_task_name(task_id), fd, strerror(errno)); itti_get_task_name(task_id), fd, strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
} }
ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id)); ITTI_DEBUG(" Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
} }
void itti_unsubscribe_event_fd(task_id_t task_id, int fd) void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
...@@ -489,7 +493,7 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd) ...@@ -489,7 +493,7 @@ void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
/* Add the event fd to the list of monitored events */ /* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
{ {
ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n", ITTI_ERROR(" epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
itti_get_task_name(task_id), fd, strerror(errno)); itti_get_task_name(task_id), fd, strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
...@@ -545,7 +549,7 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t ...@@ -545,7 +549,7 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
} while (epoll_ret < 0 && errno == EINTR); } while (epoll_ret < 0 && errno == EINTR);
if (epoll_ret < 0) { if (epoll_ret < 0) {
ITTI_ERROR("epoll_wait failed for task %s: %s\n", ITTI_ERROR(" epoll_wait failed for task %s: %s\n",
itti_get_task_name(task_id), strerror(errno)); itti_get_task_name(task_id), strerror(errno));
DevAssert(0 == 1); DevAssert(0 == 1);
} }
...@@ -641,7 +645,7 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) { ...@@ -641,7 +645,7 @@ void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) {
} }
if ((itti_debug_poll) && (*received_msg == NULL)) { if ((itti_debug_poll) && (*received_msg == NULL)) {
ITTI_DEBUG("No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id)); ITTI_DEBUG(" No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
} }
#if defined(OAI_EMU) || defined(RTAI) #if defined(OAI_EMU) || defined(RTAI)
...@@ -661,14 +665,17 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar ...@@ -661,14 +665,17 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar
itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING; itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
ITTI_DEBUG("Create thread for task %s\n", itti_get_task_name(task_id)); ITTI_DEBUG(" Creating thread for task %s ...\n", itti_get_task_name(task_id));
result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p); result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
DevCheck(result >= 0, task_id, thread_id, result); DevCheck(result >= 0, task_id, thread_id, result);
itti_desc.created_tasks ++;
/* Wait till the thread is completely ready */ /* Wait till the thread is completely ready */
while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY) while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
; usleep (1000);
return 0; return 0;
} }
...@@ -683,6 +690,16 @@ void itti_set_task_real_time(task_id_t task_id) ...@@ -683,6 +690,16 @@ void itti_set_task_real_time(task_id_t task_id)
} }
#endif #endif
void itti_wait_ready(int wait_tasks)
{
itti_desc.wait_tasks = wait_tasks;
ITTI_DEBUG(" wait for tasks: %s, created tasks %d, ready tasks %d\n", itti_desc.wait_tasks ? "yes" : "no",
itti_desc.created_tasks, itti_desc.ready_tasks);
DevCheck(itti_desc.created_tasks == itti_desc.ready_tasks, itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
}
void itti_mark_task_ready(task_id_t task_id) void itti_mark_task_ready(task_id_t task_id)
{ {
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
...@@ -705,6 +722,14 @@ void itti_mark_task_ready(task_id_t task_id) ...@@ -705,6 +722,14 @@ void itti_mark_task_ready(task_id_t task_id)
#endif #endif
itti_desc.threads[thread_id].task_state = TASK_STATE_READY; itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
itti_desc.ready_tasks ++;
while (itti_desc.wait_tasks != 0)
{
usleep (10000);
}
ITTI_DEBUG(" task %s started\n", itti_get_task_name(task_id));
} }
void itti_exit_task(void) { void itti_exit_task(void) {
...@@ -773,7 +798,7 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -773,7 +798,7 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
itti_desc.message_number = 1; itti_desc.message_number = 1;
ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max); ITTI_DEBUG(" Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
CHECK_INIT_RETURN(signal_mask()); CHECK_INIT_RETURN(signal_mask());
...@@ -794,19 +819,19 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -794,19 +819,19 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
/* Initializing each queue and related stuff */ /* Initializing each queue and related stuff */
for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++)
{ {
ITTI_DEBUG("Initializing %stask %s%s%s\n", ITTI_DEBUG(" Initializing %stask %s%s%s\n",
itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "", itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
itti_desc.tasks_info[task_id].name, itti_desc.tasks_info[task_id].name,
itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "", itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : ""); itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");
ITTI_DEBUG("Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size); ITTI_DEBUG(" Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);
ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size); ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);
if (ret < 0) if (ret < 0)
{ {
ITTI_ERROR("lfds611_queue_new failed for task %u\n", task_id); ITTI_ERROR(" lfds611_queue_new failed for task %u\n", task_id);
DevAssert(0 == 1); DevAssert(0 == 1);
} }
} }
...@@ -818,7 +843,7 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -818,7 +843,7 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
itti_desc.threads[thread_id].epoll_fd = epoll_create1(0); itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
if (itti_desc.threads[thread_id].epoll_fd == -1) { if (itti_desc.threads[thread_id].epoll_fd == -1) {
ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno)); ITTI_ERROR(" Failed to create new epoll fd: %s\n", strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
} }
...@@ -833,7 +858,7 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -833,7 +858,7 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
# endif # endif
if (itti_desc.threads[thread_id].task_event_fd == -1) if (itti_desc.threads[thread_id].task_event_fd == -1)
{ {
ITTI_ERROR("eventfd failed: %s\n", strerror(errno)); ITTI_ERROR(" eventfd failed: %s\n", strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
} }
...@@ -849,13 +874,13 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -849,13 +874,13 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0) itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0)
{ {
ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno)); ITTI_ERROR(" epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
} }
ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", ITTI_DEBUG(" Successfully subscribed fd %d for thread %d\n",
itti_desc.threads[thread_id].task_event_fd, itti_get_task_name(task_id)); itti_desc.threads[thread_id].task_event_fd, thread_id);
#ifdef RTAI #ifdef RTAI
itti_desc.threads[thread_id].real_time = FALSE; itti_desc.threads[thread_id].real_time = FALSE;
...@@ -864,6 +889,9 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -864,6 +889,9 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
} }
itti_desc.running = 1; itti_desc.running = 1;
itti_desc.wait_tasks = 0;
itti_desc.created_tasks = 0;
itti_desc.ready_tasks = 0;
#ifdef RTAI #ifdef RTAI
/* Start RT relay thread */ /* Start RT relay thread */
DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0); DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
...@@ -913,7 +941,7 @@ void itti_wait_tasks_end(void) { ...@@ -913,7 +941,7 @@ void itti_wait_tasks_end(void) {
result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL); result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);
ITTI_DEBUG("Thread %s join status %d\n", itti_get_task_name(task_id), result); ITTI_DEBUG(" Thread %s join status %d\n", itti_get_task_name(task_id), result);
if (result == 0) { if (result == 0) {
/* Thread has terminated */ /* Thread has terminated */
...@@ -933,7 +961,7 @@ void itti_wait_tasks_end(void) { ...@@ -933,7 +961,7 @@ void itti_wait_tasks_end(void) {
itti_desc.running = 0; itti_desc.running = 0;
if (ready_tasks > 0) { if (ready_tasks > 0) {
ITTI_DEBUG("Some threads are still running, force exit\n"); ITTI_DEBUG(" Some threads are still running, force exit\n");
exit (0); exit (0);
} }
......
...@@ -171,6 +171,11 @@ int itti_create_task(task_id_t task_id, ...@@ -171,6 +171,11 @@ int itti_create_task(task_id_t task_id,
void itti_set_task_real_time(task_id_t task_id); void itti_set_task_real_time(task_id_t task_id);
#endif #endif
/** \brief Indicates to ITTI if newly created tasks should wait for all tasks to be ready
* \param wait_tasks non 0 to make new created tasks to wait, 0 to let created tasks to run
**/
void itti_wait_ready(int wait_tasks);
/** \brief Mark the task as in ready state /** \brief Mark the task as in ready state
* \param task_id task to mark as ready * \param task_id task to mark as ready
**/ **/
......
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
int create_tasks(uint32_t enb_nb, uint32_t ue_nb) int create_tasks(uint32_t enb_nb, uint32_t ue_nb)
{ {
itti_wait_ready(1);
# ifdef OPENAIR2 # ifdef OPENAIR2
{ {
# if defined(ENABLE_USE_MME) # if defined(ENABLE_USE_MME)
...@@ -110,6 +111,8 @@ int create_tasks(uint32_t enb_nb, uint32_t ue_nb) ...@@ -110,6 +111,8 @@ int create_tasks(uint32_t enb_nb, uint32_t ue_nb)
return -1; return -1;
} }
} }
itti_wait_ready(0);
return 0; return 0;
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment