Commit ff24e3e0 authored by Teodora's avatar Teodora

Enable multiple RC subscriptions

  - create hash table to save riq_req_id (key) and array of ran_param_id(s) (values), per each subscription
  - create RB tree to store list of riq_req_id(s) for each ran_param_id
    => when the async event occurs, it is easier and faster to search per ran_param_id and send the indication message to all xApps (riq_req_id(s)) subscribed to the same ran_param_id
  - it is important to mention that both data structures need to be maintained, especially when unsubscription occurs (free_aperiodic_subscription)
parent d89d2ae2
...@@ -30,12 +30,93 @@ ...@@ -30,12 +30,93 @@
#include <stdio.h> #include <stdio.h>
#include <pthread.h> #include <pthread.h>
#include <unistd.h> #include <unistd.h>
#include "common/utils/hashtable/hashtable.h"
#include "common/utils/assertions.h" #include "common/utils/assertions.h"
#include "common/ran_context.h" #include "common/ran_context.h"
static const int rrc_state_changed_to = 202; #define MAX_NUM_RIQ_REQ_ID 64
static bool subs_rrc_state_change = false;
static uint32_t riq_req_id = 0; typedef enum {
RRC_STATE_CHANGED_TO_E2SM_RC_RAN_PARAM_ID = 202, // 8.2.4 RAN Parameters for Report Service Style 4
END_E2SM_RC_RAN_PARAM_ID
} ran_param_id_e;
typedef struct rb_riq_req_id_s {
RB_ENTRY(rb_riq_req_id_s) entries;
uint32_t riq_req_id;
} rb_riq_req_id_t;
typedef struct {
RB_HEAD(riq_id_2_param_id, rb_riq_req_id_s) arr[END_E2SM_RC_RAN_PARAM_ID];
hash_table_t* riq_id_2_param_id_mapping;
} rc_subs_data_t;
int riq_req_id_compare(struct rb_riq_req_id_s *c1_pP, struct rb_riq_req_id_s *c2_pP)
{
if (c1_pP->riq_req_id > c2_pP->riq_req_id) {
return 1;
}
if (c1_pP->riq_req_id < c2_pP->riq_req_id) {
return -1;
}
return 0;
}
RB_GENERATE(riq_id_2_param_id, rb_riq_req_id_s, entries, riq_req_id_compare);
typedef struct{
size_t len;
ran_param_id_e* ran_param_id;
} arr_ran_param_id_t;
static rc_subs_data_t rc_subs_data = {0};
static pthread_mutex_t rc_mutex = PTHREAD_MUTEX_INITIALIZER;
static void init_rc_subs_data(void)
{
pthread_mutex_lock(&rc_mutex);
// Initialize RB tree
for (size_t i = 0; i < END_E2SM_RC_RAN_PARAM_ID; i++) {
RB_INIT(&rc_subs_data.arr[i]);
}
// Initialize hash table
DevAssert(rc_subs_data.riq_id_2_param_id_mapping == NULL);
rc_subs_data.riq_id_2_param_id_mapping = hashtable_create(MAX_NUM_RIQ_REQ_ID, NULL, free);
assert(rc_subs_data.riq_id_2_param_id_mapping != NULL && "Memory exhausted");
pthread_mutex_unlock(&rc_mutex);
}
static void insert_rc_subs_data(uint32_t riq_req_id, arr_ran_param_id_t* arr_ran_param_id)
{
pthread_mutex_lock(&rc_mutex);
// Insert in hash table
DevAssert(rc_subs_data.riq_id_2_param_id_mapping != NULL);
uint64_t key = riq_req_id;
// Check if the subscription already exists
if (hashtable_is_key_exists(rc_subs_data.riq_id_2_param_id_mapping, key) == HASH_TABLE_OK) {
pthread_mutex_unlock(&rc_mutex);
return;
}
void* value = (void*)arr_ran_param_id;
hashtable_rc_t ret = hashtable_insert(rc_subs_data.riq_id_2_param_id_mapping, key, value);
assert(ret == HASH_TABLE_OK && "Hash table not ok");
// Insert in RB tree
for (size_t i = 0; i < arr_ran_param_id->len; i++) {
struct rb_riq_req_id_s *node = (struct rb_riq_req_id_s *)malloc(sizeof(struct rb_riq_req_id_s));
assert(node != NULL);
node->riq_req_id = riq_req_id;
RB_INSERT(riq_id_2_param_id, &rc_subs_data.arr[arr_ran_param_id->ran_param_id[i]], node);
}
pthread_mutex_unlock(&rc_mutex);
}
static ue_id_e2sm_t fill_ue_id_data(const gNB_RRC_UE_t *rrc_ue_context) static ue_id_e2sm_t fill_ue_id_data(const gNB_RRC_UE_t *rrc_ue_context)
{ {
...@@ -88,7 +169,7 @@ static seq_ran_param_t fill_rrc_state_change_seq_ran(const rc_sm_rrc_state_e rrc ...@@ -88,7 +169,7 @@ static seq_ran_param_t fill_rrc_state_change_seq_ran(const rc_sm_rrc_state_e rrc
{ {
seq_ran_param_t seq_ran_param = {0}; seq_ran_param_t seq_ran_param = {0};
seq_ran_param.ran_param_id = rrc_state_changed_to; seq_ran_param.ran_param_id = RRC_STATE_CHANGED_TO_E2SM_RC_RAN_PARAM_ID;
seq_ran_param.ran_param_val.type = ELEMENT_KEY_FLAG_FALSE_RAN_PARAMETER_VAL_TYPE; seq_ran_param.ran_param_val.type = ELEMENT_KEY_FLAG_FALSE_RAN_PARAMETER_VAL_TYPE;
seq_ran_param.ran_param_val.flag_false = calloc(1, sizeof(ran_parameter_value_t)); seq_ran_param.ran_param_val.flag_false = calloc(1, sizeof(ran_parameter_value_t));
assert(seq_ran_param.ran_param_val.flag_false != NULL && "Memory exhausted"); assert(seq_ran_param.ran_param_val.flag_false != NULL && "Memory exhausted");
...@@ -132,15 +213,22 @@ static rc_ind_data_t* fill_ue_rrc_state_change(const gNB_RRC_UE_t *rrc_ue_contex ...@@ -132,15 +213,22 @@ static rc_ind_data_t* fill_ue_rrc_state_change(const gNB_RRC_UE_t *rrc_ue_contex
return rc_ind; return rc_ind;
} }
void signal_rrc_state_changed_to(const gNB_RRC_UE_t *rrc_ue_context, const rc_sm_rrc_state_e rrc_state) void signal_rrc_state_changed_to(const gNB_RRC_UE_t *rrc_ue_context, const rc_sm_rrc_state_e rrc_state)
{ {
AssertFatal(subs_rrc_state_change == true, "xApp should be subscribed to E2 node before sending the RIC INDICATION Message\n"); pthread_mutex_lock(&rc_mutex);
if (rc_subs_data.riq_id_2_param_id_mapping == NULL)
return;
rc_ind_data_t* rc_ind_data = fill_ue_rrc_state_change(rrc_ue_context, rrc_state); struct rb_riq_req_id_s *currentNode;
RB_FOREACH(currentNode, riq_id_2_param_id, &rc_subs_data.arr[RRC_STATE_CHANGED_TO_E2SM_RC_RAN_PARAM_ID]) {
rc_ind_data_t* rc_ind_data = fill_ue_rrc_state_change(rrc_ue_context, rrc_state);
async_event_agent_api(riq_req_id, rc_ind_data); // Needs review: memory ownership of the type rc_ind_data_t is transferred to the E2 Agent. Bad
printf("Event for RIC Req ID %u generated\n", riq_req_id); async_event_agent_api(currentNode->riq_req_id, rc_ind_data);
printf("Event for RIC Req ID %u generated\n", currentNode->riq_req_id);
}
pthread_mutex_unlock(&rc_mutex);
} }
bool read_rc_sm(void* data) bool read_rc_sm(void* data)
...@@ -158,6 +246,8 @@ void read_rc_setup_sm(void* data) ...@@ -158,6 +246,8 @@ void read_rc_setup_sm(void* data)
// assert(data->type == RAN_CTRL_V1_3_AGENT_IF_E2_SETUP_ANS_V0); // assert(data->type == RAN_CTRL_V1_3_AGENT_IF_E2_SETUP_ANS_V0);
rc_e2_setup_t* rc = (rc_e2_setup_t*)data; rc_e2_setup_t* rc = (rc_e2_setup_t*)data;
rc->ran_func_def = fill_rc_ran_func_def(); rc->ran_func_def = fill_rc_ran_func_def();
init_rc_subs_data();
} }
sm_ag_if_ans_t write_ctrl_rc_sm(void const* data) sm_ag_if_ans_t write_ctrl_rc_sm(void const* data)
...@@ -205,14 +295,34 @@ sm_ag_if_ans_t write_ctrl_rc_sm(void const* data) ...@@ -205,14 +295,34 @@ sm_ag_if_ans_t write_ctrl_rc_sm(void const* data)
return ans; return ans;
} }
static void free_aperiodic_subscription(uint32_t riq_req_id)
{
pthread_mutex_lock(&rc_mutex);
DevAssert(rc_subs_data.riq_id_2_param_id_mapping != NULL);
// Remove riq_req_id with its ran_param_id(s) from hash table
uint64_t key = riq_req_id;
hashtable_rc_t ret = hashtable_remove(rc_subs_data.riq_id_2_param_id_mapping, key);
assert(ret == HASH_TABLE_OK && "Hash table not ok");
// Remove riq_req_id from each tree of ran_param_id(s), if subscribed
rb_riq_req_id_t *node = calloc(1, sizeof(rb_riq_req_id_t));
assert(node != NULL);
node->riq_req_id = riq_req_id;
for (size_t i = 0; i < END_E2SM_RC_RAN_PARAM_ID; i++) {
if (rc_subs_data.arr[i].rbh_root == NULL)
continue;
RB_REMOVE(riq_id_2_param_id, &rc_subs_data.arr[i], node);
}
pthread_mutex_unlock(&rc_mutex);
}
sm_ag_if_ans_t write_subs_rc_sm(void const* src) sm_ag_if_ans_t write_subs_rc_sm(void const* src)
{ {
assert(src != NULL); // && src->type == RAN_CTRL_SUBS_V1_03); assert(src != NULL); // && src->type == RAN_CTRL_SUBS_V1_03);
wr_rc_sub_data_t* wr_rc = (wr_rc_sub_data_t*)src; wr_rc_sub_data_t* wr_rc = (wr_rc_sub_data_t*)src;
riq_req_id = wr_rc->ric_req_id; // store the ric_req_id to generate asynchronous event
assert(wr_rc->rc.ad != NULL && "Cannot be NULL"); assert(wr_rc->rc.ad != NULL && "Cannot be NULL");
// 9.2.1.2 RIC ACTION DEFINITION IE // 9.2.1.2 RIC ACTION DEFINITION IE
...@@ -221,13 +331,15 @@ sm_ag_if_ans_t write_subs_rc_sm(void const* src) ...@@ -221,13 +331,15 @@ sm_ag_if_ans_t write_subs_rc_sm(void const* src)
case FORMAT_1_E2SM_RC_ACT_DEF: { case FORMAT_1_E2SM_RC_ACT_DEF: {
// Parameters to be Reported List // Parameters to be Reported List
// [1-65535] // [1-65535]
for(size_t i = 0; i < wr_rc->rc.ad->frmt_1.sz_param_report_def; i++) { arr_ran_param_id_t* arr_values = calloc(1, sizeof(arr_ran_param_id_t));
if (wr_rc->rc.ad->frmt_1.param_report_def[i].ran_param_id == rrc_state_changed_to) { assert(arr_values != NULL && "Memory exhausted");
subs_rrc_state_change = true; arr_values->len = wr_rc->rc.ad->frmt_1.sz_param_report_def;
} else { arr_values->ran_param_id = calloc(arr_values->len, sizeof(ran_param_id_e));
AssertFatal(wr_rc->rc.ad->frmt_1.param_report_def[i].ran_param_id == rrc_state_changed_to, "RAN Parameter ID %d not yet implemented\n", wr_rc->rc.ad->frmt_1.param_report_def[i].ran_param_id);
} for(size_t i = 0; i < arr_values->len; i++) {
arr_values->ran_param_id[i] = wr_rc->rc.ad->frmt_1.param_report_def[i].ran_param_id;
} }
insert_rc_subs_data(wr_rc->ric_req_id, arr_values);
break; break;
} }
...@@ -235,7 +347,9 @@ sm_ag_if_ans_t write_subs_rc_sm(void const* src) ...@@ -235,7 +347,9 @@ sm_ag_if_ans_t write_subs_rc_sm(void const* src)
AssertFatal(wr_rc->rc.ad->format == FORMAT_1_E2SM_RC_ACT_DEF, "Action Definition Format %d not yet implemented", wr_rc->rc.ad->format); AssertFatal(wr_rc->rc.ad->format == FORMAT_1_E2SM_RC_ACT_DEF, "Action Definition Format %d not yet implemented", wr_rc->rc.ad->format);
} }
sm_ag_if_ans_t ans = {0}; sm_ag_if_ans_t ans = {.type = SUBS_OUTCOME_SM_AG_IF_ANS_V0};
ans.subs_out.type = APERIODIC_SUBSCRIPTION_FLRC;
ans.subs_out.aper.free_aper_subs = free_aperiodic_subscription;
return ans; return ans;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment