Commit 3b68376a authored by Robert Schmidt's avatar Robert Schmidt

PROTO_AGENT: Restructure/simplify code, UDP only, no own configuration

parent cf55a513
......@@ -103,8 +103,6 @@ typedef struct {
pthread_mutex_t ru_mutex;
/// condition variable for signaling setup completion of an RU
pthread_cond_t ru_cond;
struct cudu_params_s cudu;
} RAN_CONTEXT_t;
......
......@@ -416,132 +416,6 @@ void RCconfig_macrlc(int macrlc_has_f1[MAX_MAC_INST]) {
}
void RCconfig_cudu() {
int j;
char *transport_type;
char *du_type;
char *balancing;
pthread_t myid = pthread_self();
printf("CONFIG my id is %u\n", myid);
paramdef_t DU_Params[] = DUPARAMS_DESC;
paramdef_t CU_Params[] = CUPARAMS_DESC;
paramlist_def_t DU_ParamList = {CONFIG_STRING_DU_LIST,NULL,0};
paramlist_def_t CU_ParamList = {CONFIG_STRING_CU_LIST,NULL,0};
config_getlist( &DU_ParamList,DU_Params,sizeof(DU_Params)/sizeof(paramdef_t), NULL);
config_getlist( &CU_ParamList,CU_Params,sizeof(CU_Params)/sizeof(paramdef_t), NULL);
paramdef_t CU_Bal[] = CU_BAL_DESC;
paramlist_def_t CU_BalList = {CONFIG_STRING_CU_BALANCING, NULL, 0};
config_get(&CU_Bal, sizeof(CU_Bal)/sizeof(paramdef_t),NULL);
//printf("%s\n", strdup(*(CU_Bal[0].strptr)));
balancing = strdup(*(CU_Bal[0].strptr));
if (strcmp(balancing, "ALL") == 0)
{
RC.cudu.cu_balancing = CU_BALANCING_ALL;
}
else if (strcmp(balancing, "ROUND_ROBIN") == 0)
{
RC.cudu.cu_balancing = CU_BALANCING_ROUND_ROBIN;
}
else
{
RC.cudu.cu_balancing = atoi(balancing) - 1;
}
// DU Parameters
RC.cudu.local_du.du_interface = strdup(*(DU_ParamList.paramarray[0][DU_INTERFACE_F1U].strptr));
RC.cudu.local_du.du_ipv4_address = strdup(*(DU_ParamList.paramarray[0][DU_ADDRESS_F1U].strptr));
RC.cudu.local_du.du_port = *(DU_ParamList.paramarray[0][DU_PORT_F1U].iptr);
transport_type = strdup(*(DU_ParamList.paramarray[0][DU_TYPE_F1U].strptr));
if (strcmp(transport_type, "TCP") == 0)
{
RC.cudu.local_du.tcp = 1;
RC.cudu.local_du.udp = 0;
RC.cudu.local_du.sctp = 0;
}
else if (strcmp(transport_type, "UDP") == 0)
{
RC.cudu.local_du.tcp = 0;
RC.cudu.local_du.udp = 1;
RC.cudu.local_du.sctp = 0;
}
else if (strcmp(transport_type, "SCTP") == 0)
{
RC.cudu.local_du.tcp = 0;
RC.cudu.local_du.udp = 0;
RC.cudu.local_du.sctp = 1;
}
else
{
RC.cudu.local_du.tcp = 1;
RC.cudu.local_du.udp = 0;
RC.cudu.local_du.sctp = 0;
}
//CU Parameters
j = CU_ParamList.numelt;
RC.cudu.serving_dus = j;
for (int k=0; k<j; k++)
{
RC.cudu.cu[k].cu_interface = strdup(*(CU_ParamList.paramarray[k][CU_INTERFACE_F1U].strptr));
RC.cudu.cu[k].cu_ipv4_address = strdup(*(CU_ParamList.paramarray[k][CU_ADDRESS_F1U].strptr));
RC.cudu.cu[k].cu_port = *(CU_ParamList.paramarray[k][CU_PORT_F1U].iptr);
RC.cudu.cu[k].cu_id = k;
du_type = strdup(*(CU_ParamList.paramarray[k][DU_TECH].strptr));
transport_type = strdup(*(CU_ParamList.paramarray[k][CU_TYPE_F1U].strptr));
if (strcmp(transport_type, "TCP") == 0)
{
RC.cudu.cu[k].tcp = 1;
RC.cudu.cu[k].udp = 0;
RC.cudu.cu[k].sctp = 0;
}
else if (strcmp(transport_type, "UDP") == 0)
{
RC.cudu.cu[k].tcp = 0;
RC.cudu.cu[k].udp = 1;
RC.cudu.cu[k].sctp = 0;
}
else if (strcmp(transport_type, "SCTP") == 0)
{
RC.cudu.cu[k].tcp = 0;
RC.cudu.cu[k].udp = 0;
RC.cudu.cu[k].sctp = 1;
}
else
{
RC.cudu.cu[k].tcp = 1;
RC.cudu.cu[k].udp = 0;
RC.cudu.cu[k].sctp = 0;
}
if (strcmp(du_type, "LTE") == 0)
{
RC.cudu.cu[k].du_type = DU_TYPE_LTE;
}
else if (strcmp(du_type, "WiFi") == 0)
{
RC.cudu.cu[k].du_type = DU_TYPE_WIFI;
}
else
{
RC.cudu.cu[k].du_type = DU_TYPE_LTE;
}
}
}
cudu_params_t *get_cudu_config()
{
return &(RC.cudu);
}
int RCconfig_RRC(uint32_t i, eNB_RRC_INST *rrc) {
int num_enbs = 0;
......@@ -2828,8 +2702,6 @@ void RCConfig(void) {
char aprefix[MAX_OPTNAME_SIZE*2 + 8];
// RCconfig_cudu();
/* get global parameters, defined outside any section in the config file */
printf("Getting ENBSParams\n");
......
......@@ -82,37 +82,16 @@ typedef struct mme_ip_address_s {
typedef struct du_interfaces {
char *du_interface;
char *du_ipv4_address;
uint16_t du_port;
typedef struct du_params {
const char *remote_ipv4_address;
const int16_t remote_port;
} du_params_t;
unsigned tcp:1;
unsigned udp:1;
unsigned sctp:1;
}du_interfaces_t;
typedef struct cu_interfaces {
char *cu_interface;
char *cu_ipv4_address;
uint16_t cu_port;
uint8_t du_type;
uint8_t cu_id;
unsigned tcp:1;
unsigned udp:1;
unsigned sctp:1;
}cu_interfaces_t;
typedef struct cudu_params_s {
du_interfaces_t local_du;
cu_interfaces_t cu[MAX_DU];
uint8_t serving_dus;
uint8_t cu_balancing;
}cudu_params_t;
typedef struct cu_params {
const char *local_interface;
const char *local_ipv4_address;
const uint16_t local_port;
} cu_params_t;
typedef struct ru_config_s {
// indicates if local or remote rf is used (1 == LOCAL)
......@@ -145,11 +124,7 @@ void ru_config_display(void);
int RCconfig_RRC(uint32_t i, eNB_RRC_INST *rrc);
int RCconfig_S1(MessageDef *msg_p, uint32_t i);
int RCconfig_DU_F1(MessageDef *msg_p, uint32_t i);
int RCconfig_CU_F1(uint32_t i);
void RCconfig_cudu(void);
cudu_params_t *get_cudu_config(void);
void read_config_and_init(void);
int RCconfig_X2(MessageDef *msg_p, uint32_t i);
......
......@@ -45,251 +45,84 @@
proto_agent_instance_t proto_agent[MAX_DU];
proto_agent_instance_t proto_server[MAX_DU];
char in_ip[40];
static uint16_t in_port;
char local_cache[40];
void *send_thread(void *args);
//void *receive_thread(void *args);
pthread_t new_thread(void *(*f)(void *), void *b);
pthread_t cu_thread[MAX_DU], du_thread;
Protocol__FlexsplitMessage *proto_agent_timeout_fsp(void* args);
mod_id_t client_mod[MAX_DU], server_mod;
proto_agent_async_channel_t *client_channel[MAX_DU], *server_channel;
proto_recv_t client_info[MAX_DU];
#define TEST_MOD 0
uint8_t tcp = 0;
uint8_t udp = 0;
uint8_t sctp = 0;
char *link_type = NULL;
#define ECHO
/* Thread continuously listening for incomming packets */
/*
void *receive_thread(void *args) {
proto_agent_instance_t *d = args;
void *data;
int size;
int priority;
err_code_t err_code = 0;
Protocol__FlexsplitMessage *msg;
while (1) {
if (proto_agent_msg_recv(d->enb_id, PROTO_AGENT_DEFAULT, &data, &size, &priority)) {
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error;
}
LOG_D(PROTO_AGENT, "Received message with size %d and priority %d, calling message handle\n", size, priority);
msg=proto_agent_handle_message(d->enb_id, data, size);
if (msg == NULL)
{
LOG_D(PROTO_AGENT, "msg to send back is NULL\n");
}
if (msg != NULL){
if (proto_agent_msg_send(d->enb_id, PROTO_AGENT_DEFAULT, msg, size, priority)) {
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
goto error;
}
LOG_D(PROTO_AGENT, "sent message with size %d\n", size);
}
}
return NULL;
error:
LOG_E(PROTO_AGENT, "receive_thread: error %d occured\n",err_code);
return NULL;
}
*/
/* utility function to create a thread */
/*
pthread_t new_thread(void *(*f)(void *), void *b) {
pthread_t t;
pthread_attr_t att;
if (pthread_attr_init(&att)){
fprintf(stderr, "pthread_attr_init err\n");
exit(1);
}
if (pthread_attr_setdetachstate(&att, PTHREAD_CREATE_DETACHED)) {
fprintf(stderr, "pthread_attr_setdetachstate err\n");
exit(1);
}
if (pthread_create(&t, &att, f, b)) {
fprintf(stderr, "pthread_create err\n");
exit(1);
}
if (pthread_attr_destroy(&att)) {
fprintf(stderr, "pthread_attr_destroy err\n");
exit(1);
}
return t;
}
*/
/* Function to be called as a thread:
it is calling the proto_agent_server with
the appropriate arguments
*/
void * proto_server_init(void *args)
{
//printf( "Initializing server thread for listening connections\n");
mod_id_t mod_id = (mod_id_t) 0;
cudu_params_t* cudu = NULL;
cudu = get_cudu_config();
proto_server_start(mod_id, (const cudu_params_t*) cudu);
return NULL;
}
/* Server side function; upon a new connection
reception, sends the hello packets
*/
int proto_server_start(mod_id_t mod_id, const cudu_params_t* cudu){
int proto_server_start(mod_id_t mod_id, const cu_params_t *cu)
{
int channel_id;
char *peer_address = NULL;
proto_server[mod_id].enb_id = mod_id;
server_mod = mod_id;
if (cudu->local_du.du_ipv4_address != NULL)
{
//LOG_D(PROTO_AGENT, "DU ADDRESS IS %s\n",cudu->local_du.du_ipv4_address);
peer_address = strdup(cudu->local_du.du_ipv4_address);
strcpy(in_ip, cudu->local_du.du_ipv4_address);
}
else
{
strcpy(in_ip, DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
//LOG_D(PROTO_AGENT, "Cannot read DU address from conf file, setting the default (%s)\n", DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
}
if (cudu->local_du.du_port != 0)
in_port = cudu->local_du.du_port;
else
{
in_port = DEFAULT_PROTO_AGENT_PORT;
//LOG_D(PROTO_AGENT, "Cannot read DU port from conf file, setting the default (%d)\n", DEFAULT_PROTO_AGENT_PORT);
}
DevAssert(cu->local_interface);
DevAssert(cu->local_ipv4_address);
DevAssert(cu->local_port > 1024); // "unprivileged" port
if(cudu->local_du.tcp == 1)
{
tcp = 1;
link_type = strdup("TCP");
LOG_D(PROTO_AGENT, "Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else if(cudu->local_du.udp == 1)
{
udp = 1;
link_type = strdup("UDP");
LOG_D(PROTO_AGENT, "Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over UDP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else if(cudu->local_du.sctp == 1)
{
sctp = 1;
link_type = strdup("SCTP");
LOG_D(PROTO_AGENT, "Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over SCTP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else
{
tcp = 1;
link_type = strdup("TCP");
LOG_D(PROTO_AGENT, "Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
proto_server[mod_id].mod_id = mod_id;
}
/* Initialize the channel container */
/*
* Initialize the channel container
*/
/* TODO only initialize the first time */
proto_agent_init_channel_container();
/*Create the async channel info*/
proto_agent_async_channel_t *channel_info = proto_server_async_channel_info(mod_id, in_ip, in_port, link_type, peer_address);
proto_agent_async_channel_t *channel_info;
channel_info = proto_server_async_channel_info(mod_id, cu->local_ipv4_address, cu->local_port);
server_channel = channel_info;
/*Create a channel using the async channel info*/
/* Create a channel using the async channel info */
channel_id = proto_agent_create_channel((void *) channel_info,
proto_agent_async_msg_send,
proto_agent_async_msg_recv,
proto_agent_async_release);
if (channel_id <= 0) {
goto error;
}
if (channel_id <= 0) goto error;
proto_agent_channel_t *channel = proto_agent_get_channel(channel_id);
if (!channel) goto error;
proto_server[mod_id].channel = channel;
if (tcp == 1) channel->type = 0;
else if (udp == 1) channel->type = 1;
else if (sctp == 1) channel->type = 2;
else channel->type = 0;
if (channel == NULL) {
goto error;
}
/*Register the channel for all underlying agents (use ENB_AGENT_MAX)*/
/* Register the channel for all underlying agents (use ENB_AGENT_MAX) */
proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX);
// Code for sending the HELLO/ECHO_REQ message once a connection is established
uint8_t *msg = NULL;
Protocol__FlexsplitMessage *init_msg=NULL;
int msg_flag = 0;
if (udp == 0)
{
// If the comm is not UDP, allow the server to send the first packet over the channel
//printf( "Proto agent Server: Calling the echo_request packet constructor\n");
msg_flag = proto_agent_echo_request(mod_id, NULL, &init_msg);
if (msg_flag != 0)
goto error;
int msgsize = 0;
if (init_msg != NULL)
msg = proto_agent_pack_message(init_msg, &msgsize);
if (msg!= NULL)
{
LOG_D(PROTO_AGENT, "Server sending the message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
}
/* After sending the message, wait for any replies;
the server thread blocks until it reads any data
over the channel
*/
}
//uint8_t *msg = NULL;
//Protocol__FlexsplitMessage *init_msg=NULL;
//if (udp == 0)
//{
// // If the comm is not UDP, allow the server to send the first packet over the channel
// //printf( "Proto agent Server: Calling the echo_request packet constructor\n");
// msg_flag = proto_agent_echo_request(mod_id, NULL, &init_msg);
// if (msg_flag != 0)
// goto error;
//
// int msgsize = 0;
// if (init_msg != NULL)
// msg = proto_agent_pack_message(init_msg, &msgsize);
// if (msg!= NULL)
// {
// LOG_D(PROTO_AGENT, "Server sending the message over the async channel\n");
// proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
// }
// /* After sending the message, wait for any replies;
// the server thread blocks until it reads any data
// over the channel
// */
//}
proto_server[mod_id].recv_thread = new_thread(proto_server_receive, &proto_server[mod_id]);
du_thread=new_thread(proto_server_receive, &proto_server[mod_id]);
LOG_D(PROTO_AGENT, "server ends with thread_id %lu\n",du_thread);
return 0;
error:
......@@ -298,139 +131,67 @@ error:
}
int proto_agent_start(uint8_t enb_id, mod_id_t cu_id, uint8_t type_id, cudu_params_t *cudu){
int proto_agent_start(mod_id_t mod_id, const du_params_t *du)
{
int channel_id;
char *peer_address = NULL;
// cudu_params_t *cudu = get_cudu_config();
proto_agent[cu_id].enb_id = cu_id;
client_mod[cu_id] = cu_id; // FIXME: Allow for multiple types, now it will allow for DUs of different type per mod_id
client_info[cu_id].type_id = type_id;
client_info[cu_id].mod_id = cu_id;
/*
* check the configuration - Getting all the values from the config file
*/
if (cudu->cu[cu_id].cu_ipv4_address != NULL)
{
strcpy(in_ip, cudu->cu[cu_id].cu_ipv4_address);
peer_address = strdup(cudu->cu[cu_id].cu_ipv4_address);
}
else
{
strcpy(in_ip, DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
LOG_D(PROTO_AGENT, "Cannot read DU address from conf file, setting the default (%s)\n", DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
}
if (cudu->cu[cu_id].cu_port != 0)
in_port = cudu->cu[cu_id].cu_port;
else
{
in_port = DEFAULT_PROTO_AGENT_PORT;
LOG_D(PROTO_AGENT, "Cannot read DU port from conf file, setting the default (%d)\n", DEFAULT_PROTO_AGENT_PORT);
}
if(cudu->cu[cu_id].tcp == 1)
{
tcp = 1;
link_type = strdup("TCP");
LOG_D(PROTO_AGENT, "Starting PROTO agent client for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[cu_id].enb_id,
in_ip,
in_port);
}
else if(cudu->cu[cu_id].udp == 1)
{
udp = 1;
link_type = strdup("UDP");
LOG_D(PROTO_AGENT, "Starting PROTO agent client for module id %d on ipv4 %s, port %d over UDP\n",
proto_server[cu_id].enb_id,
in_ip,
in_port);
}
else if(cudu->cu[cu_id].sctp == 1)
{
sctp = 1;
link_type = strdup("SCTP");
LOG_D(PROTO_AGENT, "Starting PROTO agent client for module id %d on ipv4 %s, port %d over SCTP\n",
proto_server[cu_id].enb_id,
in_ip,
in_port);
}
else
{
tcp = 1;
link_type = strdup("TCP");
LOG_D(PROTO_AGENT, "Starting PROTO agent client for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[cu_id].enb_id,
in_ip,
in_port);
DevAssert(du->remote_ipv4_address);
DevAssert(du->remote_port > 1024); // "unprivileged" port
}
proto_agent[mod_id].mod_id = mod_id;
/*
* Initialize the channel container
*/
/* TODO only initialize the first time */
proto_agent_init_channel_container();
/*Create the async channel info*/
proto_agent_async_channel_t *channel_info = proto_agent_async_channel_info(cu_id, in_ip, in_port, link_type, peer_address);
client_channel[cu_id] = channel_info;
proto_agent_async_channel_t *channel_info = proto_agent_async_channel_info(mod_id, du->remote_ipv4_address, du->remote_port);
if (!channel_info) goto error;
/*Create a channel using the async channel info*/
channel_id = proto_agent_create_channel((void *) channel_info,
proto_agent_async_msg_send,
proto_agent_async_msg_recv,
proto_agent_async_release);
if (channel_id <= 0) {
goto error;
}
if (channel_id <= 0) goto error;
proto_agent_channel_t *channel = proto_agent_get_channel(channel_id);
if (!channel) goto error;
proto_agent[mod_id].channel = channel;
if (channel == NULL) {
goto error;
}
if (tcp == 1) channel->type = 0;
else if (udp == 1) channel->type = 1;
else if (sctp == 1) channel->type = 2;
else channel->type = 0;
/*Register the channel for all underlying agents (use ENB_AGENT_MAX)*/
proto_agent_register_channel(cu_id, channel, ENB_AGENT_MAX);
/* Register the channel for all underlying agents (use ENB_AGENT_MAX) */
proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX);
uint8_t *msg = NULL;
Protocol__FlexsplitMessage *init_msg=NULL;
int msg_flag;
//uint8_t *msg = NULL;
//Protocol__FlexsplitMessage *init_msg=NULL;
//int msg_flag;
// In the case of UDP comm, start the echo request from the client side; the server thread should be blocked until it reads the SRC port of the 1st packet
if (udp == 1)
{
msg_flag = proto_agent_echo_request(cu_id, NULL, &init_msg);
if (msg_flag != 0)
goto error;
int msgsize = 0;
if (init_msg != NULL)
msg = proto_agent_pack_message(init_msg, &msgsize);
if (msg!= NULL)
{
LOG_D(PROTO_AGENT, "Client sending the ECHO_REQUEST message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
}
}
//if (udp == 1)
//{
// msg_flag = proto_agent_echo_request(cu_id, NULL, &init_msg);
// if (msg_flag != 0)
// goto error;
//
// int msgsize = 0;
// if (init_msg != NULL)
// msg = proto_agent_pack_message(init_msg, &msgsize);
// if (msg!= NULL)
// {
// LOG_D(PROTO_AGENT, "Client sending the ECHO_REQUEST message over the async channel\n");
// proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
// }
//}
//
/* After sending the message, wait for any replies;
the server thread blocks until it reads any data
over the channel
*/
cu_thread[cu_id]=new_thread(proto_client_receive, (void *) &client_info[cu_id]);
proto_agent[mod_id].recv_thread = new_thread(proto_client_receive, &proto_agent[mod_id]);
return 0;
error:
......@@ -439,26 +200,26 @@ error:
}
void
proto_agent_send_hello(void)
{
uint8_t *msg = NULL;
Protocol__FlexsplitMessage *init_msg=NULL;
int msg_flag = 0;
//printf( "PDCP agent: Calling the HELLO packet constructor\n");
msg_flag = proto_agent_hello(proto_agent[TEST_MOD].enb_id, NULL, &init_msg);
int msgsize = 0;
if (msg_flag == 0)
{
proto_agent_serialize_message(init_msg, &msg, &msgsize);
}
LOG_D(PROTO_AGENT, "Agent sending the message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel[TEST_MOD]);
}
//void
//proto_agent_send_hello(void)
//{
// uint8_t *msg = NULL;
// Protocol__FlexsplitMessage *init_msg=NULL;
// int msg_flag = 0;
//
//
// //printf( "PDCP agent: Calling the HELLO packet constructor\n");
// msg_flag = proto_agent_hello(proto_agent[TEST_MOD].mod_id, NULL, &init_msg);
//
// int msgsize = 0;
// if (msg_flag == 0)
// {
// proto_agent_serialize_message(init_msg, &msg, &msgsize);
// }
//
// LOG_D(PROTO_AGENT, "Agent sending the message over the async channel\n");
// proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel[TEST_MOD]);
//}
void
......@@ -543,7 +304,8 @@ proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_f
args->sdu_p = malloc(sdu_sizeP);
memcpy(args->sdu_p, sdu_pP->data, sdu_sizeP);
msg_flag = proto_agent_pdcp_data_ind(proto_server[server_mod].enb_id, (void *) args, &init_msg);
AssertFatal(0, "need mod_id here\n");
msg_flag = proto_agent_pdcp_data_ind(proto_server[0].mod_id, (void *) args, &init_msg);
if (msg_flag != 0)
goto error;
......@@ -598,7 +360,7 @@ proto_server_receive(void *args)
LOG_D(PROTO_AGENT, "Server side Received message with size %d and priority %d, calling message handle\n", size, priority);
msg=proto_agent_handle_message(d->enb_id, data, size);
msg=proto_agent_handle_message(d->mod_id, data, size);
if (msg == NULL)
{
......@@ -631,9 +393,8 @@ error:
void *
proto_client_receive(void *args)
{
proto_recv_t* recv = args;
mod_id_t recv_mod = recv->mod_id;
AssertFatal(0, "check proto_client_receive\n");
mod_id_t recv_mod = 0;
LOG_D(PROTO_AGENT, "\n\nrecv mod is %u\n\n",recv_mod);
//proto_agent_instance_t *d = &proto_agent[TEST_MOD];
......@@ -692,19 +453,3 @@ error:
return NULL;
}
uint8_t select_du(uint8_t max_dus)
{
static uint8_t selected = 0;
if (selected < max_dus -1 )
{
selected++;
}
else
{
selected = 0;
}
return selected;
}
......@@ -45,20 +45,13 @@ void * proto_server_init(void *args);
void * proto_server_receive(void *args);
void * proto_client_receive(void *args);
int proto_agent_start(uint8_t enb_id, mod_id_t mod_id, uint8_t type_id, cudu_params_t *cudu);
int proto_server_start(mod_id_t mod_id, const cudu_params_t* cudu);
int proto_agent_start(mod_id_t mod_id, const du_params_t *du);
int proto_server_start(mod_id_t mod_id, const cu_params_t* cu);
int proto_agent_stop(mod_id_t mod_id);
void *proto_agent_task(void *args);
uint8_t select_du(uint8_t max_dus);
typedef struct
{
mod_id_t mod_id;
uint8_t type_id;
}proto_recv_t;
void proto_agent_send_rlc_data_req(uint8_t mod_id, uint8_t type_id,
const protocol_ctxt_t* const ctxt_pP, const srb_flag_t srb_flagP,
const MBMS_flag_t MBMS_flagP, const rb_id_t rb_idP, const mui_t muiP,
......
......@@ -34,61 +34,35 @@
#include "common/utils/LOG/log.h"
uint16_t proto_udp = 0;
uint16_t proto_tcp = 0;
uint16_t proto_sctp = 0;
proto_agent_async_channel_t * proto_server_async_channel_info(mod_id_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr) {
proto_agent_async_channel_t *
proto_server_async_channel_info(mod_id_t mod_id, const char *ip, uint16_t port)
{
LOG_E(PROTO_AGENT, "does not bind to specific address at the moment, ignoring %s\n", ip);
proto_agent_async_channel_t *channel;
channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t));
channel->port = dst_port;
channel->peer_addr = NULL;
channel = malloc(sizeof(proto_agent_channel_t));
if (channel == NULL)
goto error;
channel->port = port;
channel->peer_addr = NULL;
channel->enb_id = mod_id;
/*Create a socket*/
if (strcmp(type, "TCP") == 0)
{
proto_tcp = 1;
printf("PROTO_AGENT: sTARTING TCP SERVER\n");
channel->link = new_link_server(dst_port);
channel->type = 0;
}
else if (strcmp(type, "UDP") == 0)
{
proto_udp = 1;
//channel->link = new_udp_link_server(dst_port);
channel->link = new_link_udp_server(dst_port);
channel->type = 1;
channel->peer_addr = peer_addr;
}
else if (strcmp(type, "SCTP") == 0)
{
proto_sctp = 1;
//channel->link = new_sctp_link_server(dst_port);
channel->link = new_link_sctp_server(dst_port);
channel->type = 2;
}
channel->link = new_link_udp_server(port);
if (channel->link == NULL) goto error;
/*
* create a message queue
*/
channel->send_queue = new_message_queue();
if (channel->send_queue == NULL) goto error;
channel->receive_queue = new_message_queue();
if (channel->receive_queue == NULL) goto error;
/*
* create a link manager
*/
channel->manager = create_link_manager(channel->send_queue, channel->receive_queue, channel->link, channel->type, channel->peer_addr, channel->port);
channel->manager = create_link_manager(channel->send_queue,
channel->receive_queue,
channel->link,
CHANNEL_UDP,
channel->peer_addr,
channel->port);
if (channel->manager == NULL) goto error;
return channel;
......@@ -99,55 +73,34 @@ proto_agent_async_channel_t * proto_server_async_channel_info(mod_id_t mod_id, c
}
proto_agent_async_channel_t * proto_agent_async_channel_info(mod_id_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr) {
proto_agent_async_channel_t *
proto_agent_async_channel_info(mod_id_t mod_id, const char *dst_ip, uint16_t dst_port)
{
proto_agent_async_channel_t *channel;
channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t));
channel->port = dst_port;
channel->peer_addr = NULL;
if (channel == NULL)
goto error;
channel->enb_id = mod_id;
/*Create a socket*/
if (strcmp(type, "TCP") == 0)
{
proto_tcp = 1;
channel->link = new_link_client(dst_ip, dst_port);
channel->type = 0;
}
else if (strcmp(type, "UDP") == 0)
{
proto_udp = 1;
channel->link = new_link_udp_client(dst_ip, dst_port);
channel->type = 1;
channel->peer_addr = peer_addr;
}
else if (strcmp(type, "SCTP") == 0)
{
proto_sctp = 1;
channel->link = new_link_sctp_client(dst_ip, dst_port);;
channel->type = 2;
}
channel->port = dst_port;
channel->peer_addr = dst_ip;
channel->enb_id = mod_id;
channel->link = new_link_udp_client(channel->peer_addr, channel->port);
if (channel->link == NULL) goto error;
/*
* create a message queue
*/
channel->send_queue = new_message_queue();
if (channel->send_queue == NULL) goto error;
channel->receive_queue = new_message_queue();
if (channel->receive_queue == NULL) goto error;
/*
* create a link manager
*/
channel->manager = create_link_manager(channel->send_queue, channel->receive_queue, channel->link, channel->type, channel->peer_addr, channel->port);
channel->manager = create_link_manager(channel->send_queue,
channel->receive_queue,
channel->link,
CHANNEL_UDP,
channel->peer_addr,
channel->port);
if (channel->manager == NULL) goto error;
return channel;
......@@ -163,19 +116,12 @@ int proto_agent_async_msg_send(void *data, int size, int priority, void *channel
return message_put(channel->send_queue, data, size, priority);
}
int proto_agent_async_msg_recv(void **data, int *size, int *priority, void *channel_info) {
int proto_agent_async_msg_recv(void **data, int *size, int *priority, void *channel_info)
{
proto_agent_async_channel_t *channel;
channel = (proto_agent_async_channel_t *)channel_info;
if (channel == NULL)
return 0;
else if (channel->type < 0)
return 0;
else if (channel->receive_queue == NULL)
return 0;
else
return message_get(channel->receive_queue, data, size, priority);
}
void proto_agent_async_release(proto_agent_channel_t *channel) {
......
......@@ -39,10 +39,9 @@
#include "proto_agent_net_comm.h"
typedef struct {
typedef struct proto_agent_async_channel_s {
mod_id_t enb_id;
uint16_t type; // 0-> TCP, 1-> UDP, 2->SCTP
char *peer_addr;
const char *peer_addr;
int port;
socket_link_t *link;
message_queue_t *send_queue;
......@@ -50,8 +49,8 @@ typedef struct {
link_manager_t *manager;
} proto_agent_async_channel_t;
proto_agent_async_channel_t * proto_agent_async_channel_info(mod_id_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr);
proto_agent_async_channel_t * proto_server_async_channel_info(mod_id_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr);
proto_agent_async_channel_t * proto_agent_async_channel_info(mod_id_t mod_id, const char *dst_ip, uint16_t dst_port);
proto_agent_async_channel_t * proto_server_async_channel_info(mod_id_t mod_id, const char *ip, uint16_t _port);
int proto_agent_async_msg_send(void *data, int size, int priority, void *channel_info);
......
......@@ -100,7 +100,7 @@ typedef uint8_t mod_id_t; // module or enb id
typedef uint8_t lcid_t;
typedef int32_t err_code_t;
#define CHANNEL_UDP 1
typedef struct {
/* general info */
......@@ -113,12 +113,16 @@ typedef struct {
uint32_t rx_msg[NUM_MAX_ENB];
uint32_t tx_msg[NUM_MAX_ENB];
}proto_agent_info_t;
} proto_agent_info_t;
typedef struct {
mod_id_t enb_id;
proto_agent_info_t agent_info;
/* forward declaration */
struct proto_agent_channel_s;
}proto_agent_instance_t;
typedef struct proto_agent_instance_s {
mod_id_t mod_id;
proto_agent_info_t agent_info;
struct proto_agent_channel_s *channel;
pthread_t recv_thread;
} proto_agent_instance_t;
#endif
......@@ -41,16 +41,18 @@
#include "tree.h"
#define ENB_AGENT_MAX 9
/* forward declaration */
struct proto_agent_async_channel_s;
/*Channel related information used for Tx/Rx of protocol messages*/
typedef struct proto_agent_channel_s {
RB_ENTRY(proto_agent_channel_s) entry;
int channel_id;
void *channel_info;
uint16_t type; // 0-> TCP, 1-> UDP, 2->SCTP
/*Callbacks for channel message Tx and Rx*/
int (*msg_send)(void *data, int size, int priority, void *channel_info);
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info);
void (*release)(struct proto_agent_channel_s *channel);
int channel_id;
struct proto_agent_async_channel_s *channel_info;
/*Callbacks for channel message Tx and Rx*/
int (*msg_send)(void *data, int size, int priority, void *channel_info);
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info);
void (*release)(struct proto_agent_channel_s *channel);
} proto_agent_channel_t;
typedef struct proto_agent_channel_instance_s{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment