Commit ed7e13c6 authored by Xenofon Foukas's avatar Xenofon Foukas

Integrated abstract network communication in main agent

parent 8fde111b
...@@ -42,6 +42,9 @@ ...@@ -42,6 +42,9 @@
#include "assertions.h" #include "assertions.h"
#include "enb_agent_net_comm.h"
#include "enb_agent_async.h"
//#define TEST_TIMER //#define TEST_TIMER
enb_agent_instance_t enb_agent[NUM_MAX_ENB_AGENT]; enb_agent_instance_t enb_agent[NUM_MAX_ENB_AGENT];
...@@ -98,10 +101,15 @@ void *enb_agent_task(void *args){ ...@@ -98,10 +101,15 @@ void *enb_agent_task(void *args){
msg = enb_agent_process_timeout(msg_p->ittiMsg.timer_has_expired.timer_id, msg_p->ittiMsg.timer_has_expired.arg); msg = enb_agent_process_timeout(msg_p->ittiMsg.timer_has_expired.timer_id, msg_p->ittiMsg.timer_has_expired.arg);
if (msg != NULL){ if (msg != NULL){
data=enb_agent_pack_message(msg,&size); data=enb_agent_pack_message(msg,&size);
if (message_put(d->tx_mq, data, size, priority)){ if (enb_agent_msg_send(d->mod_id, ENB_AGENT_DEFAULT, data, size, priority)) {
err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING;
goto error; goto error;
} }
/* if (message_put(d->tx_mq, data, size, priority)){ */
/* err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; */
/* goto error; */
/* } */
LOG_D(ENB_AGENT,"sent message with size %d\n", size); LOG_D(ENB_AGENT,"sent message with size %d\n", size);
} }
break; break;
...@@ -121,42 +129,6 @@ void *enb_agent_task(void *args){ ...@@ -121,42 +129,6 @@ void *enb_agent_task(void *args){
return NULL; return NULL;
} }
/* void *send_thread(void *args) { */
/* #ifdef TEST_TIMER */
/* msg_context_t *d = args; */
/* void *data; */
/* int size; */
/* int priority; */
/* struct timeval t1, t2; */
/* long long t; */
/* struct timespec ts; */
/* unsigned int delay = 250*1000; */
/* while(1) { */
/* gettimeofday(&t1, NULL); */
/* enb_agent_sleep_until(&ts, delay); */
/* gettimeofday(&t2, NULL); */
/* t = ((t2.tv_sec * 1000000) + t2.tv_usec) - ((t1.tv_sec * 1000000) + t1.tv_usec); */
/* LOG_I(ENB_AGENT, "Call to sleep_until(%d) took %lld us\n", delay, t); */
/* sleep(1); */
/* } */
/* #endif */
/* /\* while (1) { */
/* // need logic for the timer, and */
/* usleep(10); */
/* if (message_put(d->tx_mq, data, size, priority)) goto error; */
/* }*\/ */
/* return NULL; */
/* error: */
/* printf("receive_thread: there was an error\n"); */
/* return NULL; */
/* } */
void *receive_thread(void *args) { void *receive_thread(void *args) {
msg_context_t *d = args; msg_context_t *d = args;
...@@ -168,10 +140,15 @@ void *receive_thread(void *args) { ...@@ -168,10 +140,15 @@ void *receive_thread(void *args) {
Protocol__ProgranMessage *msg; Protocol__ProgranMessage *msg;
while (1) { while (1) {
if (message_get(d->rx_mq, &data, &size, &priority)){ if (enb_agent_msg_recv(d->mod_id, ENB_AGENT_DEFAULT, &data, &size, &priority)) {
err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING; err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING;
goto error; goto error;
} }
/* if (message_get(d->rx_mq, &data, &size, &priority)){ */
/* err_code = PROTOCOL__PROGRAN_ERR__MSG_DEQUEUING; */
/* goto error; */
/* } */
LOG_D(ENB_AGENT,"received message with size %d\n", size); LOG_D(ENB_AGENT,"received message with size %d\n", size);
...@@ -186,10 +163,15 @@ void *receive_thread(void *args) { ...@@ -186,10 +163,15 @@ void *receive_thread(void *args) {
if (msg != NULL){ if (msg != NULL){
data=enb_agent_pack_message(msg,&size); data=enb_agent_pack_message(msg,&size);
if (message_put(d->tx_mq, data, size, priority)){ if (enb_agent_msg_send(d->mod_id, ENB_AGENT_DEFAULT, data, size, priority)) {
err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING;
goto error; goto error;
} }
/* if (message_put(d->tx_mq, data, size, priority)){ */
/* err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; */
/* goto error; */
/* } */
LOG_D(ENB_AGENT,"sent message with size %d\n", size); LOG_D(ENB_AGENT,"sent message with size %d\n", size);
} }
...@@ -229,7 +211,9 @@ pthread_t new_thread(void *(*f)(void *), void *b) { ...@@ -229,7 +211,9 @@ pthread_t new_thread(void *(*f)(void *), void *b) {
} }
int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){
int channel_id;
// //
set_enb_vars(mod_id, RAN_LTE_OAI); set_enb_vars(mod_id, RAN_LTE_OAI);
enb_agent[mod_id].mod_id = mod_id; enb_agent[mod_id].mod_id = mod_id;
...@@ -255,48 +239,78 @@ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){ ...@@ -255,48 +239,78 @@ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){
in_ip, in_ip,
in_port); in_port);
//#define TEST_TIMER 0 /*
#if !defined TEST_TIMER * Initialize the channel container
*/
enb_agent_init_channel_container();
/*Create the async channel info*/
enb_agent_instance_t *channel_info = enb_agent_async_channel_info(mod_id, in_ip, in_port);
/*Create a channel using the async channel info*/
channel_id = enb_agent_create_channel((void *) channel_info,
enb_agent_async_msg_send,
enb_agent_async_msg_recv,
enb_agent_async_release);
/*
* create a socket
*/
enb_agent[mod_id].link = new_link_client(in_ip, in_port);
if (enb_agent[mod_id].link == NULL) goto error;
LOG_I(ENB_AGENT,"starting enb agent client for module id %d on ipv4 %s, port %d\n", if (channel_id <= 0) {
enb_agent[mod_id].mod_id, goto error;
in_ip, }
in_port);
/* enb_agent_channel_t *channel = get_channel(channel_id);
* create a message queue
*/
enb_agent[mod_id].send_queue = new_message_queue(); if (channel == NULL) {
if (enb_agent[mod_id].send_queue == NULL) goto error; goto error;
enb_agent[mod_id].receive_queue = new_message_queue(); }
if (enb_agent[mod_id].receive_queue == NULL) goto error;
/*Register the channel for all underlying agents (use ENB_AGENT_MAX)*/
enb_agent_register_channel(mod_id, channel, ENB_AGENT_MAX);
/*Example of registration for a specific agent(MAC):
*enb_agent_register_channel(mod_id, channel, ENB_AGENT_MAC);
*/
/* /\* */
/* * create a socket */
/* *\/ */
/* enb_agent[mod_id].link = new_link_client(in_ip, in_port); */
/* if (enb_agent[mod_id].link == NULL) goto error; */
/* /* LOG_I(ENB_AGENT,"starting enb agent client for module id %d on ipv4 %s, port %d\n", */
* create a link manager /* enb_agent[mod_id].mod_id, */
*/ /* in_ip, */
/* in_port); */
/* /\* */
/* * create a message queue */
/* *\/ */
/* enb_agent[mod_id].send_queue = new_message_queue(); */
/* if (enb_agent[mod_id].send_queue == NULL) goto error; */
/* enb_agent[mod_id].receive_queue = new_message_queue(); */
/* if (enb_agent[mod_id].receive_queue == NULL) goto error; */
/* /\* */
/* * create a link manager */
/* *\/ */
enb_agent[mod_id].manager = create_link_manager(enb_agent[mod_id].send_queue, enb_agent[mod_id].receive_queue, enb_agent[mod_id].link); /* enb_agent[mod_id].manager = create_link_manager(enb_agent[mod_id].send_queue, enb_agent[mod_id].receive_queue, enb_agent[mod_id].link); */
if (enb_agent[mod_id].manager == NULL) goto error; /* if (enb_agent[mod_id].manager == NULL) goto error; */
memset(&shared_ctxt, 0, sizeof(msg_context_t)); /* memset(&shared_ctxt, 0, sizeof(msg_context_t)); */
shared_ctxt[mod_id].mod_id = mod_id; /* shared_ctxt[mod_id].mod_id = mod_id; */
shared_ctxt[mod_id].tx_mq = enb_agent[mod_id].send_queue; /* shared_ctxt[mod_id].tx_mq = enb_agent[mod_id].send_queue; */
shared_ctxt[mod_id].rx_mq = enb_agent[mod_id].receive_queue; /* shared_ctxt[mod_id].rx_mq = enb_agent[mod_id].receive_queue; */
/* /*
* start the enb agent rx thread * start the enb agent rx thread
*/ */
memset(&shared_ctxt, 0, sizeof(msg_context_t));
shared_ctxt[mod_id].mod_id = mod_id;
new_thread(receive_thread, &shared_ctxt[mod_id]); new_thread(receive_thread, &shared_ctxt[mod_id]);
#endif
/* /*
* initilize a timer * initilize a timer
...@@ -304,8 +318,6 @@ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){ ...@@ -304,8 +318,6 @@ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){
enb_agent_init_timer(); enb_agent_init_timer();
/* /*
* start the enb agent task for tx and interaction with the underlying network function * start the enb agent task for tx and interaction with the underlying network function
*/ */
...@@ -313,12 +325,7 @@ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){ ...@@ -313,12 +325,7 @@ int enb_agent_start(mid_t mod_id, const Enb_properties_array_t* enb_properties){
if (itti_create_task (TASK_ENB_AGENT, enb_agent_task, (void *) &shared_ctxt[mod_id]) < 0) { if (itti_create_task (TASK_ENB_AGENT, enb_agent_task, (void *) &shared_ctxt[mod_id]) < 0) {
LOG_E(ENB_AGENT, "Create task for eNB Agent failed\n"); LOG_E(ENB_AGENT, "Create task for eNB Agent failed\n");
return -1; return -1;
} }
//new_thread(send_thread, &shared_ctxt);
//while (1) pause();
LOG_I(ENB_AGENT,"client ends\n"); LOG_I(ENB_AGENT,"client ends\n");
return 0; return 0;
......
...@@ -133,8 +133,8 @@ typedef struct { ...@@ -133,8 +133,8 @@ typedef struct {
typedef struct { typedef struct {
mid_t mod_id; mid_t mod_id;
message_queue_t *tx_mq; // message_queue_t *tx_mq;
message_queue_t *rx_mq; // message_queue_t *rx_mq;
xid_t tx_xid; xid_t tx_xid;
xid_t rx_xid; xid_t rx_xid;
} msg_context_t; } msg_context_t;
......
...@@ -836,19 +836,23 @@ void enb_agent_send_sr_info(mid_t mod_id, msg_context_t *context) { ...@@ -836,19 +836,23 @@ void enb_agent_send_sr_info(mid_t mod_id, msg_context_t *context) {
err_code_t err_code; err_code_t err_code;
/*TODO: Must use a proper xid*/ /*TODO: Must use a proper xid*/
int xid = 1; err_code = enb_agent_mac_sr_info(mod_id, (void *) &(context->tx_xid), &msg);
err_code = enb_agent_mac_sr_info(mod_id, (void *) &xid, &msg);
if (err_code < 0) { if (err_code < 0) {
goto error; goto error;
} }
if (msg != NULL){ if (msg != NULL){
data=enb_agent_pack_message(msg, &size); data=enb_agent_pack_message(msg, &size);
/*Send sr info using the MAC channel of the eNB*/
if (message_put(context->tx_mq, data, size, priority)){ if (enb_agent_msg_send(mod_id, ENB_AGENT_MAC, data, size, priority)) {
err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING;
goto error; goto error;
} }
/* if (message_put(context->tx_mq, data, size, priority)){ */
/* err_code = PROTOCOL__PROGRAN_ERR__MSG_ENQUEUING; */
/* goto error; */
/* } */
LOG_D(ENB_AGENT,"sent message with size %d\n", size); LOG_D(ENB_AGENT,"sent message with size %d\n", size);
} }
error: error:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment