Commit 08ea6180 authored by laurent's avatar laurent

fix important bug in CU, merge thread pool improvement and rf simulator...

fix important bug in CU, merge thread pool improvement and rf simulator improvements made initialy in 5G branch
parent e8091083
......@@ -49,23 +49,6 @@ void *one_thread(void *arg) {
struct one_thread *myThread=(struct one_thread *) arg;
struct thread_pool *tp=myThread->pool;
// configure the thread core assignment
// TBD: reserve the core for us exclusively
if ( myThread->coreID >= 0 && myThread->coreID < get_nprocs_conf()) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(myThread->coreID, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}
//Configure the thread scheduler policy for Linux
struct sched_param sparam= {0};
sparam.sched_priority = sched_get_priority_max(SCHED_RR);
pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
// set the thread name for debugging
sprintf(myThread->name,"Tpool_%d",myThread->coreID);
pthread_setname_np(pthread_self(), myThread->name );
// Infinite loop to process requests
do {
notifiedFIFO_elt_t *elt=pullNotifiedFifoRemember(&tp->incomingFifo, myThread);
......@@ -106,10 +89,6 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
} else
pool->traceFd=-1;
//Configure the thread scheduler policy for Linux
struct sched_param sparam= {0};
sparam.sched_priority = sched_get_priority_max(SCHED_RR)-1;
pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
pool->activated=true;
initNotifiedFIFO(&pool->incomingFifo);
char *saveptr, * curptr;
......@@ -137,7 +116,11 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
pool->allthreads->coreID=atoi(curptr);
pool->allthreads->id=pool->nbThreads;
pool->allthreads->pool=pool;
pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads);
//Configure the thread scheduler policy for Linux
// set the thread name for debugging
sprintf(pool->allthreads->name,"Tpool_%d",pool->allthreads->coreID);
threadCreate(&pool->allthreads->threadID, one_thread, (void *)pool->allthreads,
pool->allthreads->name, pool->allthreads->coreID, OAI_PRIORITY_RT);
pool->nbThreads++;
}
......@@ -152,6 +135,9 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
#ifdef TEST_THREAD_POOL
void exit_function(const char *file, const char *function, const int line, const char *s) {
}
struct testData {
int id;
char txt[30];
......
......@@ -11,6 +11,7 @@
#include <sys/syscall.h>
#include <assertions.h>
#include <LOG/log.h>
#include <common/utils/system.h>
#ifdef DEBUG
#define THREADINIT PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
......@@ -77,41 +78,60 @@ static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
//LOG_W(UTIL,"delNotifiedFIFO on something not allocated by newNotifiedFIFO\n");
}
static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
nf->inF=NULL;
nf->outF=NULL;
}
static inline void initNotifiedFIFO(notifiedFIFO_t *nf) {
mutexinit(nf->lockF);
condinit (nf->notifF);
nf->inF=NULL;
nf->outF=NULL;
initNotifiedFIFO_nothreadSafe(nf);
// No delete function: the creator has only to free the memory
}
static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
mutexlock(nf->lockF);
static inline void pushNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
msg->next=NULL;
if (nf->outF == NULL)
nf->outF = msg;
if (nf->inF)
if (nf->inF != NULL)
nf->inF->next = msg;
nf->inF = msg;
condbroadcast(nf->notifF);
mutexunlock(nf->lockF);
}
static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
mutexlock(nf->lockF);
pushNotifiedFIFO_nothreadSafe(nf,msg);
condbroadcast(nf->notifF);
mutexunlock(nf->lockF);
}
while(!nf->outF)
condwait(nf->notifF, nf->lockF);
static inline notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
if (nf->outF == NULL)
return NULL;
notifiedFIFO_elt_t *ret=nf->outF;
if (nf->outF==nf->outF->next)
LOG_E(TMR,"Circular list in thread pool: push several times the same buffer is forbidden\n");
nf->outF=nf->outF->next;
if (nf->outF==NULL)
nf->inF=NULL;
return ret;
}
static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
mutexlock(nf->lockF);
notifiedFIFO_elt_t *ret;
while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL)
condwait(nf->notifF, nf->lockF);
mutexunlock(nf->lockF);
return ret;
}
......@@ -122,14 +142,7 @@ static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
if (tmp != 0 )
return NULL;
notifiedFIFO_elt_t *ret=nf->outF;
if (ret!=NULL)
nf->outF=nf->outF->next;
if (nf->outF==NULL)
nf->inF=NULL;
notifiedFIFO_elt_t *ret=pullNotifiedFIFO_nothreadSafe(nf);
mutexunlock(nf->lockF);
return ret;
}
......@@ -151,6 +164,7 @@ static inline int abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) {
} else
start=&(*start)->next;
}
if (nf->outF == NULL)
nf->inF=NULL;
......@@ -184,14 +198,18 @@ typedef struct thread_pool {
static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) {
if (t->measurePerf) msg->creationTime=rdtsc();
if ( t->activated)
pushNotifiedFIFO(&t->incomingFifo, msg);
else {
if (t->measurePerf)
msg->startProcessingTime=rdtsc();
msg->processingFunc(NotifiedFifoData(msg));
if (t->measurePerf)
msg->endProcessingTime=rdtsc();
if (msg->reponseFifo)
pushNotifiedFIFO(msg->reponseFifo, msg);
}
......@@ -202,10 +220,8 @@ static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_
if (t->measurePerf)
msg->returnTime=rdtsc();
if (t->traceFd)
if (t->traceFd >= 0)
if(write(t->traceFd, msg, sizeof(*msg)));
return msg;
}
......@@ -217,10 +233,8 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo
if (t->measurePerf)
msg->returnTime=rdtsc();
if (t->traceFd)
if(write(t->traceFd, msg, sizeof(*msg)));
return msg;
}
......@@ -239,6 +253,7 @@ static inline int abortTpool(tpool_t *t, uint64_t key) {
} else
start=&(*start)->next;
}
if (t->incomingFifo.outF==NULL)
t->incomingFifo.inF=NULL;
......
......@@ -1324,6 +1324,9 @@ void *DL_du_fs6(void *arg) {
feptx_prec(ru, &L1_proc);
feptx_ofdm(ru, &L1_proc);
tx_rf(ru, &L1_proc);
if ( IS_SOFTMODEM_RFSIM )
return NULL;
}
return NULL;
......@@ -1443,13 +1446,22 @@ void *cu_fs6(void *arg) {
remoteIP=DU_IP;
AssertFatal(createUDPsock(NULL, CU_PORT, remoteIP, DU_PORT, &sockFS6), "");
L1_rxtx_proc_t L1proc= {0};
if ( strlen(get_softmodem_params()->threadPoolConfig) > 0 )
initTpool(get_softmodem_params()->threadPoolConfig, &L1proc.threadPool, true);
else
initTpool("n", &L1proc.threadPool, true);
initNotifiedFIFO(&L1proc.respEncode);
initNotifiedFIFO(&L1proc.respDecode);
uint64_t timeStamp=0;
initStaticTime(begingWait);
initStaticTime(begingWait2);
initRefTimes(waitDUAndProcessingUL);
initRefTimes(makeSendDL);
initRefTimes(fullLoop);
L1_rxtx_proc_t L1proc= {0};
while(1) {
timeStamp+=ru->frame_parms.samples_per_tti;
......@@ -1496,6 +1508,7 @@ void *du_fs6(void *arg) {
initRefTimes(waitRxAndProcessingUL);
initRefTimes(fullLoop);
pthread_t t;
if ( !IS_SOFTMODEM_RFSIM )
threadCreate(&t, DL_du_fs6, (void *)ru, "MainDuTx", -1, OAI_PRIORITY_RT_MAX);
while(1) {
......@@ -1503,6 +1516,8 @@ void *du_fs6(void *arg) {
updateTimesReset(begingWait, &fullLoop, 1000, true,"DU for full SubFrame (must be less 1ms)");
pickStaticTime(begingWait);
UL_du_fs6(ru, &L1proc);
if ( IS_SOFTMODEM_RFSIM )
DL_du_fs6((void *)ru);
updateTimesReset(begingWait, &waitRxAndProcessingUL, 1000, true,"DU Time in wait Rx + Ul processing");
}
......
......@@ -688,7 +688,7 @@ static void *ru_thread( void *param ) {
setbuf(stdout, NULL);
setbuf(stderr, NULL);
RU_t *ru = (RU_t *)param;
L1_rxtx_proc_t L1proc;
L1_rxtx_proc_t L1proc={0};
L1_rxtx_proc_t *proc=&L1proc;
if ( strlen(get_softmodem_params()->threadPoolConfig) > 0 )
......
......@@ -70,8 +70,8 @@ typedef struct complex16 sample_t; // 2*16 bits complex number
typedef struct buffer_s {
int conn_sock;
openair0_timestamp lastReceivedTS;
openair0_timestamp lastWroteTS;
bool headerMode;
bool trashingPacket;
samplesBlockHeader_t th;
char *transferPtr;
uint64_t remainToTransfer;
......@@ -83,6 +83,7 @@ typedef struct buffer_s {
typedef struct {
int listen_sock, epollfd;
openair0_timestamp nextTimestamp;
openair0_timestamp lastWroteTS;
uint64_t typeStamp;
char *ip;
uint16_t port;
......@@ -102,8 +103,8 @@ void allocCirBuf(rfsimulator_state_t *bridge, int sock) {
ptr->circularBufEnd=((char *)ptr->circularBuf)+sampleToByte(CirSize,1);
ptr->conn_sock=sock;
ptr->lastReceivedTS=0;
ptr->lastWroteTS=0;
ptr->headerMode=true;
ptr->trashingPacket=false;
ptr->transferPtr=(char *)&ptr->th;
ptr->remainToTransfer=sizeof(samplesBlockHeader_t);
int sendbuff=1000*1000*100;
......@@ -219,8 +220,6 @@ void fullwrite(int fd, void *_buf, ssize_t count, rfsimulator_state_t *t) {
}
}
void rfsimulator_readconfig(rfsimulator_state_t *rfsimulator) {
char *saveF=NULL;
char *modelname=NULL;
......@@ -317,19 +316,16 @@ sin_addr:
return 0;
}
static int rfsimulator_write_internal(rfsimulator_state_t *t, openair0_timestamp timestamp, void **samplesVoid, int nsamps, int nbAnt, int flags) {
static int rfsimulator_write_internal(rfsimulator_state_t *t, openair0_timestamp timestamp, void **samplesVoid, int nsamps, int nbAnt, int flags, bool alreadyLocked) {
if (!alreadyLocked)
pthread_mutex_lock(&Sockmutex);
LOG_D(HW,"sending %d samples at time: %ld\n", nsamps, timestamp);
for (int i=0; i<FD_SETSIZE; i++) {
buffer_t *b=&t->buf[i];
if (b->conn_sock >= 0 ) {
pthread_mutex_lock(&Sockmutex);
if ( b->lastWroteTS != 0 && abs((double)b->lastWroteTS-timestamp) > (double)CirSize)
LOG_E(HW,"Discontinuous TX gap too large Tx:%lu, %lu\n", b->lastWroteTS, timestamp);
AssertFatal(b->lastWroteTS <= timestamp+1, " Not supported to send Tx out of order (same in USRP) %lu, %lu\n", b->lastWroteTS, timestamp);
samplesBlockHeader_t header= {t->typeStamp, nsamps, nbAnt, timestamp};
fullwrite(b->conn_sock,&header, sizeof(header), t);
sample_t tmpSamples[nsamps][nbAnt];
......@@ -343,24 +339,29 @@ static int rfsimulator_write_internal(rfsimulator_state_t *t, openair0_timestamp
if (b->conn_sock >= 0 ) {
fullwrite(b->conn_sock, (void *)tmpSamples, sampleToByte(nsamps,nbAnt), t);
b->lastWroteTS=timestamp+nsamps;
}
pthread_mutex_unlock(&Sockmutex);
}
}
if ( t->lastWroteTS != 0 && abs((double)t->lastWroteTS-timestamp) > (double)CirSize)
LOG_E(HW,"Discontinuous TX gap too large Tx:%lu, %lu\n", t->lastWroteTS, timestamp);
AssertFatal(t->lastWroteTS <= timestamp+1, " Not supported to send Tx out of order (same in USRP) %lu, %lu\n",
t->lastWroteTS, timestamp);
t->lastWroteTS=timestamp+nsamps;
if (!alreadyLocked)
pthread_mutex_unlock(&Sockmutex);
LOG_D(HW,"sent %d samples at time: %ld->%ld, energy in first antenna: %d\n",
nsamps, timestamp, timestamp+nsamps, signal_energy(samplesVoid[0], nsamps) );
return nsamps;
}
int rfsimulator_write(openair0_device *device, openair0_timestamp timestamp, void **samplesVoid, int nsamps, int nbAnt, int flags) {
return rfsimulator_write_internal(device->priv, timestamp, samplesVoid, nsamps, nbAnt, flags);
return rfsimulator_write_internal(device->priv, timestamp, samplesVoid, nsamps, nbAnt, flags, false);
}
static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initial) {
// Process all incoming events on sockets
// store the data in lists
......@@ -389,22 +390,9 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
for ( int i=0; i < t->tx_num_channels; i++)
samplesVoid[i]=(void *)&v;
pthread_mutex_lock(&Sockmutex);
openair0_timestamp timestamp=1;
for (int i=0; i<FD_SETSIZE; i++) {
buffer_t *b=&t->buf[i];
if (t->buf[i].circularBuf != NULL && t->buf[i].lastWroteTS > 1) {
timestamp=b->lastWroteTS;
break;
}
}
pthread_mutex_unlock(&Sockmutex);
rfsimulator_write_internal(t, timestamp-1,
rfsimulator_write_internal(t, t->lastWroteTS > 1 ? t->lastWroteTS-1 : 0,
samplesVoid, 1,
t->tx_num_channels, 1);
t->tx_num_channels, 1, false);
} else {
if ( events[nbEv].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP) ) {
socketError(t,fd);
......@@ -450,7 +438,16 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
(t->typeStamp == ENB_MAGICDL_FDD && b->th.magic==UE_MAGICDL_FDD), "Socket Error in protocol");
b->headerMode=false;
if ( b->lastReceivedTS != b->th.timestamp) {
if ( t->nextTimestamp == 0 ) { // First block in UE, resync with the eNB current TS
t->nextTimestamp=b->th.timestamp> nsamps_for_initial ?
b->th.timestamp - nsamps_for_initial :
0;
b->lastReceivedTS=b->th.timestamp> nsamps_for_initial ?
b->th.timestamp :
nsamps_for_initial;
LOG_W(HW,"UE got first timestamp: starting at %lu\n", t->nextTimestamp);
b->trashingPacket=true;
} else if ( b->lastReceivedTS < b->th.timestamp) {
int nbAnt= b->th.nbAnt;
for (uint64_t index=b->lastReceivedTS; index < b->th.timestamp; index++ ) {
......@@ -459,16 +456,23 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
b->circularBuf[(index*nbAnt+a)%CirSize].i=0;
}
}
if ( abs(b->th.timestamp-b->lastReceivedTS) > 50 )
if (b->lastReceivedTS != 0 && b->th.timestamp-b->lastReceivedTS > 50 )
LOG_W(HW,"UEsock: %d gap of: %ld in reception\n", fd, b->th.timestamp-b->lastReceivedTS );
}
b->lastReceivedTS=b->th.timestamp;
} else if ( b->lastReceivedTS > b->th.timestamp && b->th.size == 1 ) {
LOG_W(HW,"Received Rx/Tx synchro out of order\n");
b->trashingPacket=true;
} else if ( b->lastReceivedTS == b->th.timestamp ) {
// normal case
} else {
abort();
AssertFatal(false, "received data in past: current is %lu, new reception: %lu!\n", b->lastReceivedTS, b->th.timestamp); }
pthread_mutex_lock(&Sockmutex);
if (b->lastWroteTS != 0 && ( abs((double)b->lastWroteTS-b->lastReceivedTS) > (double)CirSize))
LOG_E(HW,"UEsock: %d Tx/Rx shift too large Tx:%lu, Rx:%lu\n", fd, b->lastWroteTS, b->lastReceivedTS);
if (t->lastWroteTS != 0 && ( abs((double)t->lastWroteTS-b->lastReceivedTS) > (double)CirSize))
LOG_E(HW,"UEsock: %d Tx/Rx shift too large Tx:%lu, Rx:%lu\n", fd, t->lastWroteTS, b->lastReceivedTS);
pthread_mutex_unlock(&Sockmutex);
b->transferPtr=(char *)&b->circularBuf[b->lastReceivedTS%CirSize];
......@@ -477,14 +481,8 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
if ( b->headerMode==false ) {
LOG_D(HW,"UEsock: %d Set b->lastReceivedTS %ld\n", fd, b->lastReceivedTS);
if ( ! b->trashingPacket ) {
b->lastReceivedTS=b->th.timestamp+b->th.size-byteToSample(b->remainToTransfer,b->th.nbAnt);
// First block in UE, resync with the eNB current TS
if ( t->nextTimestamp == 0 && b->th.size < b->lastReceivedTS) {
t->nextTimestamp=b->lastReceivedTS > nsamps_for_initial ?
b->lastReceivedTS - nsamps_for_initial :
0;
LOG_W(HW,"UE got first timestamp: starting at %lu\n", t->nextTimestamp);
}
if ( b->remainToTransfer==0) {
......@@ -493,6 +491,7 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
b->transferPtr=(char *)&b->th;
b->remainToTransfer=sizeof(samplesBlockHeader_t);
b->th.magic=-1;
b->trashingPacket=false;
}
}
}
......@@ -528,12 +527,43 @@ int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimestamp, vo
t->nextTimestamp+=nsamps;
if ( ((t->nextTimestamp/nsamps)%100) == 0)
LOG_W(HW,"Generated void samples for Rx: %ld\n", t->nextTimestamp);
LOG_W(HW,"No UE, Generated void samples for Rx: %ld\n", t->nextTimestamp);
*ptimestamp = t->nextTimestamp-nsamps;
return nsamps;
}
} else {
pthread_mutex_lock(&Sockmutex);
if ( t->nextTimestamp > 0 && t->lastWroteTS < t->nextTimestamp) {
pthread_mutex_unlock(&Sockmutex);
usleep(10000);
pthread_mutex_lock(&Sockmutex);
if ( t->lastWroteTS < t->nextTimestamp ) {
// Assuming Tx is not done fully in another thread
// We can never write is the past from the received time
// So, the node perform receive but will never write these symbols
// let's tell this to the opposite node
// We send timestamp for nb samples required
// assuming this should have been done earlier if a Tx would exist
pthread_mutex_unlock(&Sockmutex);
struct complex16 v= {0};
void *samplesVoid[t->tx_num_channels];
for ( int i=0; i < t->tx_num_channels; i++)
samplesVoid[i]=(void *)&v;
LOG_I(HW, "No samples Tx occured, so we send 1 sample to notify it: Tx:%lu, Rx:%lu\n",
t->lastWroteTS, t->nextTimestamp);
rfsimulator_write_internal(t, t->nextTimestamp,
samplesVoid, 1,
t->tx_num_channels, 1, true);
} else {
pthread_mutex_unlock(&Sockmutex);
LOG_W(HW, "trx_write came from another thread\n");
}
} else
pthread_mutex_unlock(&Sockmutex);
bool have_to_wait;
do {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment