intertask_interface.c 35.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (c) 2015, EURECOM (www.eurecom.fr)
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this
 *    list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *
 * The views and conclusions contained in the software and documentation are those
 * of the authors and should not be interpreted as representing official policies,
 * either expressed or implied, of the FreeBSD Project.
 */
Cedric Roux's avatar
 
Cedric Roux committed
29

30
#define _GNU_SOURCE
Cedric Roux's avatar
 
Cedric Roux committed
31 32 33 34 35 36
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
37
#include <signal.h>
Cedric Roux's avatar
 
Cedric Roux committed
38

39 40
#include <sys/epoll.h>
#include <sys/eventfd.h>
41

42 43 44 45
#ifdef RTAI
# include <rtai_shm.h>
#endif

Lionel Gauthier's avatar
Lionel Gauthier committed
46 47 48 49
#if !defined(TRUE)
#define TRUE 1
#endif

50 51 52 53 54 55
#include "liblfds611.h"

#include "assertions.h"
#include "intertask_interface.h"
#include "intertask_interface_dump.h"

56
#if defined(OAI_EMU) || defined(RTAI)
57
# include "memory_pools.h"
58 59 60
# include "vcd_signal_dumper.h"
#endif

61 62 63 64
#if T_TRACER
#include "T.h"
#endif

65 66 67
/* Includes "intertask_interface_init.h" to check prototype coherence, but
 * disable threads and messages information generation.
 */
Cedric Roux's avatar
 
Cedric Roux committed
68 69 70 71
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY

72
#include "signals.h"
Cedric Roux's avatar
 
Cedric Roux committed
73 74
#include "timer.h"

75 76 77 78 79 80 81 82
#ifdef RTAI
# include <rtai.h>
# include <rtai_fifos.h>
#    define FIFO_PRINTF_MAX_STRING_SIZE 1000
#    define FIFO_PRINTF_NO              62
#    define FIFO_PRINTF_SIZE            65536
#endif

83 84 85 86 87 88 89 90 91 92
/* ITTI DEBUG groups */
#define ITTI_DEBUG_POLL             (1<<0)
#define ITTI_DEBUG_SEND             (1<<1)
#define ITTI_DEBUG_EVEN_FD          (1<<2)
#define ITTI_DEBUG_INIT             (1<<3)
#define ITTI_DEBUG_EXIT             (1<<4)
#define ITTI_DEBUG_ISSUES           (1<<5)
#define ITTI_DEBUG_MP_STATISTICS    (1<<6)

const int itti_debug = ITTI_DEBUG_ISSUES | ITTI_DEBUG_MP_STATISTICS;
Cedric Roux's avatar
 
Cedric Roux committed
93

94 95
/* Don't flush if using RTAI */
#ifdef RTAI
96
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) rt_log_debug (x, ##args); } while(0);
97
#else
98
# define ITTI_DEBUG(m, x, args...)  do { if ((m) & itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); fflush (stdout); } while(0);
99
#endif
100
#define ITTI_ERROR(x, args...)      do { fprintf(stdout, "[ITTI][E]"x, ##args); fflush (stdout); } while(0);
Cedric Roux's avatar
 
Cedric Roux committed
101 102 103 104

/* Global message size */
#define MESSAGE_SIZE(mESSAGEiD) (sizeof(MessageHeader) + itti_desc.messages_info[mESSAGEiD].size)

105 106 107 108 109
#ifdef RTAI
# define ITTI_MEM_PAGE_SIZE (1024)
# define ITTI_MEM_SIZE      (16 * 1024 * 1024)
#endif

Cedric Roux's avatar
 
Cedric Roux committed
110
typedef enum task_state_s {
111
  TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
Cedric Roux's avatar
 
Cedric Roux committed
112 113 114
} task_state_t;

/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
115
typedef struct message_list_s {
116
  MessageDef *msg; ///< Pointer to the message
Cedric Roux's avatar
 
Cedric Roux committed
117

118 119
  message_number_t message_number; ///< Unique message number
  uint32_t message_priority; ///< Message priority
120
} message_list_t;
Cedric Roux's avatar
 
Cedric Roux committed
121

122
typedef struct thread_desc_s {
123 124
  /* pthread associated with the thread */
  pthread_t task_thread;
125

126 127
  /* State of the thread */
  volatile task_state_t task_state;
128

129 130
  /* This fd is used internally by ITTI. */
  int epoll_fd;
131

132 133
  /* The thread fd */
  int task_event_fd;
134

135 136
  /* Number of events to monitor */
  uint16_t nb_events;
137

138

139 140 141 142 143 144
  /* Array of events monitored by the task.
   * By default only one fd is monitored (the one used to received messages
   * from other tasks).
   * More events can be suscribed later by the task itself.
   */
  struct epoll_event *events;
145

146
  int epoll_nb_events;
147

148 149 150
  //#ifdef RTAI
  /* Flag to mark real time thread */
  unsigned real_time;
151

152 153 154
  /* Counter to indicate from RTAI threads that messages are pending for the thread */
  unsigned messages_pending;
  //#endif
155 156 157
} thread_desc_t;

typedef struct task_desc_s {
158 159
  /* Queue of messages belonging to the task */
  struct lfds611_queue_state *message_queue;
Cedric Roux's avatar
 
Cedric Roux committed
160 161
} task_desc_t;

162
typedef struct itti_desc_s {
163 164
  thread_desc_t *threads;
  task_desc_t   *tasks;
165

166 167
  /* Current message number. Incremented every call to send_msg_to_task */
  message_number_t message_number __attribute__((aligned(8)));
Cedric Roux's avatar
 
Cedric Roux committed
168

169 170 171
  thread_id_t thread_max;
  task_id_t task_max;
  MessagesIds messages_id_max;
Cedric Roux's avatar
 
Cedric Roux committed
172

173 174
  boolean_t thread_handling_signals;
  pthread_t thread_ref;
175

176 177
  const task_info_t *tasks_info;
  const message_info_t *messages_info;
Cedric Roux's avatar
 
Cedric Roux committed
178

179
  itti_lte_time_t lte_time;
180

181
  int running;
182

183 184 185
  volatile uint32_t created_tasks;
  volatile uint32_t ready_tasks;
  volatile int      wait_tasks;
186
#ifdef RTAI
187
  pthread_t rt_relay_thread;
188
#endif
189 190

#if defined(OAI_EMU) || defined(RTAI)
191
  memory_pools_handle_t memory_pools_handle;
192

193 194 195
  uint64_t vcd_poll_msg;
  uint64_t vcd_receive_msg;
  uint64_t vcd_send_msg;
196
#endif
197 198 199
} itti_desc_t;

static itti_desc_t itti_desc;
Cedric Roux's avatar
 
Cedric Roux committed
200

201
void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size)
202
{
203
  void *ptr = NULL;
204

205
#if defined(OAI_EMU) || defined(RTAI)
206 207 208 209 210 211 212 213
  ptr = memory_pools_allocate (itti_desc.memory_pools_handle, size, origin_task_id, destination_task_id);

  if (ptr == NULL) {
    char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

    ITTI_ERROR (" Memory pools statistics:\n%s", statistics);
    free (statistics);
  }
214

winckel's avatar
winckel committed
215
#else
216
  ptr = malloc (size);
217
#endif
winckel's avatar
winckel committed
218

219
  AssertFatal (ptr != NULL, "Memory allocation of %d bytes failed (%d -> %d)!\n", (int) size, origin_task_id, destination_task_id);
220

221
  return ptr;
222 223
}

224
int itti_free(task_id_t task_id, void *ptr)
225
{
226 227
  int result = EXIT_SUCCESS;
  AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id);
228 229

#if defined(OAI_EMU) || defined(RTAI)
230
  result = memory_pools_free (itti_desc.memory_pools_handle, ptr, task_id);
231

232
  AssertError (result == EXIT_SUCCESS, {}, "Failed to free memory at %p (%d)!\n", ptr, task_id);
233
#else
234
  free (ptr);
235
#endif
236

237
  return (result);
238 239
}

240 241 242 243 244 245 246
static inline message_number_t itti_increment_message_number(void)
{
  /* Atomic operation supported by GCC: returns the current message number
   * and then increment it by 1.
   * This can be done without mutex.
   */
  return __sync_fetch_and_add (&itti_desc.message_number, 1);
Cedric Roux's avatar
 
Cedric Roux committed
247 248
}

249 250 251
static inline uint32_t itti_get_message_priority(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
 
Cedric Roux committed
252

253
  return (itti_desc.messages_info[message_id].priority);
Cedric Roux's avatar
 
Cedric Roux committed
254 255
}

256 257 258
const char *itti_get_message_name(MessagesIds message_id)
{
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
 
Cedric Roux committed
259

260
  return (itti_desc.messages_info[message_id].name);
Cedric Roux's avatar
 
Cedric Roux committed
261 262
}

263
const char *itti_get_task_name(task_id_t task_id)
Cedric Roux's avatar
Cedric Roux committed
264
{
265 266 267 268 269
  if (itti_desc.task_max > 0) {
    AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  } else {
    return ("ITTI NOT INITIALIZED !!!");
  }
Cedric Roux's avatar
Cedric Roux committed
270

271
  return (itti_desc.tasks_info[task_id].name);
Cedric Roux's avatar
Cedric Roux committed
272 273
}

274
static task_id_t itti_get_current_task_id(void)
275
{
276 277 278 279 280 281 282 283 284
  task_id_t task_id;
  thread_id_t thread_id;
  pthread_t thread = pthread_self ();

  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    thread_id = TASK_GET_THREAD_ID(task_id);

    if (itti_desc.threads[thread_id].task_thread == thread) {
      return task_id;
285
    }
286
  }
287

288
  return TASK_UNKNOWN;
289 290
}

291 292 293
#ifdef RTAI
static void rt_log_debug(char *format, ...)
{
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
  task_id_t   task_id;
  va_list     args;
  char        log_buffer[FIFO_PRINTF_MAX_STRING_SIZE];
  int         len;

  task_id = itti_get_current_task_id ();
  len = snprintf(log_buffer, FIFO_PRINTF_MAX_STRING_SIZE-1, "[ITTI][D][%s]", itti_get_task_name(task_id));
  va_start(args, format);
  len += vsnprintf(&log_buffer[len], FIFO_PRINTF_MAX_STRING_SIZE-1-len, format, args);
  va_end (args);

  if (task_id != TASK_UNKNOWN)
    fwrite(log_buffer, len, 1, stdout);
  else
    rtf_put (FIFO_PRINTF_NO, log_buffer, len);
309 310 311
}
#endif

312 313
void itti_update_lte_time(uint32_t frame, uint8_t slot)
{
314 315
  itti_desc.lte_time.frame = frame;
  itti_desc.lte_time.slot = slot;
316 317
}

318 319 320 321 322 323 324 325 326 327
int itti_send_broadcast_message(MessageDef *message_p)
{
  task_id_t destination_task_id;
  task_id_t origin_task_id;
  thread_id_t origin_thread_id;
  uint32_t thread_id;
  int ret = 0;
  int result;

  AssertFatal (message_p != NULL, "Trying to broadcast a NULL message!\n");
Cedric Roux's avatar
 
Cedric Roux committed
328

329 330
  origin_task_id = message_p->ittiMsgHeader.originTaskId;
  origin_thread_id = TASK_GET_THREAD_ID(origin_task_id);
Cedric Roux's avatar
 
Cedric Roux committed
331

332
  destination_task_id = TASK_FIRST;
Cedric Roux's avatar
 
Cedric Roux committed
333

334 335
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    MessageDef *new_message_p;
336

337 338
    while (thread_id != TASK_GET_THREAD_ID(destination_task_id)) {
      destination_task_id++;
Cedric Roux's avatar
 
Cedric Roux committed
339 340
    }

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    /* Skip task that broadcast the message */
    if (thread_id != origin_thread_id) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        size_t size = sizeof(MessageHeader) + message_p->ittiMsgHeader.ittiMsgSize;
        new_message_p = itti_malloc( origin_task_id, destination_task_id, size );
        AssertFatal (new_message_p != NULL, "New message allocation failed!\n");

        memcpy( new_message_p, message_p, size );
        result = itti_send_msg_to_task (destination_task_id, INSTANCE_DEFAULT, new_message_p);
        AssertFatal (result >= 0, "Failed to send message %d to thread %d (task %d)!\n", message_p->ittiMsgHeader.messageId, thread_id, destination_task_id);
      }
    }
  }

  result = itti_free (ITTI_MSG_ORIGIN_ID(message_p), message_p);
  AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

  return ret;
Cedric Roux's avatar
 
Cedric Roux committed
360 361
}

362
MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size)
Cedric Roux's avatar
Cedric Roux committed
363
{
364
  MessageDef *temp = NULL;
Cedric Roux's avatar
 
Cedric Roux committed
365

366
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
 
Cedric Roux committed
367

368
#if defined(OAI_EMU) || defined(RTAI)
369
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, size);
370 371
#endif

372 373 374 375
  if (origin_task_id == TASK_UNKNOWN) {
    /* Try to identify real origin task ID */
    origin_task_id = itti_get_current_task_id();
  }
376

377
  temp = itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) + size);
Cedric Roux's avatar
 
Cedric Roux committed
378

379 380 381
  temp->ittiMsgHeader.messageId = message_id;
  temp->ittiMsgHeader.originTaskId = origin_task_id;
  temp->ittiMsgHeader.ittiMsgSize = size;
Cedric Roux's avatar
 
Cedric Roux committed
382

383
#if defined(OAI_EMU) || defined(RTAI)
384
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_ALLOC_MSG, 0);
385 386
#endif

387
  return temp;
Cedric Roux's avatar
 
Cedric Roux committed
388 389
}

390
MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id)
Cedric Roux's avatar
Cedric Roux committed
391
{
392
  return itti_alloc_new_message_sized(origin_task_id, message_id, itti_desc.messages_info[message_id].size);
Cedric Roux's avatar
Cedric Roux committed
393 394
}

395
int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message)
Cedric Roux's avatar
Cedric Roux committed
396
{
397 398 399 400 401 402
  thread_id_t destination_thread_id;
  task_id_t origin_task_id;
  message_list_t *new;
  uint32_t priority;
  message_number_t message_number;
  uint32_t message_id;
Cedric Roux's avatar
 
Cedric Roux committed
403

winckel's avatar
winckel committed
404
#if defined(OAI_EMU) || defined(RTAI)
405
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
406
                                          __sync_or_and_fetch (&itti_desc.vcd_send_msg, 1L << destination_task_id));
winckel's avatar
winckel committed
407 408
#endif

409 410
  AssertFatal (message != NULL, "Message is NULL!\n");
  AssertFatal (destination_task_id < itti_desc.task_max, "Destination task id (%d) is out of range (%d)\n", destination_task_id, itti_desc.task_max);
Cedric Roux's avatar
 
Cedric Roux committed
411

412 413 414 415 416 417 418
  destination_thread_id = TASK_GET_THREAD_ID(destination_task_id);
  message->ittiMsgHeader.destinationTaskId = destination_task_id;
  message->ittiMsgHeader.instance = instance;
  message->ittiMsgHeader.lte_time.frame = itti_desc.lte_time.frame;
  message->ittiMsgHeader.lte_time.slot = itti_desc.lte_time.slot;
  message_id = message->ittiMsgHeader.messageId;
  AssertFatal (message_id < itti_desc.messages_id_max, "Message id (%d) is out of range (%d)!\n", message_id, itti_desc.messages_id_max);
Cedric Roux's avatar
 
Cedric Roux committed
419

420
  origin_task_id = ITTI_MSG_ORIGIN_ID(message);
421

422
  priority = itti_get_message_priority (message_id);
Cedric Roux's avatar
 
Cedric Roux committed
423

424 425
  /* Increment the global message number */
  message_number = itti_increment_message_number ();
Cedric Roux's avatar
 
Cedric Roux committed
426

427 428
  itti_dump_queue_message (origin_task_id, message_number, message, itti_desc.messages_info[message_id].name,
                           sizeof(MessageHeader) + message->ittiMsgHeader.ittiMsgSize);
429

430
  if (destination_task_id != TASK_UNKNOWN) {
431
#if defined(OAI_EMU) || defined(RTAI)
432
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
433

434
    memory_pools_set_info (itti_desc.memory_pools_handle, message, 1, destination_task_id);
winckel's avatar
winckel committed
435 436
#endif

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
    if (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_ENDED) {
      ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Message %s, number %lu with priority %d can not be sent from %s to queue (%u:%s), ended destination task!\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
    } else {
      /* We cannot send a message if the task is not running */
      AssertFatal (itti_desc.threads[destination_thread_id].task_state == TASK_STATE_READY,
                   "Task %s Cannot send message %s (%d) to thread %d, it is not in ready state (%d)!\n",
                   itti_get_task_name(origin_task_id),
                   itti_desc.messages_info[message_id].name,
                   message_id,
                   destination_thread_id,
                   itti_desc.threads[destination_thread_id].task_state);

      /* Allocate new list element */
      new = (message_list_t *) itti_malloc (origin_task_id, destination_task_id, sizeof(struct message_list_s));

      /* Fill in members */
      new->msg = message;
      new->message_number = message_number;
      new->message_priority = priority;

      /* Enqueue message in destination task queue */
464 465 466
      if (lfds611_queue_enqueue(itti_desc.tasks[destination_task_id].message_queue, new) == 0) {
        AssertFatal(0, "Error: lfds611_queue_enqueue returns 0, queue is full, exiting\n");
      }
467

468
#if defined(OAI_EMU) || defined(RTAI)
469
      VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
winckel's avatar
winckel committed
470 471
#endif

472
#ifdef RTAI
473 474 475 476 477

      if (itti_desc.threads[TASK_GET_THREAD_ID(origin_task_id)].real_time) {
        /* This is a RT task, increase destination task messages pending counter */
        __sync_fetch_and_add (&itti_desc.threads[destination_thread_id].messages_pending, 1);
      } else
478
#endif
479 480 481 482 483 484 485 486 487 488
      {
        /* Only use event fd for tasks, subtasks will pool the queue */
        if (TASK_GET_PARENT_TASK_ID(destination_task_id) == TASK_UNKNOWN) {
          ssize_t write_ret;
          eventfd_t sem_counter = 1;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[destination_thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          AssertFatal (write_ret == sizeof(sem_counter), "Write to task message FD (%d) failed (%d/%d)\n",
                       destination_thread_id, (int) write_ret, (int) sizeof(sem_counter));
489
        }
490 491 492 493 494 495 496 497 498
      }

      ITTI_DEBUG(ITTI_DEBUG_SEND, " Message %s, number %lu with priority %d successfully sent from %s to queue (%u:%s)\n",
                 itti_desc.messages_info[message_id].name,
                 message_number,
                 priority,
                 itti_get_task_name(origin_task_id),
                 destination_task_id,
                 itti_get_task_name(destination_task_id));
499
    }
500 501 502 503 504
  } else {
    /* This is a debug message to TASK_UNKNOWN, we can release safely release it */
    int result = itti_free(origin_task_id, message);
    AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
  }
505

506
#if defined(OAI_EMU) || defined(RTAI)
507
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_SEND_MSG,
508
                                          __sync_and_and_fetch (&itti_desc.vcd_send_msg, ~(1L << destination_task_id)));
509 510
#endif

511
  return 0;
Cedric Roux's avatar
 
Cedric Roux committed
512 513
}

514 515
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
516 517
  thread_id_t thread_id;
  struct epoll_event event;
518

519
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
520

521 522
  thread_id = TASK_GET_THREAD_ID(task_id);
  itti_desc.threads[thread_id].nb_events++;
523

524 525 526 527
  /* Reallocate the events */
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
528

529 530 531
  event.events  = EPOLLIN | EPOLLERR;
  event.data.u64 = 0;
  event.data.fd  = fd;
532

533 534 535 536 537 538 539
  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
                &event) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }
540

541
  ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
542 543 544 545
}

void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
546
  thread_id_t thread_id;
547

548 549
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (fd >= 0, "File descriptor (%d) is invalid!\n", fd);
550

551 552 553 554 555 556 557 558
  thread_id = TASK_GET_THREAD_ID(task_id);

  /* Add the event fd to the list of monitored events */
  if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0) {
    /* Always assert on this condition */
    AssertFatal (0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n",
                 itti_get_task_name(task_id), fd, strerror(errno));
  }
559

560 561 562 563
  itti_desc.threads[thread_id].nb_events--;
  itti_desc.threads[thread_id].events = realloc(
                                          itti_desc.threads[thread_id].events,
                                          itti_desc.threads[thread_id].nb_events * sizeof(struct epoll_event));
564 565 566 567
}

int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
568
  thread_id_t thread_id;
569

570
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)\n", task_id, itti_desc.task_max);
571

572 573
  thread_id = TASK_GET_THREAD_ID(task_id);
  *events = itti_desc.threads[thread_id].events;
574

575
  return itti_desc.threads[thread_id].epoll_nb_events;
576 577
}

578 579
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
580 581 582 583
  thread_id_t thread_id;
  int epoll_ret = 0;
  int epoll_timeout = 0;
  int i;
584

585 586
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
  AssertFatal (received_msg != NULL, "Received message is NULL!\n");
587

588 589
  thread_id = TASK_GET_THREAD_ID(task_id);
  *received_msg = NULL;
590

591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
  if (polling) {
    /* In polling mode we set the timeout to 0 causing epoll_wait to return
     * immediately.
     */
    epoll_timeout = 0;
  } else {
    /* timeout = -1 causes the epoll_wait to wait indefinitely.
     */
    epoll_timeout = -1;
  }

  do {
    epoll_ret = epoll_wait(itti_desc.threads[thread_id].epoll_fd,
                           itti_desc.threads[thread_id].events,
                           itti_desc.threads[thread_id].nb_events,
                           epoll_timeout);
  } while (epoll_ret < 0 && errno == EINTR);

  if (epoll_ret < 0) {
    AssertFatal (0, "epoll_wait failed for task %s: %s!\n", itti_get_task_name(task_id), strerror(errno));
  }

  if (epoll_ret == 0 && polling) {
    /* No data to read -> return */
    return;
  }

  itti_desc.threads[thread_id].epoll_nb_events = epoll_ret;

  for (i = 0; i < epoll_ret; i++) {
    /* Check if there is an event for ITTI for the event fd */
    if ((itti_desc.threads[thread_id].events[i].events & EPOLLIN) &&
        (itti_desc.threads[thread_id].events[i].data.fd == itti_desc.threads[thread_id].task_event_fd)) {
      struct message_list_s *message = NULL;
      eventfd_t   sem_counter;
      ssize_t     read_ret;
      int         result;

      /* Read will always return 1 */
      read_ret = read (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
      AssertFatal (read_ret == sizeof(sem_counter), "Read from task message FD (%d) failed (%d/%d)!\n", thread_id, (int) read_ret, (int) sizeof(sem_counter));


      if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 0) {
        /* No element in list -> this should not happen */
        AssertFatal (0, "No message in queue for task %d while there are %d events and some for the messages queue!\n", task_id, epoll_ret);
      }

      AssertFatal(message != NULL, "Message from message queue is NULL!\n");
      *received_msg = message->msg;
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);

      /* Mark that the event has been processed */
      itti_desc.threads[thread_id].events[i].events &= ~EPOLLIN;
      return;
647
    }
648
  }
649 650 651 652
}

void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
653
#if defined(OAI_EMU) || defined(RTAI)
654
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
655
                                          __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
656
#endif
657

658
  itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
Cedric Roux's avatar
 
Cedric Roux committed
659

winckel's avatar
winckel committed
660
#if defined(OAI_EMU) || defined(RTAI)
661
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
662
                                          __sync_or_and_fetch (&itti_desc.vcd_receive_msg, 1L << task_id));
663
#endif
Cedric Roux's avatar
 
Cedric Roux committed
664 665
}

666 667 668
void itti_poll_msg(task_id_t task_id, MessageDef **received_msg)
{
  AssertFatal (task_id < itti_desc.task_max, "Task id (%d) is out of range (%d)!\n", task_id, itti_desc.task_max);
Cedric Roux's avatar
 
Cedric Roux committed
669

670
  *received_msg = NULL;
Cedric Roux's avatar
 
Cedric Roux committed
671

672
#if defined(OAI_EMU) || defined(RTAI)
673
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
674
                                          __sync_or_and_fetch (&itti_desc.vcd_poll_msg, 1L << task_id));
675 676
#endif

677 678
  {
    struct message_list_s *message;
679

680 681
    if (lfds611_queue_dequeue (itti_desc.tasks[task_id].message_queue, (void **) &message) == 1) {
      int result;
682

683 684 685
      *received_msg = message->msg;
      result = itti_free (ITTI_MSG_ORIGIN_ID(*received_msg), message);
      AssertFatal (result == EXIT_SUCCESS, "Failed to free memory (%d)!\n", result);
686
    }
687
  }
Cedric Roux's avatar
 
Cedric Roux committed
688

689 690 691
  if (*received_msg == NULL) {
    ITTI_DEBUG(ITTI_DEBUG_POLL, " No message in queue[(%u:%s)]\n", task_id, itti_get_task_name(task_id));
  }
692 693

#if defined(OAI_EMU) || defined(RTAI)
694
  VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_POLL_MSG,
695
                                          __sync_and_and_fetch (&itti_desc.vcd_poll_msg, ~(1L << task_id)));
696
#endif
Cedric Roux's avatar
 
Cedric Roux committed
697 698
}

699 700 701 702
int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p)
{
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
  int result;
Cedric Roux's avatar
 
Cedric Roux committed
703

704 705 706 707
  AssertFatal (start_routine != NULL, "Start routine is NULL!\n");
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
  AssertFatal (itti_desc.threads[thread_id].task_state == TASK_STATE_NOT_CONFIGURED, "Task %d, thread %d state is not correct (%d)!\n",
               task_id, thread_id, itti_desc.threads[thread_id].task_state);
Cedric Roux's avatar
 
Cedric Roux committed
708

709
  itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
Cedric Roux's avatar
 
Cedric Roux committed
710

711
  ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating thread for task %s ...\n", itti_get_task_name(task_id));
712

713 714 715 716 717
  result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
  AssertFatal (result >= 0, "Thread creation for task %d, thread %d failed (%d)!\n", task_id, thread_id, result);
  char name[16];
  snprintf( name, sizeof(name), "ITTI %d", thread_id );
  pthread_setname_np( itti_desc.threads[thread_id].task_thread, name );
Cedric Roux's avatar
 
Cedric Roux committed
718

719
  itti_desc.created_tasks ++;
720

721 722 723
  /* Wait till the thread is completely ready */
  while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
    usleep (1000);
724

725
  return 0;
Cedric Roux's avatar
 
Cedric Roux committed
726 727
}

728
//#ifdef RTAI
729 730
void itti_set_task_real_time(task_id_t task_id)
{
731
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
732

733
  DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
734

735
  itti_desc.threads[thread_id].real_time = TRUE;
736
}
Raymond Knopp's avatar
 
Raymond Knopp committed
737
//#endif
738

739 740
void itti_wait_ready(int wait_tasks)
{
741
  itti_desc.wait_tasks = wait_tasks;
742

743 744 745 746 747
  ITTI_DEBUG(ITTI_DEBUG_INIT,
             " wait for tasks: %s, created tasks %d, ready tasks %d\n",
             itti_desc.wait_tasks ? "yes" : "no",
             itti_desc.created_tasks,
             itti_desc.ready_tasks);
748

749 750
  AssertFatal (itti_desc.created_tasks == itti_desc.ready_tasks, "Number of created tasks (%d) does not match ready tasks (%d), wait task %d!\n",
               itti_desc.created_tasks, itti_desc.ready_tasks, itti_desc.wait_tasks);
751 752
}

753 754
void itti_mark_task_ready(task_id_t task_id)
{
755
  thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
Cedric Roux's avatar
 
Cedric Roux committed
756

757
  AssertFatal (thread_id < itti_desc.thread_max, "Thread id (%d) is out of range (%d)!\n", thread_id, itti_desc.thread_max);
758

759 760
  /* Register the thread in itti dump */
  itti_dump_thread_use_ring_buffer();
761

762 763
  /* Mark the thread as using LFDS queue */
  lfds611_queue_use(itti_desc.tasks[task_id].message_queue);
764

765
#ifdef RTAI
766 767 768 769 770 771
  /* Assign low priority to created threads */
  {
    struct sched_param sched_param;
    sched_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
    sched_setscheduler(0, SCHED_FIFO, &sched_param);
  }
772 773
#endif

774 775
  itti_desc.threads[thread_id].task_state = TASK_STATE_READY;
  itti_desc.ready_tasks ++;
776

777 778 779
  while (itti_desc.wait_tasks != 0) {
    usleep (10000);
  }
780

781
  ITTI_DEBUG(ITTI_DEBUG_INIT, " task %s started\n", itti_get_task_name(task_id));
782 783
}

784 785
void itti_exit_task(void)
{
786
#if defined(OAI_EMU) || defined(RTAI)
787 788 789
  task_id_t task_id = itti_get_current_task_id();

  if (task_id > TASK_UNKNOWN) {
790
    VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME(VCD_SIGNAL_DUMPER_VARIABLE_ITTI_RECV_MSG,
791 792
                                            __sync_and_and_fetch (&itti_desc.vcd_receive_msg, ~(1L << task_id)));
  }
793 794

#endif
795
  pthread_exit (NULL);
796 797
}

798 799 800 801
void itti_terminate_tasks(task_id_t task_id)
{
  // Sends Terminate signals to all tasks.
  itti_send_terminate_message (task_id);
802

803 804 805
  if (itti_desc.thread_handling_signals) {
    pthread_kill (itti_desc.thread_ref, SIGUSR1);
  }
806

807
  pthread_exit (NULL);
Cedric Roux's avatar
 
Cedric Roux committed
808 809
}

810 811 812
#ifdef RTAI
static void *itti_rt_relay_thread(void *arg)
{
813 814
  thread_id_t thread_id;
  unsigned pending_messages;
815

816 817
  while (itti_desc.running) {
    usleep (200); // Poll for messages a little more than 2 time by slot to get a small latency between RT and other tasks
818 819

#if defined(OAI_EMU) || defined(RTAI)
820
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_IN);
821
#endif
822

823 824 825 826 827 828 829 830 831 832 833 834 835
    /* Checks for all non real time tasks if they have pending messages */
    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
      if ((itti_desc.threads[thread_id].task_state == TASK_STATE_READY)
          && (itti_desc.threads[thread_id].real_time == FALSE)) {
        pending_messages = __sync_fetch_and_and (&itti_desc.threads[thread_id].messages_pending, 0);

        if (pending_messages > 0) {
          ssize_t write_ret;
          eventfd_t sem_counter = pending_messages;

          /* Call to write for an event fd must be of 8 bytes */
          write_ret = write (itti_desc.threads[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
          DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, thread_id);
836
        }
837 838
      }
    }
839 840

#if defined(OAI_EMU) || defined(RTAI)
841
    VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_RELAY_THREAD, VCD_FUNCTION_OUT);
842
#endif
843 844 845
  }

  return NULL;
846 847 848
}
#endif

849
int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info,
850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888
              const message_info_t *messages_info, const char * const messages_definition_xml, const char * const dump_file_name)
{
  task_id_t task_id;
  thread_id_t thread_id;
  int ret;

  itti_desc.message_number = 1;

  ITTI_DEBUG(ITTI_DEBUG_INIT, " Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);

  CHECK_INIT_RETURN(signal_mask());

  /* Saves threads and messages max values */
  itti_desc.task_max = task_max;
  itti_desc.thread_max = thread_max;
  itti_desc.messages_id_max = messages_id_max;
  itti_desc.thread_handling_signals = FALSE;
  itti_desc.tasks_info = tasks_info;
  itti_desc.messages_info = messages_info;

  /* Allocates memory for tasks info */
  itti_desc.tasks = calloc (itti_desc.task_max, sizeof(task_desc_t));

  /* Allocates memory for threads info */
  itti_desc.threads = calloc (itti_desc.thread_max, sizeof(thread_desc_t));

  /* Initializing each queue and related stuff */
  for (task_id = TASK_FIRST; task_id < itti_desc.task_max; task_id++) {
    ITTI_DEBUG(ITTI_DEBUG_INIT, " Initializing %stask %s%s%s\n",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? "sub-" : "",
               itti_desc.tasks_info[task_id].name,
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ? " with parent " : "",
               itti_desc.tasks_info[task_id].parent_task != TASK_UNKNOWN ?
               itti_get_task_name(itti_desc.tasks_info[task_id].parent_task) : "");

    ITTI_DEBUG(ITTI_DEBUG_INIT, " Creating queue of message of size %u\n", itti_desc.tasks_info[task_id].queue_size);

    ret = lfds611_queue_new(&itti_desc.tasks[task_id].message_queue, itti_desc.tasks_info[task_id].queue_size);

889
    if (0 == ret) {
890
      AssertFatal (0, "lfds611_queue_new failed for task %s!\n", itti_get_task_name(task_id));
891
    }
892
  }
893

894 895 896
  /* Initializing each thread */
  for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
    itti_desc.threads[thread_id].task_state = TASK_STATE_NOT_CONFIGURED;
897

898
    itti_desc.threads[thread_id].epoll_fd = epoll_create1(0);
899

900 901 902 903
    if (itti_desc.threads[thread_id].epoll_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, "Failed to create new epoll fd: %s!\n", strerror(errno));
    }
904

905
    itti_desc.threads[thread_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
906

907 908 909 910
    if (itti_desc.threads[thread_id].task_event_fd == -1) {
      /* Always assert on this condition */
      AssertFatal (0, " eventfd failed: %s!\n", strerror(errno));
    }
911

912
    itti_desc.threads[thread_id].nb_events = 1;
913

914
    itti_desc.threads[thread_id].events = calloc(1, sizeof(struct epoll_event));
915

916 917 918 919 920 921 922 923 924 925 926 927
    itti_desc.threads[thread_id].events->events  = EPOLLIN | EPOLLERR;
    itti_desc.threads[thread_id].events->data.fd = itti_desc.threads[thread_id].task_event_fd;

    /* Add the event fd to the list of monitored events */
    if (epoll_ctl(itti_desc.threads[thread_id].epoll_fd, EPOLL_CTL_ADD,
                  itti_desc.threads[thread_id].task_event_fd, itti_desc.threads[thread_id].events) != 0) {
      /* Always assert on this condition */
      AssertFatal (0, " epoll_ctl (EPOLL_CTL_ADD) failed: %s!\n", strerror(errno));
    }

    ITTI_DEBUG(ITTI_DEBUG_EVEN_FD, " Successfully subscribed fd %d for thread %d\n",
               itti_desc.threads[thread_id].task_event_fd, thread_id);
928

929
#ifdef RTAI
930 931
    itti_desc.threads[thread_id].real_time = FALSE;
    itti_desc.threads[thread_id].messages_pending = 0;
932
#endif
933
  }
934

935 936 937 938
  itti_desc.running = 1;
  itti_desc.wait_tasks = 0;
  itti_desc.created_tasks = 0;
  itti_desc.ready_tasks = 0;
939
#ifdef RTAI
940 941
  /* Start RT relay thread */
  DevAssert(pthread_create (&itti_desc.rt_relay_thread, NULL, itti_rt_relay_thread, NULL) >= 0);
942

943
  rt_global_heap_open();
944
#endif
945

946
#if defined(OAI_EMU) || defined(RTAI)
947 948 949 950 951 952 953 954 955 956 957 958 959
  itti_desc.memory_pools_handle = memory_pools_create (5);
  memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + ITTI_QUEUE_MAX_ELEMENTS,       50);
  memory_pools_add_pool (itti_desc.memory_pools_handle, 1000 + (2 * ITTI_QUEUE_MAX_ELEMENTS), 100);
  memory_pools_add_pool (itti_desc.memory_pools_handle, 10000,                                1000);
  memory_pools_add_pool (itti_desc.memory_pools_handle,  400,                                 20050);
  memory_pools_add_pool (itti_desc.memory_pools_handle,  100,                                 30050);

  {
    char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);

    ITTI_DEBUG(ITTI_DEBUG_MP_STATISTICS, " Memory pools statistics:\n%s", statistics);
    free (statistics);
  }
960 961
#endif

962
#if defined(OAI_EMU) || defined(RTAI)
963 964 965
  itti_desc.vcd_poll_msg = 0;
  itti_desc.vcd_receive_msg = 0;
  itti_desc.vcd_send_msg = 0;
966 967
#endif

968
  itti_dump_init (messages_definition_xml, dump_file_name);
Cedric Roux's avatar
 
Cedric Roux committed
969

970
  CHECK_INIT_RETURN(timer_init ());
Cedric Roux's avatar
 
Cedric Roux committed
971

972
  return 0;
Cedric Roux's avatar
 
Cedric Roux committed
973 974
}

975 976 977 978 979 980 981 982
void itti_wait_tasks_end(void)
{
  int end = 0;
  int thread_id;
  task_id_t task_id;
  int ready_tasks;
  int result;
  int retries = 10;
983

984 985
  itti_desc.thread_handling_signals = TRUE;
  itti_desc.thread_ref=pthread_self ();
986

987 988 989 990 991 992
  /* Handle signals here */
  while (end == 0) {
    signal_handle (&end);
  }

  printf("closing all tasks\n");
993
  sleep(1);
994

995 996 997 998 999 1000 1001 1002 1003 1004
  do {
    ready_tasks = 0;

    task_id = TASK_FIRST;

    for (thread_id = THREAD_FIRST; thread_id < itti_desc.thread_max; thread_id++) {
      /* Skip tasks which are not running */
      if (itti_desc.threads[thread_id].task_state == TASK_STATE_READY) {
        while (thread_id != TASK_GET_THREAD_ID(task_id)) {
          task_id++;
1005
        }
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016

        result = pthread_tryjoin_np (itti_desc.threads[thread_id].task_thread, NULL);

        ITTI_DEBUG(ITTI_DEBUG_EXIT, " Thread %s join status %d\n", itti_get_task_name(task_id), result);

        if (result == 0) {
          /* Thread has terminated */
          itti_desc.threads[thread_id].task_state = TASK_STATE_ENDED;
        } else {
          /* Thread is still running, count it */
          ready_tasks++;
1017
        }
1018 1019
      }
    }
1020

1021 1022 1023 1024
    if (ready_tasks > 0) {
      usleep (100 * 1000);
    }
  } while ((ready_tasks > 0) && (retries--)&& (!end) );
1025

1026 1027 1028
  printf("ready_tasks %d\n",ready_tasks);

  itti_desc.running = 0;
1029

1030
#if defined(OAI_EMU) || defined(RTAI)
1031 1032
  {
    char *statistics = memory_pools_statistics (itti_desc.memory_pools_handle);
1033

1034 1035 1036
    ITTI_DEBUG(ITTI_DEBUG_MP_STATISTICS, " Memory pools statistics:\n%s", statistics);
    free (statistics);
  }
1037 1038
#endif

1039 1040 1041 1042
  if (ready_tasks > 0) {
    ITTI_DEBUG(ITTI_DEBUG_ISSUES, " Some threads are still running, force exit\n");
    exit (0);
  }
1043

1044
  itti_dump_exit();
1045 1046
}

1047 1048 1049
void itti_send_terminate_message(task_id_t task_id)
{
  MessageDef *terminate_message_p;
Cedric Roux's avatar
 
Cedric Roux committed
1050

1051
  terminate_message_p = itti_alloc_new_message (task_id, TERMINATE_MESSAGE);
Cedric Roux's avatar
 
Cedric Roux committed
1052

1053
  itti_send_broadcast_message (terminate_message_p);
Cedric Roux's avatar
 
Cedric Roux committed
1054
}