diff --git a/common/utils/itti/intertask_interface.c b/common/utils/itti/intertask_interface.c index 1780a9b967a6e8ad0c1332608308a6191f1db55a..22690ceeecb904942402a7eb0f7694e36db6476b 100644 --- a/common/utils/itti/intertask_interface.c +++ b/common/utils/itti/intertask_interface.c @@ -50,7 +50,7 @@ #include "signals.h" #include "timer.h" -int itti_debug = 1; +int itti_debug = 0; #define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \ while(0) @@ -131,10 +131,7 @@ int itti_send_broadcast_message(MessageDef *message_p) { int ret = 0; int result; - if (message_p == NULL) { - ITTI_ERROR("Message to broadcast is NULL (%s:%d)\n", __FILE__, __LINE__); - return -1; - } + DevAssert(message_p != NULL); origin_thread_id = TASK_GET_THREAD_ID(message_p->header.originTaskId); @@ -146,19 +143,11 @@ int itti_send_broadcast_message(MessageDef *message_p) { /* Skip tasks which are not running */ if (itti_desc.tasks[i].task_state == TASK_STATE_READY) { new_message_p = malloc (sizeof(MessageDef)); + DevAssert(message_p != NULL); - if (new_message_p == NULL) { - ITTI_ERROR("Failed to allocate memory (%s:%d)\n", __FILE__, __LINE__); - return -1; - } memcpy (new_message_p, message_p, sizeof(MessageDef)); result = itti_send_msg_to_task (TASK_SHIFT_THREAD_ID(i), INSTANCE_DEFAULT, new_message_p); - if (result < 0) { - ITTI_ERROR("Failed to send broadcast message (%s) to queue (%u:%s)\n", - itti_desc.messages_info[message_p->header.messageId].name, i, itti_desc.threads_name[i]); - ret = result; - free (new_message_p); - } + DevCheck(result >= 0, message_p->header.messageId, i, 0); } } } @@ -170,17 +159,10 @@ int itti_send_broadcast_message(MessageDef *message_p) { inline MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id) { MessageDef *temp = NULL; - if (message_id >= itti_desc.messages_id_max) { - ITTI_ERROR("Invalid message id %d (%s:%d)\n", message_id, __FILE__, __LINE__); - return NULL; - } + DevCheck(message_id < itti_desc.messages_id_max, message_id, itti_desc.messages_id_max, 0); temp = calloc (1, MESSAGE_SIZE(message_id)); - - if (temp == NULL) { - ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n", __FILE__, __LINE__); - return NULL; - } + DevAssert(temp != NULL); temp->header.messageId = message_id; temp->header.originTaskId = origin_task_id; @@ -196,16 +178,12 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me message_number_t message_number; uint32_t message_id; - if (thread_id >= itti_desc.thread_max) { - return -1; - } + DevAssert(message != NULL); + DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); message->header.destinationTaskId = task_id; message->header.instance = instance; message_id = message->header.messageId; - - DevAssert(message != NULL); - DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0); DevCheck(message_id < itti_desc.messages_id_max, itti_desc.messages_id_max, message_id, 0); priority = itti_get_message_priority (message_id); @@ -223,10 +201,8 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me itti_desc.tasks[thread_id].message_in_queue); /* Allocate new list element */ - if ((new = (struct message_list_s *) malloc (sizeof(struct message_list_s))) == NULL) { - ITTI_ERROR("Cannot allocate memory for new message (%s:%d)\n", __FILE__, __LINE__); - return -1; - } + new = (struct message_list_s *) malloc (sizeof(struct message_list_s)); + DevAssert(new != NULL); /* Increment the global message number */ message_number = itti_increment_message_number (); @@ -285,7 +261,7 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) { thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0); + DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); DevAssert(received_msg != NULL); // Lock the mutex to get exclusive access to the list @@ -318,7 +294,7 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) { void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg) { thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0); + DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); DevAssert(received_msg != NULL); *received_msg = NULL; @@ -359,24 +335,17 @@ void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) { thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); + int result; DevAssert(start_routine != NULL); - DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0); - - if (itti_desc.tasks[thread_id].task_state != TASK_STATE_NOT_CONFIGURED) { - ITTI_ERROR("You are attempting to start an already configured thread" - " for %s thread\n", - itti_desc.threads_name[thread_id]); - return -1; - } + DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, task_id, thread_id, + itti_desc.tasks[thread_id].task_state); itti_desc.tasks[thread_id].task_state = TASK_STATE_STARTING; - if (pthread_create (&itti_desc.tasks[thread_id].task_thread, NULL, start_routine, args_p) < 0) { - ITTI_ERROR("Failed to initialize %s thread: %s:%d\n", - itti_desc.threads_name[thread_id], strerror(errno), errno); - return -1; - } + result = pthread_create (&itti_desc.tasks[thread_id].task_thread, NULL, start_routine, args_p); + DevCheck(result>= 0, task_id, thread_id, result); /* Wait till the thread is completely ready */ while (itti_desc.tasks[thread_id].task_state != TASK_STATE_READY) @@ -387,10 +356,12 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar void itti_mark_task_ready(task_id_t task_id) { thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); - DevCheck(thread_id < itti_desc.thread_max, thread_id, 0, 0); + DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); + // Lock the mutex to get exclusive access to the list pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex); itti_desc.tasks[thread_id].task_state = TASK_STATE_READY; + // Release the mutex pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex); } @@ -400,7 +371,7 @@ void itti_exit_task(void) { } void itti_terminate_tasks(task_id_t task_id) { - // Sends Terminate signals to all tasks. +// Sends Terminate signals to all tasks. itti_send_terminate_message (task_id); if (itti_desc.thread_handling_signals >= 0) { @@ -415,6 +386,8 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * int i; itti_desc.message_number = 0; + ITTI_DEBUG( "Init: %d threads, %d messages\n", thread_max, messages_id_max); + CHECK_INIT_RETURN(signal_init()); /* Saves threads and messages max values */ @@ -431,8 +404,10 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char * for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) { STAILQ_INIT (&itti_desc.tasks[i].message_queue); itti_desc.tasks[i].message_in_queue = 0; + // Initialize mutexes pthread_mutex_init (&itti_desc.tasks[i].message_queue_mutex, NULL); + // Initialize Cond vars pthread_cond_init (&itti_desc.tasks[i].message_queue_cond_var, NULL); itti_desc.tasks[i].task_state = TASK_STATE_NOT_CONFIGURED; diff --git a/common/utils/itti/intertask_interface.h b/common/utils/itti/intertask_interface.h index 7856cee9efcd577f8e5ec86a897d4d7c37cd7fbb..3533136c3b0cbb8ff2f258f7f85085d91bfdc882 100644 --- a/common/utils/itti/intertask_interface.h +++ b/common/utils/itti/intertask_interface.h @@ -44,6 +44,9 @@ #include "intertask_interface_conf.h" #include "intertask_interface_types.h" +#define ITTI_MSG_NAME(mSGpTR) itti_get_message_name((mSGpTR)->header.messageId) +#define ITTI_MSG_INSTANCE(mSGpTR) (mSGpTR)->header.instance + /* Make the message number platform specific */ typedef unsigned long message_number_t; #define MESSAGE_NUMBER_SIZE (sizeof(unsigned long)) diff --git a/common/utils/itti/intertask_interface_dump.c b/common/utils/itti/intertask_interface_dump.c index 023c6b9ae55df8eaa753adb1b4a2889a362b4162..3721589ae12bd5461e41d4a4cb7521d9cf222208 100644 --- a/common/utils/itti/intertask_interface_dump.c +++ b/common/utils/itti/intertask_interface_dump.c @@ -60,7 +60,7 @@ /* Declared in intertask_interface.c */ extern int itti_debug; -#define ITTI_DEBUG(x, args...) do { fprintf(stdout, "[ITTI][D]"x, ##args); } \ +#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \ while(0) #define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \ while(0)