intertask_interface_dump.c 30.3 KB
Newer Older
Cedric Roux's avatar
 
Cedric Roux committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
/*******************************************************************************

  Eurecom OpenAirInterface
  Copyright(c) 1999 - 2013 Eurecom

  This program is free software; you can redistribute it and/or modify it
  under the terms and conditions of the GNU General Public License,
  version 2, as published by the Free Software Foundation.

  This program is distributed in the hope it will be useful, but WITHOUT
  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
  more details.

  You should have received a copy of the GNU General Public License along with
  this program; if not, write to the Free Software Foundation, Inc.,
  51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.

  The full GNU General Public License is included in this distribution in
  the file called "COPYING".

  Contact Information
  Openair Admin: openair_admin@eurecom.fr
  Openair Tech : openair_tech@eurecom.fr
  Forums       : http://forums.eurecom.fr/openairinterface
  Address      : EURECOM, Campus SophiaTech, 450 Route des Chappes
                 06410 Biot FRANCE

*******************************************************************************/

/** @brief Intertask Interface Signal Dumper
32
 * Allows users to connect their itti_analyzer to this process and dump
Cedric Roux's avatar
 
Cedric Roux committed
33 34 35 36 37 38 39 40 41 42 43 44
 * signals exchanged between tasks.
 * @author Sebastien Roux <sebastien.roux@eurecom.fr>
 */

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <error.h>
45
#include <sched.h>
Cedric Roux's avatar
 
Cedric Roux committed
46 47 48 49 50 51 52

#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <arpa/inet.h>

53 54
#include <sys/eventfd.h>

Cedric Roux's avatar
 
Cedric Roux committed
55
#include "assertions.h"
56
#include "liblfds611.h"
Cedric Roux's avatar
 
Cedric Roux committed
57 58 59 60

#include "intertask_interface.h"
#include "intertask_interface_dump.h"

61
#if defined(OAI_EMU) || defined(RTAI)
62 63 64
#include "vcd_signal_dumper.h"
#endif

65
#define SIGNAL_NAME_LENGTH  48
Cedric Roux's avatar
 
Cedric Roux committed
66

67
static const int itti_dump_debug = 0; // 0x8 | 0x4 | 0x2;
Cedric Roux's avatar
 
Cedric Roux committed
68

69
#ifdef RTAI
70
# define ITTI_DUMP_DEBUG(m, x, args...) do { if (m & itti_dump_debug) rt_printk("[ITTI_DUMP][D]"x, ##args); } \
71
    while(0)
72
# define ITTI_DUMP_ERROR(x, args...) do { rt_printk("[ITTI_DUMP][E]"x, ##args); } \
Cedric Roux's avatar
 
Cedric Roux committed
73
    while(0)
74
#else
75
# define ITTI_DUMP_DEBUG(m, x, args...) do { if (m & itti_dump_debug) fprintf(stdout, "[ITTI_DUMP][D]"x, ##args); } \
Cedric Roux's avatar
 
Cedric Roux committed
76
    while(0)
77
# define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI_DUMP][E]"x, ##args); } \
78 79
    while(0)
#endif
Cedric Roux's avatar
 
Cedric Roux committed
80

81 82 83 84
#ifndef EFD_SEMAPHORE
# define KERNEL_VERSION_PRE_2_6_30 1
#endif

Cedric Roux's avatar
Cedric Roux committed
85 86 87 88
/* Message sent is an intertask dump type */
#define ITTI_DUMP_MESSAGE_TYPE      0x1
#define ITTI_STATISTIC_MESSAGE_TYPE 0x2
#define ITTI_DUMP_XML_DEFINITION    0x3
89 90
/* This signal is not meant to be used by remote analyzer */
#define ITTI_DUMP_EXIT_SIGNAL       0x4
Cedric Roux's avatar
Cedric Roux committed
91

92
typedef struct itti_dump_queue_item_s {
93 94 95 96 97 98
    MessageDef *data;
    uint32_t    data_size;
    uint32_t    message_number;
    char        message_name[SIGNAL_NAME_LENGTH];
    uint32_t    message_type;
    uint32_t    message_size;
99
} itti_dump_queue_item_t;
Cedric Roux's avatar
 
Cedric Roux committed
100 101 102 103 104 105 106

typedef struct {
    int      sd;
    uint32_t last_message_number;
} itti_client_desc_t;

typedef struct itti_desc_s {
107 108 109
    /* Asynchronous thread that write to file/accept new clients */
    pthread_t      itti_acceptor_thread;
    pthread_attr_t attr;
Cedric Roux's avatar
 
Cedric Roux committed
110 111 112

    /* List of messages to dump.
     * NOTE: we limit the size of this queue to retain only the last exchanged
113
     * messages. The size can be increased by setting up the ITTI_QUEUE_MAX_ELEMENTS
Cedric Roux's avatar
 
Cedric Roux committed
114 115
     * in mme_default_values.h or by putting a custom in the configuration file.
     */
116 117
    struct lfds611_ringbuffer_state *itti_message_queue;

Cedric Roux's avatar
 
Cedric Roux committed
118 119
    int nb_connected;

120
#ifndef RTAI
121 122
    /* Event fd used to notify new messages (semaphore) */
    int event_fd;
123 124 125
#else
    unsigned long messages_in_queue __attribute__((aligned(8)));
#endif
126 127 128

    int itti_listen_socket;

Cedric Roux's avatar
 
Cedric Roux committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
    itti_client_desc_t itti_clients[ITTI_DUMP_MAX_CON];
} itti_desc_t;

typedef struct {
    /* The size of this structure */
    uint32_t message_size;
    uint32_t message_type;
} itti_socket_header_t;

typedef struct {
    itti_socket_header_t header;

    uint32_t message_number;
    char signal_name[SIGNAL_NAME_LENGTH];

    /* Message payload is added here, this struct is used as an header */
} itti_dump_message_t;

typedef struct {
    itti_socket_header_t header;
    
} itti_statistic_message_t;

152
static itti_desc_t itti_dump_queue;
153
static FILE *dump_file = NULL;
154
static int itti_dump_running = 1;
155

156
static volatile uint32_t pending_messages = 0;
Cedric Roux's avatar
 
Cedric Roux committed
157

158 159
/*------------------------------------------------------------------------------*/
static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
Cedric Roux's avatar
 
Cedric Roux committed
160 161
{
    itti_dump_message_t *new_message;
162 163
    ssize_t bytes_sent = 0, total_sent = 0;
    uint8_t *data_ptr;
Cedric Roux's avatar
 
Cedric Roux committed
164 165

    /* Allocate memory for message header and payload */
166
    size_t size = sizeof(itti_dump_message_t) + message->data_size;
Cedric Roux's avatar
 
Cedric Roux committed
167

168 169
    AssertFatal (sd > 0, "Socket descriptor (%d) is invalid\n", sd);
    AssertFatal (message != NULL, "Message is NULL\n");
Cedric Roux's avatar
 
Cedric Roux committed
170 171

    new_message = calloc(1, size);
172
    AssertFatal (new_message != NULL, "New message allocation failed\n");
Cedric Roux's avatar
 
Cedric Roux committed
173 174 175 176 177 178 179 180 181 182 183

    /* Preparing the header */
    new_message->header.message_size = size;
    new_message->header.message_type = ITTI_DUMP_MESSAGE_TYPE;

    new_message->message_number = message->message_number;
    /* Copy the name, but leaves last byte set to 0 in case name is too long */
    memcpy(new_message->signal_name, message->message_name, SIGNAL_NAME_LENGTH - 1);
    /* Appends message payload */
    memcpy(&new_message[1], message->data, message->data_size);

184 185 186 187 188
    data_ptr = (uint8_t *)&new_message[0];

    do {
        bytes_sent = send(sd, &data_ptr[total_sent], size - total_sent, 0);
        if (bytes_sent < 0) {
189
            ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
190 191 192 193 194 195
                       sd, size, errno, strerror(errno));
            free(new_message);
            return -1;
        }
        total_sent += bytes_sent;
    } while (total_sent != size);
Cedric Roux's avatar
 
Cedric Roux committed
196 197

    free(new_message);
198
    return total_sent;
Cedric Roux's avatar
 
Cedric Roux committed
199 200
}

201
static int itti_dump_fwrite_message(itti_dump_queue_item_t *message)
202 203 204
{
    itti_socket_header_t header;

205
    if ((dump_file != NULL) && (message != NULL)) {
206 207 208 209 210 211 212 213

        header.message_size = message->message_size + sizeof(itti_dump_message_t);
        header.message_type = message->message_type;

        fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file);
        fwrite (&message->message_number, sizeof(message->message_number), 1, dump_file);
        fwrite (message->message_name, sizeof(message->message_name), 1, dump_file);
        fwrite (message->data, message->data_size, 1, dump_file);
214 215 216
// #if !defined(RTAI)
        fflush (dump_file);
// #endif
217
        return (1);
218
    }
219
    return (0);
220 221
}

Cedric Roux's avatar
 
Cedric Roux committed
222 223 224
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
                                         const uint32_t message_definition_xml_length)
{
225 226 227 228 229
    itti_socket_header_t *itti_dump_message;
    /* Allocate memory for message header and payload */
    size_t itti_dump_message_size;
    ssize_t bytes_sent = 0, total_sent = 0;
    uint8_t *data_ptr;
Cedric Roux's avatar
 
Cedric Roux committed
230

231 232
    AssertFatal (sd > 0, "Socket descriptor (%d) is invalid\n", sd);
    AssertFatal (message_definition_xml != NULL, "Message definition XML is NULL\n");
Cedric Roux's avatar
 
Cedric Roux committed
233

234 235 236 237
    itti_dump_message_size = sizeof(itti_socket_header_t) + message_definition_xml_length;

    itti_dump_message = calloc(1, itti_dump_message_size);

238
    ITTI_DUMP_DEBUG(0x2, "[%d] Sending XML definition message of size %zu to observer peer\n",
239 240 241 242 243 244 245 246 247 248 249 250 251
               sd, itti_dump_message_size);

    itti_dump_message->message_size = itti_dump_message_size;
    itti_dump_message->message_type = ITTI_DUMP_XML_DEFINITION;

    /* Copying message definition */
    memcpy(&itti_dump_message[1], message_definition_xml, message_definition_xml_length);

    data_ptr = (uint8_t *)&itti_dump_message[0];

    do {
        bytes_sent = send(sd, &data_ptr[total_sent], itti_dump_message_size - total_sent, 0);
        if (bytes_sent < 0) {
252
            ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
253 254 255 256 257 258 259 260 261
                       sd, itti_dump_message_size, errno, strerror(errno));
            free(itti_dump_message);
            return -1;
        }
        total_sent += bytes_sent;
    } while (total_sent != itti_dump_message_size);

    free(itti_dump_message);

Cedric Roux's avatar
 
Cedric Roux committed
262 263 264
    return 0;
}

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
static void itti_dump_user_data_delete_function(void *user_data, void *user_state)
{
    if (user_data != NULL)
    {
        itti_dump_queue_item_t *item;
        task_id_t task_id;

        item = (itti_dump_queue_item_t *)user_data;

        if (item->data != NULL)
        {
            task_id = ITTI_MSG_ORIGIN_ID(item->data);
            itti_free(task_id, item->data);
        }
        else
        {
            task_id = TASK_UNKNOWN;
        }
        itti_free(task_id, item);
    }
}

287
static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size,
288
                                     uint32_t message_type)
Cedric Roux's avatar
Cedric Roux committed
289
{
290
    struct lfds611_freelist_element *new_queue_element = NULL;
291
    int overwrite_flag;
292
    AssertFatal (new != NULL, "Message to queue is NULL\n");
Cedric Roux's avatar
Cedric Roux committed
293

294
#if defined(OAI_EMU) || defined(RTAI)
295 296 297
    vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_IN);
#endif

298 299
    new->message_type = message_type;
    new->message_size = message_size;
Cedric Roux's avatar
Cedric Roux committed
300

301 302 303 304 305 306
    ITTI_DUMP_DEBUG (0x1, " itti_dump_enqueue_message: lfds611_ringbuffer_get_write_element\n");
    new_queue_element = lfds611_ringbuffer_get_write_element (itti_dump_queue.itti_message_queue, &new_queue_element, &overwrite_flag);

    if (overwrite_flag != 0)
    {
        void *old = NULL;
Cedric Roux's avatar
Cedric Roux committed
307

308 309 310 311
        lfds611_freelist_get_user_data_from_element(new_queue_element, &old);
        ITTI_DUMP_DEBUG (0x4, " overwrite_flag set, freeing old data %p %p\n", new_queue_element, old);
        itti_dump_user_data_delete_function (old, NULL);
    }
Cedric Roux's avatar
Cedric Roux committed
312

313 314
    lfds611_freelist_set_user_data_in_element(new_queue_element, (void *) new);
    lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue, new_queue_element);
Cedric Roux's avatar
Cedric Roux committed
315

316 317
    if (overwrite_flag == 0)
    {
318
#ifdef RTAI
319
        __sync_fetch_and_add (&itti_dump_queue.messages_in_queue, 1);
320
#else
321 322 323
        {
            ssize_t   write_ret;
            eventfd_t sem_counter = 1;
324

325 326
            /* Call to write for an event fd must be of 8 bytes */
            write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
327
            AssertFatal (write_ret == sizeof(sem_counter), "Write to dump event failed (%ld/%ld)\n",  write_ret, sizeof(sem_counter));
328
        }
329
#endif
330 331 332 333
        __sync_fetch_and_add (&pending_messages, 1);
    }

    ITTI_DUMP_DEBUG (0x2, " Added element to queue %p %p, pending %u, type %u\n", new_queue_element, new, pending_messages, message_type);
334

335
#if defined(OAI_EMU) || defined(RTAI)
336 337
    vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE, VCD_FUNCTION_OUT);
#endif
Cedric Roux's avatar
Cedric Roux committed
338 339 340 341

    return 0;
}

342
static void itti_dump_socket_exit(void)
Cedric Roux's avatar
 
Cedric Roux committed
343
{
344 345
#ifndef RTAI
    close(itti_dump_queue.event_fd);
346
#endif
347
    close(itti_dump_queue.itti_listen_socket);
Cedric Roux's avatar
 
Cedric Roux committed
348

349 350
    /* Leave the thread as we detected end signal */
    pthread_exit(NULL);
Cedric Roux's avatar
 
Cedric Roux committed
351 352
}

353
static int itti_dump_flush_ring_buffer(int flush_all)
354 355
{
    struct lfds611_freelist_element *element = NULL;
356 357 358
    void   *user_data;
    int     j;
    int     consumer;
359 360 361

#ifdef RTAI
    unsigned long number_of_messages;
362
#endif
363

364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    /* Check if there is a least one consumer */
    consumer = 0;
    if (dump_file != NULL)
    {
        consumer = 1;
    }
    else
    {
        for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
            if (itti_dump_queue.itti_clients[j].sd > 0) {
                consumer = 1;
                break;
            }
        }
    }
379

380 381 382 383
    if (consumer > 0)
    {
#ifdef RTAI
        number_of_messages = itti_dump_queue.messages_in_queue;
384

385 386 387 388 389
        ITTI_DUMP_DEBUG(0x4, "%lu elements in queue\n", number_of_messages);

        if (number_of_messages == 0) {
            return (consumer);
        }
390

391
        __sync_sub_and_fetch(&itti_dump_queue.messages_in_queue, number_of_messages);
392 393
#endif

394 395 396
        do {
            /* Acquire the ring element */
            lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element);
397

398
            __sync_fetch_and_sub (&pending_messages, 1);
399

400 401 402 403 404 405 406 407
            if (element == NULL)
            {
                if (flush_all != 0)
                {
                    flush_all = 0;
                }
                else
                {
408
                    AssertFatal (0, "Dump event with no data\n");
409 410 411 412 413 414
                }
            }
            else
            {
                /* Retrieve user part of the message */
                lfds611_freelist_get_user_data_from_element(element, &user_data);
415

416
                ITTI_DUMP_DEBUG (0x2, " removed element from queue %p %p, pending %u\n", element, user_data, pending_messages);
417

418 419 420 421 422
                if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL)
                {
                    lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
                    itti_dump_socket_exit();
                }
423

424 425
                /* Write message to file */
                itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data);
426

427 428 429 430 431 432 433
                /* Send message to remote analyzer */
                for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
                    if (itti_dump_queue.itti_clients[j].sd > 0) {
                        itti_dump_send_message(itti_dump_queue.itti_clients[j].sd,
                                                (itti_dump_queue_item_t *)user_data);
                    }
                }
434

435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
                itti_dump_user_data_delete_function (user_data, NULL);
                lfds611_freelist_set_user_data_in_element(element, NULL);

                /* We have finished with this element, reinsert it in the ring buffer */
                lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
            }
        } while(flush_all
    #ifdef RTAI
                && --number_of_messages
    #endif
                );
    }

    return (consumer);
}

static int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length)
{
    if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) {
        uint8_t i;

        for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
            /* Let's find a place to store the new client */
            if (itti_dump_queue.itti_clients[i].sd == -1) {
                break;
460 461 462
            }
        }

463 464
        ITTI_DUMP_DEBUG(0x2, " Found place to store new connection: %d\n", i);

465
        AssertFatal (i < ITTI_DUMP_MAX_CON, "No more connection available (%d/%d) for socked %d\n", i, ITTI_DUMP_MAX_CON, sd);
466 467 468 469 470

        ITTI_DUMP_DEBUG(0x2, " Socket %d accepted\n", sd);

        /* Send the XML message definition */
        if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) {
471
            AssertError (0, " Failed to send XML definition\n");
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
            close (sd);
            return -1;
        }

        itti_dump_queue.itti_clients[i].sd = sd;
        itti_dump_queue.nb_connected++;
    } else {
        ITTI_DUMP_DEBUG(0x2, " Socket %d rejected\n", sd);
        /* We have reached max number of users connected...
         * Reject the connection.
         */
        close (sd);
        return -1;
    }

    return 0;
488 489
}

Cedric Roux's avatar
 
Cedric Roux committed
490 491 492 493 494 495 496
static void *itti_dump_socket(void *arg_p)
{
    uint32_t message_definition_xml_length;
    char *message_definition_xml;
    int rc;
    int itti_listen_socket, max_sd;
    int on = 1;
497
    fd_set read_set, working_set;
Cedric Roux's avatar
 
Cedric Roux committed
498 499
    struct sockaddr_in servaddr; /* socket address structure */

500 501 502 503 504
    struct timeval *timeout_p = NULL;
#ifdef RTAI
    struct timeval  timeout;
#endif

505
    ITTI_DUMP_DEBUG(0x2, " Creating TCP dump socket on port %u\n", ITTI_PORT);
Cedric Roux's avatar
 
Cedric Roux committed
506 507

    message_definition_xml = (char *)arg_p;
508
    AssertFatal (message_definition_xml != NULL, "Message definition XML is NULL\n");
Cedric Roux's avatar
 
Cedric Roux committed
509 510 511 512

    message_definition_xml_length = strlen(message_definition_xml) + 1;

    if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
513
        ITTI_DUMP_ERROR(" ocket creation failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
514 515 516 517 518 519 520
        pthread_exit(NULL);
    }

    /* Allow socket reuse */
    rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR,
                    (char *)&on, sizeof(on));
    if (rc < 0) {
521
        ITTI_DUMP_ERROR(" setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
522 523 524 525 526 527 528 529 530
        close(itti_listen_socket);
        pthread_exit(NULL);
    }

    /* Set socket to be non-blocking.
     * NOTE: sockets accepted will inherit this option.
     */
    rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on);
    if (rc < 0) {
531
        ITTI_DUMP_ERROR(" ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
532 533 534 535 536 537 538 539 540 541
        close(itti_listen_socket);
        pthread_exit(NULL);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family      = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port        = htons(ITTI_PORT);

    if (bind(itti_listen_socket, (struct sockaddr *) &servaddr,
542
             sizeof(servaddr)) < 0) {
543
        ITTI_DUMP_ERROR(" Bind failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
544 545
        pthread_exit(NULL);
    }
546
    if (listen(itti_listen_socket, 5) < 0) {
547
        ITTI_DUMP_ERROR(" Listen failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
548 549 550
        pthread_exit(NULL);
    }

551 552 553 554 555
    FD_ZERO(&read_set);

    /* Add the listener */
    FD_SET(itti_listen_socket, &read_set);

556
#ifndef RTAI
557 558 559 560 561
    /* Add the event fd */
    FD_SET(itti_dump_queue.event_fd, &read_set);

    /* Max of both sd */
    max_sd = itti_listen_socket > itti_dump_queue.event_fd ? itti_listen_socket : itti_dump_queue.event_fd;
562 563 564
#else
    max_sd = itti_listen_socket;
#endif
565 566

    itti_dump_queue.itti_listen_socket = itti_listen_socket;
Cedric Roux's avatar
 
Cedric Roux committed
567 568 569 570 571 572 573 574 575

    /* Loop waiting for incoming connects or for incoming data
     * on any of the connected sockets.
     */
    while (1) {
        int desc_ready;
        int client_socket = -1;
        int i;

576
        memcpy(&working_set, &read_set, sizeof(read_set));
577 578 579 580 581 582 583 584
#ifdef RTAI
        timeout.tv_sec  = 0;
        timeout.tv_usec = 100000;

        timeout_p = &timeout;
#else
        timeout_p = NULL;
#endif
Cedric Roux's avatar
 
Cedric Roux committed
585 586 587 588

        /* No timeout: select blocks till a new event has to be handled
         * on sd's.
         */
589
        rc = select(max_sd + 1, &working_set, NULL, NULL, timeout_p);
Cedric Roux's avatar
 
Cedric Roux committed
590 591

        if (rc < 0) {
592
            ITTI_DUMP_ERROR(" select failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
593
            pthread_exit(NULL);
594 595
        } else if (rc == 0) {
            /* Timeout */
596 597 598 599 600 601 602 603 604 605 606 607
            if (itti_dump_flush_ring_buffer(1) == 0)
            {
                if (itti_dump_running)
                {
                    ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
                    usleep(100 * 1000);
                }
                else
                {
                    itti_dump_socket_exit();
                }
            }
Cedric Roux's avatar
 
Cedric Roux committed
608 609 610
        }

        desc_ready = rc;
611 612 613 614
        for (i = 0; i <= max_sd && desc_ready > 0; i++)
        {
            if (FD_ISSET(i, &working_set))
            {
Cedric Roux's avatar
 
Cedric Roux committed
615
                desc_ready -= 1;
616

617
#ifndef RTAI
618 619
                if (i == itti_dump_queue.event_fd) {
                    /* Notification of new element to dump from other tasks */
620 621
                    eventfd_t sem_counter;
                    ssize_t   read_ret;
622

623
                    /* Read will always return 1 for kernel versions > 2.6.30 */
624 625
                    read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
                    if (read_ret < 0) {
626
                        ITTI_DUMP_ERROR(" Failed read for semaphore: %s\n", strerror(errno));
627 628
                        pthread_exit(NULL);
                    }
629
                    AssertFatal (read_ret == sizeof(sem_counter), "Failed to read from dump event FD (%ld/%ld)\n", read_ret, sizeof(sem_counter));
630
#if defined(KERNEL_VERSION_PRE_2_6_30)
631
                    if (itti_dump_flush_ring_buffer(1) == 0)
632
#else
633
                    if (itti_dump_flush_ring_buffer(0) == 0)
634
#endif
635 636 637 638 639 640 641 642 643 644 645 646
                    {
                        if (itti_dump_running)
                        {
                            ITTI_DUMP_DEBUG (0x4, " No messages consumers, waiting ...\n");
                            usleep(100 * 1000);
#ifndef RTAI
                            {
                                ssize_t   write_ret;

                                sem_counter = 1;
                                /* Call to write for an event fd must be of 8 bytes */
                                write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
647
                                AssertFatal (write_ret == sizeof(sem_counter), "Failed to write to dump event FD (%ld/%ld)\n", write_ret, sem_counter);
648 649 650 651 652 653 654 655 656 657 658 659
                            }
#endif
                        }
                        else
                        {
                            itti_dump_socket_exit();
                        }
                    }
                    else
                    {
                        ITTI_DUMP_DEBUG(0x1, " Write element to file\n");
                    }
660 661 662
                } else
#endif
                if (i == itti_listen_socket) {
Cedric Roux's avatar
 
Cedric Roux committed
663 664 665 666 667
                    do {
                        client_socket = accept(itti_listen_socket, NULL, NULL);
                        if (client_socket < 0) {
                            if (errno == EWOULDBLOCK || errno == EAGAIN) {
                                /* No more new connection */
668
                                ITTI_DUMP_DEBUG(0x2, " No more new connection\n");
Cedric Roux's avatar
 
Cedric Roux committed
669 670
                                continue;
                            } else {
671
                                ITTI_DUMP_ERROR(" accept failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
672 673 674 675
                                pthread_exit(NULL);
                            }
                        }
                        if (itti_dump_handle_new_connection(client_socket, message_definition_xml,
676 677
                            message_definition_xml_length) == 0)
                        {
Cedric Roux's avatar
 
Cedric Roux committed
678 679 680
                            /* The socket has been accepted.
                             * We have to update the set to include this new sd.
                             */
681
                            FD_SET(client_socket, &read_set);
Cedric Roux's avatar
 
Cedric Roux committed
682 683 684 685 686 687 688 689 690 691
                            if (client_socket > max_sd)
                                max_sd = client_socket;
                        }
                    } while(client_socket != -1);
                } else {
                    /* For now the MME itti dumper should not receive data
                     * other than connection oriented (CLOSE).
                     */
                    uint8_t j;

692
                    ITTI_DUMP_DEBUG(0x2, " Socket %d disconnected\n", i);
Cedric Roux's avatar
 
Cedric Roux committed
693 694 695 696 697

                    /* Close the socket and update info related to this connection */
                    close(i);

                    for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
698
                        if (itti_dump_queue.itti_clients[j].sd == i)
Cedric Roux's avatar
 
Cedric Roux committed
699 700 701 702 703 704
                            break;
                    }

                    /* In case we don't find the matching sd in list of known
                     * connections -> assert.
                     */
705
                    AssertFatal (j < ITTI_DUMP_MAX_CON, "Connection index not found (%d/%d) for socked %d\n", j, ITTI_DUMP_MAX_CON, i);
Cedric Roux's avatar
 
Cedric Roux committed
706 707 708 709

                    /* Re-initialize the socket to -1 so we can accept new
                     * incoming connections.
                     */
710 711 712
                    itti_dump_queue.itti_clients[j].sd                  = -1;
                    itti_dump_queue.itti_clients[j].last_message_number = 0;
                    itti_dump_queue.nb_connected--;
Cedric Roux's avatar
 
Cedric Roux committed
713 714

                    /* Remove the socket from the FD set and update the max sd */
715
                    FD_CLR(i, &read_set);
Cedric Roux's avatar
 
Cedric Roux committed
716 717
                    if (i == max_sd)
                    {
718
                        if (itti_dump_queue.nb_connected == 0) {
Cedric Roux's avatar
 
Cedric Roux committed
719 720 721
                            /* No more new connection max_sd = itti_listen_socket */
                            max_sd = itti_listen_socket;
                        } else {
722
                            while (FD_ISSET(max_sd, &read_set) == 0) {
Cedric Roux's avatar
 
Cedric Roux committed
723 724 725 726 727 728 729 730 731 732 733
                                max_sd -= 1;
                            }
                        }
                    }
                }
            }
        }
    }
    return NULL;
}

734 735 736 737 738 739
/*------------------------------------------------------------------------------*/
int itti_dump_queue_message(task_id_t sender_task,
                            message_number_t message_number,
                            MessageDef *message_p,
                            const char *message_name,
                            const uint32_t message_size)
Cedric Roux's avatar
 
Cedric Roux committed
740
{
741 742 743 744
    if (itti_dump_running)
    {
        itti_dump_queue_item_t *new;
        size_t message_name_length;
Cedric Roux's avatar
 
Cedric Roux committed
745

746 747
        AssertFatal (message_name != NULL, "Message name is NULL\n");
        AssertFatal (message_p != NULL, "Message is NULL\n");
Cedric Roux's avatar
 
Cedric Roux committed
748

749 750 751 752 753 754 755
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
        new = itti_malloc(sender_task, TASK_MAX + 100, sizeof(itti_dump_queue_item_t));
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
Cedric Roux's avatar
 
Cedric Roux committed
756

757 758 759 760 761 762 763
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_IN);
#endif
        new->data = itti_malloc(sender_task, TASK_MAX + 100, message_size);
#if defined(OAI_EMU) || defined(RTAI)
        vcd_signal_dumper_dump_function_by_name(VCD_SIGNAL_DUMPER_FUNCTIONS_ITTI_DUMP_ENQUEUE_MESSAGE_MALLOC, VCD_FUNCTION_OUT);
#endif
764

765 766 767
        memcpy(new->data, message_p, message_size);
        new->data_size       = message_size;
        new->message_number  = message_number;
Cedric Roux's avatar
 
Cedric Roux committed
768

769
        message_name_length = strlen(message_name) + 1;
770
        AssertError (message_name_length <= SIGNAL_NAME_LENGTH, "Message name too long (%ld/%d)\n", message_name_length, SIGNAL_NAME_LENGTH);
771
        memcpy(new->message_name, message_name, message_name_length);
Cedric Roux's avatar
 
Cedric Roux committed
772

773
        itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE);
Cedric Roux's avatar
 
Cedric Roux committed
774 775 776 777 778
    }

    return 0;
}

779 780 781 782 783 784
/* This function should be called by each thread that will use the ring buffer */
void itti_dump_thread_use_ring_buffer(void)
{
    lfds611_ringbuffer_use(itti_dump_queue.itti_message_queue);
}

785
int itti_dump_init(const char * const messages_definition_xml, const char * const dump_file_name)
Cedric Roux's avatar
 
Cedric Roux committed
786
{
787
    int i, ret;
788 789
    struct sched_param scheduler_param;

790
    scheduler_param.sched_priority = sched_get_priority_min(SCHED_FIFO) + 1;
Cedric Roux's avatar
 
Cedric Roux committed
791

792 793
    if (dump_file_name != NULL)
    {
winckel's avatar
winckel committed
794
        dump_file = fopen(dump_file_name, "wb");
795 796 797

        if (dump_file == NULL)
        {
798
            ITTI_DUMP_ERROR(" can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno));
799 800 801
        }
        else
        {
802
            /* Output the XML to file */
winckel's avatar
winckel committed
803
            uint32_t message_size = strlen(messages_definition_xml) + 1;
804 805
            itti_socket_header_t header;

winckel's avatar
winckel committed
806
            header.message_size = sizeof(itti_socket_header_t) + message_size;
807 808 809 810
            header.message_type = ITTI_DUMP_XML_DEFINITION;

            fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file);
            fwrite (messages_definition_xml, message_size, 1, dump_file);
811
            fflush (dump_file);
812 813 814
        }
    }

815
    memset(&itti_dump_queue, 0, sizeof(itti_desc_t));
Cedric Roux's avatar
 
Cedric Roux committed
816

817
    ITTI_DUMP_DEBUG(0x2, " Creating new ring buffer for itti dump of %u elements\n",
818 819 820 821 822 823 824 825
                    ITTI_QUEUE_MAX_ELEMENTS);

    if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue,
                               ITTI_QUEUE_MAX_ELEMENTS,
                               NULL,
                               NULL) != 1)
    {
        /* Always assert on this condition */
826
        AssertFatal (0, " Failed to create ring buffer...\n");
827 828
    }

829 830 831
#ifdef RTAI
    itti_dump_queue.messages_in_queue = 0;
#else
832 833 834
# if defined(KERNEL_VERSION_PRE_2_6_30)
    itti_dump_queue.event_fd = eventfd(0, 0);
# else
835
    itti_dump_queue.event_fd = eventfd(0, EFD_SEMAPHORE);
836
# endif
837
    if (itti_dump_queue.event_fd == -1) {
838
        /* Always assert on this condition */
839
        AssertFatal (0, "eventfd failed: %s\n", strerror(errno));
840
    }
841
#endif
842 843

    itti_dump_queue.nb_connected = 0;
Cedric Roux's avatar
 
Cedric Roux committed
844 845

    for(i = 0; i < ITTI_DUMP_MAX_CON; i++) {
846 847 848 849 850 851 852
        itti_dump_queue.itti_clients[i].sd = -1;
        itti_dump_queue.itti_clients[i].last_message_number = 0;
    }

    /* initialized with default attributes */
    ret = pthread_attr_init(&itti_dump_queue.attr);
    if (ret < 0) {
853
        AssertFatal (0, "pthread_attr_init failed (%d:%s)\n", errno, strerror(errno));
854
    }
855

856
    ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_FIFO);
