Commit 9db40696 authored by Nikos Makris's avatar Nikos Makris

Added functionality for UDP/TCP/SCTP transferring of data over the ASYNC channel interface.

Added selection of interfaces through the configuration file with a new block of statements as follows:

    FLEXSPLIT_INTERFACES :
    {
        DU_INTERFACE_NAME_FOR_F1U           = "lo";
        DU_IPV4_ADDRESS_FOR_F1U             = "127.0.0.1/24";
        DU_PORT_FOR_F1U			    = 2210;

        CU_INTERFACE_NAME_FOR_F1U           = "lo";
        CU_IPV4_ADDRESS_FOR_F1U             = "127.0.0.1";	//Address to search the DU
        CU_PORT_FOR_F1U			    = 2210;

        // One of TCP/UDP/SCTP
        F1_U_TRANSPORT_TYPE 		    = "UDP";
    };
parent af6af8a9
...@@ -170,13 +170,14 @@ ...@@ -170,13 +170,14 @@
#define ENB_CONFIG_STRING_ENB_IPV4_ADDR_FOR_S1U "ENB_IPV4_ADDRESS_FOR_S1U" #define ENB_CONFIG_STRING_ENB_IPV4_ADDR_FOR_S1U "ENB_IPV4_ADDRESS_FOR_S1U"
#define ENB_CONFIG_STRING_ENB_PORT_FOR_S1U "ENB_PORT_FOR_S1U" #define ENB_CONFIG_STRING_ENB_PORT_FOR_S1U "ENB_PORT_FOR_S1U"
#define ENB_CONFIG_STRING_FLEXSPLIT_CONFIG "FLEXSPLIT" #define ENB_CONFIG_STRING_FLEXSPLIT_INTERFACES_CONFIG "FLEXSPLIT_INTERFACES"
#define ENB_CONFIG_STRING_PROTO_AGENT_INTERFACE_NAME "PROTO_AGENT_INTERFACE_NAME" #define ENB_CONFIG_STRING_DU_INTERFACE_NAME_FOR_F1U "DU_INTERFACE_NAME_FOR_F1U"
#define ENB_CONFIG_STRING_PROTO_AGENT_IPV4_ADDRESS "PROTO_AGENT_IPV4_ADDRESS" #define ENB_CONFIG_STRING_DU_IPV4_ADDRESS_FOR_F1U "DU_IPV4_ADDRESS_FOR_F1U"
#define ENB_CONFIG_STRING_PROTO_AGENT_PORT "PROTO_AGENT_PORT" #define ENB_CONFIG_STRING_DU_PORT_FOR_F1U "DU_PORT_FOR_F1U"
#define ENB_CONFIG_STRING_PROTO_AGENT_CACHE "PROTO_AGENT_CACHE" #define ENB_CONFIG_STRING_CU_INTERFACE_NAME_FOR_F1U "CU_INTERFACE_NAME_FOR_F1U"
#define ENB_CONFIG_STRING_CU_IPV4_ADDRESS_FOR_F1U "CU_IPV4_ADDRESS_FOR_F1U"
#define ENB_CONFIG_STRING_CU_PORT_FOR_F1U "CU_PORT_FOR_F1U"
#define ENB_CONFIG_STRING_F1_U_TRANSPORT_TYPE "F1_U_TRANSPORT_TYPE"
#define ENB_CONFIG_STRING_RRH_GW_CONFIG "rrh_gw_config" #define ENB_CONFIG_STRING_RRH_GW_CONFIG "rrh_gw_config"
#define ENB_CONFIG_STRING_RRH_GW_LOCAL_IF_NAME "local_if_name" #define ENB_CONFIG_STRING_RRH_GW_LOCAL_IF_NAME "local_if_name"
...@@ -635,11 +636,21 @@ const Enb_properties_array_t *enb_config_init(char* lib_config_file_name_pP) ...@@ -635,11 +636,21 @@ const Enb_properties_array_t *enb_config_init(char* lib_config_file_name_pP)
char *cidr = NULL; char *cidr = NULL;
char *astring = NULL; char *astring = NULL;
char* proto_agent_interface_name = NULL; // char* proto_agent_interface_name = NULL;
char* proto_agent_ipv4_address = NULL; // char* proto_agent_ipv4_address = NULL;
libconfig_int proto_agent_port = 0; // libconfig_int proto_agent_port = 0;
char* proto_agent_cache = NULL; // char* proto_agent_cache = NULL;
char* du_interface_name_for_F1U = NULL;
char* du_ipv4_address_for_F1U = NULL;
libconfig_int du_port_for_F1U = 0;
char* cu_interface_name_for_F1U = NULL;
char* cu_ipv4_address_for_F1U = NULL;
libconfig_int cu_port_for_F1U = 0;
char* transport_type_F1U = NULL;
libconfig_int otg_ue_id = 0; libconfig_int otg_ue_id = 0;
char* otg_app_type = NULL; char* otg_app_type = NULL;
char* otg_bg_traffic = NULL; char* otg_bg_traffic = NULL;
...@@ -2355,27 +2366,65 @@ const Enb_properties_array_t *enb_config_init(char* lib_config_file_name_pP) ...@@ -2355,27 +2366,65 @@ const Enb_properties_array_t *enb_config_init(char* lib_config_file_name_pP)
} }
// PROTO_AGENT configuration // PROTO_AGENT configuration
subsetting = config_setting_get_member (setting_enb, ENB_CONFIG_STRING_FLEXSPLIT_CONFIG); subsetting = config_setting_get_member (setting_enb, ENB_CONFIG_STRING_FLEXSPLIT_INTERFACES_CONFIG);
//LOG_I(PROTO_AGENT, "HERE\n");
if (subsetting != NULL) { if (subsetting != NULL) {
if ( ( if ( (
config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_PROTO_AGENT_INTERFACE_NAME, config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_DU_INTERFACE_NAME_FOR_F1U,
(const char **)&proto_agent_interface_name) (const char **)&du_interface_name_for_F1U)
&& config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_PROTO_AGENT_IPV4_ADDRESS, && config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_DU_IPV4_ADDRESS_FOR_F1U,
(const char **)&proto_agent_ipv4_address) (const char **)&du_ipv4_address_for_F1U)
&& config_setting_lookup_int(subsetting, ENB_CONFIG_STRING_PROTO_AGENT_PORT, && config_setting_lookup_int(subsetting, ENB_CONFIG_STRING_DU_PORT_FOR_F1U,
&proto_agent_port) &du_port_for_F1U)
&& config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_PROTO_AGENT_CACHE, && config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_CU_INTERFACE_NAME_FOR_F1U,
(const char **)&proto_agent_cache) (const char **)&cu_interface_name_for_F1U)
&& config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_CU_IPV4_ADDRESS_FOR_F1U,
(const char **)&cu_ipv4_address_for_F1U)
&& config_setting_lookup_int(subsetting, ENB_CONFIG_STRING_CU_PORT_FOR_F1U,
&cu_port_for_F1U)
&& config_setting_lookup_string( subsetting, ENB_CONFIG_STRING_F1_U_TRANSPORT_TYPE,
(const char **)&transport_type_F1U)
) )
) { ) {
enb_properties.properties[enb_properties_index]->proto_agent_interface_name = strdup(proto_agent_interface_name); enb_properties.properties[enb_properties_index]->flexsplit_interfaces.du_interface = strdup(du_interface_name_for_F1U);
cidr = proto_agent_ipv4_address; cidr = du_ipv4_address_for_F1U;
address = strtok(cidr, "/"); address = strtok(cidr, "/");
enb_properties.properties[enb_properties_index]->proto_agent_ipv4_address = strdup(address); enb_properties.properties[enb_properties_index]->flexsplit_interfaces.du_ipv4_address = strdup(address);
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.du_port = du_port_for_F1U;
enb_properties.properties[enb_properties_index]->proto_agent_port = proto_agent_port; enb_properties.properties[enb_properties_index]->flexsplit_interfaces.cu_interface = strdup(cu_interface_name_for_F1U);
enb_properties.properties[enb_properties_index]->proto_agent_cache = strdup(proto_agent_cache); cidr = cu_ipv4_address_for_F1U;
address = strtok(cidr, "/");
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.cu_ipv4_address = strdup(address);
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.cu_port = cu_port_for_F1U;
if (strcmp(transport_type_F1U, "UDP") == 0)
{
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.udp = 1;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.tcp = 0;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.sctp = 0;
}
else if (strcmp(transport_type_F1U, "TCP") == 0)
{
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.udp = 0;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.tcp = 1;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.sctp = 0;
}
else if (strcmp(transport_type_F1U, "SCTP") == 0)
{
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.udp = 0;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.tcp = 0;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.sctp = 1;
}
else
{
// Will be handled by the proto agent functionality
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.udp = 0;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.tcp = 0;
enb_properties.properties[enb_properties_index]->flexsplit_interfaces.sctp = 0;
}
} }
} }
......
...@@ -78,6 +78,22 @@ typedef struct mme_ip_address_s { ...@@ -78,6 +78,22 @@ typedef struct mme_ip_address_s {
char *ipv6_address; char *ipv6_address;
} mme_ip_address_t; } mme_ip_address_t;
typedef struct flexplit_interfaces {
char *du_interface;
char *du_ipv4_address;
uint16_t du_port;
char *cu_interface;
char *cu_ipv4_address;
uint16_t cu_port;
unsigned tcp:1;
unsigned udp:1;
unsigned sctp:1;
}flexsplit_interfaces_t;
typedef struct rrh_gw_config_s { typedef struct rrh_gw_config_s {
unsigned udp:1; unsigned udp:1;
unsigned raw:1; unsigned raw:1;
...@@ -221,11 +237,7 @@ typedef struct Enb_properties_s { ...@@ -221,11 +237,7 @@ typedef struct Enb_properties_s {
char *enb_interface_name_for_S1_MME; char *enb_interface_name_for_S1_MME;
in_addr_t enb_ipv4_address_for_S1_MME; in_addr_t enb_ipv4_address_for_S1_MME;
flexsplit_interfaces_t flexsplit_interfaces;
char *proto_agent_interface_name;
in_addr_t proto_agent_ipv4_address;
tcp_udp_port_t proto_agent_port;
char *proto_agent_cache;
......
...@@ -374,22 +374,23 @@ boolean_t pdcp_data_req( ...@@ -374,22 +374,23 @@ boolean_t pdcp_data_req(
enb_properties_p = enb_config_get(); enb_properties_p = enb_config_get();
static int agent_started = 1; static int agent_started = 1;
/*
if (agent_started == 1) if (agent_started == 1)
{ {
LOG_I(PROTO_AGENT,"PDCP: starting client side with mod_id %d \n",ctxt_pP->module_id);
agent_started = proto_agent_start(ctxt_pP->module_id, enb_properties_p); agent_started = proto_agent_start(ctxt_pP->module_id, enb_properties_p);
} }*/
if (pdcp_pdu_p!=NULL) if (pdcp_pdu_p!=NULL)
{ {
proto_agent_send_rlc_data_req(ctxt_pP, srb_flagP, MBMS_FLAG_NO, rb_idP, muiP, confirmP, pdcp_pdu_size, pdcp_pdu_p); proto_agent_send_rlc_data_req(ctxt_pP, srb_flagP, MBMS_FLAG_NO, rb_idP, muiP, confirmP, pdcp_pdu_size, pdcp_pdu_p);
free_mem_block(pdcp_pdu_p); free_mem_block(pdcp_pdu_p);
rlc_status = ack_result; rlc_status = ack_result;
} }
else else
{ {
// It should never get here //It should never get here
rlc_status = rlc_data_req(ctxt_pP, srb_flagP, MBMS_FLAG_NO, rb_idP, muiP, confirmP, pdcp_pdu_size, pdcp_pdu_p); rlc_status = rlc_data_req(ctxt_pP, srb_flagP, MBMS_FLAG_NO, rb_idP, muiP, confirmP, pdcp_pdu_size, pdcp_pdu_p);
} }
} }
...@@ -921,6 +922,23 @@ pdcp_run ( ...@@ -921,6 +922,23 @@ pdcp_run (
int result; int result;
protocol_ctxt_t ctxt; protocol_ctxt_t ctxt;
#endif #endif
static int agent_started = 1;
Enb_properties_array_t *enb_properties_p = NULL;
enb_properties_p = enb_config_get();
if (agent_started == 1)
{
LOG_I(PROTO_AGENT,"Starting the PDCP instance\n");
//LOG_I(PROTO_AGENT,"PDCP: starting client side with mod_id %d \n",ctxt_pP->module_id);
agent_started = proto_agent_start(ctxt_pP->module_id, enb_properties_p);
}
if (ctxt_pP->enb_flag) { if (ctxt_pP->enb_flag) {
start_meas(&eNB_pdcp_stats[ctxt_pP->module_id].pdcp_run); start_meas(&eNB_pdcp_stats[ctxt_pP->module_id].pdcp_run);
......
...@@ -57,6 +57,13 @@ Protocol__FlexsplitMessage *proto_agent_timeout_fsp(void* args); ...@@ -57,6 +57,13 @@ Protocol__FlexsplitMessage *proto_agent_timeout_fsp(void* args);
mid_t client_mod, server_mod; mid_t client_mod, server_mod;
proto_agent_instance_t *client_channel, *server_channel; proto_agent_instance_t *client_channel, *server_channel;
uint8_t tcp = 0;
uint8_t udp = 0;
uint8_t sctp = 0;
char *link_type = NULL;
#define ECHO #define ECHO
/* Thread continuously listening for incomming packets */ /* Thread continuously listening for incomming packets */
...@@ -135,8 +142,8 @@ pthread_t new_thread(void *(*f)(void *), void *b) { ...@@ -135,8 +142,8 @@ pthread_t new_thread(void *(*f)(void *), void *b) {
void * proto_server_init(void *args) void * proto_server_init(void *args)
{ {
LOG_D(PROTO_AGENT, "Initializing server thread for listening connections\n"); LOG_I(PROTO_AGENT, "Initializing server thread for listening connections\n");
mid_t mod_id = (mid_t) 1; mid_t mod_id = (mid_t) 0;
Enb_properties_array_t* enb_properties = NULL; Enb_properties_array_t* enb_properties = NULL;
enb_properties = enb_config_get(); enb_properties = enb_config_get();
proto_server_start(mod_id, (const Enb_properties_array_t*) enb_properties); proto_server_start(mod_id, (const Enb_properties_array_t*) enb_properties);
...@@ -149,32 +156,83 @@ void * proto_server_init(void *args) ...@@ -149,32 +156,83 @@ void * proto_server_init(void *args)
int proto_server_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){ int proto_server_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){
int channel_id; int channel_id;
char *peer_address;
// Maybe change the RAN_LTE_OAI to protocol?? (e.g. PDCP) // Maybe change the RAN_LTE_OAI to protocol?? (e.g. PDCP)
set_enb_vars(mod_id, RAN_LTE_OAI); set_enb_vars(mod_id, RAN_LTE_OAI);
proto_server[mod_id].enb_id = mod_id; proto_server[mod_id].enb_id = mod_id;
server_mod = mod_id; server_mod = mod_id;
/*
* check the configuration - Getting all the values from the config file if (enb_properties->properties[mod_id]->flexsplit_interfaces.cu_ipv4_address != NULL)
* TODO: get the configuration optionally from the conf file {
*/ LOG_D(PROTO_AGENT,"CU ADDRESS IS %s\n",enb_properties->properties[mod_id]->flexsplit_interfaces.cu_ipv4_address);
strcpy(local_cache, DEFAULT_PROTO_AGENT_CACHE); }
strcpy(in_ip, DEFAULT_PROTO_AGENT_IPv4_ADDRESS ); if (enb_properties->properties[mod_id]->flexsplit_interfaces.du_ipv4_address != NULL)
in_port = DEFAULT_PROTO_AGENT_PORT ; {
LOG_D(PROTO_AGENT,"DU ADDRESS IS %s\n",enb_properties->properties[mod_id]->flexsplit_interfaces.du_ipv4_address);
peer_address = strdup(enb_properties->properties[mod_id]->flexsplit_interfaces.cu_ipv4_address);
}
if (enb_properties->properties[mod_id]->flexsplit_interfaces.du_ipv4_address != NULL)
strcpy(in_ip, enb_properties->properties[mod_id]->flexsplit_interfaces.du_ipv4_address);
else
{
strcpy(in_ip, DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
LOG_D(PROTO_AGENT,"Cannot read DU address from conf file, setting the default (%s)\n", DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
}
if (enb_properties->properties[mod_id]->flexsplit_interfaces.du_port != 0)
in_port = enb_properties->properties[mod_id]->flexsplit_interfaces.du_port;
else
{
in_port = DEFAULT_PROTO_AGENT_PORT;
LOG_D(PROTO_AGENT,"Cannot read DU port from conf file, setting the default (%d)\n", DEFAULT_PROTO_AGENT_PORT);
}
LOG_I(PROTO_AGENT,"Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d\n", if(enb_properties->properties[mod_id]->flexsplit_interfaces.tcp == 1)
{
tcp = 1;
link_type = strdup("TCP");
LOG_I(PROTO_AGENT,"Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[mod_id].enb_id, proto_server[mod_id].enb_id,
in_ip, in_ip,
in_port); in_port);
}
else if(enb_properties->properties[mod_id]->flexsplit_interfaces.udp == 1)
{
udp = 1;
link_type = strdup("UDP");
LOG_I(PROTO_AGENT,"Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over UDP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else if(enb_properties->properties[mod_id]->flexsplit_interfaces.sctp == 1)
{
sctp = 1;
link_type = strdup("SCTP");
LOG_I(PROTO_AGENT,"Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over SCTP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else
{
tcp = 1;
link_type = strdup("TCP");
LOG_I(PROTO_AGENT,"Starting PROTO agent SERVER for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
/* /*
* Initialize the channel container * Initialize the channel container
*/ */
proto_agent_init_channel_container(); proto_agent_init_channel_container();
/*Create the async channel info*/ /*Create the async channel info*/
proto_agent_instance_t *channel_info = proto_server_async_channel_info(mod_id, in_ip, in_port); proto_agent_instance_t *channel_info = proto_server_async_channel_info(mod_id, in_ip, in_port, link_type, peer_address);
server_channel = channel_info; server_channel = channel_info;
...@@ -190,6 +248,12 @@ int proto_server_start(mid_t mod_id, const Enb_properties_array_t* enb_propertie ...@@ -190,6 +248,12 @@ int proto_server_start(mid_t mod_id, const Enb_properties_array_t* enb_propertie
proto_agent_channel_t *channel = get_channel(channel_id); proto_agent_channel_t *channel = get_channel(channel_id);
if (tcp == 1) channel->type = 0;
else if (udp == 1) channel->type = 1;
else if (sctp == 1) channel->type = 2;
else channel->type = 0;
if (channel == NULL) { if (channel == NULL) {
goto error; goto error;
} }
...@@ -207,28 +271,36 @@ int proto_server_start(mid_t mod_id, const Enb_properties_array_t* enb_propertie ...@@ -207,28 +271,36 @@ int proto_server_start(mid_t mod_id, const Enb_properties_array_t* enb_propertie
int size; int size;
#ifdef ECHO #ifdef ECHO
LOG_D(PROTO_AGENT, "Proto agent Server: Calling the echo_request packet constructor\n"); if (udp == 0)
msg_flag = proto_agent_echo_request(mod_id, NULL, &init_msg); {
// If the comm is not UDP, allow the server to send the first packet over the channel
LOG_D(PROTO_AGENT, "Proto agent Server: Calling the echo_request packet constructor\n");
msg_flag = proto_agent_echo_request(mod_id, NULL, &init_msg);
if (msg_flag != 0)
goto error;
int msgsize = 0;
int err_code;
if (init_msg != NULL)
msg = proto_agent_pack_message(init_msg, &msgsize);
if (msg!= NULL)
{
LOG_D(PROTO_AGENT,"Server sending the message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
}
/* After sending the message, wait for any replies;
the server thread blocks until it reads any data
over the channel
*/
}
#else #else
LOG_D(PROTO_AGENT, "Proto agent Server: Calling the hello packet constructor\n"); LOG_D(PROTO_AGENT, "Proto agent Server: Calling the hello packet constructor\n");
msg_flag = proto_agent_hello(mod_id, NULL, &init_msg); msg_flag = proto_agent_hello(mod_id, NULL, &init_msg);
#endif #endif
if (msg_flag != 0)
goto error;
int msgsize = 0;
int err_code;
msg = proto_agent_pack_message(init_msg, &msgsize);
LOG_D(PROTO_AGENT,"Server sending the message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
/* After sending the message, wait for any replies;
the server thread blocks until it reads any data
over the channel
*/
LOG_D(PROTO_AGENT, "Server reading any message over the async channel.\n"); LOG_D(PROTO_AGENT, "Server reading any message over the async channel.\n");
new_thread(proto_server_receive, &proto_server[mod_id]); new_thread(proto_server_receive, &proto_server[mod_id]);
...@@ -244,6 +316,7 @@ error: ...@@ -244,6 +316,7 @@ error:
int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){
int channel_id; int channel_id;
char *peer_address = NULL;
set_enb_vars(mod_id, RAN_LTE_OAI); set_enb_vars(mod_id, RAN_LTE_OAI);
proto_agent[mod_id].enb_id = mod_id; proto_agent[mod_id].enb_id = mod_id;
...@@ -253,14 +326,61 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties ...@@ -253,14 +326,61 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties
* check the configuration - Getting all the values from the config file * check the configuration - Getting all the values from the config file
*/ */
strcpy(local_cache, DEFAULT_PROTO_AGENT_CACHE); if (enb_properties->properties[mod_id]->flexsplit_interfaces.cu_ipv4_address != NULL)
strcpy(in_ip, DEFAULT_PROTO_AGENT_IPv4_ADDRESS ); {
in_port = DEFAULT_PROTO_AGENT_PORT; strcpy(in_ip, enb_properties->properties[mod_id]->flexsplit_interfaces.cu_ipv4_address);
peer_address = strdup(enb_properties->properties[mod_id]->flexsplit_interfaces.du_ipv4_address);
}
else
{
strcpy(in_ip, DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
LOG_D(PROTO_AGENT,"Cannot read DU address from conf file, setting the default (%s)\n", DEFAULT_PROTO_AGENT_IPv4_ADDRESS);
}
if (enb_properties->properties[mod_id]->flexsplit_interfaces.cu_port != 0)
in_port = enb_properties->properties[mod_id]->flexsplit_interfaces.cu_port;
else
{
in_port = DEFAULT_PROTO_AGENT_PORT;
LOG_D(PROTO_AGENT,"Cannot read DU port from conf file, setting the default (%d)\n", DEFAULT_PROTO_AGENT_PORT);
}
LOG_I(PROTO_AGENT,"starting PROTO agent client for module id %d on ipv4 %s, port %d\n", if(enb_properties->properties[mod_id]->flexsplit_interfaces.tcp == 1)
proto_agent[mod_id].enb_id, {
tcp = 1;
link_type = strdup("TCP");
LOG_I(PROTO_AGENT,"Starting PROTO agent client for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else if(enb_properties->properties[mod_id]->flexsplit_interfaces.udp == 1)
{
udp = 1;
link_type = strdup("UDP");
LOG_I(PROTO_AGENT,"Starting PROTO agent client for module id %d on ipv4 %s, port %d over UDP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
else if(enb_properties->properties[mod_id]->flexsplit_interfaces.sctp == 1)
{
sctp = 1;
link_type = strdup("SCTP");
LOG_I(PROTO_AGENT,"Starting PROTO agent client for module id %d on ipv4 %s, port %d over SCTP\n",
proto_server[mod_id].enb_id,
in_ip, in_ip,
in_port); in_port);
}
else
{
tcp = 1;
link_type = strdup("TCP");
LOG_I(PROTO_AGENT,"Starting PROTO agent client for module id %d on ipv4 %s, port %d over TCP\n",
proto_server[mod_id].enb_id,
in_ip,
in_port);
}
/* /*
* Initialize the channel container * Initialize the channel container
...@@ -268,7 +388,7 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties ...@@ -268,7 +388,7 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties
proto_agent_init_channel_container(); proto_agent_init_channel_container();
/*Create the async channel info*/ /*Create the async channel info*/
proto_agent_instance_t *channel_info = proto_agent_async_channel_info(mod_id, in_ip, in_port); proto_agent_instance_t *channel_info = proto_agent_async_channel_info(mod_id, in_ip, in_port, link_type, peer_address);
client_channel = channel_info; client_channel = channel_info;
/*Create a channel using the async channel info*/ /*Create a channel using the async channel info*/
channel_id = proto_agent_create_channel((void *) channel_info, channel_id = proto_agent_create_channel((void *) channel_info,
...@@ -284,6 +404,11 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties ...@@ -284,6 +404,11 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties
if (channel == NULL) { if (channel == NULL) {
goto error; goto error;
} }
if (tcp == 1) channel->type = 0;
else if (udp == 1) channel->type = 1;
else if (sctp == 1) channel->type = 2;
else channel->type = 0;
/*Register the channel for all underlying agents (use ENB_AGENT_MAX)*/ /*Register the channel for all underlying agents (use ENB_AGENT_MAX)*/
proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX); proto_agent_register_channel(mod_id, channel, ENB_AGENT_MAX);
...@@ -292,10 +417,39 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties ...@@ -292,10 +417,39 @@ int proto_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties
int size; int size;
int priority; int priority;
err_code_t err_code; err_code_t err_code;
Protocol__FlexsplitMessage *msg; Protocol__FlexsplitMessage *msg = NULL;
Protocol__FlexsplitMessage *ser_msg; Protocol__FlexsplitMessage *init_msg=NULL;
Protocol__FlexsplitMessage *rep_msg=NULL;
Protocol__FlexsplitMessage *ser_msg=NULL;
int msg_flag;
// In the case of UDP comm, start the echo request from the client side; the server thread should be blocked until it reads the SRC port of the 1st packet
if (udp == 1)
{
msg_flag = proto_agent_echo_request(mod_id, NULL, &init_msg);
if (msg_flag != 0)
goto error;
int msgsize = 0;
int err_code;
if (init_msg != NULL)
msg = proto_agent_pack_message(init_msg, &msgsize);
if (msg!= NULL)
{
LOG_D(PROTO_AGENT,"Client sending the ECHO_REQUEST message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) channel_info);
}
}
/* After sending the message, wait for any replies;
the server thread blocks until it reads any data
over the channel
*/
new_thread(proto_client_receive, &proto_agent[mod_id]); new_thread(proto_client_receive, &proto_agent[mod_id]);
return 0; return 0;
...@@ -314,7 +468,7 @@ proto_agent_send_hello(void) ...@@ -314,7 +468,7 @@ proto_agent_send_hello(void)
int msg_flag = 0; int msg_flag = 0;
LOG_D(PROTO_AGENT, "PDCP agent Server: Calling the hello packet constructor\n"); LOG_D(PROTO_AGENT, "PDCP agent: Calling the HELLO packet constructor\n");
msg_flag = proto_agent_hello(proto_agent[client_mod].enb_id, NULL, &init_msg); msg_flag = proto_agent_hello(proto_agent[client_mod].enb_id, NULL, &init_msg);
int msgsize = 0; int msgsize = 0;
...@@ -323,7 +477,7 @@ proto_agent_send_hello(void) ...@@ -323,7 +477,7 @@ proto_agent_send_hello(void)
proto_agent_serialize_message(init_msg, &msg, &msgsize); proto_agent_serialize_message(init_msg, &msg, &msgsize);
} }
LOG_D(PROTO_AGENT,"Server sending the message over the async channel\n"); LOG_D(PROTO_AGENT,"Agent sending the message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel); proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel);
} }
...@@ -347,7 +501,7 @@ proto_agent_send_rlc_data_req(const protocol_ctxt_t* const ctxt_pP, const srb_fl ...@@ -347,7 +501,7 @@ proto_agent_send_rlc_data_req(const protocol_ctxt_t* const ctxt_pP, const srb_fl
int ret; int ret;
int err_code; int err_code;
LOG_D(PROTO_AGENT, "PDCP agent Server: Calling the hello packet constructor\n"); LOG_D(PROTO_AGENT, "PDCP agent: Calling the PDCP DATA REQ constructor\n");
data_req_args *args = malloc(sizeof(data_req_args)); data_req_args *args = malloc(sizeof(data_req_args));
...@@ -373,7 +527,7 @@ proto_agent_send_rlc_data_req(const protocol_ctxt_t* const ctxt_pP, const srb_fl ...@@ -373,7 +527,7 @@ proto_agent_send_rlc_data_req(const protocol_ctxt_t* const ctxt_pP, const srb_fl
msg = proto_agent_pack_message(init_msg, &msgsize); msg = proto_agent_pack_message(init_msg, &msgsize);
LOG_D(PROTO_AGENT,"Server sending the pdcp data_req message over the async channel\n"); LOG_D(PROTO_AGENT,"Sending the pdcp data_req message over the async channel\n");
if (msg!=NULL) if (msg!=NULL)
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel); proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) client_channel);
...@@ -411,7 +565,7 @@ proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_f ...@@ -411,7 +565,7 @@ proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_f
int ret; int ret;
int err_code; int err_code;
LOG_D(PROTO_AGENT, "PDCP agent Server: Calling the hello packet constructor\n"); LOG_D(PROTO_AGENT, "PDCP agent: Calling the PDCP_DATA_IND constructor\n");
data_req_args *args = malloc(sizeof(data_req_args)); data_req_args *args = malloc(sizeof(data_req_args));
...@@ -436,7 +590,7 @@ proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_f ...@@ -436,7 +590,7 @@ proto_agent_send_pdcp_data_ind(const protocol_ctxt_t* const ctxt_pP, const srb_f
if (msg!=NULL) if (msg!=NULL)
{ {
LOG_D(PROTO_AGENT,"Server sending the pdcp data_ind message over the async channel\n"); LOG_D(PROTO_AGENT,"Sending the pdcp data_ind message over the async channel\n");
proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) server_channel); proto_agent_async_msg_send((void *)msg, (int) msgsize, 1, (void *) server_channel);
} }
} }
...@@ -477,7 +631,7 @@ proto_server_receive(void) ...@@ -477,7 +631,7 @@ proto_server_receive(void)
goto error; goto error;
} }
LOG_D(PROTO_AGENT,"Client Received message with size %d and priority %d, calling message handle\n", size, priority); LOG_D(PROTO_AGENT,"Server side Received message with size %d and priority %d, calling message handle\n", size, priority);
msg=proto_agent_handle_message(d->enb_id, data, size); msg=proto_agent_handle_message(d->enb_id, data, size);
...@@ -490,7 +644,7 @@ proto_server_receive(void) ...@@ -490,7 +644,7 @@ proto_server_receive(void)
ser_msg = proto_agent_pack_message(msg, &size); ser_msg = proto_agent_pack_message(msg, &size);
} }
LOG_D(PROTO_AGENT,"Server sending the pdcp data_req message over the async channel\n"); LOG_D(PROTO_AGENT,"Server sending the reply message over the async channel\n");
if (ser_msg != NULL){ if (ser_msg != NULL){
if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, (void *) server_channel)){ if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, (void *) server_channel)){
err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING; err_code = PROTOCOL__FLEXSPLIT_ERR__MSG_ENQUEUING;
...@@ -544,6 +698,7 @@ proto_client_receive(void) ...@@ -544,6 +698,7 @@ proto_client_receive(void)
{ {
ser_msg = proto_agent_pack_message(msg, &size); ser_msg = proto_agent_pack_message(msg, &size);
} }
LOG_D(PROTO_AGENT,"Server sending the reply message over the async channel\n");
if (ser_msg != NULL){ if (ser_msg != NULL){
if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, (void *) client_channel)){ if (proto_agent_async_msg_send((void *)ser_msg, (int) size, 1, (void *) client_channel)){
......
...@@ -37,29 +37,49 @@ ...@@ -37,29 +37,49 @@
#include "proto_agent_async.h" #include "proto_agent_async.h"
#include "proto_agent_defs.h" #include "proto_agent_defs.h"
#include "log.h" #include "log.h"
uint16_t proto_udp = 0;
uint16_t proto_tcp = 0;
uint16_t proto_sctp = 0;
proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port) { proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr) {
proto_agent_async_channel_t *channel; proto_agent_async_channel_t *channel;
channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t)); channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t));
channel->port = dst_port;
channel->peer_addr = NULL;
if (channel == NULL) if (channel == NULL)
goto error; goto error;
channel->enb_id = mod_id; channel->enb_id = mod_id;
/*Create a socket*/ /*Create a socket*/
printf("Starting async server\n"); if (strcmp(type, "TCP") == 0)
channel->link = new_link_server(dst_port); {
//channel->link = NULL; proto_tcp = 1;
printf("Started async server\n"); channel->link = new_link_server(dst_port);
if (channel->link == NULL) goto error; channel->type = 0;
}
else if (strcmp(type, "UDP") == 0)
{
proto_udp = 1;
//channel->link = new_udp_link_server(dst_port);
channel->link = new_link_udp_server(dst_port);
channel->type = 1;
channel->peer_addr = peer_addr;
}
else if (strcmp(type, "SCTP") == 0)
{
proto_sctp = 1;
//channel->link = new_sctp_link_server(dst_port);
channel->link = new_link_sctp_server(dst_port);
channel->type = 2;
}
LOG_I(PROTO_AGENT,"starting proto agent server for module id %d on ipv4 %s, port %d\n", if (channel->link == NULL) goto error;
channel->enb_id,
dst_ip,
dst_port);
/* /*
* create a message queue * create a message queue
...@@ -69,11 +89,11 @@ proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char ...@@ -69,11 +89,11 @@ proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char
if (channel->send_queue == NULL) goto error; if (channel->send_queue == NULL) goto error;
channel->receive_queue = new_message_queue(); channel->receive_queue = new_message_queue();
if (channel->receive_queue == NULL) goto error; if (channel->receive_queue == NULL) goto error;
/* /*
* create a link manager * create a link manager
*/ */
channel->manager = create_link_manager(channel->send_queue, channel->receive_queue, channel->link); channel->manager = create_link_manager(channel->send_queue, channel->receive_queue, channel->link, channel->type, channel->peer_addr, channel->port);
if (channel->manager == NULL) goto error; if (channel->manager == NULL) goto error;
return channel; return channel;
...@@ -84,25 +104,41 @@ proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char ...@@ -84,25 +104,41 @@ proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char
} }
proto_agent_async_channel_t * proto_agent_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port) { proto_agent_async_channel_t * proto_agent_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr) {
proto_agent_async_channel_t *channel; proto_agent_async_channel_t *channel;
channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t)); channel = (proto_agent_async_channel_t *) malloc(sizeof(proto_agent_channel_t));
channel->port = dst_port;
channel->peer_addr = NULL;
if (channel == NULL) if (channel == NULL)
goto error; goto error;
channel->enb_id = mod_id; channel->enb_id = mod_id;
/*Create a socket*/ /*Create a socket*/
channel->link = new_link_client(dst_ip, dst_port); if (strcmp(type, "TCP") == 0)
{
if (channel->link == NULL) goto error; proto_tcp = 1;
channel->link = new_link_client(dst_ip, dst_port);
channel->type = 0;
}
else if (strcmp(type, "UDP") == 0)
{
proto_udp = 1;
channel->link = new_link_udp_client(dst_ip, dst_port);
channel->type = 1;
channel->peer_addr = peer_addr;
}
else if (strcmp(type, "SCTP") == 0)
{
proto_sctp = 1;
channel->link = new_link_sctp_client(dst_ip, dst_port);;
channel->type = 2;
}
LOG_I(PROTO_AGENT,"starting proto agent client for module id %d on ipv4 %s, port %d\n", if (channel->link == NULL) goto error;
channel->enb_id,
dst_ip,
dst_port);
/* /*
* create a message queue * create a message queue
...@@ -116,13 +152,13 @@ proto_agent_async_channel_t * proto_agent_async_channel_info(mid_t mod_id, char ...@@ -116,13 +152,13 @@ proto_agent_async_channel_t * proto_agent_async_channel_info(mid_t mod_id, char
/* /*
* create a link manager * create a link manager
*/ */
channel->manager = create_link_manager(channel->send_queue, channel->receive_queue, channel->link); channel->manager = create_link_manager(channel->send_queue, channel->receive_queue, channel->link, channel->type, channel->peer_addr, channel->port);
if (channel->manager == NULL) goto error; if (channel->manager == NULL) goto error;
return channel; return channel;
error: error:
LOG_I(PROTO_AGENT,"there was an error\n"); LOG_E(PROTO_AGENT,"there was an error\n");
return 1; return 1;
} }
......
...@@ -41,14 +41,17 @@ ...@@ -41,14 +41,17 @@
typedef struct { typedef struct {
mid_t enb_id; mid_t enb_id;
uint16_t type; // 0-> TCP, 1-> UDP, 2->SCTP
char *peer_addr;
int port;
socket_link_t *link; socket_link_t *link;
message_queue_t *send_queue; message_queue_t *send_queue;
message_queue_t *receive_queue; message_queue_t *receive_queue;
link_manager_t *manager; link_manager_t *manager;
} proto_agent_async_channel_t; } proto_agent_async_channel_t;
proto_agent_async_channel_t * proto_agent_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port); proto_agent_async_channel_t * proto_agent_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr);
proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port); proto_agent_async_channel_t * proto_server_async_channel_info(mid_t mod_id, char *dst_ip, uint16_t dst_port, const char* type, const char *peer_addr);
int proto_agent_async_msg_send(void *data, int size, int priority, void *channel_info); int proto_agent_async_msg_send(void *data, int size, int priority, void *channel_info);
......
...@@ -254,6 +254,7 @@ int proto_agent_get_ack_result(mid_t mod_id, const void *params, Protocol__Flexs ...@@ -254,6 +254,7 @@ int proto_agent_get_ack_result(mid_t mod_id, const void *params, Protocol__Flexs
Protocol__FspRlcDataReqAck *data_ack = input->data_req_ack; Protocol__FspRlcDataReqAck *data_ack = input->data_req_ack;
result = data_ack->result; result = data_ack->result;
ack_result = result; ack_result = result;
return 0;
} }
......
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
#include "assertions.h" #include "assertions.h"
proto_agent_message_decoded_callback agent_messages_callback[][3] = { proto_agent_message_decoded_callback agent_messages_callback[][3] = {
{proto_agent_hello, proto_agent_hello, 0}, {proto_agent_hello, 0, 0},
{proto_agent_echo_reply, 0, 0}, {proto_agent_echo_reply, 0, 0},
{0, just_print, 0}, {0, just_print, 0},
{proto_agent_pdcp_data_req_ack, 0, 0}, {proto_agent_pdcp_data_req_ack, 0, 0},
...@@ -100,6 +100,7 @@ Protocol__FlexsplitMessage* proto_agent_handle_message (mid_t mod_id, ...@@ -100,6 +100,7 @@ Protocol__FlexsplitMessage* proto_agent_handle_message (mid_t mod_id,
if ( err_code < 0 ) if ( err_code < 0 )
{ {
LOG_I(PROTO_AGENT, "decoded_message case : %d, direction : %d \n", decoded_message->msg_case-1, decoded_message->msg_dir-1);
goto error; goto error;
} }
else if (err_code == 1) else if (err_code == 1)
......
...@@ -45,6 +45,7 @@ typedef struct proto_agent_channel_s { ...@@ -45,6 +45,7 @@ typedef struct proto_agent_channel_s {
RB_ENTRY(proto_agent_channel_s) entry; RB_ENTRY(proto_agent_channel_s) entry;
int channel_id; int channel_id;
void *channel_info; void *channel_info;
uint16_t type; // 0-> TCP, 1-> UDP, 2->SCTP
/*Callbacks for channel message Tx and Rx*/ /*Callbacks for channel message Tx and Rx*/
int (*msg_send)(void *data, int size, int priority, void *channel_info); int (*msg_send)(void *data, int size, int priority, void *channel_info);
int (*msg_recv)(void **data, int *size, int *priority, void *channel_info); int (*msg_recv)(void **data, int *size, int *priority, void *channel_info);
......
...@@ -57,7 +57,7 @@ static void *link_manager_sender_thread(void *_manager) ...@@ -57,7 +57,7 @@ static void *link_manager_sender_thread(void *_manager)
while (manager->run) { while (manager->run) {
while (message_get(manager->send_queue, &data, &size, &priority) == 0) { while (message_get(manager->send_queue, &data, &size, &priority) == 0) {
link_send_packet(manager->socket_link, data, size); link_send_packet(manager->socket_link, data, size, manager->type, manager->peer_addr, manager->port);
free(data); free(data);
} }
// if (message_get(manager->send_queue, &data, &size, &priority)) // if (message_get(manager->send_queue, &data, &size, &priority))
...@@ -86,7 +86,7 @@ static void *link_manager_receiver_thread(void *_manager) ...@@ -86,7 +86,7 @@ static void *link_manager_receiver_thread(void *_manager)
LOG_D(MAC, "starting link manager receiver thread\n"); LOG_D(MAC, "starting link manager receiver thread\n");
while (manager->run) { while (manager->run) {
if (link_receive_packet(manager->socket_link, &data, &size)) if (link_receive_packet(manager->socket_link, &data, &size, manager->type, manager->peer_addr, manager->port))
goto error; goto error;
/* todo: priority */ /* todo: priority */
if (message_put(manager->receive_queue, data, size, 0)) if (message_put(manager->receive_queue, data, size, 0))
...@@ -105,7 +105,10 @@ error: ...@@ -105,7 +105,10 @@ error:
link_manager_t *create_link_manager( link_manager_t *create_link_manager(
message_queue_t *send_queue, message_queue_t *send_queue,
message_queue_t *receive_queue, message_queue_t *receive_queue,
socket_link_t *link) socket_link_t *link,
uint16_t type,
char *peer_addr,
int port )
{ {
link_manager_t *ret = NULL; link_manager_t *ret = NULL;
pthread_attr_t attr; pthread_attr_t attr;
...@@ -120,6 +123,9 @@ link_manager_t *create_link_manager( ...@@ -120,6 +123,9 @@ link_manager_t *create_link_manager(
ret->send_queue = send_queue; ret->send_queue = send_queue;
ret->receive_queue = receive_queue; ret->receive_queue = receive_queue;
ret->socket_link = link; ret->socket_link = link;
ret->type = type;
ret->peer_addr = peer_addr;
ret->port = port;
ret->run = 1; ret->run = 1;
if (pthread_attr_init(&attr)) if (pthread_attr_init(&attr))
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "message_queue.h" #include "message_queue.h"
//#include "ringbuffer_queue.h" //#include "ringbuffer_queue.h"
#include "socket_link.h" #include "socket_link.h"
#include <stdint.h>
#include <pthread.h> #include <pthread.h>
...@@ -52,6 +53,9 @@ typedef struct { ...@@ -52,6 +53,9 @@ typedef struct {
message_queue_t *send_queue; message_queue_t *send_queue;
message_queue_t *receive_queue; message_queue_t *receive_queue;
socket_link_t *socket_link; socket_link_t *socket_link;
uint16_t type;
char *peer_addr;
int port;
pthread_t sender; pthread_t sender;
pthread_t receiver; pthread_t receiver;
volatile int run; volatile int run;
...@@ -60,7 +64,11 @@ typedef struct { ...@@ -60,7 +64,11 @@ typedef struct {
link_manager_t *create_link_manager( link_manager_t *create_link_manager(
message_queue_t *send_queue, message_queue_t *send_queue,
message_queue_t *receive_queue, message_queue_t *receive_queue,
socket_link_t *link); socket_link_t *link,
uint16_t type,
char *peer_addr,
int port
);
void destroy_link_manager(link_manager_t *); void destroy_link_manager(link_manager_t *);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#ifndef MESSAGE_QUEUE_H #ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H #define MESSAGE_QUEUE_H
#include <stdint.h>
#include <pthread.h> #include <pthread.h>
#ifdef __cplusplus #ifdef __cplusplus
...@@ -48,6 +49,7 @@ typedef struct message_t { ...@@ -48,6 +49,7 @@ typedef struct message_t {
void *data; void *data;
int size; int size;
int priority; int priority;
uint16_t type;
struct message_t *next; struct message_t *next;
} message_t; } message_t;
......
...@@ -48,6 +48,7 @@ ...@@ -48,6 +48,7 @@
#include <netinet/ip.h> #include <netinet/ip.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <netinet/udp.h> #include <netinet/udp.h>
#include <netinet/sctp.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <stdint.h> #include <stdint.h>
...@@ -106,7 +107,6 @@ socket_link_t *new_link_server(int port) ...@@ -106,7 +107,6 @@ socket_link_t *new_link_server(int port)
LOG_E(PROTO_AGENT, "%s:%d: accept: %s\n", __FILE__, __LINE__, strerror(errno)); LOG_E(PROTO_AGENT, "%s:%d: accept: %s\n", __FILE__, __LINE__, strerror(errno));
goto error; goto error;
} }
printf("Accepted new connection from client\n");
close(socket_server); close(socket_server);
LOG_D(PROTO_AGENT, "connection from %s:%d\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); LOG_D(PROTO_AGENT, "connection from %s:%d\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
...@@ -169,6 +169,202 @@ error: ...@@ -169,6 +169,202 @@ error:
return NULL; return NULL;
} }
socket_link_t *new_link_udp_server(int port){
socket_link_t *ret = NULL;
struct sockaddr_in si_me, si_other;
int socket_server, i, slen = sizeof(si_other) , recv_bytes;
char buf[1500];
ret = calloc(1, sizeof(socket_link_t));
if (ret == NULL) {
LOG_D(PROTO_AGENT, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
ret->socket_fd = -1;
LOG_D(PROTO_AGENT, "create a new udp link server socket at port %d\n", port);
//create a UDP socket
if ((socket_server=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
goto error;
}
// zero out the structure
memset((char *) &si_me, 0, sizeof(si_me));
si_me.sin_family = AF_INET;
si_me.sin_port = htons(port);
si_me.sin_addr.s_addr = htonl(INADDR_ANY);
//bind socket to port
if( bind(socket_server , (struct sockaddr*)&si_me, sizeof(si_me) ) == -1) {
goto error;
}
ret->socket_fd = socket_server;
ret->peer_port = 0;
return ret;
error:
close(socket_server);
if (ret != NULL) close(ret->socket_fd);
free(ret);
LOG_E(PROTO_AGENT, "ERROR in new_link_udp_server (see above), returning NULL\n");
return NULL;
}
socket_link_t *new_link_udp_client(char *server, int port){
socket_link_t *ret = NULL;
ret = calloc(1, sizeof(socket_link_t));
if (ret == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
//goto error;
}
ret->socket_fd = -1;
struct sockaddr_in si_other, server_info;
int s, i, slen=sizeof(si_other);
char buf[1500];
char message[1500];
if ( (s=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1){
goto error;
}
memset((char *) &si_other, 0, sizeof(si_other));
si_other.sin_family = AF_INET;
si_other.sin_port = 0; //htons(port);
if (inet_aton(server, &si_other.sin_addr) == 0){
fprintf(stderr, "inet_aton() failed\n");
goto error;
}
connect(s, (struct sockaddr *)&si_other, sizeof(si_other));
getsockname(s, (struct sockaddr *)&si_other, &slen);
ret->socket_fd = s;
ret->peer_port = port; //ntohs(si_other.sin_port);
return ret;
error:
if (ret != NULL) close(ret->socket_fd);
free(ret);
LOG_E(MAC, "ERROR in new_link_udp_client (see above), returning NULL\n");
return NULL;
}
socket_link_t *new_link_sctp_server(int port)
{
socket_link_t *ret = NULL;
ret = calloc(1, sizeof(socket_link_t));
if (ret == NULL) {
LOG_D(PROTO_AGENT, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
ret->socket_fd = -1;
int listenSock, temp;
struct sockaddr_in servaddr;
listenSock = socket (AF_INET, SOCK_STREAM, IPPROTO_SCTP);
if(listenSock == -1)
{
perror("socket()");
exit(1);
}
bzero ((void *) &servaddr, sizeof (servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl (INADDR_ANY);
servaddr.sin_port = htons(port);
temp = bind (listenSock, (struct sockaddr *) &servaddr, sizeof (servaddr));
if(temp == -1 )
{
perror("bind()");
close(listenSock);
exit(1);
}
temp = listen (listenSock, 5);
if(temp == -1 )
{
perror("listen()");
close(listenSock);
exit(1);
}
ret->socket_fd = accept (listenSock, (struct sockaddr *) NULL, (int *) NULL);
return ret;
error:
close(listenSock);
if (ret != NULL) close(ret->socket_fd);
free(ret);
LOG_E(PROTO_AGENT, "ERROR in new_link_sctp_server (see above), returning NULL\n");
return NULL;
}
socket_link_t *new_link_sctp_client(char *server, int port)
{
socket_link_t *ret = NULL;
ret = calloc(1, sizeof(socket_link_t));
if (ret == NULL) {
LOG_D(PROTO_AGENT, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
ret->socket_fd = -1;
int temp;
struct sockaddr_in servaddr;
ret->socket_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_SCTP);
if (ret->socket_fd == -1)
{
perror("socket()");
exit(1);
}
bzero ((void *) &servaddr, sizeof (servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons (port);
if (inet_aton(server, &servaddr.sin_addr) == 0) {
LOG_E(PROTO_AGENT, "invalid IP address '%s', use a.b.c.d notation\n", server);
goto error;
}
temp = connect (ret->socket_fd, (struct sockaddr *) &servaddr, sizeof (servaddr));
if (temp == -1)
{
perror("connect()");
close(ret->socket_fd);
exit(1);
}
return ret;
error:
if (ret != NULL) close(ret->socket_fd);
free(ret);
LOG_E(MAC, "ERROR in new_link_sctp_client (see above), returning NULL\n");
return NULL;
}
/* /*
* return -1 on error and 0 if the sending was fine * return -1 on error and 0 if the sending was fine
*/ */
...@@ -192,6 +388,43 @@ error: ...@@ -192,6 +388,43 @@ error:
return -1; return -1;
} }
static int socket_udp_send(int socket_fd, void *buf, int size, char *peer_addr, int port)
{
struct sockaddr_in si_other;
int slen = sizeof(si_other);
char *s = buf;
int l;
int my_socket;
LOG_D(PROTO_AGENT,"UDP send\n");
my_socket = socket_fd;
memset((char *) &si_other, 0, sizeof(si_other));
si_other.sin_family = AF_INET;
si_other.sin_port = htons(port);
if (inet_aton(peer_addr , &si_other.sin_addr) == 0)
{
fprintf(stderr, "inet_aton() failed\n");
exit(1);
}
while (size) {
l = sendto(my_socket, s, size, 0, (struct sockaddr *) &si_other, slen);
if (l == -1) goto error;
if (l == 0) { LOG_E(PROTO_AGENT, "%s:%d: this cannot happen, normally...\n", __FILE__, __LINE__); abort(); }
size -= l;
s += l;
}
return 0;
error:
LOG_E(PROTO_AGENT, "socket_udp_send: ERROR: %s\n", strerror(errno));
return -1;
}
/* /*
* return -1 on error and 0 if the receiving was fine * return -1 on error and 0 if the receiving was fine
*/ */
...@@ -219,30 +452,85 @@ socket_closed: ...@@ -219,30 +452,85 @@ socket_closed:
return -1; return -1;
} }
/*
* return -1 on error and 0 if the receiving was fine
*/
static int socket_udp_receive(int socket_fd, void *buf, int size)
{
LOG_D(PROTO_AGENT,"UDP RECEIVE\n");
struct sockaddr_in client;
int slen = sizeof(client);
char *s = buf;
int l;
while (size) {
l = recvfrom(socket_fd, s, size, 0, (struct sockaddr *) &client, &slen);
getsockname(s, (struct sockaddr *)&client, &slen);
LOG_D(PROTO_AGENT, "Got message from src port: %u\n", ntohs(client.sin_port));
if (l == -1) goto error;
if (l == 0) goto socket_closed;
size -= l;
s += l;
}
return ntohs(client.sin_port);
error:
LOG_E(MAC, "socket_udp_receive: ERROR: %s\n", strerror(errno));
return -1;
socket_closed:
LOG_E(MAC, "socket_udp_receive: socket closed\n");
return -1;
}
/* /*
* return -1 on error and 0 if the sending was fine * return -1 on error and 0 if the sending was fine
*/ */
int link_send_packet(socket_link_t *link, void *data, int size) int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_type, char *peer_addr, int port)
{ {
char sizebuf[4]; char sizebuf[4];
int32_t s = size; int32_t s = size;
/* send the size first, maximum is 2^31 bytes */ /* send the size first, maximum is 2^31 bytes */
sizebuf[0] = (s >> 24) & 255; sizebuf[0] = (s >> 24) & 255;
sizebuf[1] = (s >> 16) & 255; sizebuf[1] = (s >> 16) & 255;
sizebuf[2] = (s >> 8) & 255; sizebuf[2] = (s >> 8) & 255;
sizebuf[3] = s & 255; sizebuf[3] = s & 255;
if (socket_send(link->socket_fd, sizebuf, 4) == -1) if ((proto_type == 0) || (proto_type == 2))
goto error; {
if (socket_send(link->socket_fd, sizebuf, 4) == -1)
link->bytes_sent += 4; goto error;
link->bytes_sent += 4;
if (socket_send(link->socket_fd, data, size) == -1) if (socket_send(link->socket_fd, data, size) == -1)
goto error; goto error;
link->bytes_sent += size; link->bytes_sent += size;
link->packets_sent++; link->packets_sent++;
}
else if (proto_type == 1 )
{
while (link->peer_port == 0)
{
sleep(0.1);
}
LOG_D(PROTO_AGENT, "peer port is %d", link->peer_port);
if (socket_udp_send(link->socket_fd, sizebuf, 4, peer_addr, link->peer_port) == -1)
goto error;
link->bytes_sent += 4;
if (socket_udp_send(link->socket_fd, data, size, peer_addr, link->peer_port) == -1)
goto error;
link->bytes_sent += size;
link->packets_sent++;
}
return 0; return 0;
...@@ -253,16 +541,28 @@ error: ...@@ -253,16 +541,28 @@ error:
/* /*
* return -1 on error and 0 if the sending was fine * return -1 on error and 0 if the sending was fine
*/ */
int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size) int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size, uint16_t proto_type, char *peer_addr, int port)
{ {
unsigned char sizebuf[4]; unsigned char sizebuf[4];
int32_t size; int32_t size;
void *data = NULL; void *data = NULL;
int peer_port = 0;
if ((proto_type == 0) || (proto_type == 2))
{
/* received the size first, maximum is 2^31 bytes */ /* received the size first, maximum is 2^31 bytes */
if (socket_receive(link->socket_fd, sizebuf, 4) == -1) if (socket_receive(link->socket_fd, sizebuf, 4) == -1)
goto error; goto error;
}
else if (proto_type == 1)
{
/* received the size first, maximum is 2^31 bytes */
peer_port = socket_udp_receive(link->socket_fd, sizebuf, 4);
if ( peer_port == -1)
goto error;
if (link->peer_port == 0) link->peer_port = peer_port;
}
size = (sizebuf[0] << 24) | size = (sizebuf[0] << 24) |
(sizebuf[1] << 16) | (sizebuf[1] << 16) |
(sizebuf[2] << 8) | (sizebuf[2] << 8) |
...@@ -277,10 +577,17 @@ int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size) ...@@ -277,10 +577,17 @@ int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size)
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__); LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error; goto error;
} }
if (socket_receive(link->socket_fd, data, size) == -1) if ((proto_type == 0) || (proto_type == 2))
goto error; {
if (socket_receive(link->socket_fd, data, size) == -1)
goto error;
}
else if (proto_type == 1)
{
if (socket_udp_receive(link->socket_fd, data, size) == -1)
goto error;
}
link->bytes_received += size; link->bytes_received += size;
link->packets_received++; link->packets_received++;
*ret_data = data; *ret_data = data;
......
...@@ -44,8 +44,10 @@ ...@@ -44,8 +44,10 @@
extern "C" { extern "C" {
#endif #endif
typedef struct { typedef struct {
int socket_fd; int socket_fd;
int peer_port;
uint64_t bytes_sent; uint64_t bytes_sent;
uint64_t packets_sent; uint64_t packets_sent;
uint64_t bytes_received; uint64_t bytes_received;
...@@ -54,8 +56,12 @@ typedef struct { ...@@ -54,8 +56,12 @@ typedef struct {
socket_link_t *new_link_server(int port); socket_link_t *new_link_server(int port);
socket_link_t *new_link_client(char *server, int port); socket_link_t *new_link_client(char *server, int port);
int link_send_packet(socket_link_t *link, void *data, int size); socket_link_t *new_link_udp_server(int port);
int link_receive_packet(socket_link_t *link, void **data, int *size); socket_link_t *new_link_udp_client(char *server, int port);
socket_link_t *new_link_sctp_server(int port);
socket_link_t *new_link_sctp_client(char *server, int port);
int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_type, char *peer_addr, int port);
int link_receive_packet(socket_link_t *link, void **data, int *size, uint16_t proto_type, char *peer_addr, int port);
int close_link(socket_link_t *link); int close_link(socket_link_t *link);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -118,6 +118,20 @@ eNBs = ...@@ -118,6 +118,20 @@ eNBs =
ENB_PORT_FOR_S1U = 2153; # Spec 2152 ENB_PORT_FOR_S1U = 2153; # Spec 2152
}; };
FLEXSPLIT_INTERFACES :
{
DU_INTERFACE_NAME_FOR_F1U = "eth0";
DU_IPV4_ADDRESS_FOR_F1U = "127.0.0.1/24";
DU_PORT_FOR_F1U = 2210;
CU_INTERFACE_NAME_FOR_F1U = "lo";
CU_IPV4_ADDRESS_FOR_F1U = "10.64.45.62"; //Address to search the DU
CU_PORT_FOR_F1U = 2210;
// One of TCP/UDP/SCTP
F1_U_TRANSPORT_TYPE = "UDP";
};
log_config : log_config :
{ {
global_log_level ="trace"; global_log_level ="trace";
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment