/* Author: Laurent THOMAS, Open Cells copyleft: OpenAirInterface Software Alliance and it's licence */ #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <stdbool.h> #include <stdint.h> #include <pthread.h> #include <sys/syscall.h> #include <assertions.h> #include <LOG/log.h> #include <common/utils/system.h> #ifdef DEBUG #define THREADINIT PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP #else #define THREADINIT PTHREAD_MUTEX_INITIALIZER #endif #define mutexinit(mutex) AssertFatal(pthread_mutex_init(&mutex,NULL)==0,""); #define condinit(signal) AssertFatal(pthread_cond_init(&signal,NULL)==0,""); #define mutexlock(mutex) AssertFatal(pthread_mutex_lock(&mutex)==0,""); #define mutextrylock(mutex) pthread_mutex_trylock(&mutex) #define mutexunlock(mutex) AssertFatal(pthread_mutex_unlock(&mutex)==0,""); #define condwait(condition, mutex) AssertFatal(pthread_cond_wait(&condition, &mutex)==0,""); #define condbroadcast(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,""); #define condsignal(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,""); typedef struct notifiedFIFO_elt_s { struct notifiedFIFO_elt_s *next; uint64_t key; //To filter out elements struct notifiedFIFO_s *reponseFifo; void (*processingFunc)(void *); bool malloced; uint64_t creationTime; uint64_t startProcessingTime; uint64_t endProcessingTime; uint64_t returnTime; void *msgData; } notifiedFIFO_elt_t; typedef struct notifiedFIFO_s { notifiedFIFO_elt_t *outF; notifiedFIFO_elt_t *inF; pthread_mutex_t lockF; pthread_cond_t notifF; } notifiedFIFO_t; // You can use this allocator or use any piece of memory static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size, uint64_t key, notifiedFIFO_t *reponseFifo, void (*processingFunc)(void *)) { notifiedFIFO_elt_t *ret; AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), ""); ret->next=NULL; ret->key=key; ret->reponseFifo=reponseFifo; ret->processingFunc=processingFunc; // We set user data piece aligend 32 bytes to be able to process it with SIMD ret->msgData=(void *)ret+(sizeof(notifiedFIFO_elt_t)/32+1)*32; ret->malloced=true; return ret; } static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) { return elt->msgData; } static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) { if (elt->malloced) { elt->malloced=false; free(elt); } else printf("delNotifiedFIFO on something not allocated by newNotifiedFIFO\n"); //LOG_W(UTIL,"delNotifiedFIFO on something not allocated by newNotifiedFIFO\n"); } static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) { nf->inF=NULL; nf->outF=NULL; } static inline void initNotifiedFIFO(notifiedFIFO_t *nf) { mutexinit(nf->lockF); condinit (nf->notifF); initNotifiedFIFO_nothreadSafe(nf); // No delete function: the creator has only to free the memory } static inline void pushNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) { msg->next=NULL; if (nf->outF == NULL) nf->outF = msg; if (nf->inF != NULL) nf->inF->next = msg; nf->inF = msg; } static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) { mutexlock(nf->lockF); pushNotifiedFIFO_nothreadSafe(nf,msg); condbroadcast(nf->notifF); mutexunlock(nf->lockF); } static inline notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) { if (nf->outF == NULL) return NULL; notifiedFIFO_elt_t *ret=nf->outF; if (nf->outF==nf->outF->next) LOG_E(TMR,"Circular list in thread pool: push several times the same buffer is forbidden\n"); nf->outF=nf->outF->next; if (nf->outF==NULL) nf->inF=NULL; return ret; } static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) { mutexlock(nf->lockF); notifiedFIFO_elt_t *ret; while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL) condwait(nf->notifF, nf->lockF); mutexunlock(nf->lockF); return ret; } static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) { int tmp=mutextrylock(nf->lockF); if (tmp != 0 ) return NULL; notifiedFIFO_elt_t *ret=pullNotifiedFIFO_nothreadSafe(nf); mutexunlock(nf->lockF); return ret; } // This function aborts all messages matching the key // If the queue is used in thread pools, it doesn't cancels already running processing // because the message has already been picked static inline int abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) { mutexlock(nf->lockF); int nbDeleted=0; notifiedFIFO_elt_t **start=&nf->outF; while(*start!=NULL) { if ( (*start)->key == key ) { notifiedFIFO_elt_t *request=*start; *start=(*start)->next; delNotifiedFIFO_elt(request); nbDeleted++; } else start=&(*start)->next; } if (nf->outF == NULL) nf->inF=NULL; mutexunlock(nf->lockF); return nbDeleted; } struct one_thread { pthread_t threadID; int id; int coreID; char name[256]; uint64_t runningOnKey; bool abortFlag; struct thread_pool *pool; struct one_thread *next; }; typedef struct thread_pool { int activated; bool measurePerf; int traceFd; int dummyTraceFd; uint64_t cpuCyclesMicroSec; uint64_t startProcessingUE; int nbThreads; bool restrictRNTI; notifiedFIFO_t incomingFifo; struct one_thread *allthreads; } tpool_t; static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) { if (t->measurePerf) msg->creationTime=rdtsc(); if ( t->activated) pushNotifiedFIFO(&t->incomingFifo, msg); else { if (t->measurePerf) msg->startProcessingTime=rdtsc(); msg->processingFunc(NotifiedFifoData(msg)); if (t->measurePerf) msg->endProcessingTime=rdtsc(); if (msg->reponseFifo) pushNotifiedFIFO(msg->reponseFifo, msg); } } static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) { notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo); if (t->measurePerf) msg->returnTime=rdtsc(); if (t->traceFd >= 0) if(write(t->traceFd, msg, sizeof(*msg))); return msg; } static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) { notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo); if (msg == NULL) return NULL; if (t->measurePerf) msg->returnTime=rdtsc(); if (t->traceFd) if(write(t->traceFd, msg, sizeof(*msg))); return msg; } static inline int abortTpool(tpool_t *t, uint64_t key) { int nbRemoved=0; notifiedFIFO_t *nf=&t->incomingFifo; mutexlock(nf->lockF); notifiedFIFO_elt_t **start=&nf->outF; while(*start!=NULL) { if ( (*start)->key == key ) { notifiedFIFO_elt_t *request=*start; *start=(*start)->next; delNotifiedFIFO_elt(request); nbRemoved++; } else start=&(*start)->next; } if (t->incomingFifo.outF==NULL) t->incomingFifo.inF=NULL; struct one_thread *ptr=t->allthreads; while(ptr!=NULL) { if (ptr->runningOnKey==key) { ptr->abortFlag=true; nbRemoved++; } ptr=ptr->next; } mutexunlock(nf->lockF); return nbRemoved; } void initTpool(char *params,tpool_t *pool, bool performanceMeas); #endif