/* * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The OpenAirInterface Software Alliance licenses this file to You under * the OAI Public License, Version 1.1 (the "License"); you may not use this file * except in compliance with the License. * You may obtain a copy of the License at * * http://www.openairinterface.org/?page_id=698 * * Author and copyright: Laurent Thomas, open-cells.com * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *------------------------------------------------------------------------------- * For more information about the OpenAirInterface (OAI) Software Alliance: * contact@openairinterface.org */ #include <vector> #include <map> #include <sys/eventfd.h> extern "C" { #include <intertask_interface.h> #include <common/utils/system.h> typedef struct timer_elm_s { timer_type_t type; ///< Timer type long instance; long duration; uint64_t timeout; void *timer_arg; ///< Optional argument that will be passed when timer expires } timer_elm_t ; typedef struct task_list_s { task_info_t admin; pthread_t thread; pthread_mutex_t queue_cond_lock; std::vector<MessageDef *> message_queue; std::map<long,timer_elm_t> timer_map; uint64_t next_timer=UINT64_MAX; struct epoll_event *events =NULL; int nb_fd_epoll=0; int nb_events=0; int epoll_fd=-1; int sem_fd=-1; } task_list_t; int timer_expired(int fd); task_list_t tasks[TASK_MAX]; void *pool_buffer_init (void) { return 0; } void *pool_buffer_clean (void *arg) { //----------------------------------------------------------------------------- return 0; } void free_mem_block (mem_block_t *leP, const char *caller) { AssertFatal(leP!=NULL,""); free(leP); } mem_block_t *get_free_mem_block (uint32_t sizeP, const char *caller) { mem_block_t *ptr=(mem_block_t *)malloc(sizeP+sizeof(mem_block_t)); ptr->next = NULL; ptr->previous = NULL; ptr->data=((unsigned char *)ptr)+sizeof(mem_block_t); ptr->size=sizeP; return ptr; } void *itti_malloc(task_id_t origin_task_id, task_id_t destination_task_id, ssize_t size) { void *ptr = NULL; AssertFatal ((ptr=calloc (size, 1)) != NULL, "Memory allocation of %zu bytes failed (%d -> %d)!\n", size, origin_task_id, destination_task_id); return ptr; } int itti_free(task_id_t task_id, void *ptr) { AssertFatal (ptr != NULL, "Trying to free a NULL pointer (%d)!\n", task_id); free (ptr); return (EXIT_SUCCESS); } // in the two following functions, the +32 in malloc is there to deal with gcc memory alignment // because a struct size can be larger than sum(sizeof(struct components)) // We should remove the itti principle of a huge union for all types of messages in paramter "msg_t ittiMsg" // to use a more C classical pointer casting "void * ittiMsg", later casted in the right struct // but we would have to change all legacy macros, as per this example // #define S1AP_REGISTER_ENB_REQ(mSGpTR) (mSGpTR)->ittiMsg.s1ap_register_enb_req // would become // #define S1AP_REGISTER_ENB_REQ(mSGpTR) (s1ap_register_enb_req) mSGpTR)->ittiMsg MessageDef *itti_alloc_new_message_sized(task_id_t origin_task_id, MessagesIds message_id, MessageHeaderSize size) { MessageDef *temp = (MessageDef *)itti_malloc (origin_task_id, TASK_UNKNOWN, sizeof(MessageHeader) +32 + size); temp->ittiMsgHeader.messageId = message_id; temp->ittiMsgHeader.originTaskId = origin_task_id; temp->ittiMsgHeader.ittiMsgSize = size; return temp; } MessageDef *itti_alloc_new_message(task_id_t origin_task_id, MessagesIds message_id) { int size=sizeof(MessageHeader) + 32 + messages_info[message_id].size; MessageDef *temp = (MessageDef *)itti_malloc (origin_task_id, TASK_UNKNOWN, size); temp->ittiMsgHeader.messageId = message_id; temp->ittiMsgHeader.originTaskId = origin_task_id; temp->ittiMsgHeader.ittiMsgSize = size; temp->ittiMsgHeader.destinationTaskId=TASK_UNKNOWN; temp->ittiMsgHeader.instance=0; temp->ittiMsgHeader.lte_time={0}; return temp; //return itti_alloc_new_message_sized(origin_task_id, message_id, messages_info[message_id].size); } static inline int itti_send_msg_to_task_locked(task_id_t destination_task_id, instance_t instance, MessageDef *message) { task_list_t *t=tasks+destination_task_id; message->ittiMsgHeader.destinationTaskId = destination_task_id; message->ittiMsgHeader.instance = instance; message->ittiMsgHeader.lte_time.frame = 0; message->ittiMsgHeader.lte_time.slot = 0; int message_id = message->ittiMsgHeader.messageId; size_t s=t->message_queue.size(); if ( s > t->admin.queue_size ) LOG_E(TMR,"Queue for %s task contains %ld messages\n", itti_get_task_name(destination_task_id), s ); if ( s > 50 ) LOG_I(TMR,"Queue for %s task size: %ld\n",itti_get_task_name(destination_task_id), s+1); t->message_queue.insert(t->message_queue.begin(), message); eventfd_t sem_counter = 1; AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), ""); LOG_D(TMR,"sent messages id=%d to %s\n",message_id, t->admin.name); return 0; } int itti_send_msg_to_task(task_id_t destination_task_id, instance_t instance, MessageDef *message) { task_list_t *t=&tasks[destination_task_id]; pthread_mutex_lock (&t->queue_cond_lock); int ret=itti_send_msg_to_task_locked(destination_task_id, instance, message); while ( t->message_queue.size()>0 && t->admin.func != NULL ) { if (t->message_queue.size()>1) LOG_W(TMR,"queue in no thread mode is %ld\n", t->message_queue.size()); pthread_mutex_unlock (&t->queue_cond_lock); t->admin.func(NULL); pthread_mutex_lock (&t->queue_cond_lock); } pthread_mutex_unlock (&t->queue_cond_lock); return ret; } void itti_subscribe_event_fd(task_id_t task_id, int fd) { struct epoll_event event; task_list_t *t=&tasks[task_id]; t->nb_fd_epoll++; t->events = (struct epoll_event *)realloc((void *)t->events, t->nb_fd_epoll * sizeof(struct epoll_event)); event.events = EPOLLIN | EPOLLERR; event.data.u64 = 0; event.data.fd = fd; AssertFatal(epoll_ctl(t->epoll_fd, EPOLL_CTL_ADD, fd, &event) == 0, "epoll_ctl (EPOLL_CTL_ADD) failed for task %s, fd %d: %s!\n", itti_get_task_name(task_id), fd, strerror(errno)); } void itti_unsubscribe_event_fd(task_id_t task_id, int fd) { task_list_t *t=&tasks[task_id]; AssertFatal (epoll_ctl(t->epoll_fd, EPOLL_CTL_DEL, fd, NULL) == 0, "epoll_ctl (EPOLL_CTL_DEL) failed for task %s, fd %d: %s!\n", itti_get_task_name(task_id), fd, strerror(errno)); t->nb_fd_epoll--; } static inline int itti_get_events_locked(task_id_t task_id, struct epoll_event **events) { task_list_t *t=&tasks[task_id]; uint64_t current_time=0; do { if ( t->next_timer != UINT64_MAX ) { struct timespec tp; clock_gettime(CLOCK_MONOTONIC, &tp); current_time=(uint64_t)tp.tv_sec*1000+tp.tv_nsec/(1000*1000); if ( t->next_timer < current_time) { t->next_timer=UINT64_MAX; // Proceed expired timer for ( auto it=t->timer_map.begin() , next_it = it; it != t->timer_map.end() ; it = next_it ) { ++next_it; if ( it->second.timeout < current_time ) { MessageDef *message = itti_alloc_new_message(TASK_TIMER, TIMER_HAS_EXPIRED); message->ittiMsg.timer_has_expired.timer_id=it->first; message->ittiMsg.timer_has_expired.arg=it->second.timer_arg; if (itti_send_msg_to_task_locked(task_id, it->second.instance, message) < 0) { LOG_W(TMR,"Failed to send msg TIMER_HAS_EXPIRED to task %u\n", task_id); free(message); t->timer_map.erase(it); return -1; } if ( it->second.type==TIMER_PERIODIC ) { it->second.timeout+=it->second.duration; if (it->second.timeout < t->next_timer) t->next_timer=it->second.timeout; } else t->timer_map.erase(it); } else if (it->second.timeout < t->next_timer) t->next_timer=it->second.timeout; } } } int epoll_timeout = -1; if ( t->next_timer != UINT64_MAX ) epoll_timeout = t->next_timer-current_time; pthread_mutex_unlock(&t->queue_cond_lock); LOG_D(TMR,"enter blocking wait for %s\n", itti_get_task_name(task_id)); t->nb_events = epoll_wait(t->epoll_fd,t->events,t->nb_fd_epoll, epoll_timeout); if ( t->nb_events < 0 && (errno == EINTR || errno == EAGAIN ) ) pthread_mutex_lock(&t->queue_cond_lock); } while (t->nb_events < 0 && (errno == EINTR || errno == EAGAIN ) ); AssertFatal (t->nb_events >=0, "epoll_wait failed for task %s, nb fds %d, timeout %lu: %s!\n", itti_get_task_name(task_id), t->nb_fd_epoll, t->next_timer != UINT64_MAX ? t->next_timer-current_time : -1, strerror(errno)); LOG_D(TMR,"receive on %d descriptors for %s\n", t->nb_events, itti_get_task_name(task_id)); if (t->nb_events == 0) /* No data to read -> return */ return 0; for (int i = 0; i < t->nb_events; i++) { /* Check if there is an event for ITTI for the event fd */ if ((t->events[i].events & EPOLLIN) && (t->events[i].data.fd == t->sem_fd)) { eventfd_t sem_counter; /* Read will always return 1 */ AssertFatal( sizeof(sem_counter) == read (t->sem_fd, &sem_counter, sizeof(sem_counter)), ""); /* Mark that the event has been processed */ t->events[i].events &= ~EPOLLIN; } } *events = t->events; return t->nb_events; } int itti_get_events(task_id_t task_id, struct epoll_event **events) { pthread_mutex_lock(&tasks[task_id].queue_cond_lock); return itti_get_events_locked(task_id, events); } void itti_receive_msg(task_id_t task_id, MessageDef **received_msg) { // Reception of one message, blocking caller task_list_t *t=&tasks[task_id]; pthread_mutex_lock(&t->queue_cond_lock); // Weird condition to deal with crap legacy itti interface if ( t->nb_fd_epoll == 1 ) { while (t->message_queue.empty()) { itti_get_events_locked(task_id, &t->events); pthread_mutex_lock(&t->queue_cond_lock); } } else { if (t->message_queue.empty()) { itti_get_events_locked(task_id, &t->events); pthread_mutex_lock(&t->queue_cond_lock); } } // Legacy design: we return even if we have no message // in this case, *received_msg is NULL if (t->message_queue.empty()) { *received_msg=NULL; LOG_D(TMR,"task %s received even from other fd (total fds: %d), returning msg NULL\n",t->admin.name, t->nb_fd_epoll); } else { *received_msg=t->message_queue.back(); t->message_queue.pop_back(); LOG_D(TMR,"task %s received a message\n",t->admin.name); } pthread_mutex_unlock (&t->queue_cond_lock); } void itti_poll_msg(task_id_t task_id, MessageDef **received_msg) { //reception of one message, non-blocking task_list_t *t=&tasks[task_id]; pthread_mutex_lock(&t->queue_cond_lock); if (!t->message_queue.empty()) { LOG_D(TMR,"task %s received a message in polling mode\n",t->admin.name); *received_msg=t->message_queue.back(); t->message_queue.pop_back(); } else *received_msg=NULL; pthread_mutex_unlock (&t->queue_cond_lock); } int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *args_p) { task_list_t *t=&tasks[task_id]; threadCreate (&t->thread, start_routine, args_p, (char*)itti_get_task_name(task_id),-1,OAI_PRIORITY_RT); LOG_I(TMR,"Created Posix thread %s\n", itti_get_task_name(task_id) ); return 0; } void itti_exit_task(void) { pthread_exit (NULL); } void itti_terminate_tasks(task_id_t task_id) { // Sends Terminate signals to all tasks. itti_send_terminate_message (task_id); usleep(100*1000); // Allow the tasks to receive the message before going returning to main thread } int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_id_max, const task_info_t *tasks_info, const message_info_t *messages_info) { AssertFatal(TASK_MAX<UINT16_MAX, "Max itti tasks"); for(int i=0; i<task_max; ++i) { LOG_I(TMR,"Starting itti queue: %s as task %d\n", tasks_info[i].name, i); pthread_mutex_init(&tasks[i].queue_cond_lock, NULL); memcpy(&tasks[i].admin, &tasks_info[i], sizeof(task_info_t)); AssertFatal( ( tasks[i].epoll_fd = epoll_create1(0) ) >=0, ""); AssertFatal( ( tasks[i].sem_fd = eventfd(0, EFD_SEMAPHORE) ) >=0, ""); itti_subscribe_event_fd((task_id_t)i, tasks[i].sem_fd); if (tasks[i].admin.threadFunc != NULL) itti_create_task((task_id_t)i, tasks[i].admin.threadFunc, NULL); } return 0; } int timer_setup( uint32_t interval_sec, uint32_t interval_us, task_id_t task_id, int32_t instance, timer_type_t type, void *timer_arg, long *timer_id) { task_list_t *t=&tasks[task_id]; do { // set the taskid in the timer id to keep compatible with the legacy API // timer_remove() takes only the timer id as parameter *timer_id=(random()%UINT16_MAX) << 16 | task_id ; } while ( t->timer_map.find(*timer_id) != t->timer_map.end()); /* Allocate new timer list element */ timer_elm_t timer; struct timespec tp; clock_gettime(CLOCK_MONOTONIC, &tp); if (interval_us%1000 != 0) LOG_W(TMR, "Can't set timer precision below 1ms, rounding it\n"); timer.duration = interval_sec*1000+interval_us/1000; timer.timeout= ((uint64_t)tp.tv_sec*1000+tp.tv_nsec/(1000*1000)+timer.duration); timer.instance = instance; timer.type = type; timer.timer_arg = timer_arg; pthread_mutex_lock (&t->queue_cond_lock); t->timer_map[*timer_id]= timer; if (timer.timeout < t->next_timer) t->next_timer=timer.timeout; eventfd_t sem_counter = 1; AssertFatal ( sizeof(sem_counter) == write(t->sem_fd, &sem_counter, sizeof(sem_counter)), ""); pthread_mutex_unlock (&t->queue_cond_lock); return 0; } int timer_remove(long timer_id) { task_id_t task_id=(task_id_t)(timer_id&0xffff); int ret; pthread_mutex_lock (&tasks[task_id].queue_cond_lock); ret=tasks[task_id].timer_map.erase(timer_id); pthread_mutex_unlock (&tasks[task_id].queue_cond_lock); if (ret==1) return 0; else { LOG_W(TMR, "tried to remove a non existing timer\n"); return 1; } } const char *itti_get_message_name(MessagesIds message_id) { return messages_info[message_id].name; } const char *itti_get_task_name(task_id_t task_id) { return tasks[task_id].admin.name; } // void for compatibility void itti_send_terminate_message(task_id_t task_id) { } void itti_wait_tasks_end(void) { while(1) sleep(24*3600); } void itti_update_lte_time(uint32_t frame, uint8_t slot) {} void itti_set_task_real_time(task_id_t task_id) {} void itti_mark_task_ready(task_id_t task_id) { // Function meaning is clear, but legacy implementation is wrong // keep it void is fine: today implementation accepts messages in the queue before task is ready } void itti_wait_ready(int wait_tasks) { // Stupid function, kept for compatibility (the parameter is meaningless!!!) } int signal_mask(void) { return 0;} }