/*
 * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The OpenAirInterface Software Alliance licenses this file to You under
 * the OAI Public License, Version 1.1  (the "License"); you may not use this file
 * except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.openairinterface.org/?page_id=698
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *-------------------------------------------------------------------------------
 * For more information about the OpenAirInterface (OAI) Software Alliance:
 *      contact@openairinterface.org
 */

#define _GNU_SOURCE
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <signal.h>

#include <sys/epoll.h>
#include <sys/eventfd.h>

#if !defined(TRUE)
#define TRUE 1
#endif

#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

#if T_TRACER
#include "T.h"
#endif

/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

#include "signals.h"
#include "timer.h"

/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

const int itti_debug = (ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS);

# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) {fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout);} } while(0);
#define ITTI_ERROR(x, args...)      do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } while(0);

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

typedef enum task_state_s {
  TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
typedef struct message_list_s {
  MessageDef *msg; ///< Pointer to the message

  message_number_t message_number; ///< Unique message number
  uint32_t message_priority; ///< Message priority
} message_list_t;

typedef struct thread_desc_s {
  /* pthread associated with the thread */
  pthread_t task_thread;

  /* State of the thread */
  volatile task_state_t task_state;

  /* This fd is used internally by ITTI. */
  int epoll_fd;

  /* The thread fd */
  int task_event_fd;

  /* Number of events to monitor */
  uint16_t nb_events;


  /* Array of events monitored by the task.
   * By default only one fd is monitored (the one used to received messages
   * from other tasks).
   * More events can be suscribed later by the task itself.
   */
  struct epoll_event *events;

  int epoll_nb_events;

  /* Flag to mark real time thread */
  unsigned real_time;

  /* Counter to indicate that messages are pending for the thread */
  unsigned messages_pending;
} thread_desc_t;

typedef struct task_desc_s {
  /* Queue of messages belonging to the task */
  struct lfds611_queue_state *message_queue;
} task_desc_t;

typedef struct itti_desc_s {
  thread_desc_t *threads;
  task_desc_t   *tasks;

  /* Current message number. Incremented every call to send_msg_to_task */
  message_number_t message_number __attribute__((aligned(8)));

  thread_id_t thread_max;
  task_id_t task_max;
  MessagesIds messages_id_max;

  boolean_t thread_handling_signals;
  pthread_t thread_ref;

  const task_info_t *tasks_info;
  const message_info_t *messages_info;

  itti_lte_time_t lte_time;

  int running;

  volatile uint32_t created_tasks;
  volatile uint32_t ready_tasks;
  volatile int      wait_tasks;
} itti_desc_t;

static itti_desc_t itti_desc;

void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
{
  void *ptr = NULL;

  ptr = malloc (size);
  if (ptr) memset(ptr,0,size);

  AssertFatal (ptr != NULL, "Memory allocation of %d bytes failed (%d -> %d)!\n", (int) size, origin_task_id, destination_task_id);

  return ptr;
}

int itti_free(task_id_t task_id, void *ptr)
{
  int result = EXIT_SUCCESS;
  AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);

  free (ptr);

  return (result);
}

static inline message_number_t itti_increment_message_number(void)
{
  /* Atomic operation supported by GCC: returns the current message number
   * and then increment it by 1.
   * This can be done without mutex.
   */
  return __sync_fetch_and_add (&itti_desc.message_number, 1);
}

static inline uint32_t itti_get_message_priority(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);

  return (itti_desc.messages_info[message_id].priority);
}

const char *itti_get_message_name(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);

  return (itti_desc.messages_info[message_id].name);
}

const char *itti_get_task_name(task_id_t task_id)
{
  if (itti_desc.task_max > 0) {
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  } else {
    return ("ITTI NOT INITIALIZED !!!");
  }

  return (itti_desc.tasks_info[task_id].name);
}

static task_id_t itti_get_current_task_id(void)
{
  task_id_t task_id;
  thread_id_t thread_id;
  pthread_t thread = pthread_self ();

  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    thread_id = TASK_GET_THREAD_ID(task_id);

    if (itti_desc.threads[thread_id].task_thread == thread) {
      return task_id;
    }
  }

  return TASK_UNKNOWN;
}

void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
  itti_desc.lte_time.frame = frame;
  itti_desc.lte_time.slot = slot;
}

int itti_send_broadcast_message(MessageDef *message_p)
{
  task_id_t destination_task_id;
  task_id_t origin_task_id;
  thread_id_t origin_thread_id;
  uint32_t thread_id;
  int ret = 0;
  int result;

  AssertFatal (message_p != NULL, "Trying to broadcast a NULL message!\n");

  origin_task_id = message_p->ittiMsgHeader.originTaskId;
  origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);

  destination_task_id = TASK_FIRST;

  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    MessageDef *new_message_p;

    while (thread_id != TASK_GET_THREAD_ID(destination_task_id)) {
      destination_task_id++;
    }

    /* Skip task that broadcast the message */
    if (thread_id != origin_thread_id) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        size_t size = sizeof(MessageHeader) + message_p->ittiMsgHeader.ittiMsgSize;
        new_message_p = itti_malloc( origin_task_id, destination_task_id, size );
        AssertFatal (new_message_p != NULL, "New message allocation failed!\n");

        memcpy( new_message_p, message_p, size );
        result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
        AssertFatal (result >= 0, "Failed to send message %d to thread %d (task %d)!\n", message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
      }
    }
  }

  result = itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
  AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

  return ret;
}

MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
{
  MessageDef *temp = NULL;

  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);

  if (origin_task_id == TASK_UNKNOWN) {
    /* Try to identify real origin task ID */
    origin_task_id = itti_get_current_task_id();
  }

  temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);

  temp->ittiMsgHeader.messageId = message_id;
  temp->ittiMsgHeader.originTaskId = origin_task_id;
  temp->ittiMsgHeader.ittiMsgSize = size;

  return temp;
}

MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
{
  return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
}

int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
{
  thread_id_t destination_thread_id;
  task_id_t origin_task_id;
  message_list_t *new;
  uint32_t priority;
  message_number_t message_number;
  uint32_t message_id;

  AssertFatal (message != NULL, "Message is NULL!\n");
  AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);

  destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
  message->ittiMsgHeader.destinationTaskId = destination_task_id;
  message->ittiMsgHeader.instance = instance;
  message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
  message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
  message_id = message->ittiMsgHeader.messageId;
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);

  origin_task_id = ITTI_MSG_ORIGIN_ID(message);

  priority = itti_get_message_priority (message_id);

  /* Increment the global message number */
  message_number = itti_increment_message_number ();

#if 0
  /* itti dump is disabled */
  itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
                           sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
#endif

  if (destination_task_id != TASK_UNKNOWN) {

    if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) {
      ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    } else {
      /* We cannot send a message if the task is not running */
      AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY,
                   "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n",
                   itti_get_task_name(origin_task_id),
                   itti_desc.messages_info[message_id].name,
                   message_id,
                   destination_thread_id,
                   itti_desc.threads[destination_thread_id].task_state);

      /* Allocate new list element */
      new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));

      /* Fill in members */
      new->msg = message;
      new->message_number = message_number;
      new->message_priority = priority;

      /* Enqueue message in destination task queue */
      if (lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new) == 0) {
        AssertFatal(0, "Error: lfds611_queue_enqueue returns 0, queue is full, exiting\n");
      }

      {
        /* Only use event fd for tasks, subtasks will pool the queue */
        if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) {
          ssize_t write_ret;
          eventfd_t sem_counter = 1;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                       destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
        }
      }

      ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    }
  } else {
    /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
    int result = itti_free(origin_task_id, message);
    AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
  }

  return 0;
}

/* same as itti_send_msg_to_task but returns -1 in case of failure instead of crashing */
/* TODO: this is a hack - the whole logic needs a proper rework. */
/* look for HACK_RLC_UM_LIMIT for others places related to the hack. Please do not remove this comment. */
int itti_try_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
{
  thread_id_t destination_thread_id;
  task_id_t origin_task_id;
  message_list_t *new;
  uint32_t priority;
  message_number_t message_number;
  uint32_t message_id;

  AssertFatal (message != NULL, "Message is NULL!\n");
  AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);

  destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
  message->ittiMsgHeader.destinationTaskId = destination_task_id;
  message->ittiMsgHeader.instance = instance;
  message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
  message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
  message_id = message->ittiMsgHeader.messageId;
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);

  origin_task_id = ITTI_MSG_ORIGIN_ID(message);

  priority = itti_get_message_priority (message_id);

  /* Increment the global message number */
  message_number = itti_increment_message_number ();

#if 0
  /* itti dump is disabled */
  itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
                           sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
#endif

  if (destination_task_id != TASK_UNKNOWN) {

    if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) {
      ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    } else {
      /* We cannot send a message if the task is not running */
      AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY,
                   "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n",
                   itti_get_task_name(origin_task_id),
                   itti_desc.messages_info[message_id].name,
                   message_id,
                   destination_thread_id,
                   itti_desc.threads[destination_thread_id].task_state);

      /* Allocate new list element */
      new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));

      /* Fill in members */
      new->msg = message;
      new->message_number = message_number;
      new->message_priority = priority;

      /* Enqueue message in destination task queue */
      if (lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new) == 0) {
        itti_free(origin_task_id, new);
        return -1;
      }

      {
        /* Only use event fd for tasks, subtasks will pool the queue */
        if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) {
          ssize_t write_ret;
          eventfd_t sem_counter = 1;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                       destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
        }
      }

      ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    }
  } else {
    /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
    int result = itti_free(origin_task_id, message);
    AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
  }

  return 0;
}

void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
  thread_id_t thread_id;
  struct epoll_event event;

  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);

  thread_id = TASK_GET_THREAD_ID(task_id);
  itti_desc.threads[thread_id].nb_events++;

  /* Reallocate the events */
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));

  event.events  = EPOLLIN | EPOLLERR;
  event.data.u64 = 0;
  event.data.fd  = fd;

  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
                &event) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }

  ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
  thread_id_t thread_id;

  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (fd >= 0, "File descriptor (%d) is invalid!\n", fd);

  thread_id = TASK_GET_THREAD_ID(task_id);

  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }

  itti_desc.threads[thread_id].nb_events--;
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
  thread_id_t thread_id;

  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);

  thread_id = TASK_GET_THREAD_ID(task_id);
  *events = itti_desc.threads[thread_id].events;

  return itti_desc.threads[thread_id].epoll_nb_events;
}

static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
  thread_id_t thread_id;
  int epoll_ret = 0;
  int epoll_timeout = 0;
  int i;

  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (received_msg != NULL, "Received message is NULL!\n");

  thread_id = TASK_GET_THREAD_ID(task_id);
  *received_msg = NULL;

  if (polling) {
    /* In polling mode we set the timeout to 0 causing epoll_wait to return
     * immediately.
     */
    epoll_timeout = 0;
  } else {
    /* timeout = -1 causes the epoll_wait to wait indefinitely.
     */
    epoll_timeout = -1;
  }

  do {
    epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                           itti_desc.threads[thread_id].events,
                           itti_desc.threads[thread_id].nb_events,
                           epoll_timeout);
  } while (epoll_ret < 0 && errno == EINTR);

  if (epoll_ret < 0) {
    AssertFatal (0, "epoll_wait failed for task %s: %s!\n", itti_get_task_name(task_id), strerror(errno));
  }

  if (epoll_ret == 0 && polling) {
    /* No data to read -> return */
    return;
  }

  itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;

  for (i = 0; i < epoll_ret; i++) {
    /* Check if there is an event for ITTI for the event fd */
    if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
        (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd)) {
      struct message_list_s *message = NULL;
      eventfd_t   sem_counter;
      ssize_t     read_ret;
      int         result;

      /* Read will always return 1 */
      read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
      AssertFatal (read_ret == sizeof(sem_counter), "Read from task message FD (%d) failed (%d/%d)!\n", thread_id, (int) read_ret, (int) sizeof(sem_counter));

      if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
        /* No element in list -> this should not happen */
        AssertFatal (0, "No message in queue for task %d while there are %d events and some for the messages queue!\n", task_id, epoll_ret);
      }

      AssertFatal(message != NULL, "Message from message queue is NULL!\n");
      *received_msg = message->msg;


      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);


      /* Mark that the event has been processed */
      itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
      return;
    }
  }
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{

  itti_receive_msg_internal_event_fd(task_id, 0, received_msg);

}

void itti_poll_msg(task_id_t task_id, MessageDef **received_msg)
{
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);

  *received_msg = NULL;

  {
    struct message_list_s *message;

    if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1) {
      int result;

      *received_msg = message->msg;
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
    }
  }

  if (*received_msg == NULL) {
    ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
  }

}

int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p)
{
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
  int result;

  AssertFatal (start_routine != NULL, "Start routine is NULL!\n");
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
  AssertFatal (itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, "Task %d, thread %d state is not correct (%d)!\n",
               task_id, thread_id, itti_desc.threads[thread_id].task_state);

  itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;

  ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));

  result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
  AssertFatal (result >= 0, "Thread creation for task %d, thread %d failed (%d)!\n", task_id, thread_id, result);
  char name[16];
  snprintf( name, sizeof(name), "ITTI %d", thread_id );
  pthread_setname_np( itti_desc.threads[thread_id].task_thread, name );

  itti_desc.created_tasks ++;

  /* Wait till the thread is completely ready */
  while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
    usleep (1000);

  return 0;
}

void itti_set_task_real_time(task_id_t task_id)
{
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

  DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);

  itti_desc.threads[thread_id].real_time = TRUE;
}

void itti_wait_ready(int wait_tasks)
{
  itti_desc.wait_tasks = wait_tasks;

  ITTI_DEBUG(ITTI_DEBUG_INIT,
             " wait for tasks: %s, created tasks %d, ready tasks %d\n",
             itti_desc.wait_tasks ? "yes" : "no",
             itti_desc.created_tasks,
             itti_desc.ready_tasks);

  AssertFatal (itti_desc.created_tasks == itti_desc.ready_tasks, "Number of created tasks (%d) does not match ready tasks (%d), wait task %d!\n",
               itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
}

void itti_mark_task_ready(task_id_t task_id)
{
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);

#if 0
  /* itti dump is disabled */
  /* Register the thread in itti dump */
  itti_dump_thread_use_ring_buffer();
#endif

  /* Mark the thread as using LFDS queue */
  lfds611_queue_use(itti_desc.tasks[task_id].message_queue);

  itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
  itti_desc.ready_tasks ++;

  while (itti_desc.wait_tasks != 0) {
    usleep (10000);
  }

  ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
}

