Commit e63cc03c authored by Robert Schmidt's avatar Robert Schmidt

Implement abortTpool()

This stops all ongoing threads (using pthread_join()), and marks FIFOs
to not block anymore. In this case, returned messages will be NULL.
parent 078d6fa1
......@@ -48,9 +48,14 @@ void displayList(notifiedFIFO_t *nf) {
static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf, struct one_thread *thr) {
mutexlock(nf->lockF);
while(!nf->outF)
while(!nf->outF && !thr->terminate)
condwait(nf->notifF, nf->lockF);
if (thr->terminate) {
mutexunlock(nf->lockF);
return NULL;
}
notifiedFIFO_elt_t *ret=nf->outF;
nf->outF=nf->outF->next;
......@@ -59,7 +64,7 @@ static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf,
// For abort feature
thr->runningOnKey=ret->key;
thr->abortFlag=false;
thr->dropJob = false;
mutexunlock(nf->lockF);
return ret;
}
......@@ -71,6 +76,10 @@ void *one_thread(void *arg) {
// Infinite loop to process requests
do {
notifiedFIFO_elt_t *elt=pullNotifiedFifoRemember(&tp->incomingFifo, myThread);
if (elt == NULL) {
AssertFatal(myThread->terminate, "pullNotifiedFifoRemember() returned NULL although thread not aborted\n");
break;
}
if (tp->measurePerf) elt->startProcessingTime=rdtsc_oai();
......@@ -82,14 +91,15 @@ void *one_thread(void *arg) {
// Check if the job is still alive, else it has been aborted
mutexlock(tp->incomingFifo.lockF);
if (myThread->abortFlag)
if (myThread->dropJob)
delNotifiedFIFO_elt(elt);
else
pushNotifiedFIFO(elt->reponseFifo, elt);
myThread->runningOnKey=-1;
mutexunlock(tp->incomingFifo.lockF);
}
} while (true);
} while (!myThread->terminate);
return NULL;
}
void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name) {
......@@ -137,6 +147,8 @@ void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name
pool->allthreads->coreID=atoi(curptr);
pool->allthreads->id=pool->nbThreads;
pool->allthreads->pool=pool;
pool->allthreads->dropJob = false;
pool->allthreads->terminate = false;
//Configure the thread scheduler policy for Linux
// set the thread name for debugging
sprintf(pool->allthreads->name,"%s%d_%d",tname,pool->nbThreads,pool->allthreads->coreID);
......
......@@ -72,6 +72,7 @@ typedef struct notifiedFIFO_s {
notifiedFIFO_elt_t *inF;
pthread_mutex_t lockF;
pthread_cond_t notifF;
bool abortFIFO; // if set, the FIFO always returns NULL -> abort condition
} notifiedFIFO_t;
// You can use this allocator or use any piece of memory
......@@ -107,6 +108,7 @@ static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
nf->inF=NULL;
nf->outF=NULL;
nf->abortFIFO = false;
}
static inline void initNotifiedFIFO(notifiedFIFO_t *nf) {
mutexinit(nf->lockF);
......@@ -152,9 +154,9 @@ static inline notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t
static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
mutexlock(nf->lockF);
notifiedFIFO_elt_t *ret;
notifiedFIFO_elt_t *ret = NULL;
while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL)
while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL && !nf->abortFIFO)
condwait(nf->notifF, nf->lockF);
mutexunlock(nf->lockF);
......@@ -167,6 +169,11 @@ static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
if (tmp != 0 )
return NULL;
if (nf->abortFIFO) {
mutexunlock(nf->lockF);
return NULL;
}
notifiedFIFO_elt_t *ret=pullNotifiedFIFO_nothreadSafe(nf);
mutexunlock(nf->lockF);
return ret;
......@@ -217,13 +224,14 @@ struct one_thread {
int coreID;
char name[256];
uint64_t runningOnKey;
bool abortFlag;
bool dropJob;
bool terminate;
struct thread_pool *pool;
struct one_thread *next;
};
typedef struct thread_pool {
int activated;
bool activated;
bool measurePerf;
int traceFd;
int dummyTraceFd;
......@@ -256,6 +264,8 @@ static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) {
static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo);
if (msg == NULL)
return NULL;
AssertFatal(t->traceFd, "Thread pool used while not initialized");
if (t->measurePerf)
msg->returnTime=rdtsc_oai();
......@@ -284,6 +294,7 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo
static inline int abortTpoolJob(tpool_t *t, uint64_t key) {
int nbRemoved=0;
notifiedFIFO_t *nf=&t->incomingFifo;
mutexlock(nf->lockF);
notifiedFIFO_elt_t **start=&nf->outF;
......@@ -300,20 +311,62 @@ static inline int abortTpoolJob(tpool_t *t, uint64_t key) {
if (t->incomingFifo.outF==NULL)
t->incomingFifo.inF=NULL;
struct one_thread *ptr=t->allthreads;
while(ptr!=NULL) {
if (ptr->runningOnKey==key) {
ptr->abortFlag=true;
struct one_thread *thread = t->allthreads;
while (thread != NULL) {
if (thread->runningOnKey == key) {
thread->dropJob = true;
nbRemoved++;
}
ptr=ptr->next;
thread = thread->next;
}
mutexunlock(nf->lockF);
return nbRemoved;
}
static inline int abortTpool(tpool_t *t) {
int nbRemoved=0;
/* disables threading: if a message comes in now, we cannot have a race below
* as each thread will simply execute the message itself */
t->activated = false;
notifiedFIFO_t *nf=&t->incomingFifo;
mutexlock(nf->lockF);
nf->abortFIFO = true;
notifiedFIFO_elt_t **start=&nf->outF;
/* mark threads to abort them */
struct one_thread *thread = t->allthreads;
while (thread != NULL) {
thread->dropJob = true;
thread->terminate = true;
nbRemoved++;
thread = thread->next;
}
/* clear FIFOs */
while(*start!=NULL) {
notifiedFIFO_elt_t **request=start;
*start=(*start)->next;
delNotifiedFIFO_elt(*request);
*request = NULL;
nbRemoved++;
}
if (t->incomingFifo.outF==NULL)
t->incomingFifo.inF=NULL;
condbroadcast(t->incomingFifo.notifF);
mutexunlock(nf->lockF);
/* join threads that are still runing */
thread = t->allthreads;
while (thread != NULL) {
pthread_cancel(thread->threadID);
thread = thread->next;
}
return nbRemoved;
}
void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name);
#define initTpool(PARAMPTR,TPOOLPTR, MEASURFLAG) initNamedTpool(PARAMPTR,TPOOLPTR, MEASURFLAG, NULL)
#endif
......@@ -467,6 +467,11 @@ void init_gNB_Tpool(int inst) {
}
void term_gNB_Tpool(int inst) {
PHY_VARS_gNB *gNB = RC.gNB[inst];
abortTpool(&gNB->threadPool);
}
/*!
* \brief Terminate gNB TX and RX threads.
*/
......@@ -609,6 +614,7 @@ void init_gNB(int single_thread_flag,int wait_for_sync) {
void stop_gNB(int nb_inst) {
for (int inst=0; inst<nb_inst; inst++) {
LOG_I(PHY,"Killing gNB %d processing threads\n",inst);
term_gNB_Tpool(inst);
kill_gNB_proc(inst);
}
}
......@@ -1271,13 +1271,6 @@ void *ru_thread( void *param ) {
else LOG_I(PHY,"RU %d rf device stopped\n",ru->idx);
}
res = pullNotifiedFIFO(&gNB->resp_L1);
delNotifiedFIFO_elt(res);
res = pullNotifiedFIFO(&gNB->L1_tx_free);
delNotifiedFIFO_elt(res);
res = pullNotifiedFIFO(&gNB->L1_tx_free);
delNotifiedFIFO_elt(res);
ru_thread_status = 0;
return &ru_thread_status;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment