/* * Licensed to the OpenAirInterface (OAI) Software Alliance under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The OpenAirInterface Software Alliance licenses this file to You under * the OAI Public License, Version 1.1 (the "License"); you may not use this file * except in compliance with the License. * You may obtain a copy of the License at * * http://www.openairinterface.org/?page_id=698 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *------------------------------------------------------------------------------- * For more information about the OpenAirInterface (OAI) Software Alliance: * contact@openairinterface.org */ /*! \file enb_agent.h * \brief top level enb agent receive thread and itti task * \author Navid Nikaein and Xenofon Foukas * \date 2016 * \version 0.1 */ #define _GNU_SOURCE #include "proto_agent_common.h" #include "common/utils/LOG/log.h" #include "proto_agent.h" #include "assertions.h" #include "proto_agent_net_comm.h" #include "proto_agent_async.h" #include <common/utils/system.h> #include <pthread.h> #define ENB_AGENT_MAX 9 proto_agent_instance_t proto_agent[MAX_DU]; //pthread_t new_thread(void *(*f)(void *), void *b); Protocol__FlexsplitMessage *proto_agent_timeout_fsp(void *args); #define TEST_MOD 0 #define ECHO /* Server side function; upon a new connection reception, sends the hello packets */ int proto_agent_start(mod_id_t mod_id, const cudu_params_t *p) { int channel_id; // RS: CUDU does not work! //DevAssert(p->local_interface); //DevAssert(p->local_ipv4_address); //DevAssert(p->local_port > 1024); // "unprivileged" port //DevAssert(p->remote_ipv4_address); //DevAssert(p->remote_port > 1024); // "unprivileged" port proto_agent[mod_id].mod_id = mod_id; proto_agent[mod_id].exit = 0; /* Initialize the channel container */ /* TODO only initialize the first time */ proto_agent_init_channel_container(); /*Create the async channel info*/ proto_agent_async_channel_t *channel_info; channel_info = proto_agent_async_channel_info(mod_id, p->local_ipv4_address, p->local_port, p->remote_ipv4_address, p->remote_port); if (!channel_info) goto error; /* Create a channel using the async channel info */ channel_id = proto_agent_create_channel((void *) channel_info, proto_agent_async_msg_send, proto_agent_async_msg_recv, proto_agent_async_release); if (channel_id <= 0) goto error; proto_agent_channel_t *channel = proto_agent_get_channel(channel_id); if (!channel) goto error; proto_agent[mod_id].channel = channel; /* Register the channel for all underlying agents (use ENB_AGENT_MAX) */ proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX); // Code for sending the HELLO/ECHO_REQ message once a connection is established //uint8_t *msg = NULL; //Protocol__FlexsplitMessage *init_msg=NULL; //if (udp == 0) //{ // // If the comm is not UDP, allow the server to send the first packet over the channel // //printf( "Proto agent Server: Calling the echo_request packet constructor\n"); // msg_flag = proto_agent_echo_request(mod_id, NULL, &init_msg); // if (msg_flag != 0) // goto error; // // int msgsize = 0; // if (init_msg != NULL) // msg = proto_agent_pack_message(init_msg, &msgsize); // if (msg!= NULL) // { // LOG_D(PROTO_AGENT, "Server sending the message over the async channel\n"); // proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info); // } // /* After sending the message, wait for any replies; // the server thread blocks until it reads any data // over the channel // */ //} //proto_agent[mod_id].recv_thread = new_thread(proto_agent_receive, &proto_agent[mod_id]); threadCreate(&proto_agent[mod_id].recv_thread, proto_agent_receive, &proto_agent[mod_id], "proto", -1, OAI_PRIORITY_RT_LOW); fprintf(stderr, "[PROTO_AGENT] server started at port %s:%d\n", p->local_ipv4_address, p->local_port); return 0; error: LOG_E(PROTO_AGENT, "there was an error\n"); return 1; } void proto_agent_stop(mod_id_t mod_id) { if (!proto_agent[mod_id].channel) return; /* unlock the independent read thread proto_agent_receive() */ proto_agent[mod_id].exit = 1; proto_agent_async_msg_recv_unlock(proto_agent[mod_id].channel->channel_info); proto_agent_async_release(proto_agent[mod_id].channel); proto_agent_destroy_channel(proto_agent[mod_id].channel->channel_id); free(proto_agent[mod_id].channel); proto_agent[mod_id].channel = NULL; LOG_W(PROTO_AGENT, "server stopped\n"); } //void //proto_agent_send_hello(void) //{ // uint8_t *msg = NULL; // Protocol__FlexsplitMessage *init_msg=NULL; // int msg_flag = 0; // // // //printf( "PDCP agent: Calling the HELLO packet constructor\n"); // msg_flag = proto_agent_hello(proto_agent[TEST_MOD].mod_id, NULL, &init_msg); // // int msgsize = 0; // if (msg_flag == 0) // { // proto_agent_serialize_message(init_msg, &msg, &msgsize); // } // // LOG_D(PROTO_AGENT, "Agent sending the message over the async channel\n"); // proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel[TEST_MOD]); //} rlc_op_status_t proto_agent_send_rlc_data_req(const protocol_ctxt_t *const ctxt_pP, const srb_flag_t srb_flagP, const MBMS_flag_t MBMS_flagP, const rb_id_t rb_idP, const mui_t muiP, confirm_t confirmP, sdu_size_t sdu_sizeP, mem_block_t *sdu_pP) { uint8_t *msg = NULL; Protocol__FlexsplitMessage *init_msg=NULL; int msg_flag = 0; int msgsize = 0; mod_id_t mod_id = ctxt_pP->module_id; data_req_args args; DevAssert(proto_agent[mod_id].channel); DevAssert(proto_agent[mod_id].channel->channel_info); args.ctxt = ctxt_pP; args.srb_flag = srb_flagP; args.MBMS_flag = MBMS_flagP; args.rb_id = rb_idP; args.mui = muiP; args.confirm = confirmP; args.sdu_size = sdu_sizeP; args.sdu_p = sdu_pP; msg_flag = proto_agent_pdcp_data_req(mod_id, (void *) &args, &init_msg); if (msg_flag != 0 || !init_msg) goto error; msg = proto_agent_pack_message(init_msg, &msgsize); if (!msg) goto error; proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, proto_agent[mod_id].channel->channel_info); free_mem_block(sdu_pP, __func__); return RLC_OP_STATUS_OK; error: LOG_E(PROTO_AGENT, "PROTO_AGENT there was an error\n"); return RLC_OP_STATUS_INTERNAL_ERROR; } boolean_t proto_agent_send_pdcp_data_ind(const protocol_ctxt_t *const ctxt_pP, const srb_flag_t srb_flagP, const MBMS_flag_t MBMS_flagP, const rb_id_t rb_idP, sdu_size_t sdu_sizeP, mem_block_t *sdu_pP) { uint8_t *msg = NULL; Protocol__FlexsplitMessage *init_msg = NULL; int msg_flag = 0; int msgsize = 0; mod_id_t mod_id = ctxt_pP->module_id; data_req_args args; DevAssert(proto_agent[mod_id].channel); DevAssert(proto_agent[mod_id].channel->channel_info); args.ctxt = ctxt_pP; args.srb_flag = srb_flagP; args.MBMS_flag = MBMS_flagP; args.rb_id = rb_idP; args.sdu_size = sdu_sizeP; args.sdu_p = sdu_pP; msg_flag = proto_agent_pdcp_data_ind(mod_id, (void *) &args, &init_msg); if (msg_flag != 0 || !init_msg) goto error; msg = proto_agent_pack_message(init_msg, &msgsize); if (!msg) goto error; proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, proto_agent[mod_id].channel->channel_info); free_mem_block(sdu_pP, __func__); return TRUE; error: LOG_E(PROTO_AGENT, "there was an error in %s\n", __func__); return FALSE; } void * proto_agent_receive(void *args) { proto_agent_instance_t *inst = args; void *data = NULL; int size; int priority; err_code_t err_code; pthread_setname_np(pthread_self(), "proto_rx"); Protocol__FlexsplitMessage *msg; uint8_t *ser_msg; while (1) { msg = NULL; ser_msg = NULL; if ((size = proto_agent_async_msg_recv(&data, &priority, inst->channel->channel_info)) < 0) { err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING; goto error; } if (inst->exit) break; LOG_D(PROTO_AGENT, "Server side Received message with size %d and priority %d, calling message handle\n", size, priority); msg = proto_agent_handle_message(inst->mod_id, data, size); if (!msg) { LOG_D(PROTO_AGENT, "msg to send back is NULL\n"); continue; } ser_msg = proto_agent_pack_message(msg, &size); if (!ser_msg) { continue; } LOG_D(PROTO_AGENT, "Server sending the reply message over the async channel\n"); if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, inst->channel->channel_info)) { err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING; goto error; } LOG_D(PROTO_AGENT, "sent message with size %d\n", size); } return NULL; error: LOG_E(PROTO_AGENT, "proto_agent_receive(): error %d occured\n",err_code); return NULL; }