Commit df177680 authored by Cedric Roux's avatar Cedric Roux

- Put itti writes to file in another thread to avoid locks when flushing the file

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4522 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 508a1df2
...@@ -601,6 +601,9 @@ void itti_mark_task_ready(task_id_t task_id) { ...@@ -601,6 +601,9 @@ void itti_mark_task_ready(task_id_t task_id) {
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0); DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
/* Register the thread in itti dump */
itti_dump_thread_use_ring_buffer();
#if !defined(ENABLE_EVENT_FD) #if !defined(ENABLE_EVENT_FD)
// Lock the mutex to get exclusive access to the list // Lock the mutex to get exclusive access to the list
pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex); pthread_mutex_lock (&itti_desc.tasks[task_id].message_queue_mutex);
...@@ -638,6 +641,9 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -638,6 +641,9 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max); ITTI_DEBUG("Init: %d tasks, %d threads, %d messages\n", task_max, thread_max, messages_id_max);
#if !defined(RTAI) #if !defined(RTAI)
/* SR: disable signals module for RTAI (need to harmonize management
* between softmodem and oaisim).
*/
CHECK_INIT_RETURN(signal_init()); CHECK_INIT_RETURN(signal_init());
#endif #endif
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <errno.h> #include <errno.h>
#include <error.h> #include <error.h>
#include <sched.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/socket.h> #include <sys/socket.h>
...@@ -49,61 +50,65 @@ ...@@ -49,61 +50,65 @@
#include <sys/types.h> #include <sys/types.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/eventfd.h>
#include "assertions.h" #include "assertions.h"
#include "queue.h" #include "liblfds611.h"
#include "intertask_interface.h" #include "intertask_interface.h"
#include "intertask_interface_dump.h" #include "intertask_interface_dump.h"
#define SIGNAL_NAME_LENGTH 48 #define SIGNAL_NAME_LENGTH 48
/* Declared in intertask_interface.c */ static const int itti_dump_debug = 0;
extern int itti_debug;
#define ITTI_DEBUG(x, args...) do { if (itti_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \ #define ITTI_DUMP_DEBUG(x, args...) do { if (itti_dump_debug) fprintf(stdout, "[ITTI][D]"x, ##args); } \
while(0) while(0)
#define ITTI_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \ #define ITTI_DUMP_ERROR(x, args...) do { fprintf(stdout, "[ITTI][E]"x, ##args); } \
while(0) while(0)
/* Message sent is an intertask dump type */ /* Message sent is an intertask dump type */
#define ITTI_DUMP_MESSAGE_TYPE 0x1 #define ITTI_DUMP_MESSAGE_TYPE 0x1
#define ITTI_STATISTIC_MESSAGE_TYPE 0x2 #define ITTI_STATISTIC_MESSAGE_TYPE 0x2
#define ITTI_DUMP_XML_DEFINITION 0x3 #define ITTI_DUMP_XML_DEFINITION 0x3
/* This signal is not meant to be used by remote analyzer */
#define ITTI_DUMP_EXIT_SIGNAL 0x4
typedef struct itti_queue_item_s { typedef struct itti_dump_queue_item_s {
STAILQ_ENTRY(itti_queue_item_s) entry;
void *data; void *data;
uint32_t data_size; uint32_t data_size;
uint32_t message_number; uint32_t message_number;
char message_name[SIGNAL_NAME_LENGTH]; char message_name[SIGNAL_NAME_LENGTH];
} itti_queue_item_t; uint32_t message_type;
uint32_t message_size;
} itti_dump_queue_item_t;
typedef struct { typedef struct {
int sd; int sd;
uint32_t last_message_number; uint32_t last_message_number;
pthread_mutex_t client_lock;
} itti_client_desc_t; } itti_client_desc_t;
typedef struct itti_desc_s { typedef struct itti_desc_s {
/* The acceptor thread */ /* Asynchronous thread that write to file/accept new clients */
pthread_t itti_acceptor_thread; pthread_t itti_acceptor_thread;
pthread_t itti_write_thread; pthread_attr_t attr;
/* Protect the circular queue */
pthread_mutex_t queue_mutex;
/* List of messages to dump. /* List of messages to dump.
* NOTE: we limit the size of this queue to retain only the last exchanged * NOTE: we limit the size of this queue to retain only the last exchanged
* messages. The size can be increased by setting up the ITTI_QUEUE_SIZE_MAX * messages. The size can be increased by setting up the ITTI_QUEUE_MAX_ELEMENTS
* in mme_default_values.h or by putting a custom in the configuration file. * in mme_default_values.h or by putting a custom in the configuration file.
*/ */
STAILQ_HEAD(itti_queue_s, itti_queue_item_s) itti_message_queue; struct lfds611_ringbuffer_state *itti_message_queue;
struct itti_queue_item_s *itti_queue_last;
uint32_t queue_size; uint32_t queue_size;
int nb_connected; int nb_connected;
/* Event fd used to notify new messages (semaphore) */
int event_fd;
int itti_listen_socket;
itti_client_desc_t itti_clients[ITTI_DUMP_MAX_CON]; itti_client_desc_t itti_clients[ITTI_DUMP_MAX_CON];
} itti_desc_t; } itti_desc_t;
...@@ -127,17 +132,17 @@ typedef struct { ...@@ -127,17 +132,17 @@ typedef struct {
} itti_statistic_message_t; } itti_statistic_message_t;
static itti_desc_t itti_queue; static itti_desc_t itti_dump_queue;
static FILE *dump_file; static FILE *dump_file;
static int itti_dump_send_message(int sd, itti_queue_item_t *message); static int itti_dump_send_message(int sd, itti_dump_queue_item_t *message);
static int itti_dump_handle_new_connection(int sd, const char *xml_definition, static int itti_dump_handle_new_connection(int sd, const char *xml_definition,
uint32_t xml_definition_length); uint32_t xml_definition_length);
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml, static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
const uint32_t message_definition_xml_length); const uint32_t message_definition_xml_length);
static static
int itti_dump_send_message(int sd, itti_queue_item_t *message) int itti_dump_send_message(int sd, itti_dump_queue_item_t *message)
{ {
itti_dump_message_t *new_message; itti_dump_message_t *new_message;
ssize_t bytes_sent = 0, total_sent = 0; ssize_t bytes_sent = 0, total_sent = 0;
...@@ -167,7 +172,7 @@ int itti_dump_send_message(int sd, itti_queue_item_t *message) ...@@ -167,7 +172,7 @@ int itti_dump_send_message(int sd, itti_queue_item_t *message)
do { do {
bytes_sent = send(sd, &data_ptr[total_sent], size - total_sent, 0); bytes_sent = send(sd, &data_ptr[total_sent], size - total_sent, 0);
if (bytes_sent < 0) { if (bytes_sent < 0) {
ITTI_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n", ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
sd, size, errno, strerror(errno)); sd, size, errno, strerror(errno));
free(new_message); free(new_message);
return -1; return -1;
...@@ -179,6 +184,22 @@ int itti_dump_send_message(int sd, itti_queue_item_t *message) ...@@ -179,6 +184,22 @@ int itti_dump_send_message(int sd, itti_queue_item_t *message)
return total_sent; return total_sent;
} }
static void itti_dump_fwrite_message(itti_dump_queue_item_t *message)
{
itti_socket_header_t header;
if (dump_file != NULL && message) {
header.message_size = message->message_size + sizeof(itti_dump_message_t);
header.message_type = message->message_type;
fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file);
fwrite (&message->message_number, sizeof(message->message_number), 1, dump_file);
fwrite (message->message_name, sizeof(message->message_name), 1, dump_file);
fwrite (message->data, message->data_size, 1, dump_file);
}
}
static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml, static int itti_dump_send_xml_definition(const int sd, const char *message_definition_xml,
const uint32_t message_definition_xml_length) const uint32_t message_definition_xml_length)
{ {
...@@ -195,7 +216,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin ...@@ -195,7 +216,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin
itti_dump_message = calloc(1, itti_dump_message_size); itti_dump_message = calloc(1, itti_dump_message_size);
ITTI_DEBUG("[%d] Sending XML definition message of size %zu to observer peer\n", ITTI_DUMP_DEBUG("[%d] Sending XML definition message of size %zu to observer peer\n",
sd, itti_dump_message_size); sd, itti_dump_message_size);
itti_dump_message->message_size = itti_dump_message_size; itti_dump_message->message_size = itti_dump_message_size;
...@@ -209,7 +230,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin ...@@ -209,7 +230,7 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin
do { do {
bytes_sent = send(sd, &data_ptr[total_sent], itti_dump_message_size - total_sent, 0); bytes_sent = send(sd, &data_ptr[total_sent], itti_dump_message_size - total_sent, 0);
if (bytes_sent < 0) { if (bytes_sent < 0) {
ITTI_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n", ITTI_DUMP_ERROR("[%d] Failed to send %zu bytes to socket (%d:%s)\n",
sd, itti_dump_message_size, errno, strerror(errno)); sd, itti_dump_message_size, errno, strerror(errno));
free(itti_dump_message); free(itti_dump_message);
return -1; return -1;
...@@ -222,53 +243,30 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin ...@@ -222,53 +243,30 @@ static int itti_dump_send_xml_definition(const int sd, const char *message_defin
return 0; return 0;
} }
static int itti_enqueue_message(itti_queue_item_t *new, uint32_t message_size, static int itti_dump_enqueue_message(itti_dump_queue_item_t *new, uint32_t message_size,
uint32_t message_type) uint32_t message_type)
{ {
itti_queue_item_t *head = NULL; ssize_t write_ret;
uint64_t sem_counter = 1;
DevAssert(new != NULL); struct lfds611_freelist_element *new_queue_element = NULL;
/* Lock the queue mutex for writing to insert the new element */
pthread_mutex_lock(&itti_queue.queue_mutex);
/* We reached the maximum size for the queue of messages -> remove the head */ DevAssert(new != NULL);
if (itti_queue.queue_size + message_size > ITTI_QUEUE_SIZE_MAX) {
head = STAILQ_FIRST(&itti_queue.itti_message_queue);
/* Remove the head */
STAILQ_REMOVE_HEAD(&itti_queue.itti_message_queue, entry);
} else {
itti_queue.queue_size += message_size;
}
/* Insert the packet at tail */
STAILQ_INSERT_TAIL(&itti_queue.itti_message_queue, new, entry);
itti_queue.itti_queue_last = new;
if (dump_file != NULL) new->message_type = message_type;
{ new->message_size = message_size;
itti_socket_header_t header;
header.message_size = message_size + sizeof(itti_dump_message_t); new_queue_element = lfds611_ringbuffer_get_write_element(
header.message_type = message_type; itti_dump_queue.itti_message_queue, &new_queue_element, NULL);
fwrite (&header, sizeof(itti_socket_header_t), 1, dump_file); lfds611_freelist_set_user_data_in_element(new_queue_element, (void *)new);
fwrite (&new->message_number, sizeof(new->message_number), 1, dump_file);
fwrite (new->message_name, sizeof(new->message_name), 1, dump_file);
fwrite (new->data, new->data_size, 1, dump_file);
fflush (dump_file);
}
/* Release the mutex */ lfds611_ringbuffer_put_write_element(itti_dump_queue.itti_message_queue,
pthread_mutex_unlock(&itti_queue.queue_mutex); new_queue_element);
/* No need to have the mutex locked to free data as at this point the message /* Call to write for an event fd must be of 8 bytes */
* is no more in the list. write_ret = write(itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
*/ DevCheck(write_ret == sizeof(sem_counter), write_ret, sem_counter, 0);
if (head) {
free(head->data);
free(head);
head = NULL;
}
return 0; return 0;
} }
...@@ -278,17 +276,17 @@ int itti_dump_queue_message(message_number_t message_number, ...@@ -278,17 +276,17 @@ int itti_dump_queue_message(message_number_t message_number,
const char *message_name, const char *message_name,
const uint32_t message_size) const uint32_t message_size)
{ {
itti_queue_item_t *new; itti_dump_queue_item_t *new;
size_t message_name_length; size_t message_name_length;
int i; int i;
DevAssert(message_name != NULL); DevAssert(message_name != NULL);
DevAssert(message_p != NULL); DevAssert(message_p != NULL);
new = calloc(1, sizeof(itti_queue_item_t)); new = calloc(1, sizeof(itti_dump_queue_item_t));
if (new == NULL) { if (new == NULL) {
ITTI_ERROR("Failed to allocate memory (%s:%d)\n", ITTI_DUMP_ERROR("Failed to allocate memory (%s:%d)\n",
__FILE__, __LINE__); __FILE__, __LINE__);
return -1; return -1;
} }
...@@ -296,7 +294,7 @@ int itti_dump_queue_message(message_number_t message_number, ...@@ -296,7 +294,7 @@ int itti_dump_queue_message(message_number_t message_number,
new->data = malloc(message_size); new->data = malloc(message_size);
if (new->data == NULL) { if (new->data == NULL) {
ITTI_ERROR("Failed to allocate memory (%s:%d)\n", ITTI_DUMP_ERROR("Failed to allocate memory (%s:%d)\n",
__FILE__, __LINE__); __FILE__, __LINE__);
return -1; return -1;
} }
...@@ -309,17 +307,12 @@ int itti_dump_queue_message(message_number_t message_number, ...@@ -309,17 +307,12 @@ int itti_dump_queue_message(message_number_t message_number,
SIGNAL_NAME_LENGTH, 0); SIGNAL_NAME_LENGTH, 0);
memcpy(new->message_name, message_name, message_name_length); memcpy(new->message_name, message_name, message_name_length);
itti_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE); itti_dump_enqueue_message(new, message_size, ITTI_DUMP_MESSAGE_TYPE);
for (i = 0; i < ITTI_DUMP_MAX_CON; i++) { for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
if (pthread_mutex_trylock(&itti_queue.itti_clients[i].client_lock) == 0) { if (itti_dump_queue.itti_clients[i].sd == -1)
pthread_mutex_unlock(&itti_queue.itti_clients[i].client_lock);
} else {
continue;
}
if (itti_queue.itti_clients[i].sd == -1)
continue; continue;
itti_dump_send_message(itti_queue.itti_clients[i].sd, new); itti_dump_send_message(itti_dump_queue.itti_clients[i].sd, new);
} }
return 0; return 0;
...@@ -332,10 +325,10 @@ static void *itti_dump_socket(void *arg_p) ...@@ -332,10 +325,10 @@ static void *itti_dump_socket(void *arg_p)
int rc; int rc;
int itti_listen_socket, max_sd; int itti_listen_socket, max_sd;
int on = 1; int on = 1;
fd_set master_set, working_set; fd_set read_set, working_set;
struct sockaddr_in servaddr; /* socket address structure */ struct sockaddr_in servaddr; /* socket address structure */
ITTI_DEBUG("Creating TCP dump socket on port %u\n", ITTI_PORT); ITTI_DUMP_DEBUG("Creating TCP dump socket on port %u\n", ITTI_PORT);
message_definition_xml = (char *)arg_p; message_definition_xml = (char *)arg_p;
DevAssert(message_definition_xml != NULL); DevAssert(message_definition_xml != NULL);
...@@ -343,7 +336,7 @@ static void *itti_dump_socket(void *arg_p) ...@@ -343,7 +336,7 @@ static void *itti_dump_socket(void *arg_p)
message_definition_xml_length = strlen(message_definition_xml) + 1; message_definition_xml_length = strlen(message_definition_xml) + 1;
if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((itti_listen_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
ITTI_ERROR("Socket creation failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("Socket creation failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -351,7 +344,7 @@ static void *itti_dump_socket(void *arg_p) ...@@ -351,7 +344,7 @@ static void *itti_dump_socket(void *arg_p)
rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR, rc = setsockopt(itti_listen_socket, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on)); (char *)&on, sizeof(on));
if (rc < 0) { if (rc < 0) {
ITTI_ERROR("setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("setsockopt SO_REUSEADDR failed (%d:%s)\n", errno, strerror(errno));
close(itti_listen_socket); close(itti_listen_socket);
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -361,7 +354,7 @@ static void *itti_dump_socket(void *arg_p) ...@@ -361,7 +354,7 @@ static void *itti_dump_socket(void *arg_p)
*/ */
rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on); rc = ioctl(itti_listen_socket, FIONBIO, (char *)&on);
if (rc < 0) { if (rc < 0) {
ITTI_ERROR("ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("ioctl FIONBIO (non-blocking) failed (%d:%s)\n", errno, strerror(errno));
close(itti_listen_socket); close(itti_listen_socket);
pthread_exit(NULL); pthread_exit(NULL);
} }
...@@ -373,17 +366,26 @@ static void *itti_dump_socket(void *arg_p) ...@@ -373,17 +366,26 @@ static void *itti_dump_socket(void *arg_p)
if (bind(itti_listen_socket, (struct sockaddr *) &servaddr, if (bind(itti_listen_socket, (struct sockaddr *) &servaddr,
sizeof(servaddr)) < 0) { sizeof(servaddr)) < 0) {
ITTI_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL); pthread_exit(NULL);
} }
if (listen(itti_listen_socket, 5) < 0) { if (listen(itti_listen_socket, 5) < 0) {
ITTI_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL); pthread_exit(NULL);
} }
FD_ZERO(&master_set); FD_ZERO(&read_set);
max_sd = itti_listen_socket;
FD_SET(itti_listen_socket, &master_set); /* Add the listener */
FD_SET(itti_listen_socket, &read_set);
/* Add the event fd */
FD_SET(itti_dump_queue.event_fd, &read_set);
/* Max of both sd */
max_sd = itti_listen_socket > itti_dump_queue.event_fd ? itti_listen_socket : itti_dump_queue.event_fd;
itti_dump_queue.itti_listen_socket = itti_listen_socket;
/* Loop waiting for incoming connects or for incoming data /* Loop waiting for incoming connects or for incoming data
* on any of the connected sockets. * on any of the connected sockets.
...@@ -393,9 +395,7 @@ static void *itti_dump_socket(void *arg_p) ...@@ -393,9 +395,7 @@ static void *itti_dump_socket(void *arg_p)
int client_socket = -1; int client_socket = -1;
int i; int i;
memcpy(&working_set, &master_set, sizeof(master_set)); memcpy(&working_set, &read_set, sizeof(read_set));
// ITTI_DEBUG("Stuck on select\n");
/* No timeout: select blocks till a new event has to be handled /* No timeout: select blocks till a new event has to be handled
* on sd's. * on sd's.
...@@ -403,37 +403,88 @@ static void *itti_dump_socket(void *arg_p) ...@@ -403,37 +403,88 @@ static void *itti_dump_socket(void *arg_p)
rc = select(max_sd + 1, &working_set, NULL, NULL, NULL); rc = select(max_sd + 1, &working_set, NULL, NULL, NULL);
if (rc < 0) { if (rc < 0) {
ITTI_ERROR("select failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("select failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL); pthread_exit(NULL);
} }
desc_ready = rc; desc_ready = rc;
for (i = 0; i <= max_sd && desc_ready > 0; i++) { for (i = 0; i <= max_sd && desc_ready > 0; i++)
if (FD_ISSET(i, &working_set)) { {
ITTI_DEBUG("Handling socket %d\n", i); if (FD_ISSET(i, &working_set))
{
desc_ready -= 1; desc_ready -= 1;
/* Check if the socket where data available is the listening
* socket. if (i == itti_dump_queue.event_fd) {
*/ /* Notification of new element to dump from other tasks */
if (i == itti_listen_socket) { uint64_t sem_counter;
ssize_t read_ret;
void *user_data;
int j;
struct lfds611_freelist_element *element;
/* Read will always return 1 */
read_ret = read (itti_dump_queue.event_fd, &sem_counter, sizeof(sem_counter));
if (read_ret < 0) {
ITTI_DUMP_ERROR("Failed read for semaphore: %s\n", strerror(errno));
pthread_exit(NULL);
}
DevCheck(read_ret == sizeof(sem_counter), read_ret, sizeof(sem_counter), 0);
/* Acquire the ring element */
lfds611_ringbuffer_get_read_element(itti_dump_queue.itti_message_queue, &element);
DevAssert(element != NULL);
/* Retrieve user part of the message */
lfds611_freelist_get_user_data_from_element(element, &user_data);
if (((itti_dump_queue_item_t *)user_data)->message_type == ITTI_DUMP_EXIT_SIGNAL)
{
close(itti_dump_queue.event_fd);
close(itti_dump_queue.itti_listen_socket);
lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
/* Leave the thread as we detected end signal */
pthread_exit(NULL);
}
/* Write message to file */
itti_dump_fwrite_message((itti_dump_queue_item_t *)user_data);
/* Send message to remote analyzer */
for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
if (itti_dump_queue.itti_clients[i].sd > 0) {
itti_dump_send_message(itti_dump_queue.itti_clients[i].sd,
(itti_dump_queue_item_t *)user_data);
}
}
/* We have finished with this element, reinsert it in the ring buffer */
lfds611_ringbuffer_put_read_element(itti_dump_queue.itti_message_queue, element);
ITTI_DUMP_DEBUG("Write element to file\n");
} else if (i == itti_listen_socket) {
do { do {
client_socket = accept(itti_listen_socket, NULL, NULL); client_socket = accept(itti_listen_socket, NULL, NULL);
if (client_socket < 0) { if (client_socket < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN) { if (errno == EWOULDBLOCK || errno == EAGAIN) {
/* No more new connection */ /* No more new connection */
ITTI_DEBUG("No more new connection\n"); ITTI_DUMP_DEBUG("No more new connection\n");
continue; continue;
} else { } else {
ITTI_ERROR("accept failed (%d:%s)\n", errno, strerror(errno)); ITTI_DUMP_ERROR("accept failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL); pthread_exit(NULL);
} }
} }
if (itti_dump_handle_new_connection(client_socket, message_definition_xml, if (itti_dump_handle_new_connection(client_socket, message_definition_xml,
message_definition_xml_length) == 0) { message_definition_xml_length) == 0)
{
/* The socket has been accepted. /* The socket has been accepted.
* We have to update the set to include this new sd. * We have to update the set to include this new sd.
*/ */
FD_SET(client_socket, &master_set); FD_SET(client_socket, &read_set);
if (client_socket > max_sd) if (client_socket > max_sd)
max_sd = client_socket; max_sd = client_socket;
} }
...@@ -444,13 +495,13 @@ static void *itti_dump_socket(void *arg_p) ...@@ -444,13 +495,13 @@ static void *itti_dump_socket(void *arg_p)
*/ */
uint8_t j; uint8_t j;
ITTI_DEBUG("Socket %d disconnected\n", i); ITTI_DUMP_DEBUG("Socket %d disconnected\n", i);
/* Close the socket and update info related to this connection */ /* Close the socket and update info related to this connection */
close(i); close(i);
for (j = 0; j < ITTI_DUMP_MAX_CON; j++) { for (j = 0; j < ITTI_DUMP_MAX_CON; j++) {
if (itti_queue.itti_clients[j].sd == i) if (itti_dump_queue.itti_clients[j].sd == i)
break; break;
} }
...@@ -462,19 +513,19 @@ static void *itti_dump_socket(void *arg_p) ...@@ -462,19 +513,19 @@ static void *itti_dump_socket(void *arg_p)
/* Re-initialize the socket to -1 so we can accept new /* Re-initialize the socket to -1 so we can accept new
* incoming connections. * incoming connections.
*/ */
itti_queue.itti_clients[j].sd = -1; itti_dump_queue.itti_clients[j].sd = -1;
itti_queue.itti_clients[j].last_message_number = 0; itti_dump_queue.itti_clients[j].last_message_number = 0;
itti_queue.nb_connected--; itti_dump_queue.nb_connected--;
/* Remove the socket from the FD set and update the max sd */ /* Remove the socket from the FD set and update the max sd */
FD_CLR(i, &master_set); FD_CLR(i, &read_set);
if (i == max_sd) if (i == max_sd)
{ {
if (itti_queue.nb_connected == 0) { if (itti_dump_queue.nb_connected == 0) {
/* No more new connection max_sd = itti_listen_socket */ /* No more new connection max_sd = itti_listen_socket */
max_sd = itti_listen_socket; max_sd = itti_listen_socket;
} else { } else {
while (FD_ISSET(max_sd, &master_set) == 0) { while (FD_ISSET(max_sd, &read_set) == 0) {
max_sd -= 1; max_sd -= 1;
} }
} }
...@@ -489,44 +540,33 @@ static void *itti_dump_socket(void *arg_p) ...@@ -489,44 +540,33 @@ static void *itti_dump_socket(void *arg_p)
static static
int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length) int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t xml_definition_length)
{ {
if (itti_queue.nb_connected < ITTI_DUMP_MAX_CON) { if (itti_dump_queue.nb_connected < ITTI_DUMP_MAX_CON) {
itti_queue_item_t *item;
uint8_t i; uint8_t i;
for (i = 0; i < ITTI_DUMP_MAX_CON; i++) { for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
/* Let's find a place to store the new client */ /* Let's find a place to store the new client */
if (itti_queue.itti_clients[i].sd == -1) { if (itti_dump_queue.itti_clients[i].sd == -1) {
break; break;
} }
} }
ITTI_DEBUG("Found place to store new connection: %d\n", i); ITTI_DUMP_DEBUG("Found place to store new connection: %d\n", i);
DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd); DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd);
pthread_mutex_lock(&itti_queue.itti_clients[i].client_lock); ITTI_DUMP_DEBUG("Socket %d accepted\n", sd);
itti_queue.itti_clients[i].sd = sd;
itti_queue.nb_connected++;
ITTI_DEBUG("Socket %d accepted\n", sd);
/* Send the XML message definition */ /* Send the XML message definition */
if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) { if (itti_dump_send_xml_definition(sd, xml_definition, xml_definition_length) < 0) {
ITTI_ERROR("Failed to send XML definition\n"); ITTI_DUMP_ERROR("Failed to send XML definition\n");
close (sd); close (sd);
return -1; return -1;
} }
/* At this point we have to dump the complete list */ itti_dump_queue.itti_clients[i].sd = sd;
pthread_mutex_lock(&itti_queue.queue_mutex); itti_dump_queue.nb_connected++;
STAILQ_FOREACH(item, &itti_queue.itti_message_queue, entry) {
itti_dump_send_message(sd, item);
}
pthread_mutex_unlock(&itti_queue.queue_mutex);
pthread_mutex_unlock(&itti_queue.itti_clients[i].client_lock);
} else { } else {
ITTI_DEBUG("Socket %d rejected\n", sd); ITTI_DUMP_DEBUG("Socket %d rejected\n", sd);
/* We have reached max number of users connected... /* We have reached max number of users connected...
* Reject the connection. * Reject the connection.
*/ */
...@@ -537,9 +577,20 @@ int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t ...@@ -537,9 +577,20 @@ int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t
return 0; return 0;
} }
int itti_dump_user_data_init_function(void **user_data, void *user_state)
{
return 0;
}
/* This function should be called by each thread that will use the ring buffer */
void itti_dump_thread_use_ring_buffer(void)
{
lfds611_ringbuffer_use(itti_dump_queue.itti_message_queue);
}
int itti_dump_init(const char * const messages_definition_xml, const char * const dump_file_name) int itti_dump_init(const char * const messages_definition_xml, const char * const dump_file_name)
{ {
int i; int i, ret;
if (dump_file_name != NULL) if (dump_file_name != NULL)
{ {
...@@ -547,10 +598,11 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons ...@@ -547,10 +598,11 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
if (dump_file == NULL) if (dump_file == NULL)
{ {
ITTI_ERROR("can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno)); ITTI_DUMP_ERROR("can not open dump file \"%s\" (%d:%s)\n", dump_file_name, errno, strerror(errno));
} }
else else
{ {
/* Output the XML to file */
uint32_t message_size = strlen(messages_definition_xml) + 1; uint32_t message_size = strlen(messages_definition_xml) + 1;
itti_socket_header_t header; itti_socket_header_t header;
...@@ -563,34 +615,97 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons ...@@ -563,34 +615,97 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
} }
} }
memset(&itti_queue, 0, sizeof(itti_desc_t)); memset(&itti_dump_queue, 0, sizeof(itti_desc_t));
ITTI_DUMP_DEBUG("Creating new ring buffer for itti dump of %u elements\n",
ITTI_QUEUE_MAX_ELEMENTS);
if (lfds611_ringbuffer_new(&itti_dump_queue.itti_message_queue,
ITTI_QUEUE_MAX_ELEMENTS,
NULL,
NULL) != 1)
{
ITTI_DUMP_ERROR("Failed to create ring buffer...\n");
/* Always assert on this condition */
DevAssert(0 == 1);
}
itti_dump_queue.event_fd = eventfd(0, EFD_SEMAPHORE);
if (itti_dump_queue.event_fd == -1)
{
ITTI_DUMP_ERROR("eventfd failed: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
pthread_mutex_init(&itti_queue.queue_mutex, NULL); itti_dump_queue.queue_size = 0;
STAILQ_INIT(&itti_queue.itti_message_queue); itti_dump_queue.nb_connected = 0;
itti_queue.queue_size = 0;
itti_queue.nb_connected = 0;
for(i = 0; i < ITTI_DUMP_MAX_CON; i++) { for(i = 0; i < ITTI_DUMP_MAX_CON; i++) {
itti_queue.itti_clients[i].sd = -1; itti_dump_queue.itti_clients[i].sd = -1;
itti_queue.itti_clients[i].last_message_number = 0; itti_dump_queue.itti_clients[i].last_message_number = 0;
}
/* Init per user lock */ /* initialized with default attributes */
pthread_mutex_init(&itti_queue.itti_clients[i].client_lock, NULL); ret = pthread_attr_init(&itti_dump_queue.attr);
if (ret < 0) {
ITTI_DUMP_ERROR("pthread_attr_init failed (%d:%s)\n", errno, strerror(errno));
return -1;
} }
if (pthread_create(&itti_queue.itti_acceptor_thread, NULL, &itti_dump_socket,
(void *)messages_definition_xml) < 0) { ret = pthread_attr_setschedpolicy(&itti_dump_queue.attr, SCHED_RR);
ITTI_ERROR("pthread_create failed (%d:%s)\n", errno, strerror(errno)); if (ret < 0) {
ITTI_DUMP_ERROR("pthread_attr_setschedpolicy (SCHED_IDLE) failed (%d:%s)\n", errno, strerror(errno));
return -1; return -1;
} }
ret = pthread_create(&itti_dump_queue.itti_acceptor_thread, &itti_dump_queue.attr,
&itti_dump_socket, (void *)messages_definition_xml);
if (ret < 0) {
ITTI_DUMP_ERROR("pthread_create failed (%d:%s)\n", errno, strerror(errno));
return -1;
}
return 0; return 0;
} }
void itti_dump_user_data_delete_function(void *user_data, void *user_state)
{
if (user_data != NULL)
{
itti_dump_queue_item_t *item;
item = (itti_dump_queue_item_t *)user_data;
free(item->data);
free(item);
}
}
void itti_dump_exit(void) void itti_dump_exit(void)
{ {
void *arg;
itti_dump_queue_item_t new;
/* Send the exit signal to other thread */
itti_dump_enqueue_message(&new, 0, ITTI_DUMP_EXIT_SIGNAL);
ITTI_DUMP_DEBUG("waiting for dumper thread to finish\n");
/* wait for the thread to terminate */
pthread_join(itti_dump_queue.itti_acceptor_thread, &arg);
ITTI_DUMP_DEBUG("dumper thread correctly exited\n");
if (dump_file != NULL) if (dump_file != NULL)
{ {
/* Synchronise file and then close it */
fclose(dump_file); fclose(dump_file);
dump_file = NULL; dump_file = NULL;
} }
}
if (itti_dump_queue.itti_message_queue)
{
lfds611_ringbuffer_delete(itti_dump_queue.itti_message_queue,
itti_dump_user_data_delete_function, NULL);
}
}
...@@ -42,4 +42,6 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons ...@@ -42,4 +42,6 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
void itti_dump_exit(void); void itti_dump_exit(void);
void itti_dump_thread_use_ring_buffer(void);
#endif /* INTERTASK_INTERFACE_DUMP_H_ */ #endif /* INTERTASK_INTERFACE_DUMP_H_ */
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
#define ITTI_PORT (10007) #define ITTI_PORT (10007)
/* This is the queue size for signal dumper */ /* This is the queue size for signal dumper */
#define ITTI_QUEUE_SIZE_MAX (1 * 1024 * 1024) /* 1 MBytes */ #define ITTI_QUEUE_MAX_ELEMENTS (200 * 1024)
#define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */ #define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */
#endif /* INTERTASK_INTERFACE_CONF_H_ */ #endif /* INTERTASK_INTERFACE_CONF_H_ */
...@@ -45,7 +45,7 @@ ...@@ -45,7 +45,7 @@
#define ITTI_PORT (10006) #define ITTI_PORT (10006)
/* This is the queue size for signal dumper */ /* This is the queue size for signal dumper */
#define ITTI_QUEUE_SIZE_MAX (1 * 1024 * 1024) /* 1 MBytes */ #define ITTI_QUEUE_MAX_ELEMENTS (200 * 1000)
#define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */ #define ITTI_DUMP_MAX_CON (5) /* Max connections in parallel */
#endif /* INTERTASK_INTERFACE_CONF_H_ */ #endif /* INTERTASK_INTERFACE_CONF_H_ */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment