Commit 2e0ed038 authored by Sakthivel Velumani's avatar Sakthivel Velumani

Using only one threadPool for nr-softmodem

parent d29e09cd
......@@ -870,7 +870,7 @@ void init_gNB_proc(int inst) {
pthread_cond_init(&sync_phy_proc.cond_phy_proc_tx, NULL);
sync_phy_proc.phy_proc_CC_id = 0;
gNB->threadPool_ulsch = (tpool_t*)malloc(sizeof(tpool_t));
gNB->threadPool = (tpool_t*)malloc(sizeof(tpool_t));
gNB->respDecode = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
int numCPU = sysconf(_SC_NPROCESSORS_ONLN);
uint32_t num_threads_pusch;
......@@ -884,7 +884,7 @@ void init_gNB_proc(int inst) {
sprintf(ul_pool+2+s_offset,",-1");
s_offset += 3;
}
initTpool(ul_pool, gNB->threadPool_ulsch, false);
initTpool(ul_pool, gNB->threadPool, false);
initNotifiedFIFO(gNB->respDecode);
}
......@@ -893,8 +893,7 @@ void init_gNB_Tpool(int inst) {
gNB = RC.gNB[inst];
// ULSCH decoding threadpool
gNB->threadPool_ulsch = (tpool_t*)malloc(sizeof(tpool_t));
gNB->respDecode = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
gNB->threadPool = (tpool_t*)malloc(sizeof(tpool_t));
int numCPU = sysconf(_SC_NPROCESSORS_ONLN);
uint32_t num_threads_pusch;
paramdef_t PUSCHThreads[] = NUM_THREADS_DESC;
......@@ -907,26 +906,21 @@ void init_gNB_Tpool(int inst) {
sprintf(ul_pool+2+s_offset,",-1");
s_offset += 3;
}
initTpool(ul_pool, gNB->threadPool_ulsch, false);
initTpool(ul_pool, gNB->threadPool, false);
// ULSCH decoder result FIFO
gNB->respDecode = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(gNB->respDecode);
// L1 RX threadpool
gNB->threadPool_L1 = (tpool_t*)malloc(sizeof(tpool_t));
// L1 RX result FIFO
gNB->resp_L1 = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initTpool("-1", gNB->threadPool_L1, false);
initNotifiedFIFO(gNB->resp_L1);
pushNotifiedFIFO_nothreadSafe(gNB->resp_L1, newNotifiedFIFO_elt(20, 0,NULL,NULL));
// L1 TX threadpool
gNB->threadPool_L1_tx = (tpool_t*)malloc(sizeof(tpool_t));
// L1 TX result FIFO
gNB->resp_L1_tx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initTpool("-1,-1,-1",gNB->threadPool_L1_tx, false);
initNotifiedFIFO(gNB->resp_L1_tx);
// RU TX threadpool
gNB->threadPool_RU_tx = (tpool_t*)malloc(sizeof(tpool_t));
// RU TX result FIFO
gNB->resp_RU_tx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initTpool("-1",gNB->threadPool_RU_tx, false);
initNotifiedFIFO(gNB->resp_RU_tx);
}
......
......@@ -1513,20 +1513,15 @@ void tx_func(void *param) {
phy_procedures_gNB_TX(gNB, frame_tx,slot_tx, 1);
// start FH TX processing
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(processingData_RU_t),0,gNB->resp_RU_tx,ru_tx_func);
processingData_RU_t *syncMsg = (processingData_RU_t *)NotifiedFifoData(msg);
notifiedFIFO_elt_t *res;
res = pullTpool(gNB->resp_RU_tx, gNB->threadPool);
processingData_RU_t *syncMsg = (processingData_RU_t *)NotifiedFifoData(res);
syncMsg->frame_tx = frame_tx;
syncMsg->slot_tx = slot_tx;
syncMsg->timestamp_tx = info->timestamp_tx;
syncMsg->ru = gNB->RU_list[0];
res = tryPullTpool(gNB->resp_RU_tx, gNB->threadPool_RU_tx);
if (res == NULL) {
LOG_W(PHY,"RU TX of previous slot is still being processed. Adding current slot to queue (%d.%d)\n", frame_tx, slot_tx);
}
msg->key = slot_tx;
pushTpool(gNB->threadPool_RU_tx, msg);
res->key = slot_tx;
pushTpool(gNB->threadPool, res);
}
void rx_func(void *param) {
......@@ -1634,23 +1629,19 @@ void rx_func(void *param) {
//stop_meas( &softmodem_stats_rxtx_sf );
LOG_D(PHY,"%s() Exit proc[rx:%d%d tx:%d%d]\n", __FUNCTION__, frame_rx, slot_rx, frame_tx, slot_tx);
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(processingData_L1_t),0,gNB->resp_L1_tx,tx_func);
processingData_L1_t *syncMsg = (processingData_L1_t *)NotifiedFifoData(msg);
notifiedFIFO_elt_t *res;
syncMsg->gNB = gNB;
syncMsg->frame_rx = frame_rx;
syncMsg->slot_rx = slot_rx;
syncMsg->frame_tx = frame_tx;
syncMsg->slot_tx = slot_tx;
syncMsg->timestamp_tx = info->timestamp_tx;
if (tx_slot_type == NR_DOWNLINK_SLOT || tx_slot_type == NR_MIXED_SLOT) {
res = tryPullTpool(gNB->resp_L1_tx, gNB->threadPool_L1_tx);
if (res == NULL) {
LOG_W(PHY,"L1 TX of previous slot is still being processed. Adding current slot to queue (%d.%d)\n", frame_tx, slot_tx);
}
msg->key = slot_tx;
pushTpool(gNB->threadPool_L1_tx, msg);
res = pullTpool(gNB->resp_L1_tx, gNB->threadPool);
processingData_L1_t *syncMsg = (processingData_L1_t *)NotifiedFifoData(res);
syncMsg->gNB = gNB;
syncMsg->frame_rx = frame_rx;
syncMsg->slot_rx = slot_rx;
syncMsg->frame_tx = frame_tx;
syncMsg->slot_tx = slot_tx;
syncMsg->timestamp_tx = info->timestamp_tx;
res->key = slot_tx;
pushTpool(gNB->threadPool, res);
}
#if 0
......@@ -1703,9 +1694,7 @@ void *ru_thread( void *param ) {
int ret;
int slot = fp->slots_per_frame-1;
int frame = 1023;
char filename[40], threadname[40];
int print_frame = 8;
int i = 0;
char threadname[40];
int aa;
nfapi_nr_config_request_scf_t *cfg = &ru->config;
......@@ -1766,8 +1755,13 @@ void *ru_thread( void *param ) {
wait_sync("ru_thread");
notifiedFIFO_elt_t *msg = newNotifiedFIFO_elt(sizeof(processingData_L1_t),0,gNB->resp_L1,rx_func);
processingData_L1_t *syncMsg = (processingData_L1_t *)NotifiedFifoData(msg);
notifiedFIFO_elt_t *msgL1Tx = newNotifiedFIFO_elt(sizeof(processingData_L1_t),0,gNB->resp_L1_tx,tx_func);
notifiedFIFO_elt_t *msgRUTx = newNotifiedFIFO_elt(sizeof(processingData_L1_t),0,gNB->resp_RU_tx,ru_tx_func);
processingData_L1_t *syncMsg;
notifiedFIFO_elt_t *res;
pushNotifiedFIFO(gNB->resp_L1,msg); // to unblock the process in the beginning
pushNotifiedFIFO(gNB->resp_L1_tx,msgL1Tx); // to unblock the process in the beginning
pushNotifiedFIFO(gNB->resp_RU_tx,msgRUTx); // to unblock the process in the beginning
if(!emulate_rf) {
// Start RF device if any
......@@ -1902,61 +1896,17 @@ void *ru_thread( void *param ) {
}
// At this point, all information for subframe has been received on FH interface
res = pullTpool(gNB->resp_L1, gNB->threadPool);
syncMsg = (processingData_L1_t *)NotifiedFifoData(res);
syncMsg->gNB = gNB;
syncMsg->frame_rx = proc->frame_rx;
syncMsg->slot_rx = proc->tti_rx;
syncMsg->frame_tx = proc->frame_tx;
syncMsg->slot_tx = proc->tti_tx;
syncMsg->timestamp_tx = proc->timestamp_tx;
res->key = proc->tti_rx;
pushTpool(gNB->threadPool, res);
// check if previous L1 slot processing is finished
res = pullTpool(gNB->resp_L1, gNB->threadPool_L1);
if (res == NULL) {
LOG_W(PHY,"L1 RX of previous slot is still being processed. Adding current slot to queue (%d.%d)\n", proc->frame_rx, proc->tti_rx);
}
msg->key = proc->tti_rx;
pushTpool(gNB->threadPool_L1, msg);
if(get_thread_parallel_conf() == PARALLEL_SINGLE_THREAD || ru->num_gNB==0) {
// do TX front-end processing if needed (precoding and/or IDFTs)
if (ru->feptx_prec) ru->feptx_prec(ru,proc->frame_tx,proc->tti_tx);
// do OFDM with/without TX front-end processing if needed
if ((ru->fh_north_asynch_in == NULL) && (ru->feptx_ofdm)) ru->feptx_ofdm(ru,proc->frame_tx,proc->tti_tx);
if(!emulate_rf) {
// do outgoing fronthaul (south) if needed
if ((ru->fh_north_asynch_in == NULL) && (ru->fh_south_out)) ru->fh_south_out(ru,proc->frame_tx,proc->tti_tx,proc->timestamp_tx);
if (ru->fh_north_out) ru->fh_north_out(ru);
} else {
if(proc->frame_tx == print_frame) {
for (i=0; i<ru->nb_tx; i++) {
sprintf(filename,"tx%ddataF_frame%d_sl%d.m", i, print_frame, proc->tti_tx);
LOG_M(filename,"txdataF_frame",&ru->common.txdataF_BF[i][0],fp->samples_per_slot_wCP, 1, 1);
if(proc->tti_tx == 9) {
sprintf(filename,"tx%ddata_frame%d.m", i, print_frame);
LOG_M(filename,"txdata_frame",&ru->common.txdata[i][0],fp->samples_per_frame, 1, 1);
sprintf(filename,"tx%ddata_frame%d.dat", i, print_frame);
FILE *output_fd = fopen(filename,"w");
if (output_fd) {
fwrite(&ru->common.txdata[i][0],
sizeof(int32_t),
fp->samples_per_frame,
output_fd);
fclose(output_fd);
} else {
LOG_E(PHY,"Cannot write to file %s\n",filename);
}
}//if(proc->tti_tx == 9)
}//for (i=0; i<ru->nb_tx; i++)
}//if(proc->frame_tx == print_frame)
}//else emulate_rf
proc->emulate_rf_busy = 0;
}//if(get_thread_parallel_conf() == PARALLEL_SINGLE_THREAD)
}
printf( "Exiting ru_thread \n");
......@@ -1967,6 +1917,10 @@ void *ru_thread( void *param ) {
else LOG_I(PHY,"RU %d rf device stopped\n",ru->idx);
}
delNotifiedFIFO_elt(msg);
delNotifiedFIFO_elt(msgL1Tx);
delNotifiedFIFO_elt(msgRUTx);
ru_thread_status = 0;
return &ru_thread_status;
}
......
......@@ -640,7 +640,7 @@ uint32_t nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
rdata->offset = offset;
rdata->ulsch = ulsch;
rdata->ulsch_id = UE_id;
pushTpool(phy_vars_gNB->threadPool_ulsch,req);
pushTpool(phy_vars_gNB->threadPool,req);
phy_vars_gNB->nbDecode++;
LOG_D(PHY,"Added a block to decode, in pipe: %d\n",phy_vars_gNB->nbDecode);
r_offset += E;
......
......@@ -835,10 +835,7 @@ typedef struct PHY_VARS_gNB_s {
notifiedFIFO_t *resp_L1;
notifiedFIFO_t *resp_L1_tx;
notifiedFIFO_t *resp_RU_tx;
tpool_t *threadPool_ulsch;
tpool_t *threadPool_L1;
tpool_t *threadPool_L1_tx;
tpool_t *threadPool_RU_tx;
tpool_t *threadPool;
int nbDecode;
} PHY_VARS_gNB;
......
......@@ -243,7 +243,7 @@ void nr_postDecode(PHY_VARS_gNB *gNB, notifiedFIFO_elt_t *req) {
} else {
if ( rdata->nbSegments != ulsch_harq->processedSegments ) {
int nb=abortTpool(gNB->threadPool_ulsch, req->key);
int nb=abortTpool(gNB->threadPool, req->key);
nb+=abortNotifiedFIFO(gNB->respDecode, req->key);
gNB->nbDecode-=nb;
LOG_D(PHY,"uplink segment error %d/%d, aborted %d segments\n",rdata->segment_r,rdata->nbSegments, nb);
......@@ -353,7 +353,7 @@ void nr_ulsch_procedures(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx, int ULSCH
G);
while (gNB->nbDecode > 0) {
notifiedFIFO_elt_t *req=pullTpool(gNB->respDecode, gNB->threadPool_ulsch);
notifiedFIFO_elt_t *req=pullTpool(gNB->respDecode, gNB->threadPool);
nr_postDecode(gNB, req);
delNotifiedFIFO_elt(req);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment