Commit 1af5a89d authored by Sakthivel Velumani's avatar Sakthivel Velumani

Removed active waiting loop

which could block threads. Instead used a separate thread to reorder the finished the job messages
parent 0ac69fb1
......@@ -87,7 +87,7 @@
#include <openair1/PHY/NR_TRANSPORT/nr_ulsch.h>
#include <openair1/PHY/NR_TRANSPORT/nr_dlsch.h>
#include <PHY/NR_ESTIMATION/nr_ul_estimation.h>
//#define DEBUG_THREADS 1
#define DEBUG_THREADS 0
//#define USRP_DEBUG 1
// Fix per CC openair rf/if device update
......@@ -114,7 +114,6 @@ time_stats_t softmodem_stats_rx_sf; // total rx time
void tx_func(void *param) {
processingData_L1tx_t *info = (processingData_L1tx_t *) param;
PHY_VARS_gNB *gNB = info->gNB;
int frame_tx = info->frame;
int slot_tx = info->slot;
......@@ -122,31 +121,6 @@ void tx_func(void *param) {
frame_tx,
slot_tx,
1);
info->slot = -1;
//if ((frame_tx&127) == 0) dump_pdsch_stats(fd,gNB);
// If the later of the 2 L1 tx thread finishes first,
// we wait for the earlier one to finish and start the RU thread
// to avoid realtime issues with USRP
// Start RU TX processing.
notifiedFIFO_elt_t *res;
res = pullTpool(gNB->resp_RU_tx, gNB->threadPool);
processingData_RU_t *syncMsg = (processingData_RU_t *)NotifiedFifoData(res);
LOG_D(PHY,"waiting for previous tx to finish, next slot %d,%d\n",syncMsg->next_slot,slot_tx);
while (syncMsg->next_slot != slot_tx) {
pushNotifiedFIFO(gNB->resp_RU_tx, res);
res = pullTpool(gNB->resp_RU_tx, gNB->threadPool);
syncMsg = (processingData_RU_t *)NotifiedFifoData(res);
}
LOG_D(PHY,"previous tx finished, next slot %d,%d\n",syncMsg->next_slot,slot_tx);
syncMsg->frame_tx = frame_tx;
syncMsg->slot_tx = slot_tx;
syncMsg->next_slot = get_next_downlink_slot(gNB, &gNB->gNB_config, frame_tx, slot_tx);
syncMsg->timestamp_tx = info->timestamp_tx;
syncMsg->ru = gNB->RU_list[0];
res->key = slot_tx;
pushTpool(gNB->threadPool, res);
}
void rx_func(void *param) {
......@@ -276,15 +250,12 @@ void rx_func(void *param) {
if (tx_slot_type == NR_DOWNLINK_SLOT || tx_slot_type == NR_MIXED_SLOT) {
notifiedFIFO_elt_t *res;
res = pullTpool(gNB->resp_L1_tx, gNB->threadPool);
processingData_L1tx_t *syncMsg = (processingData_L1tx_t *)NotifiedFifoData(res);
while (syncMsg->slot != slot_tx) {
pushNotifiedFIFO(gNB->resp_L1_tx, res);
res = pullTpool(gNB->resp_L1_tx, gNB->threadPool);
syncMsg = (processingData_L1tx_t *)NotifiedFifoData(res);
}
processingData_L1tx_t *syncMsg;
// Its a FIFO so it maitains the order in which the MAC fills the messages
// so no need for checking for right slot
res = pullTpool(gNB->L1_tx_filled, gNB->threadPool);
syncMsg = (processingData_L1tx_t *)NotifiedFifoData(res);
syncMsg->gNB = gNB;
AssertFatal(syncMsg->slot == slot_tx, "Thread message slot and logical slot number do not match\n");
syncMsg->timestamp_tx = info->timestamp_tx;
res->key = slot_tx;
pushTpool(gNB->threadPool, res);
......@@ -390,6 +361,64 @@ void *nrL1_stats_thread(void *param) {
return(NULL);
}
// This thread reads the finished L1 tx jobs from threaPool
// and pushes RU tx thread in the right order. It works only
// two parallel L1 tx threads.
void *tx_reorder_thread(void* param) {
PHY_VARS_gNB *gNB = (PHY_VARS_gNB *)param;
notifiedFIFO_elt_t *resL1;
notifiedFIFO_elt_t *resL1Reserve = NULL;
notifiedFIFO_elt_t *resRU;
processingData_L1tx_t *syncMsgL1;
processingData_RU_t *syncMsgRU;
while (!oai_exit) {
// check if there is a message in reserve
if (resL1Reserve) {
syncMsgL1 = (processingData_L1tx_t *)NotifiedFifoData(resL1Reserve);
if (syncMsgL1->slot == gNB->next_tx_slot) {
resRU = pullTpool(gNB->resp_RU_tx, gNB->threadPool);
// processing of last ru_tx_func finished
syncMsgRU = (processingData_RU_t *)NotifiedFifoData(resRU);
syncMsgRU->frame_tx = syncMsgL1->frame;
syncMsgRU->slot_tx = syncMsgL1->slot;
syncMsgRU->timestamp_tx = syncMsgL1->timestamp_tx;
syncMsgRU->ru = gNB->RU_list[0];
resRU->key = syncMsgL1->slot;
gNB->next_tx_slot = get_next_downlink_slot(gNB, &gNB->gNB_config, syncMsgRU->frame_tx, syncMsgRU->slot_tx);
pushNotifiedFIFO(gNB->L1_tx_free, resL1Reserve);
pushTpool(gNB->threadPool, resRU);
resL1Reserve = NULL;
} else {
#if DEBUG_THREADS
printf("Waiting for reserve L1 Tx message to be sent to RU for slot %d\n", syncMsgL1->slot);
#endif
}
}
// pull message from output FIFO of tx_func
resL1 = pullTpool(gNB->L1_tx_out, gNB->threadPool);
syncMsgL1 = (processingData_L1tx_t *)NotifiedFifoData(resL1);
if (syncMsgL1->slot == gNB->next_tx_slot) {
resRU = pullTpool(gNB->resp_RU_tx, gNB->threadPool);
// processing of last ru_tx_func finished
syncMsgRU = (processingData_RU_t *)NotifiedFifoData(resRU);
syncMsgRU->frame_tx = syncMsgL1->frame;
syncMsgRU->slot_tx = syncMsgL1->slot;
syncMsgRU->timestamp_tx = syncMsgL1->timestamp_tx;
syncMsgRU->ru = gNB->RU_list[0];
resRU->key = syncMsgL1->slot;
gNB->next_tx_slot = get_next_downlink_slot(gNB, &gNB->gNB_config, syncMsgRU->frame_tx, syncMsgRU->slot_tx);
pushNotifiedFIFO(gNB->L1_tx_free, resL1);
pushTpool(gNB->threadPool, resRU);
} else {
AssertFatal(resL1Reserve == NULL, "Error! There is already a waiting message\n");
resL1Reserve = resL1;
}
}
return(NULL);
}
void init_gNB_Tpool(int inst) {
PHY_VARS_gNB *gNB;
gNB = RC.gNB[inst];
......@@ -421,38 +450,39 @@ void init_gNB_Tpool(int inst) {
pushNotifiedFIFO(gNB->resp_L1,msg); // to unblock the process in the beginning
// L1 TX result FIFO
gNB->resp_L1_tx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(gNB->resp_L1_tx);
gNB->L1_tx_free = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
gNB->L1_tx_filled = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
gNB->L1_tx_out = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(gNB->L1_tx_free);
initNotifiedFIFO(gNB->L1_tx_filled);
initNotifiedFIFO(gNB->L1_tx_out);
// we create 2 threads for L1 tx processing
notifiedFIFO_elt_t *msgL1Tx = newNotifiedFIFO_elt(sizeof(processingData_L1tx_t),0,gNB->resp_L1_tx,tx_func);
notifiedFIFO_elt_t *msgL1Tx = newNotifiedFIFO_elt(sizeof(processingData_L1tx_t),0,gNB->L1_tx_out,tx_func);
processingData_L1tx_t *msgDataTx = (processingData_L1tx_t *)NotifiedFifoData(msgL1Tx);
init_DLSCH_struct(gNB, msgDataTx);
msgDataTx->slot = -1;
memset(msgDataTx->ssb, 0, 64*sizeof(NR_gNB_SSB_t));
reset_meas(&msgDataTx->phy_proc_tx);
gNB->phy_proc_tx_0 = &msgDataTx->phy_proc_tx;
pushNotifiedFIFO(gNB->resp_L1_tx,msgL1Tx); // to unblock the process in the beginning
pushNotifiedFIFO(gNB->L1_tx_free,msgL1Tx); // to unblock the process in the beginning
msgL1Tx = newNotifiedFIFO_elt(sizeof(processingData_L1tx_t),0,gNB->resp_L1_tx,tx_func);
msgL1Tx = newNotifiedFIFO_elt(sizeof(processingData_L1tx_t),0,gNB->L1_tx_out,tx_func);
msgDataTx = (processingData_L1tx_t *)NotifiedFifoData(msgL1Tx);
init_DLSCH_struct(gNB, msgDataTx);
msgDataTx->slot = -1;
memset(msgDataTx->ssb, 0, 64*sizeof(NR_gNB_SSB_t));
reset_meas(&msgDataTx->phy_proc_tx);
gNB->phy_proc_tx_1 = &msgDataTx->phy_proc_tx;
pushNotifiedFIFO(gNB->resp_L1_tx,msgL1Tx); // to unblock the process in the beginning
pushNotifiedFIFO(gNB->L1_tx_free,msgL1Tx); // to unblock the process in the beginning
// RU TX result FIFO
gNB->resp_RU_tx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(gNB->resp_RU_tx);
notifiedFIFO_elt_t *msgRUTx = newNotifiedFIFO_elt(sizeof(processingData_RU_t),0,gNB->resp_RU_tx,ru_tx_func);
processingData_RU_t *msgData = (processingData_RU_t*)msgRUTx->msgData;
int first_tx_slot = sf_ahead*gNB->frame_parms.slots_per_subframe;
msgData->next_slot = get_next_downlink_slot(gNB, &gNB->gNB_config, 0, first_tx_slot-1);
pushNotifiedFIFO(gNB->resp_RU_tx,msgRUTx); // to unblock the process in the beginning
gNB->next_tx_slot = get_next_downlink_slot(gNB, &gNB->gNB_config, 0, first_tx_slot-1);
threadCreate(&proc->L1_stats_thread,nrL1_stats_thread,(void*)gNB,"L1_stats",-1,OAI_PRIORITY_RT_LOW);
threadCreate(&proc->pthread_tx_reorder, tx_reorder_thread, (void *)gNB, "thread_tx_reorder", -1, OAI_PRIORITY_RT_MAX);
}
......
......@@ -1445,9 +1445,9 @@ void *ru_thread( void *param ) {
res = pullNotifiedFIFO(gNB->resp_L1);
delNotifiedFIFO_elt(res);
res = pullNotifiedFIFO(gNB->resp_L1_tx);
res = pullNotifiedFIFO(gNB->L1_tx_free);
delNotifiedFIFO_elt(res);
res = pullNotifiedFIFO(gNB->resp_L1_tx);
res = pullNotifiedFIFO(gNB->L1_tx_free);
delNotifiedFIFO_elt(res);
res = pullNotifiedFIFO(gNB->resp_RU_tx);
delNotifiedFIFO_elt(res);
......
......@@ -1108,39 +1108,9 @@ notifiedFIFO_elt_t *l1tx_message_extract(PHY_VARS_gNB *gNB, int frame, int slot)
notifiedFIFO_elt_t *res;
notifiedFIFO_elt_t *freeRes = NULL;
// check first message
res = pullTpool(gNB->resp_L1_tx, gNB->threadPool);
processingData_L1tx_t *msgTx = (processingData_L1tx_t *)NotifiedFifoData(res);
if (msgTx->slot == slot) {
return res;
}
if (msgTx->slot == -1) {
freeRes = res;
}
// check second message
pushNotifiedFIFO(gNB->resp_L1_tx,res);
res = pullTpool(gNB->resp_L1_tx, gNB->threadPool);
msgTx = (processingData_L1tx_t *)NotifiedFifoData(res);
if (msgTx->slot == slot) {
return res;
}
if (msgTx->slot == -1) {
freeRes = res;
}
if (freeRes) {
msgTx = (processingData_L1tx_t *)NotifiedFifoData(res);
msgTx->num_pdsch_slot=0;
msgTx->pdcch_pdu.pdcch_pdu_rel15.numDlDci = 0;
msgTx->ul_pdcch_pdu.pdcch_pdu.pdcch_pdu_rel15.numDlDci = 0;
msgTx->slot = slot;
msgTx->frame = frame;
return freeRes;
}
pushNotifiedFIFO(gNB->resp_L1_tx,res);
AssertFatal(1==0, "It means both L1 Tx messages are still waiting to be processed. This happens when L1 Tx processing is too slow. Message slot %d, scheduled slot %d\n",
msgTx->slot, slot);
//TODO: This needs to be reworked for nfapi to work
res = pullTpool(gNB->L1_tx_free, gNB->threadPool);
return res;
}
int pnf_phy_ul_dci_req(gNB_L1_rxtx_proc_t *proc, nfapi_pnf_p7_config_t *pnf_p7, nfapi_nr_ul_dci_request_t *req) {
......@@ -1166,7 +1136,7 @@ int pnf_phy_ul_dci_req(gNB_L1_rxtx_proc_t *proc, nfapi_pnf_p7_config_t *pnf_p7,
}
}
pushNotifiedFIFO(gNB->resp_L1_tx,res);
pushNotifiedFIFO(gNB->L1_tx_filled,res);
return 0;
}
......@@ -1274,7 +1244,7 @@ int pnf_phy_dl_tti_req(gNB_L1_rxtx_proc_t *proc, nfapi_pnf_p7_config_t *pnf_p7,
else {
NFAPI_TRACE(NFAPI_TRACE_ERROR, "%s() UNKNOWN:%d\n", __FUNCTION__, dl_tti_pdu_list[i].PDUType);
}
pushNotifiedFIFO(gNB->resp_L1_tx,res);
pushNotifiedFIFO(gNB->L1_tx_filled,res);
}
if(req->vendor_extension)
......
......@@ -608,6 +608,8 @@ typedef struct gNB_L1_proc_t_s {
pthread_t L1_stats_thread;
/// pthread structure for printing time meas
pthread_t process_stats_thread;
/// pthread structure for reordering L1 tx thread messages
pthread_t pthread_tx_reorder;
/// flag to indicate first RX acquisition
int first_rx;
/// flag to indicate first TX transmission
......@@ -897,11 +899,14 @@ typedef struct PHY_VARS_gNB_s {
*/
notifiedFIFO_t *respDecode;
notifiedFIFO_t *resp_L1;
notifiedFIFO_t *resp_L1_tx;
notifiedFIFO_t *L1_tx_free;
notifiedFIFO_t *L1_tx_filled;
notifiedFIFO_t *L1_tx_out;
notifiedFIFO_t *resp_RU_tx;
tpool_t *threadPool;
int nbDecode;
uint8_t pusch_proc_threads;
int next_tx_slot;
int number_of_nr_dlsch_max;
int number_of_nr_ulsch_max;
void * scopeData;
......
......@@ -148,11 +148,9 @@ void nr_schedule_response(NR_Sched_Rsp_t *Sched_INFO){
AssertFatal(RC.gNB[Mod_id]!=NULL,"RC.gNB[%d] is null\n",Mod_id);
gNB = RC.gNB[Mod_id];
nfapi_nr_config_request_scf_t *cfg = &gNB->gNB_config;
notifiedFIFO_elt_t *res;
res = pullTpool(gNB->resp_L1_tx, gNB->threadPool);
processingData_L1tx_t *msgTx = (processingData_L1tx_t *)NotifiedFifoData(res);
int slot_type = nr_slot_select(cfg,frame,slot);
uint8_t number_dl_pdu = (DL_req==NULL) ? 0 : DL_req->dl_tti_request_body.nPDUs;
uint8_t number_ul_dci_pdu = (UL_dci_req==NULL) ? 0 : UL_dci_req->numPdus;
uint8_t number_ul_tti_pdu = (UL_tti_req==NULL) ? 0 : UL_tti_req->n_pdus;
......@@ -166,52 +164,58 @@ void nr_schedule_response(NR_Sched_Rsp_t *Sched_INFO){
TX_req->SFN,TX_req->Slot,TX_req->Number_of_PDUs,
number_ul_dci_pdu,number_ul_tti_pdu);
int pdcch_received=0;
msgTx->num_pdsch_slot=0;
msgTx->pdcch_pdu.pdcch_pdu_rel15.numDlDci = 0;
msgTx->ul_pdcch_pdu.pdcch_pdu.pdcch_pdu_rel15.numDlDci = 0;
msgTx->slot = slot;
msgTx->frame = frame;
for (int i=0;i<number_dl_pdu;i++) {
nfapi_nr_dl_tti_request_pdu_t *dl_tti_pdu = &DL_req->dl_tti_request_body.dl_tti_pdu_list[i];
LOG_D(PHY,"NFAPI: dl_pdu %d : type %d\n",i,dl_tti_pdu->PDUType);
switch (dl_tti_pdu->PDUType) {
case NFAPI_NR_DL_TTI_SSB_PDU_TYPE:
handle_nr_nfapi_ssb_pdu(msgTx,frame,slot,
dl_tti_pdu);
break;
case NFAPI_NR_DL_TTI_PDCCH_PDU_TYPE:
AssertFatal(pdcch_received == 0, "pdcch_received is not 0, we can only handle one PDCCH PDU per slot\n");
msgTx->pdcch_pdu = dl_tti_pdu->pdcch_pdu;
pdcch_received = 1;
break;
case NFAPI_NR_DL_TTI_CSI_RS_PDU_TYPE:
LOG_D(PHY,"frame %d, slot %d, Got NFAPI_NR_DL_TTI_CSI_RS_PDU_TYPE for %d.%d\n",frame,slot,DL_req->SFN,DL_req->Slot);
handle_nfapi_nr_csirs_pdu(msgTx,frame,slot,
&dl_tti_pdu->csi_rs_pdu);
break;
case NFAPI_NR_DL_TTI_PDSCH_PDU_TYPE:
LOG_D(PHY,"frame %d, slot %d, Got NFAPI_NR_DL_TTI_PDSCH_PDU_TYPE for %d.%d\n",frame,slot,DL_req->SFN,DL_req->Slot);
nfapi_nr_dl_tti_pdsch_pdu_rel15_t *pdsch_pdu_rel15 = &dl_tti_pdu->pdsch_pdu.pdsch_pdu_rel15;
uint16_t pduIndex = pdsch_pdu_rel15->pduIndex;
AssertFatal(TX_req->pdu_list[pduIndex].num_TLV == 1, "TX_req->pdu_list[%d].num_TLV %d != 1\n",
pduIndex,TX_req->pdu_list[pduIndex].num_TLV);
uint8_t *sdu = (uint8_t *)TX_req->pdu_list[pduIndex].TLVs[0].value.direct;
AssertFatal(msgTx->num_pdsch_slot < gNB->number_of_nr_dlsch_max,"Number of PDSCH PDUs %d exceeded the limit %d\n",
msgTx->num_pdsch_slot,gNB->number_of_nr_dlsch_max);
handle_nr_nfapi_pdsch_pdu(msgTx,&dl_tti_pdu->pdsch_pdu, sdu);
if (slot_type == NR_DOWNLINK_SLOT || slot_type == NR_MIXED_SLOT) {
notifiedFIFO_elt_t *res;
res = pullTpool(gNB->L1_tx_free, gNB->threadPool);
processingData_L1tx_t *msgTx = (processingData_L1tx_t *)NotifiedFifoData(res);
int pdcch_received=0;
msgTx->num_pdsch_slot=0;
msgTx->pdcch_pdu.pdcch_pdu_rel15.numDlDci = 0;
msgTx->ul_pdcch_pdu.pdcch_pdu.pdcch_pdu_rel15.numDlDci = 0;
msgTx->slot = slot;
msgTx->frame = frame;
for (int i=0;i<number_dl_pdu;i++) {
nfapi_nr_dl_tti_request_pdu_t *dl_tti_pdu = &DL_req->dl_tti_request_body.dl_tti_pdu_list[i];
LOG_D(PHY,"NFAPI: dl_pdu %d : type %d\n",i,dl_tti_pdu->PDUType);
switch (dl_tti_pdu->PDUType) {
case NFAPI_NR_DL_TTI_SSB_PDU_TYPE:
handle_nr_nfapi_ssb_pdu(msgTx,frame,slot,
dl_tti_pdu);
break;
case NFAPI_NR_DL_TTI_PDCCH_PDU_TYPE:
AssertFatal(pdcch_received == 0, "pdcch_received is not 0, we can only handle one PDCCH PDU per slot\n");
msgTx->pdcch_pdu = dl_tti_pdu->pdcch_pdu;
pdcch_received = 1;
break;
case NFAPI_NR_DL_TTI_CSI_RS_PDU_TYPE:
LOG_D(PHY,"frame %d, slot %d, Got NFAPI_NR_DL_TTI_CSI_RS_PDU_TYPE for %d.%d\n",frame,slot,DL_req->SFN,DL_req->Slot);
handle_nfapi_nr_csirs_pdu(msgTx,frame,slot,
&dl_tti_pdu->csi_rs_pdu);
break;
case NFAPI_NR_DL_TTI_PDSCH_PDU_TYPE:
LOG_D(PHY,"frame %d, slot %d, Got NFAPI_NR_DL_TTI_PDSCH_PDU_TYPE for %d.%d\n",frame,slot,DL_req->SFN,DL_req->Slot);
nfapi_nr_dl_tti_pdsch_pdu_rel15_t *pdsch_pdu_rel15 = &dl_tti_pdu->pdsch_pdu.pdsch_pdu_rel15;
uint16_t pduIndex = pdsch_pdu_rel15->pduIndex;
AssertFatal(TX_req->pdu_list[pduIndex].num_TLV == 1, "TX_req->pdu_list[%d].num_TLV %d != 1\n",
pduIndex,TX_req->pdu_list[pduIndex].num_TLV);
uint8_t *sdu = (uint8_t *)TX_req->pdu_list[pduIndex].TLVs[0].value.direct;
AssertFatal(msgTx->num_pdsch_slot < gNB->number_of_nr_dlsch_max,"Number of PDSCH PDUs %d exceeded the limit %d\n",
msgTx->num_pdsch_slot,gNB->number_of_nr_dlsch_max);
handle_nr_nfapi_pdsch_pdu(msgTx,&dl_tti_pdu->pdsch_pdu, sdu);
}
}
}
if (number_ul_dci_pdu > 0)
msgTx->ul_pdcch_pdu = UL_dci_req->ul_dci_pdu_list[number_ul_dci_pdu-1]; // copy the last pdu
if (number_ul_dci_pdu > 0)
msgTx->ul_pdcch_pdu = UL_dci_req->ul_dci_pdu_list[number_ul_dci_pdu-1]; // copy the last pdu
pushNotifiedFIFO(gNB->resp_L1_tx,res);
pushNotifiedFIFO(gNB->L1_tx_filled,res);
}
for (int i = 0; i < number_ul_tti_pdu; i++) {
switch (UL_tti_req->pdus_list[i].pdu_type) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment