Commit f4fd79c4 authored by Sakthivel Velumani's avatar Sakthivel Velumani

First commit

Created 3 threadpools for L1 rx, L1 tx and RU tx processing
parent 3b2d5037
......@@ -870,7 +870,7 @@ void init_gNB_proc(int inst) {
pthread_cond_init(&sync_phy_proc.cond_phy_proc_tx, NULL);
sync_phy_proc.phy_proc_CC_id = 0;
gNB->threadPool = (tpool_t*)malloc(sizeof(tpool_t));
gNB->threadPool_ulsch = (tpool_t*)malloc(sizeof(tpool_t));
gNB->respDecode = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
int numCPU = sysconf(_SC_NPROCESSORS_ONLN);
uint32_t num_threads_pusch;
......@@ -884,10 +884,50 @@ void init_gNB_proc(int inst) {
sprintf(ul_pool+2+s_offset,",-1");
s_offset += 3;
}
initTpool(ul_pool, gNB->threadPool, false);
initTpool(ul_pool, gNB->threadPool_ulsch, false);
initNotifiedFIFO(gNB->respDecode);
}
void init_gNB_Tpool(int inst) {
PHY_VARS_gNB *gNB;
gNB = RC.gNB[inst];
// ULSCH decoding threadpool
gNB->threadPool_ulsch = (tpool_t*)malloc(sizeof(tpool_t));
gNB->respDecode = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
int numCPU = sysconf(_SC_NPROCESSORS_ONLN);
uint32_t num_threads_pusch;
paramdef_t PUSCHThreads[] = NUM_THREADS_DESC;
config_get( PUSCHThreads,sizeof(PUSCHThreads)/sizeof(paramdef_t),NULL);
int threadCnt = min(numCPU, num_threads_pusch);
char ul_pool[80];
sprintf(ul_pool,"-1");
int s_offset = 0;
for (int icpu=1; icpu<threadCnt; icpu++) {
sprintf(ul_pool+2+s_offset,",-1");
s_offset += 3;
}
initTpool(ul_pool, gNB->threadPool_ulsch, false);
initNotifiedFIFO(gNB->respDecode);
// L1 RX threadpool
gNB->threadPool_L1 = (tpool_t*)malloc(sizeof(tpool_t));
gNB->resp_L1 = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initTpool("-1", gNB->threadPool_L1, false);
initNotifiedFIFO(gNB->resp_L1);
// L1 TX threadpool
gNB->threadPool_L1_tx = (tpool_t*)malloc(sizeof(tpool_t));
gNB->resp_L1_tx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initTpool("-1",gNB->threadPool_L1_tx, false);
initNotifiedFIFO(gNB->resp_L1_tx);
// RU TX threadpool
gNB->threadPool_RU_tx = (tpool_t*)malloc(sizeof(tpool_t));
gNB->resp_RU_tx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initTpool("-1",gNB->threadPool_RU_tx, false);
initNotifiedFIFO(gNB->resp_RU_tx);
}
/*!
......@@ -1010,15 +1050,10 @@ void init_eNB_afterRU(void) {
* (not tested in other modes).
*/
//init_precoding_weights(RC.gNB[inst]);
init_gNB_proc(inst);
//init_gNB_proc(inst);
init_gNB_Tpool(inst);
}
for (ru_id=0; ru_id<RC.nb_RU; ru_id++) {
AssertFatal(RC.ru[ru_id]!=NULL,"ru_id %d is null\n",ru_id);
RC.ru[ru_id]->nr_wakeup_rxtx = wakeup_rxtx;
// RC.ru[ru_id]->wakeup_prach_eNB = wakeup_prach_gNB;
RC.ru[ru_id]->gNB_top = gNB_top;
}
}
void init_gNB(int single_thread_flag,int wait_for_sync) {
......
......@@ -120,6 +120,20 @@ extern int emulate_rf;
extern int numerology;
extern int usrp_tx_thread;
typedef struct processingData_L1 {
int frame_rx;
int frame_tx;
int slot_rx;
int slot_tx;
PHY_VARS_gNB *gNB;
} processingData_L1_t;
typedef struct processingData_RU {
int frame_tx;
int slot_tx;
RU_t *ru;
} processingData_RU_t;
/*************************************************************/
/* Functions to attach and configure RRU */
......@@ -1298,6 +1312,63 @@ void *ru_stats_thread(void *param) {
return(NULL);
}
void ru_tx_func(void *param) {
processingData_RU_t *info = (processingData_RU_t *) param;
RU_t *ru = info->ru;
NR_DL_FRAME_PARMS *fp = ru->nr_frame_parms;
int frame_tx = info->frame_tx;
int slot_tx = info->slot_tx;
int print_frame = 8;
char filename[40];
// do TX front-end processing if needed (precoding and/or IDFTs)
if (ru->feptx_prec) ru->feptx_prec(ru,frame_tx,slot_tx);
// do OFDM with/without TX front-end processing if needed
if ((ru->fh_north_asynch_in == NULL) && (ru->feptx_ofdm)) ru->feptx_ofdm(ru,frame_tx,slot_tx);
if(!emulate_rf) {
// do outgoing fronthaul (south) if needed
if ((ru->fh_north_asynch_in == NULL) && (ru->fh_south_out)) ru->fh_south_out(ru,frame_tx,slot_tx,ru->proc.timestamp_tx);
if (ru->fh_north_out) ru->fh_north_out(ru);
} else {
if(frame_tx == print_frame) {
for (int i=0; i<ru->nb_tx; i++) {
if(slot_tx == 0) {
sprintf(filename,"gNBdataF_frame%d_sl%d.m", print_frame, slot_tx);
LOG_M(filename,"txdataF_frame",&ru->gNB_list[0]->common_vars.txdataF[i][0],fp->samples_per_frame_wCP, 1, 1);
sprintf(filename,"tx%ddataF_frame%d_sl%d.m", i, print_frame, slot_tx);
LOG_M(filename,"txdataF_frame",&ru->common.txdataF[i][0],fp->samples_per_frame_wCP, 1, 1);
sprintf(filename,"tx%ddataF_BF_frame%d_sl%d.m", i, print_frame, slot_tx);
LOG_M(filename,"txdataF_BF_frame",&ru->common.txdataF_BF[i][0],fp->samples_per_subframe_wCP, 1, 1);
}
if(slot_tx == 9) {
sprintf(filename,"tx%ddata_frame%d.m", i, print_frame);
LOG_M(filename,"txdata_frame",&ru->common.txdata[i][0],fp->samples_per_frame, 1, 1);
sprintf(filename,"tx%ddata_frame%d.dat", i, print_frame);
FILE *output_fd = fopen(filename,"w");
if (output_fd) {
fwrite(&ru->common.txdata[i][0],
sizeof(int32_t),
fp->samples_per_frame,
output_fd);
fclose(output_fd);
} else {
LOG_E(PHY,"Cannot write to file %s\n",filename);
}
}//if(slot_tx == 9)
}//for (i=0; i<ru->nb_tx; i++)
}//if(frame_tx == print_frame)
}//else emulate_rf
}
void *ru_thread_tx( void *param ) {
RU_t *ru = (RU_t *)param;
RU_proc_t *proc = &ru->proc;
......@@ -1430,11 +1501,201 @@ void *ru_thread_tx( void *param ) {
return 0;
}
void tx_func(void *param) {
processingData_L1_t *info = (processingData_L1_t *) param;
PHY_VARS_gNB *gNB = info->gNB;
int frame_tx = info->frame_tx;
int slot_tx = info->slot_tx;
phy_procedures_gNB_TX(gNB, frame_tx,slot_tx, 1);
// start FH TX processing
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(processingData_RU_t),0,gNB->resp_RU_tx,ru_tx_func);
processingData_RU_t *syncMsg = (processingData_RU_t *)NotifiedFifoData(msg);
notifiedFIFO_elt_t *res;
syncMsg->frame_tx = frame_tx;
syncMsg->slot_tx = slot_tx;
syncMsg->ru = gNB->RU_list[0];
res = tryPullTpool(gNB->resp_RU_tx, gNB->threadPool_RU_tx);
if (res == NULL) {
LOG_W(PHY,"RU TX of previous slot is still being processed. Adding current slot to queue (%d.%d)\n", frame_tx, slot_tx);
}
msg->key = slot_tx;
pushTpool(gNB->threadPool_RU_tx, msg);
}
void rx_func(void *param) {
processingData_L1_t *info = (processingData_L1_t *) param;
PHY_VARS_gNB *gNB = info->gNB;
int frame_rx = info->frame_rx;
int slot_rx = info->slot_rx;
int frame_tx = info->frame_tx;
int slot_tx = info->slot_tx;
sl_ahead = sf_ahead*gNB->frame_parms.slots_per_subframe;
nfapi_nr_config_request_scf_t *cfg = &gNB->gNB_config;
//start_meas(&softmodem_stats_rxtx_sf);
T(T_GNB_PHY_DL_TICK, T_INT(gNB->Mod_id), T_INT(frame_tx), T_INT(slot_tx));
/* hack to remove UEs */
extern int rnti_to_remove[10];
extern volatile int rnti_to_remove_count;
extern pthread_mutex_t rnti_to_remove_mutex;
if (pthread_mutex_lock(&rnti_to_remove_mutex)) exit(1);
int up_removed = 0;
int down_removed = 0;
int pucch_removed = 0;
for (int i = 0; i < rnti_to_remove_count; i++) {
LOG_W(PHY, "to remove rnti %d\n", rnti_to_remove[i]);
void clean_gNB_ulsch(NR_gNB_ULSCH_t *ulsch);
void clean_gNB_dlsch(NR_gNB_DLSCH_t *dlsch);
int j;
for (j = 0; j < NUMBER_OF_NR_ULSCH_MAX; j++)
if (gNB->ulsch[j][0]->rnti == rnti_to_remove[i]) {
gNB->ulsch[j][0]->rnti = 0;
gNB->ulsch[j][0]->harq_mask = 0;
//clean_gNB_ulsch(gNB->ulsch[j][0]);
int h;
for (h = 0; h < NR_MAX_ULSCH_HARQ_PROCESSES; h++) {
gNB->ulsch[j][0]->harq_processes[h]->status = SCH_IDLE;
gNB->ulsch[j][0]->harq_processes[h]->round = 0;
gNB->ulsch[j][0]->harq_processes[h]->handled = 0;
}
up_removed++;
}
for (j = 0; j < NUMBER_OF_NR_DLSCH_MAX; j++)
if (gNB->dlsch[j][0]->rnti == rnti_to_remove[i]) {
gNB->dlsch[j][0]->rnti = 0;
gNB->dlsch[j][0]->harq_mask = 0;
//clean_gNB_dlsch(gNB->dlsch[j][0]);
down_removed++;
}
for (j = 0; j < NUMBER_OF_NR_PUCCH_MAX; j++)
if (gNB->pucch[j]->active > 0 &&
gNB->pucch[j]->pucch_pdu.rnti == rnti_to_remove[i]) {
gNB->pucch[j]->active = 0;
gNB->pucch[j]->pucch_pdu.rnti = 0;
pucch_removed++;
}
#if 0
for (j = 0; j < NUMBER_OF_NR_PDCCH_MAX; j++)
gNB->pdcch_pdu[j].frame = -1;
for (j = 0; j < NUMBER_OF_NR_PDCCH_MAX; j++)
gNB->ul_pdcch_pdu[j].frame = -1;
for (j = 0; j < NUMBER_OF_NR_PRACH_MAX; j++)
gNB->prach_vars.list[j].frame = -1;
#endif
}
if (rnti_to_remove_count) LOG_W(PHY, "to remove rnti_to_remove_count=%d, up_removed=%d down_removed=%d pucch_removed=%d\n", rnti_to_remove_count, up_removed, down_removed, pucch_removed);
rnti_to_remove_count = 0;
if (pthread_mutex_unlock(&rnti_to_remove_mutex)) exit(1);
// Call the scheduler
pthread_mutex_lock(&gNB->UL_INFO_mutex);
gNB->UL_INFO.frame = frame_rx;
gNB->UL_INFO.slot = slot_rx;
gNB->UL_INFO.module_id = gNB->Mod_id;
gNB->UL_INFO.CC_id = gNB->CC_id;
gNB->if_inst->NR_UL_indication(&gNB->UL_INFO);
pthread_mutex_unlock(&gNB->UL_INFO_mutex);
// RX processing
int tx_slot_type = nr_slot_select(cfg,frame_tx,slot_tx);
int rx_slot_type = nr_slot_select(cfg,frame_rx,slot_rx);
if (rx_slot_type == NR_UPLINK_SLOT || rx_slot_type == NR_MIXED_SLOT) {
// UE-specific RX processing for subframe n
// TODO: check if this is correct for PARALLEL_RU_L1_TRX_SPLIT
// Do PRACH RU processing
L1_nr_prach_procedures(gNB,frame_rx,slot_rx);
//apply the rx signal rotation here
apply_nr_rotation_ul(&gNB->frame_parms,
gNB->common_vars.rxdataF[0],
slot_rx,
0,
gNB->frame_parms.Ncp==EXTENDED?12:14,
gNB->frame_parms.ofdm_symbol_size);
phy_procedures_gNB_uespec_RX(gNB, frame_rx, slot_rx);
}
if (oai_exit) return(-1);
//stop_meas( &softmodem_stats_rxtx_sf );
LOG_D(PHY,"%s() Exit proc[rx:%d%d tx:%d%d]\n", __FUNCTION__, frame_rx, slot_rx, frame_tx, slot_tx);
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(processingData_L1_t),0,gNB->resp_L1_tx,tx_func);
processingData_L1_t *syncMsg = (processingData_L1_t *)NotifiedFifoData(msg);
notifiedFIFO_elt_t *res;
syncMsg->gNB = gNB;
syncMsg->frame_rx = frame_rx;
syncMsg->slot_rx = slot_rx;
syncMsg->frame_tx = frame_tx;
syncMsg->slot_tx = slot_tx;
if (tx_slot_type == NR_DOWNLINK_SLOT || tx_slot_type == NR_MIXED_SLOT) {
res = tryPullTpool(gNB->resp_L1_tx, gNB->threadPool_L1_tx);
if (res == NULL) {
LOG_W(PHY,"L1 TX of previous slot is still being processed. Adding current slot to queue (%d.%d)\n", frame_tx, slot_tx);
}
msg->key = slot_tx;
pushTpool(gNB->threadPool_L1_tx, msg);
}
#if 0
LOG_D(PHY, "rxtx:%lld nfapi:%lld phy:%lld tx:%lld rx:%lld prach:%lld ofdm:%lld ",
softmodem_stats_rxtx_sf.diff_now, nfapi_meas.diff_now,
TICK_TO_US(gNB->phy_proc),
TICK_TO_US(gNB->phy_proc_tx),
TICK_TO_US(gNB->phy_proc_rx),
TICK_TO_US(gNB->rx_prach),
TICK_TO_US(gNB->ofdm_mod_stats),
softmodem_stats_rxtx_sf.diff_now, nfapi_meas.diff_now);
LOG_D(PHY,
"dlsch[enc:%lld mod:%lld scr:%lld rm:%lld t:%lld i:%lld] rx_dft:%lld ",
TICK_TO_US(gNB->dlsch_encoding_stats),
TICK_TO_US(gNB->dlsch_modulation_stats),
TICK_TO_US(gNB->dlsch_scrambling_stats),
TICK_TO_US(gNB->dlsch_rate_matching_stats),
TICK_TO_US(gNB->dlsch_turbo_encoding_stats),
TICK_TO_US(gNB->dlsch_interleaving_stats),
TICK_TO_US(gNB->rx_dft_stats));
LOG_D(PHY," ulsch[ch:%lld freq:%lld dec:%lld demod:%lld ru:%lld ",
TICK_TO_US(gNB->ulsch_channel_estimation_stats),
TICK_TO_US(gNB->ulsch_freq_offset_estimation_stats),
TICK_TO_US(gNB->ulsch_decoding_stats),
TICK_TO_US(gNB->ulsch_demodulation_stats),
TICK_TO_US(gNB->ulsch_rate_unmatching_stats));
LOG_D(PHY, "td:%lld dei:%lld dem:%lld llr:%lld tci:%lld ",
TICK_TO_US(gNB->ulsch_turbo_decoding_stats),
TICK_TO_US(gNB->ulsch_deinterleaving_stats),
TICK_TO_US(gNB->ulsch_demultiplexing_stats),
TICK_TO_US(gNB->ulsch_llr_stats),
TICK_TO_US(gNB->ulsch_tc_init_stats));
LOG_D(PHY, "tca:%lld tcb:%lld tcg:%lld tce:%lld l1:%lld l2:%lld]\n\n",
TICK_TO_US(gNB->ulsch_tc_alpha_stats),
TICK_TO_US(gNB->ulsch_tc_beta_stats),
TICK_TO_US(gNB->ulsch_tc_gamma_stats),
TICK_TO_US(gNB->ulsch_tc_ext_stats),
TICK_TO_US(gNB->ulsch_tc_intl1_stats),
TICK_TO_US(gNB->ulsch_tc_intl2_stats)
);
#endif
}
void *ru_thread( void *param ) {
static int ru_thread_status;
RU_t *ru = (RU_t *)param;
RU_proc_t *proc = &ru->proc;
NR_DL_FRAME_PARMS *fp = ru->nr_frame_parms;
PHY_VARS_gNB *gNB = RC.gNB[0];
int ret;
int slot = fp->slots_per_frame-1;
int frame = 1023;
......@@ -1500,6 +1761,10 @@ void *ru_thread( void *param ) {
pthread_mutex_unlock(&RC.ru_mutex);
wait_sync("ru_thread");
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(processingData_L1_t),0,gNB->resp_L1,rx_func);
processingData_L1_t *syncMsg = (processingData_L1_t *)NotifiedFifoData(msg);
notifiedFIFO_elt_t *res;
if(!emulate_rf) {
// Start RF device if any
if (ru->start_rf) {
......@@ -1580,7 +1845,6 @@ void *ru_thread( void *param ) {
int slot_type = nr_slot_select(cfg,proc->frame_rx,proc->tti_rx);
if (slot_type == NR_UPLINK_SLOT || slot_type == NR_MIXED_SLOT) {
//if (proc->tti_rx==8) {
if (ru->feprx) {
ru->feprx(ru,proc->tti_rx);
......@@ -1630,9 +1894,19 @@ void *ru_thread( void *param ) {
}
// At this point, all information for subframe has been received on FH interface
// wakeup all gNB processes waiting for this RU
if (ru->num_gNB>0) wakeup_gNB_L1s(ru);
syncMsg->gNB = gNB;
syncMsg->frame_rx = proc->frame_rx;
syncMsg->slot_rx = proc->tti_rx;
syncMsg->frame_tx = proc->frame_tx;
syncMsg->slot_tx = proc->tti_tx;
// check if previous L1 slot processing is finished
res = tryPullTpool(gNB->resp_L1, gNB->threadPool_L1);
if (res == NULL) {
LOG_W(PHY,"L1 RX of previous slot is still being processed. Adding current slot to queue (%d.%d)\n", proc->frame_rx, proc->tti_rx);
}
msg->key = proc->tti_rx;
pushTpool(gNB->threadPool_L1, msg);
if(get_thread_parallel_conf() == PARALLEL_SINGLE_THREAD || ru->num_gNB==0) {
// do TX front-end processing if needed (precoding and/or IDFTs)
......
......@@ -640,7 +640,7 @@ uint32_t nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
rdata->offset = offset;
rdata->ulsch = ulsch;
rdata->ulsch_id = UE_id;
pushTpool(phy_vars_gNB->threadPool,req);
pushTpool(phy_vars_gNB->threadPool_ulsch,req);
phy_vars_gNB->nbDecode++;
LOG_D(PHY,"Added a block to decode, in pipe: %d\n",phy_vars_gNB->nbDecode);
r_offset += E;
......
......@@ -832,7 +832,13 @@ typedef struct PHY_VARS_gNB_s {
time_stats_t ulsch_freq_offset_estimation_stats;
*/
notifiedFIFO_t *respDecode;
tpool_t *threadPool;
notifiedFIFO_t *resp_L1;
notifiedFIFO_t *resp_L1_tx;
notifiedFIFO_t *resp_RU_tx;
tpool_t *threadPool_ulsch;
tpool_t *threadPool_L1;
tpool_t *threadPool_L1_tx;
tpool_t *threadPool_RU_tx;
int nbDecode;
} PHY_VARS_gNB;
......
......@@ -243,7 +243,7 @@ void nr_postDecode(PHY_VARS_gNB *gNB, notifiedFIFO_elt_t *req) {
} else {
if ( rdata->nbSegments != ulsch_harq->processedSegments ) {
int nb=abortTpool(gNB->threadPool, req->key);
int nb=abortTpool(gNB->threadPool_ulsch, req->key);
nb+=abortNotifiedFIFO(gNB->respDecode, req->key);
gNB->nbDecode-=nb;
LOG_D(PHY,"uplink segment error %d/%d, aborted %d segments\n",rdata->segment_r,rdata->nbSegments, nb);
......@@ -353,7 +353,7 @@ void nr_ulsch_procedures(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx, int ULSCH
G);
while (gNB->nbDecode > 0) {
notifiedFIFO_elt_t *req=pullTpool(gNB->respDecode, gNB->threadPool);
notifiedFIFO_elt_t *req=pullTpool(gNB->respDecode, gNB->threadPool_ulsch);
nr_postDecode(gNB, req);
delNotifiedFIFO_elt(req);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment