Commit 18633187 authored by Cedric Roux's avatar Cedric Roux

- Added api to subscribe/unsuscribe to new fd events for epoll_wait. Note that...

- Added api to subscribe/unsuscribe to new fd events for epoll_wait. Note that events added this way should be monitor by the task itself.

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4331 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 35985c44
...@@ -40,8 +40,6 @@ ...@@ -40,8 +40,6 @@
#include "queue.h" #include "queue.h"
#include "assertions.h" #include "assertions.h"
// #define ENABLE_EVENT_FD
#if defined(ENABLE_EVENT_FD) #if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h> # include <sys/epoll.h>
# include <sys/eventfd.h> # include <sys/eventfd.h>
...@@ -116,6 +114,8 @@ typedef struct task_desc_s { ...@@ -116,6 +114,8 @@ typedef struct task_desc_s {
* More events can be suscribed later by the task itself. * More events can be suscribed later by the task itself.
*/ */
struct epoll_event *events; struct epoll_event *events;
int epoll_nb_events;
#endif #endif
/* pthread associated with the task */ /* pthread associated with the task */
pthread_t task_thread; pthread_t task_thread;
...@@ -316,10 +316,73 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me ...@@ -316,10 +316,73 @@ int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *me
} }
#if defined(ENABLE_EVENT_FD) #if defined(ENABLE_EVENT_FD)
void itti_subscribe_event_fd(task_id_t task_id, int fd)
{
struct epoll_event event;
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
DevCheck(fd >= 0, fd, 0, 0);
itti_desc.tasks[thread_id].nb_events++;
/* Reallocate the events */
itti_desc.tasks[thread_id].events = realloc(
itti_desc.tasks[thread_id].events,
itti_desc.tasks[thread_id].nb_events * sizeof(struct epoll_event));
event.events = EPOLLIN;
event.data.fd = fd;
/* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[thread_id].epoll_fd, EPOLL_CTL_ADD, fd,
&event) != 0)
{
ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s\n",
itti_get_task_name(task_id), fd, strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
}
void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
{
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
DevCheck(fd >= 0, fd, 0, 0);
/* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[thread_id].epoll_fd, EPOLL_CTL_DEL, fd, NULL) != 0)
{
ITTI_ERROR("epoll_ctl (EPOLL_CTL_DEL) failed for task %s and fd %d: %s\n",
itti_get_task_name(task_id), fd, strerror(errno));
/* Always assert on this condition */
DevAssert(0 == 1);
}
itti_desc.tasks[thread_id].nb_events--;
itti_desc.tasks[thread_id].events = realloc(
itti_desc.tasks[thread_id].events,
itti_desc.tasks[thread_id].nb_events * sizeof(struct epoll_event));
}
int itti_get_events(task_id_t task_id, struct epoll_event **events)
{
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
DevCheck(thread_id < itti_desc.thread_max, thread_id, itti_desc.thread_max, 0);
*events = itti_desc.tasks[thread_id].events;
return itti_desc.tasks[thread_id].epoll_nb_events;
}
static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg) static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t polling, MessageDef **received_msg)
{ {
int epoll_ret = 0; int epoll_ret = 0;
int epoll_timeout = 0; int epoll_timeout = 0;
int i;
thread_id_t thread_id = TASK_GET_THREAD_ID(task_id); thread_id_t thread_id = TASK_GET_THREAD_ID(task_id);
...@@ -354,6 +417,12 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t ...@@ -354,6 +417,12 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
return; return;
} }
itti_desc.tasks[thread_id].epoll_nb_events = epoll_ret;
for (i = 0; i < epoll_ret; i++) {
/* Check if there is an event for the ITTI for the event fd */
if ((itti_desc.tasks[thread_id].events[i].events & EPOLLIN) &&
(itti_desc.tasks[thread_id].events[i].data.fd == itti_desc.tasks[thread_id].task_event_fd))
{ {
struct message_list_s *message; struct message_list_s *message;
uint64_t sem_counter; uint64_t sem_counter;
...@@ -369,6 +438,8 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t ...@@ -369,6 +438,8 @@ static inline void itti_receive_msg_internal_event_fd(task_id_t task_id, uint8_t
} }
*received_msg = message->msg; *received_msg = message->msg;
free(message); free(message);
return;
}
} }
} }
#endif #endif
......
...@@ -34,6 +34,10 @@ ...@@ -34,6 +34,10 @@
* @{ * @{
*/ */
#if defined(ENABLE_EVENT_FD)
# include <sys/epoll.h>
#endif
#ifndef INTERTASK_INTERFACE_H_ #ifndef INTERTASK_INTERFACE_H_
#define INTERTASK_INTERFACE_H_ #define INTERTASK_INTERFACE_H_
...@@ -96,6 +100,28 @@ int itti_send_broadcast_message(MessageDef *message_p); ...@@ -96,6 +100,28 @@ int itti_send_broadcast_message(MessageDef *message_p);
**/ **/
int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message); int itti_send_msg_to_task(task_id_t task_id, instance_t instance, MessageDef *message);
#if defined(ENABLE_EVENT_FD)
/** \brief Add a new fd to monitor.
* NOTE: it is up to the user to read data associated with the fd
* \param task_id Task ID of the receiving task
* \param fd The file descriptor to monitor
**/
void itti_subscribe_event_fd(task_id_t task_id, int fd);
/** \brief Remove a fd from the list of fd to monitor
* \param task_id Task ID of the task
* \param fd The file descriptor to remove
**/
void itti_unsubscribe_event_fd(task_id_t task_id, int fd);
/** \brief Return the list of events excluding the fd associated with itti
* \param task_id Task ID of the task
* \param events events list
* @returns number of events to handle
**/
int itti_get_events(task_id_t task_id, struct epoll_event **events);
#endif
/** \brief Retrieves a message in the queue associated to task_id. /** \brief Retrieves a message in the queue associated to task_id.
* If the queue is empty, the thread is blocked till a new message arrives. * If the queue is empty, the thread is blocked till a new message arrives.
\param task_id Task ID of the receiving task \param task_id Task ID of the receiving task
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment