Commit 278fc6d7 authored by Robert Schmidt's avatar Robert Schmidt

Improve PROTO_AGENT

* unified CU/DU in one PROTO_AGENT instance since they are symmetric from UDP POV
* delete a lot of unnecessary code
* better error handling
* can reciprocally send data
parent f060385b
......@@ -79,19 +79,13 @@ typedef struct mme_ip_address_s {
char *ipv6_address;
} mme_ip_address_t;
typedef struct du_params {
const char *remote_ipv4_address;
const int16_t remote_port;
} du_params_t;
typedef struct cu_params {
const char *local_interface;
const char *local_ipv4_address;
const uint16_t local_port;
} cu_params_t;
const char *remote_ipv4_address;
const int16_t remote_port;
} cudu_params_t;
typedef struct ru_config_s {
// indicates if local or remote rf is used (1 == LOCAL)
......
......@@ -43,7 +43,6 @@
#define ENB_AGENT_MAX 9
proto_agent_instance_t proto_agent[MAX_DU];
proto_agent_instance_t proto_server[MAX_DU];
pthread_t new_thread(void *(*f)(void *), void *b);
......@@ -56,15 +55,17 @@ Protocol__FlexsplitMessage *proto_agent_timeout_fsp(void* args);
/* Server side function; upon a new connection
reception, sends the hello packets
*/
int proto_server_start(mod_id_t mod_id, const cu_params_t *cu)
int proto_agent_start(mod_id_t mod_id, const cudu_params_t *p)
{
int channel_id;
DevAssert(cu->local_interface);
DevAssert(cu->local_ipv4_address);
DevAssert(cu->local_port > 1024); // "unprivileged" port
DevAssert(p->local_interface);
DevAssert(p->local_ipv4_address);
DevAssert(p->local_port > 1024); // "unprivileged" port
DevAssert(p->remote_ipv4_address);
DevAssert(p->remote_port > 1024); // "unprivileged" port
proto_server[mod_id].mod_id = mod_id;
proto_agent[mod_id].mod_id = mod_id;
/* Initialize the channel container */
......@@ -73,7 +74,9 @@ int proto_server_start(mod_id_t mod_id, const cu_params_t *cu)
/*Create the async channel info*/
proto_agent_async_channel_t *channel_info;
channel_info = proto_server_async_channel_info(mod_id, cu->local_ipv4_address, cu->local_port);
channel_info = proto_agent_async_channel_info(mod_id, p->local_ipv4_address, p->local_port,
p->remote_ipv4_address, p->remote_port);
if (!channel_info) goto error;
/* Create a channel using the async channel info */
channel_id = proto_agent_create_channel((void *) channel_info,
......@@ -84,7 +87,7 @@ int proto_server_start(mod_id_t mod_id, const cu_params_t *cu)
proto_agent_channel_t *channel = proto_agent_get_channel(channel_id);
if (!channel) goto error;
proto_server[mod_id].channel = channel;
proto_agent[mod_id].channel = channel;
/* Register the channel for all underlying agents (use ENB_AGENT_MAX) */
proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX);
......@@ -117,8 +120,9 @@ int proto_server_start(mod_id_t mod_id, const cu_params_t *cu)
//}
proto_server[mod_id].recv_thread = new_thread(proto_server_receive, &proto_server[mod_id]);
proto_agent[mod_id].recv_thread = new_thread(proto_agent_receive, &proto_agent[mod_id]);
fprintf(stderr, "[PROTO_AGENT] server started at port %s:%d\n", p->local_ipv4_address, p->local_port);
return 0;
error:
......@@ -127,75 +131,6 @@ error:
}
int proto_agent_start(mod_id_t mod_id, const du_params_t *du)
{
int channel_id;
DevAssert(du->remote_ipv4_address);
DevAssert(du->remote_port > 1024); // "unprivileged" port
proto_agent[mod_id].mod_id = mod_id;
/* TODO only initialize the first time */
proto_agent_init_channel_container();
/*Create the async channel info*/
proto_agent_async_channel_t *channel_info = proto_agent_async_channel_info(mod_id, du->remote_ipv4_address, du->remote_port);
if (!channel_info) goto error;
/*Create a channel using the async channel info*/
channel_id = proto_agent_create_channel((void *) channel_info,
proto_agent_async_msg_send,
proto_agent_async_msg_recv,
proto_agent_async_release);
if (channel_id <= 0) goto error;
proto_agent_channel_t *channel = proto_agent_get_channel(channel_id);
if (!channel) goto error;
proto_agent[mod_id].channel = channel;
/* Register the channel for all underlying agents (use ENB_AGENT_MAX) */
proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX);
//uint8_t *msg = NULL;
//Protocol__FlexsplitMessage *init_msg=NULL;
//int msg_flag;
// In the case of UDP comm, start the echo request from the client side; the server thread should be blocked until it reads the SRC port of the 1st packet
//if (udp == 1)
//{
// msg_flag = proto_agent_echo_request(cu_id, NULL, &init_msg);
// if (msg_flag != 0)
// goto error;
//
// int msgsize = 0;
// if (init_msg != NULL)
// msg = proto_agent_pack_message(init_msg, &msgsize);
// if (msg!= NULL)
// {
// LOG_D(PROTO_AGENT, "Client sending the ECHO_REQUEST message over the async channel\n");
// proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
// }
//}
//
/* After sending the message, wait for any replies;
the server thread blocks until it reads any data
over the channel
*/
proto_agent[mod_id].recv_thread = new_thread(proto_client_receive, &proto_agent[mod_id]);
return 0;
error:
LOG_E(PROTO_AGENT, "there was an error in proto_agent_start()\n");
return 1;
}
//void
//proto_agent_send_hello(void)
//{
......@@ -224,16 +159,14 @@ proto_agent_send_rlc_data_req(const protocol_ctxt_t* const ctxt_pP,
const rb_id_t rb_idP, const mui_t muiP,
confirm_t confirmP, sdu_size_t sdu_sizeP, mem_block_t *sdu_pP)
{
//LOG_D(PROTO_AGENT, "PROTOPDCP: sending the data req over the async channel\n");
uint8_t *msg = NULL;
Protocol__FlexsplitMessage *init_msg=NULL;
int msg_flag = 0;
int msgsize = 0;
mod_id_t mod_id = ctxt_pP->module_id;
//printf( "PDCP agent: Calling the PDCP DATA REQ constructor\n");
DevAssert(proto_agent[mod_id].channel);
DevAssert(proto_agent[mod_id].channel->channel_info);
data_req_args *args = malloc(sizeof(data_req_args));
......@@ -249,27 +182,12 @@ proto_agent_send_rlc_data_req(const protocol_ctxt_t* const ctxt_pP,
memcpy(args->sdu_p, sdu_pP->data, sdu_sizeP);
msg_flag = proto_agent_pdcp_data_req(mod_id, (void *) args, &init_msg);
if (msg_flag != 0)
goto error;
int msgsize = 0;
if (init_msg != NULL)
{
if (msg_flag != 0 || !init_msg) goto error;
msg = proto_agent_pack_message(init_msg, &msgsize);
if (!msg) goto error;
LOG_D(PROTO_AGENT, "Sending the pdcp data_req message over the async channel\n");
if (msg!=NULL)
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1,
proto_agent[mod_id].channel->channel_info);
}
else
{
goto error;
}
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, proto_agent[mod_id].channel->channel_info);
return;
error:
......@@ -283,16 +201,14 @@ void
proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_flag_t srb_flagP,
const MBMS_flag_t MBMS_flagP, const rb_id_t rb_idP, sdu_size_t sdu_sizeP, mem_block_t *sdu_pP)
{
//LOG_D(PROTO_AGENT, "PROTOPDCP: Sending Data Indication over the async channel\n");
uint8_t *msg = NULL;
Protocol__FlexsplitMessage *init_msg = NULL;
int msg_flag = 0;
int msgsize = 0;
mod_id_t mod_id = ctxt_pP->module_id;
//printf( "PDCP agent: Calling the PDCP_DATA_IND constructor\n");
DevAssert(proto_agent[mod_id].channel);
DevAssert(proto_agent[mod_id].channel->channel_info);
data_req_args *args = malloc(sizeof(data_req_args));
......@@ -306,25 +222,13 @@ proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_f
memcpy(args->sdu_p, sdu_pP->data, sdu_sizeP);
msg_flag = proto_agent_pdcp_data_ind(mod_id, (void *) args, &init_msg);
if (msg_flag != 0)
goto error;
int msgsize = 0;
if (msg_flag != 0 || !init_msg) goto error;
if (init_msg != NULL)
{
msg = proto_agent_pack_message(init_msg, &msgsize);
if (!msg) goto error;
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, proto_agent[mod_id].channel->channel_info);
if (msg!=NULL)
{
LOG_D(PROTO_AGENT, "Sending the pdcp data_ind message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, proto_server[mod_id].channel->channel_info);
}
}
else
{
goto error;
}
return;
error:
......@@ -333,11 +237,8 @@ error:
}
void *
proto_server_receive(void *args)
proto_agent_receive(void *args)
{
proto_agent_instance_t *inst = args;
void *data = NULL;
......@@ -360,82 +261,21 @@ proto_server_receive(void *args)
LOG_D(PROTO_AGENT, "Server side Received message with size %d and priority %d, calling message handle\n", size, priority);
msg=proto_agent_handle_message(inst->mod_id, data, size);
if (msg == NULL)
{
LOG_D(PROTO_AGENT, "msg to send back is NULL\n");
}
else
{
ser_msg = proto_agent_pack_message(msg, &size);
}
LOG_D(PROTO_AGENT, "Server sending the reply message over the async channel\n");
if (ser_msg != NULL){
if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, inst->channel->channel_info)){
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error;
}
LOG_D(PROTO_AGENT, "sent message with size %d\n", size);
}
}
return NULL;
error:
LOG_E(PROTO_AGENT, "server_receive_thread: error %d occured\n",err_code);
return NULL;
}
void *
proto_client_receive(void *args)
{
AssertFatal(0, "check proto_client_receive\n");
proto_agent_instance_t *inst = args;
void *data = NULL;
int size;
int priority;
err_code_t err_code;
Protocol__FlexsplitMessage *msg;
uint8_t *ser_msg;
while (1) {
msg = NULL;
ser_msg = NULL;
if (proto_agent_async_msg_recv(&data, &size, &priority, inst->channel->channel_info)){
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error;
}
LOG_D(PROTO_AGENT,
"Client Received message with size %d and priority %d, calling message handle with mod_id %u\n",
size, priority, inst->mod_id);
msg = proto_agent_handle_message(inst->mod_id, data, size);
if (msg == NULL)
{
if (!msg) {
LOG_D(PROTO_AGENT, "msg to send back is NULL\n");
continue;
}
else
{
ser_msg = proto_agent_pack_message(msg, &size);
if (!ser_msg) {
continue;
}
LOG_D(PROTO_AGENT, "Server sending the reply message over the async channel\n");
if (ser_msg != NULL){
LOG_D(PROTO_AGENT, "Server sending the reply message over the async channel\n");
if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, inst->channel->channel_info)){
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error;
}
LOG_D(PROTO_AGENT, "sent message with size %d\n", size);
}
......@@ -444,7 +284,6 @@ proto_client_receive(void *args)
return NULL;
error:
LOG_E(PROTO_AGENT, " client_receive_thread: error %d occured\n",err_code);
LOG_E(PROTO_AGENT, "proto_agent_receive(): error %d occured\n",err_code);
return NULL;
}
......@@ -41,16 +41,9 @@
#include "ENB_APP/enb_config.h" // for enb properties
void * proto_server_init(void *args);
void * proto_server_receive(void *args);
void * proto_client_receive(void *args);
void * proto_agent_receive(void *args);
int proto_agent_start(mod_id_t mod_id, const du_params_t *du);
int proto_server_start(mod_id_t mod_id, const cu_params_t* cu);
int proto_agent_stop(mod_id_t mod_id);
void *proto_agent_task(void *args);
int proto_agent_start(mod_id_t mod_id, const cudu_params_t *p);
void proto_agent_send_rlc_data_req( const protocol_ctxt_t* const ctxt_pP,
const srb_flag_t srb_flagP, const MBMS_flag_t MBMS_flagP,
......
......@@ -35,20 +35,20 @@
#include "common/utils/LOG/log.h"
proto_agent_async_channel_t *
proto_server_async_channel_info(mod_id_t mod_id, const char *ip, uint16_t port)
proto_agent_async_channel_info(mod_id_t mod_id, const char *bind_ip, uint16_t bind_port,
const char* peer_ip, uint16_t peer_port)
{
LOG_E(PROTO_AGENT, "does not bind to specific address at the moment, ignoring %s\n", ip);
proto_agent_async_channel_t *channel;
channel = malloc(sizeof(proto_agent_channel_t));
if (channel == NULL)
goto error;
channel->port = port;
channel->peer_addr = NULL;
channel->peer_port = peer_port;
channel->peer_addr = peer_ip;
channel->enb_id = mod_id;
channel->link = new_link_udp_server(port);
channel->link = new_link_udp_server(bind_ip, bind_port);
if (channel->link == NULL) goto error;
......@@ -62,51 +62,14 @@ proto_server_async_channel_info(mod_id_t mod_id, const char *ip, uint16_t port)
channel->link,
CHANNEL_UDP,
channel->peer_addr,
channel->port);
if (channel->manager == NULL) goto error;
return channel;
error:
LOG_E(PROTO_AGENT,"there was an error\n");
return NULL;
}
proto_agent_async_channel_t *
proto_agent_async_channel_info(mod_id_t mod_id, const char *dst_ip, uint16_t dst_port)
{
proto_agent_async_channel_t *channel;
channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t));
if (channel == NULL)
goto error;
channel->port = dst_port;
channel->peer_addr = dst_ip;
channel->enb_id = mod_id;
channel->link = new_link_udp_client(channel->peer_addr, channel->port);
if (channel->link == NULL) goto error;
channel->send_queue = new_message_queue();
if (channel->send_queue == NULL) goto error;
channel->receive_queue = new_message_queue();
if (channel->receive_queue == NULL) goto error;
channel->manager = create_link_manager(channel->send_queue,
channel->receive_queue,
channel->link,
CHANNEL_UDP,
channel->peer_addr,
channel->port);
channel->peer_port);
if (channel->manager == NULL) goto error;
return channel;
error:
LOG_E(PROTO_AGENT,"there was an error\n");
fprintf(stderr, "error creating proto_agent_async_channel_t\n");
return NULL;
}
......
......@@ -42,15 +42,16 @@
typedef struct proto_agent_async_channel_s {
mod_id_t enb_id;
const char *peer_addr;
int port;
int peer_port;
socket_link_t *link;
message_queue_t *send_queue;
message_queue_t *receive_queue;
link_manager_t *manager;
} proto_agent_async_channel_t;
proto_agent_async_channel_t * proto_agent_async_channel_info(mod_id_t mod_id, const char *dst_ip, uint16_t dst_port);
proto_agent_async_channel_t * proto_server_async_channel_info(mod_id_t mod_id, const char *ip, uint16_t _port);
proto_agent_async_channel_t *
proto_agent_async_channel_info(mod_id_t mod_id, const char *bind_ip, uint16_t bind_port,
const char *peer_ip, uint16_t peer_port);
int proto_agent_async_msg_send(void *data, int size, int priority, void *channel_info);
......
......@@ -82,6 +82,8 @@ Protocol__FlexsplitMessage* proto_agent_handle_message (mod_id_t mod_id,
err_code= PROTOCOL__FLEXSPLIT_ERR__MSG_DECODING;
goto error;
}
/* after deserialization, we don't need the original data memory anymore */
free(data);
Protocol__FspHeader *header = (Protocol__FspHeader*) decoded_message;
if (header->has_type)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment