thread-pool.h 6.56 KB
Newer Older
laurent's avatar
laurent committed
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
  Author: Laurent THOMAS, Open Cells
  copyleft: OpenAirInterface Software Alliance and it's licence
*/

#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/syscall.h>
#include <assertions.h>
#include <LOG/log.h>
laurent's avatar
laurent committed
14
#include <common/utils/system.h>
laurent's avatar
laurent committed
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80

#ifdef DEBUG
  #define THREADINIT   PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
#else
  #define THREADINIT   PTHREAD_MUTEX_INITIALIZER
#endif
#define mutexinit(mutex)   AssertFatal(pthread_mutex_init(&mutex,NULL)==0,"");
#define condinit(signal)   AssertFatal(pthread_cond_init(&signal,NULL)==0,"");
#define mutexlock(mutex)   AssertFatal(pthread_mutex_lock(&mutex)==0,"");
#define mutextrylock(mutex)   pthread_mutex_trylock(&mutex)
#define mutexunlock(mutex) AssertFatal(pthread_mutex_unlock(&mutex)==0,"");
#define condwait(condition, mutex) AssertFatal(pthread_cond_wait(&condition, &mutex)==0,"");
#define condbroadcast(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,"");
#define condsignal(signal)    AssertFatal(pthread_cond_broadcast(&signal)==0,"");

typedef struct notifiedFIFO_elt_s {
  struct notifiedFIFO_elt_s *next;
  uint64_t key; //To filter out elements
  struct notifiedFIFO_s *reponseFifo;
  void (*processingFunc)(void *);
  bool malloced;
  uint64_t creationTime;
  uint64_t startProcessingTime;
  uint64_t endProcessingTime;
  uint64_t returnTime;
  void *msgData;
}  notifiedFIFO_elt_t;

typedef struct notifiedFIFO_s {
  notifiedFIFO_elt_t *outF;
  notifiedFIFO_elt_t *inF;
  pthread_mutex_t lockF;
  pthread_cond_t  notifF;
} notifiedFIFO_t;

// You can use this allocator or use any piece of memory
static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size,
    uint64_t key,
    notifiedFIFO_t *reponseFifo,
    void (*processingFunc)(void *)) {
  notifiedFIFO_elt_t *ret;
  AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), "");
  ret->next=NULL;
  ret->key=key;
  ret->reponseFifo=reponseFifo;
  ret->processingFunc=processingFunc;
  // We set user data piece aligend 32 bytes to be able to process it with SIMD
  ret->msgData=(void *)ret+(sizeof(notifiedFIFO_elt_t)/32+1)*32;
  ret->malloced=true;
  return ret;
}

static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) {
  return elt->msgData;
}

static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
  if (elt->malloced) {
    elt->malloced=false;
    free(elt);
  } else
    printf("delNotifiedFIFO on something not allocated by newNotifiedFIFO\n");

  //LOG_W(UTIL,"delNotifiedFIFO on something not allocated by newNotifiedFIFO\n");
}

81 82 83 84
static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
  nf->inF=NULL;
  nf->outF=NULL;
}
laurent's avatar
laurent committed
85 86 87
static inline void initNotifiedFIFO(notifiedFIFO_t *nf) {
  mutexinit(nf->lockF);
  condinit (nf->notifF);
88
  initNotifiedFIFO_nothreadSafe(nf);
laurent's avatar
laurent committed
89 90 91
  // No delete function: the creator has only to free the memory
}

92
static inline void pushNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
laurent's avatar
laurent committed
93 94 95 96 97
  msg->next=NULL;

  if (nf->outF == NULL)
    nf->outF = msg;

laurent's avatar
fixes  
laurent committed
98
  if (nf->inF != NULL)
laurent's avatar
laurent committed
99 100 101 102 103
    nf->inF->next = msg;

  nf->inF = msg;
}

104
static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
laurent's avatar
laurent committed
105
  mutexlock(nf->lockF);
106 107 108 109
  pushNotifiedFIFO_nothreadSafe(nf,msg);
  condbroadcast(nf->notifF);
  mutexunlock(nf->lockF);
}
laurent's avatar
laurent committed
110

111 112 113
static inline  notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
  if (nf->outF == NULL)
    return NULL;
laurent's avatar
laurent committed
114 115

  notifiedFIFO_elt_t *ret=nf->outF;
116 117 118 119

  if (nf->outF==nf->outF->next)
    LOG_E(TMR,"Circular list in thread pool: push several times the same buffer is forbidden\n");

laurent's avatar
laurent committed
120 121 122 123 124
  nf->outF=nf->outF->next;

  if (nf->outF==NULL)
    nf->inF=NULL;

125 126 127 128 129 130 131 132 133 134
  return ret;
}

static inline  notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
  mutexlock(nf->lockF);
  notifiedFIFO_elt_t *ret;

  while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL)
    condwait(nf->notifF, nf->lockF);

laurent's avatar
laurent committed
135 136 137 138 139 140 141 142 143 144
  mutexunlock(nf->lockF);
  return ret;
}

static inline  notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
  int tmp=mutextrylock(nf->lockF);

  if (tmp != 0 )
    return NULL;

145
  notifiedFIFO_elt_t *ret=pullNotifiedFIFO_nothreadSafe(nf);
laurent's avatar
laurent committed
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
  mutexunlock(nf->lockF);
  return ret;
}

// This function aborts all messages matching the key
// If the queue is used in thread pools, it doesn't cancels already running processing
// because the message has already been picked
static inline void abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) {
  mutexlock(nf->lockF);
  notifiedFIFO_elt_t **start=&nf->outF;

  while(*start!=NULL) {
    if ( (*start)->key == key ) {
      notifiedFIFO_elt_t *request=*start;
      *start=(*start)->next;
      delNotifiedFIFO_elt(request);
    }

    if (*start != NULL)
      start=&(*start)->next;
  }

  mutexunlock(nf->lockF);
}

struct one_thread {
  pthread_t  threadID;
  int id;
  int coreID;
  char name[256];
  uint64_t runningOnKey;
  bool abortFlag;
  struct thread_pool *pool;
  struct one_thread *next;
};

typedef struct thread_pool {
  int activated;
  bool measurePerf;
  int traceFd;
  int dummyTraceFd;
  uint64_t cpuCyclesMicroSec;
  uint64_t startProcessingUE;
  int nbThreads;
  bool restrictRNTI;
  notifiedFIFO_t incomingFifo;
  struct one_thread *allthreads;
} tpool_t;

static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) {
  if (t->measurePerf) msg->creationTime=rdtsc();

  pushNotifiedFIFO(&t->incomingFifo, msg);
}

static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
  notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo);

  if (t->measurePerf)
    msg->returnTime=rdtsc();

207
  if (t->traceFd >= 0)
laurent's avatar
laurent committed
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
    if(write(t->traceFd, msg, sizeof(*msg)));

  return msg;
}

static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
  notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo);

  if (msg == NULL)
    return NULL;

  if (t->measurePerf)
    msg->returnTime=rdtsc();

  if (t->traceFd)
    if(write(t->traceFd, msg, sizeof(*msg)));

  return msg;
}

static inline void abortTpool(tpool_t *t, uint64_t key) {
  notifiedFIFO_t *nf=&t->incomingFifo;
  mutexlock(nf->lockF);
  notifiedFIFO_elt_t **start=&nf->outF;

  while(*start!=NULL) {
    if ( (*start)->key == key ) {
      notifiedFIFO_elt_t *request=*start;
      *start=(*start)->next;
      delNotifiedFIFO_elt(request);
    }

    if (*start != NULL)
      start=&(*start)->next;
  }

  struct one_thread *ptr=t->allthreads;

  while(ptr!=NULL) {
    if (ptr->runningOnKey==key)
      ptr->abortFlag=true;

    ptr=ptr->next;
  }

  mutexunlock(nf->lockF);
}
void initTpool(char *params,tpool_t *pool, bool performanceMeas);

#endif