857
    if (ret < 0) {
858
        AssertFatal (0, "pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno));
859 860 861
    }
    ret = pthread_attr_setschedparam(&itti_dump_queue.attr, &scheduler_param);
    if (ret < 0) {
862
        AssertFatal (0, "pthread_attr_setschedparam failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
863
    }
864 865 866 867

    ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr,
                         &itti_dump_socket, (void *)messages_definition_xml);
    if (ret < 0) {
868
        AssertFatal (0, "pthread_create failed (%d:%s)\n", errno, strerror(errno));
Cedric Roux's avatar
 
Cedric Roux committed
869
    }
870

Cedric Roux's avatar
 
Cedric Roux committed
871 872
    return 0;
}
873 874 875

void itti_dump_exit(void)
{
876
    void *arg;
winckel's avatar
winckel committed
877 878
    itti_dump_queue_item_t *new;

879 880
    new = itti_malloc(TASK_UNKNOWN, TASK_UNKNOWN, sizeof(itti_dump_queue_item_t));
    memset(new, 0, sizeof(itti_dump_queue_item_t));
881

882 883 884
    /* Set a flag to stop recording message */
    itti_dump_running = 0;

885
    /* Send the exit signal to other thread */
winckel's avatar
winckel committed
886
    itti_dump_enqueue_message(new, 0, ITTI_DUMP_EXIT_SIGNAL);
887

888
    ITTI_DUMP_DEBUG(0x2, " waiting for dumper thread to finish\n");
889 890 891 892

    /* wait for the thread to terminate */
    pthread_join(itti_dump_queue.itti_acceptor_thread, &arg);

893
    ITTI_DUMP_DEBUG(0x2, " dumper thread correctly exited\n");
894

895 896
    if (dump_file != NULL)
    {
897
        /* Synchronise file and then close it */
898
        fclose(dump_file);
899
        dump_file = NULL;
900 901
    }

902 903 904 905 906 907
    if (itti_dump_queue.itti_message_queue)
    {
        lfds611_ringbuffer_delete(itti_dump_queue.itti_message_queue,
                                  itti_dump_user_data_delete_function, NULL);
    }
}