void itti_exit_task(void)
{
  task_id_t task_id = itti_get_current_task_id();
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);

#if defined(OAI_EMU) || defined(RTAI)
  if (task_id > TASK_UNKNOWN) {
    VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
  }
#endif

  itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
  itti_desc.created_tasks--;
  ITTI_DEBUG(ITTI_DEBUG_EXIT, "Thread for task %s (%d) exits\n", itti_get_task_name(task_id), task_id);
  pthread_exit (NULL);
}

void itti_terminate_tasks(task_id_t task_id)
{
  // Sends Terminate signals to all tasks.
  itti_send_terminate_message (task_id);

  if (itti_desc.thread_handling_signals) {
    pthread_kill (itti_desc.thread_ref, SIGUSR1);
  }

  pthread_exit (NULL);
}

int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name)
{
  task_id_t task_id;
  thread_id_t thread_id;
  int ret;

  itti_desc.message_number = 1;

  ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);

  CHECK_INIT_RETURN(signal_mask());

  /* Saves threads and messages max values */
  itti_desc.task_max = task_max;
  itti_desc.thread_max = thread_max;
  itti_desc.messages_id_max = messages_id_max;
  itti_desc.thread_handling_signals = FALSE;
  itti_desc.tasks_info = tasks_info;
  itti_desc.messages_info = messages_info;

  /* Allocates memory for tasks info */
  itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

  /* Allocates memory for threads info */
  itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));

  /* Initializing each queue and related stuff */
  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
               itti_desc.tasks_info[task_id].name,
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
               itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);

    ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);

    if (0 == ret) {
      AssertFatal (0, "lfds611_queue_new failed for task %s!\n", itti_get_task_name(task_id));
    }
  }

  /* Initializing each thread */
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;

    itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);

    if (itti_desc.threads[thread_id].epoll_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, "Failed to create new epoll fd: %s!\n", strerror(errno));
    }

    itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);

    if (itti_desc.threads[thread_id].task_event_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, " eventfd failed: %s!\n", strerror(errno));
    }

    itti_desc.threads[thread_id].nb_events = 1;

    itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));

    itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
    itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;

    /* Add the event fd to the list of monitored events */
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
                  itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0) {
      /* Always assert on this condition */
      AssertFatal (0, " epoll_ctl (EPOLL_CTL_ADD) failed: %s!\n", strerror(errno));
    }

    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
               itti_desc.threads[thread_id].task_event_fd, thread_id);

  }

  itti_desc.running = 1;
  itti_desc.wait_tasks = 0;
  itti_desc.created_tasks = 0;
  itti_desc.ready_tasks = 0;
#if 0
  /* itti dump is disabled */
  itti_dump_init (messages_definition_xml, dump_file_name);
#endif

  CHECK_INIT_RETURN(timer_init ());

  return 0;
}

void itti_wait_tasks_end(void)
{
  int end = 0;
  int thread_id;
  task_id_t task_id;
  int ready_tasks;
  int result;
  int retries = 10;

  itti_desc.thread_handling_signals = TRUE;
  itti_desc.thread_ref=pthread_self ();

  /* Handle signals here */
  while (end == 0) {
    signal_handle (&end);
  }

  printf("closing all tasks\n");
  sleep(1);

  do {
    ready_tasks = 0;

    task_id = TASK_FIRST;

    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        while (thread_id != TASK_GET_THREAD_ID(task_id)) {
          task_id++;
        }

        result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);

        ITTI_DEBUG(ITTI_DEBUG_EXIT, " Thread %s join status %d\n", itti_get_task_name(task_id), result);

        if (result == 0) {
          /* Thread has terminated */
          itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
        } else {
          /* Thread is still running, count it */
          ready_tasks++;
        }
      }
    }

    if (ready_tasks > 0) {
      usleep (100 * 1000);
    }
  } while ((ready_tasks > 0) && (retries--)&& (!end) );

  printf("ready_tasks %d\n",ready_tasks);

  itti_desc.running = 0;

  if (ready_tasks > 0) {
    ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Some threads are still running, force exit\n");
    exit (0);
  }

#if 0
  /* itti dump is disabled */
  itti_dump_exit();
#endif
}

void itti_send_terminate_message(task_id_t task_id)
{
  MessageDef *terminate_message_p;

  terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);

  itti_send_broadcast_message (terminate_message_p);
}