Commit 9eb72144 authored by Cedric Roux's avatar Cedric Roux

- New core event dispatcher for ITTI: will allow a single task to monitor on...

- New core event dispatcher for ITTI: will allow a single task to monitor on multiple event sources via event_fd + epoll_wait apis
- Mutex per user for the ITTI dumper to avoid sending new signals when the XML is dumped to logger

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4328 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent d0104e39
......@@ -40,9 +40,20 @@
#include "queue.h"
#include "assertions.h"
// #define ENABLE_EVENT_FD
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
# include <sys/eventfd.h>
# include "liblfds611.h"
#endif
#include "intertask_interface.h"
#include "intertask_interface_dump.h"
/* Includes "intertask_interface_init.h" to check prototype coherence, but disable threads and messages information generation */
/* Includes "intertask_interface_init.h" to check prototype coherence, but
* disable threads and messages information generation.
*/
#define CHECK_PROTOTYPE_ONLY
#include "intertask_interface_init.h"
#undef CHECK_PROTOTYPE_ONLY
......@@ -66,7 +77,9 @@ typedef enum task_state_s {
/* This list acts as a FIFO of messages received by tasks (RRC, NAS, ...) */
struct message_list_s {
#if !defined(ENABLE_EVENT_FD)
STAILQ_ENTRY(message_list_s) next_element;
#endif
MessageDef *msg; ///< Pointer to the message
......@@ -76,6 +89,7 @@ struct message_list_s {
typedef struct task_desc_s {
/* Queue of messages belonging to the task */
#if !defined(ENABLE_EVENT_FD)
STAILQ_HEAD(message_queue_head, message_list_s) message_queue;
/* Number of messages in the queue */
......@@ -84,7 +98,28 @@ typedef struct task_desc_s {
pthread_mutex_t message_queue_mutex;
/* Conditional var for message queue and task synchro */
pthread_cond_t message_queue_cond_var;
pthread_t task_thread;
#else
struct lfds611_queue_state *message_queue;
/* This fd is used internally by ITTI. */
int epoll_fd;
/* The task fd */
int task_event_fd;
/* Number of events to monitor */
uint16_t nb_events;
/* Array of events monitored by the task.
* By default only one fd is monitored (the one used to received messages
* from other tasks).
* More events can be suscribed later by the task itself.
*/
struct epoll_event *events;
#endif
/* pthread associated with the task */
pthread_t task_thread;
/* State of the task */
volatile task_state_t task_state;
} task_desc_t;
......@@ -196,17 +231,19 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
priority = itti_get_message_priority (message_id);
/* Lock the mutex to get exclusive access to the list */
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
/* We cannot send a message if the task is not running */
DevCheck(itti_desc.tasks[thread_id].task_state == TASK_STATE_READY, itti_desc.tasks[thread_id].task_state,
TASK_STATE_READY, thread_id);
#if !defined(ENABLE_EVENT_FD)
/* Lock the mutex to get exclusive access to the list */
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
/* Check the number of messages in the queue */
DevCheck((itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)) < ITTI_QUEUE_SIZE_PER_TASK,
(itti_desc.tasks[thread_id].message_in_queue * sizeof(MessageDef)), ITTI_QUEUE_SIZE_PER_TASK,
itti_desc.tasks[thread_id].message_in_queue);
itti_desc.tasks[thread_id].message_in_queue);
#endif
/* Allocate new list element */
new = (struct message_list_s *) malloc (sizeof(struct message_list_s));
......@@ -223,6 +260,16 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
itti_dump_queue_message (message_number, message, itti_desc.messages_info[message_id].name,
MESSAGE_SIZE(message_id));
#if defined(ENABLE_EVENT_FD)
{
uint64_t message_ptr = 0;
/* Call to write for an event fd must be of 8 bytes */
write(itti_desc.tasks[thread_id].task_event_fd, &message_ptr, sizeof(message_ptr));
lfds611_queue_enqueue(itti_desc.tasks[thread_id].message_queue, new);
}
#else
if (STAILQ_EMPTY (&itti_desc.tasks[thread_id].message_queue)) {
STAILQ_INSERT_HEAD (&itti_desc.tasks[thread_id].message_queue, new, next_element);
}
......@@ -260,13 +307,77 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
}
/* Release the mutex */
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
#endif
ITTI_DEBUG(
"Message %s, number %lu with priority %d successfully sent to queue (%u:%s)\n",
itti_desc.messages_info[message_id].name, message_number, priority, thread_id, itti_desc.threads_name[thread_id]);
return 0;
}
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
#if defined(ENABLE_EVENT_FD)
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{
int epoll_ret = 0;
int epoll_timeout = 0;
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
DevAssert(received_msg != NULL);
*received_msg = NULL;
if (polling) {
/* In polling mode we set the timeout to 0 causing epoll_wait to return
* immediately.
*/
epoll_timeout = 0;
} else {
/* timeout = -1 causes the epoll_wait to wait indefinetely.
*/
epoll_timeout = -1;
}
epoll_ret = epoll_wait(itti_desc.tasks[thread_id].epoll_fd,
itti_desc.tasks[thread_id].events,
itti_desc.tasks[thread_id].nb_events,
epoll_timeout);
if (epoll_ret < 0) {
ITTI_ERROR("epoll_wait failed for task %s: %s\n",
itti_get_task_name(task_id), strerror(errno));
DevAssert(0 == 1);
}
if (epoll_ret == 0 && polling) {
/* No data to read -> return */
return;
}
{
struct message_list_s *message;
uint64_t sem_counter;
/* Read will always return 1 */
read(itti_desc.tasks[thread_id].task_event_fd, &sem_counter, sizeof(sem_counter));
if (lfds611_queue_dequeue(itti_desc.tasks[thread_id].message_queue,
(void **)&message) == 0)
{
/* No element in list -> this should not happen */
DevMessage("No element in message queue...");
}
*received_msg = message->msg;
free(message);
}
}
#endif
void itti_receive_msg(task_id_t task_id, MessageDef **received_msg)
{
#if defined(ENABLE_EVENT_FD)
itti_receive_msg_internal_event_fd(task_id, 0, received_msg);
#else
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
......@@ -297,6 +408,7 @@ void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) {
}
// Release the mutex
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
#endif
}
void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received_msg) {
......@@ -307,6 +419,9 @@ void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received
*received_msg = NULL;
#if defined(ENABLE_EVENT_FD)
itti_receive_msg_internal_event_fd(task_id, 1, received_msg);
#else
if (itti_desc.tasks[thread_id].message_in_queue != 0) {
struct message_list_s *temp;
......@@ -335,6 +450,7 @@ void itti_poll_msg(task_id_t task_id, instance_t instance, MessageDef **received
// Release the mutex
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
}
#endif
if (*received_msg == NULL) {
ITTI_DEBUG("No message in queue[(%u:%s)] for task %x\n", thread_id, itti_desc.threads_name[thread_id], task_id);
......@@ -366,12 +482,17 @@ void itti_mark_task_ready(task_id_t task_id) {
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
#if !defined(ENABLE_EVENT_FD)
// Lock the mutex to get exclusive access to the list
pthread_mutex_lock (&itti_desc.tasks[thread_id].message_queue_mutex);
#endif
itti_desc.tasks[thread_id].task_state = TASK_STATE_READY;
#if !defined(ENABLE_EVENT_FD)
// Release the mutex
pthread_mutex_unlock (&itti_desc.tasks[thread_id].message_queue_mutex);
#endif
}
void itti_exit_task(void) {
......@@ -394,7 +515,7 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char *
int i;
itti_desc.message_number = 1;
ITTI_DEBUG( "Init: %d threads, %d messages\n", thread_max, messages_id_max);
ITTI_DEBUG("Init: %d threads, %d messages\n", thread_max, messages_id_max);
CHECK_INIT_RETURN(signal_init());
......@@ -409,7 +530,48 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char *
itti_desc.tasks = calloc (itti_desc.thread_max, sizeof(task_desc_t));
/* Initializing each queue and related stuff */
for (i = THREAD_FIRST; i < itti_desc.thread_max; i++) {
for (i = THREAD_FIRST; i < itti_desc.thread_max; i++)
{
#if defined(ENABLE_EVENT_FD)
ITTI_DEBUG("Creating queue of message of size %u\n",
ITTI_QUEUE_SIZE_PER_TASK / (sizeof(MessageDef) + sizeof(struct message_list_s)));
if (lfds611_queue_new(&itti_desc.tasks[i].message_queue,
ITTI_QUEUE_SIZE_PER_TASK / (sizeof(MessageDef) + sizeof(struct message_list_s))) < 0)
{
ITTI_ERROR("lfds611_queue_new failed for task %u\n", i);
DevAssert(0 == 1);
}
itti_desc.tasks[i].epoll_fd = epoll_create1(0);
if (itti_desc.tasks[i].epoll_fd == -1) {
ITTI_ERROR("Failed to create new epoll fd: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
itti_desc.tasks[i].task_event_fd = eventfd(0, EFD_SEMAPHORE);
if (itti_desc.tasks[i].task_event_fd == -1) {
ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
itti_desc.tasks[i].nb_events = 1;
itti_desc.tasks[i].events = malloc(sizeof(struct epoll_event));
itti_desc.tasks[i].events->events = EPOLLIN;
itti_desc.tasks[i].events->data.fd = itti_desc.tasks[i].task_event_fd;
/* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[i].epoll_fd, EPOLL_CTL_ADD,
itti_desc.tasks[i].task_event_fd, itti_desc.tasks[i].events) != 0)
{
ITTI_ERROR("epoll_ctl failed: %s\n", strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
#else
STAILQ_INIT (&itti_desc.tasks[i].message_queue);
itti_desc.tasks[i].message_in_queue = 0;
......@@ -418,6 +580,8 @@ int itti_init(thread_id_t thread_max, MessagesIds messages_id_max, const char *
// Initialize Cond vars
pthread_cond_init (&itti_desc.tasks[i].message_queue_cond_var, NULL);
#endif
itti_desc.tasks[i].task_state = TASK_STATE_NOT_CONFIGURED;
}
itti_dump_init (messages_definition_xml, dump_file_name);
......
......@@ -76,6 +76,8 @@ typedef struct itti_queue_item_s {
typedef struct {
int sd;
uint32_t last_message_number;
pthread_mutex_t client_lock;
} itti_client_desc_t;
typedef struct itti_desc_s {
......@@ -289,6 +291,11 @@ int itti_dump_queue_message(message_number_t message_number,
pthread_mutex_unlock(&itti_queue.queue_mutex);
for (i = 0; i < ITTI_DUMP_MAX_CON; i++) {
if (pthread_mutex_trylock(&itti_queue.itti_clients[i].client_lock) == 0) {
pthread_mutex_unlock(&itti_queue.itti_clients[i].client_lock);
} else {
continue;
}
if (itti_queue.itti_clients[i].sd == -1)
continue;
itti_dump_send_message(itti_queue.itti_clients[i].sd, new);
......@@ -353,11 +360,11 @@ static void *itti_dump_socket(void *arg_p)
servaddr.sin_port = htons(ITTI_PORT);
if (bind(itti_listen_socket, (struct sockaddr *) &servaddr,
sizeof(servaddr)) < 0 ) {
sizeof(servaddr)) < 0) {
ITTI_ERROR("Bind failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
}
if (listen(itti_listen_socket, 5) < 0 ) {
if (listen(itti_listen_socket, 5) < 0) {
ITTI_ERROR("Listen failed (%d:%s)\n", errno, strerror(errno));
pthread_exit(NULL);
}
......@@ -376,7 +383,7 @@ static void *itti_dump_socket(void *arg_p)
memcpy(&working_set, &master_set, sizeof(master_set));
ITTI_DEBUG("Stuck on select\n");
// ITTI_DEBUG("Stuck on select\n");
/* No timeout: select blocks till a new event has to be handled
* on sd's.
......@@ -484,6 +491,9 @@ int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t
ITTI_DEBUG("Found place to store new connection: %d\n", i);
DevCheck(i < ITTI_DUMP_MAX_CON, i, ITTI_DUMP_MAX_CON, sd);
pthread_mutex_lock(&itti_queue.itti_clients[i].client_lock);
itti_queue.itti_clients[i].sd = sd;
itti_queue.nb_connected++;
......@@ -502,6 +512,7 @@ int itti_dump_handle_new_connection(int sd, const char *xml_definition, uint32_t
itti_dump_send_message(sd, item);
}
pthread_mutex_unlock(&itti_queue.queue_mutex);
pthread_mutex_unlock(&itti_queue.itti_clients[i].client_lock);
} else {
ITTI_DEBUG("Socket %d rejected\n", sd);
/* We have reached max number of users connected...
......@@ -549,6 +560,9 @@ int itti_dump_init(const char * const messages_definition_xml, const char * cons
for(i = 0; i < ITTI_DUMP_MAX_CON; i++) {
itti_queue.itti_clients[i].sd = -1;
itti_queue.itti_clients[i].last_message_number = 0;
/* Init per user lock */
pthread_mutex_init(&itti_queue.itti_clients[i].client_lock, NULL);
}
if (pthread_create(&itti_queue.itti_acceptor_thread, NULL, &itti_dump_socket,
(void *)messages_definition_xml) < 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment