thread-pool.h 8.78 KB
Newer Older
laurent's avatar
laurent committed
1
/*
laurent's avatar
laurent committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1  (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.openairinterface.org/?page_id=698
*
* Author and copyright: Laurent Thomas, open-cells.com
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
*      contact@openairinterface.org
laurent's avatar
laurent committed
22 23
*/

laurent's avatar
laurent committed
24

laurent's avatar
laurent committed
25 26 27 28 29
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
30
#include <unistd.h>
laurent's avatar
laurent committed
31 32
#include <sys/syscall.h>
#include <assertions.h>
laurent's avatar
laurent committed
33
#include <common/utils/system.h>
34
//#include <stdatomic.h>
laurent's avatar
laurent committed
35

36 37 38 39 40 41
static __inline__ uint64_t rdtsc(void) {
  uint32_t a, d;
  __asm__ volatile ("rdtsc" : "=a" (a), "=d" (d));
  return (((uint64_t)d)<<32) | ((uint64_t)a);
}

laurent's avatar
laurent committed
42 43 44 45 46
#ifdef DEBUG
  #define THREADINIT   PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
#else
  #define THREADINIT   PTHREAD_MUTEX_INITIALIZER
#endif
47 48 49 50 51 52
#define mutexinit(mutex)   {int ret=pthread_mutex_init(&mutex,NULL); \
                            AssertFatal(ret==0,"ret=%d\n",ret);}
#define condinit(signal)   {int ret=pthread_cond_init(&signal,NULL); \
                            AssertFatal(ret==0,"ret=%d\n",ret);}
#define mutexlock(mutex)   {int ret=pthread_mutex_lock(&mutex); \
                            AssertFatal(ret==0,"ret=%d\n",ret);}
laurent's avatar
laurent committed
53
#define mutextrylock(mutex)   pthread_mutex_trylock(&mutex)
54 55 56 57 58 59
#define mutexunlock(mutex) {int ret=pthread_mutex_unlock(&mutex); \
                            AssertFatal(ret==0,"ret=%d\n",ret);}
#define condwait(condition, mutex) {int ret=pthread_cond_wait(&condition, &mutex); \
                                    AssertFatal(ret==0,"ret=%d\n",ret);}
#define condbroadcast(signal) {int ret=pthread_cond_broadcast(&signal); \
                               AssertFatal(ret==0,"ret=%d\n",ret);}
60
#define condsignal(signal)    {int ret=pthread_cond_signal(&signal); \
61
                               AssertFatal(ret==0,"ret=%d\n",ret);}
62
#define tpool_nbthreads(tpool)   (tpool.nbThreads)
laurent's avatar
laurent committed
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
typedef struct notifiedFIFO_elt_s {
  struct notifiedFIFO_elt_s *next;
  uint64_t key; //To filter out elements
  struct notifiedFIFO_s *reponseFifo;
  void (*processingFunc)(void *);
  bool malloced;
  uint64_t creationTime;
  uint64_t startProcessingTime;
  uint64_t endProcessingTime;
  uint64_t returnTime;
  void *msgData;
}  notifiedFIFO_elt_t;

typedef struct notifiedFIFO_s {
  notifiedFIFO_elt_t *outF;
  notifiedFIFO_elt_t *inF;
  pthread_mutex_t lockF;
  pthread_cond_t  notifF;
} notifiedFIFO_t;

// You can use this allocator or use any piece of memory
static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size,
    uint64_t key,
    notifiedFIFO_t *reponseFifo,
    void (*processingFunc)(void *)) {
  notifiedFIFO_elt_t *ret;
  AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), "");
  ret->next=NULL;
  ret->key=key;
  ret->reponseFifo=reponseFifo;
  ret->processingFunc=processingFunc;
  // We set user data piece aligend 32 bytes to be able to process it with SIMD
95
  ret->msgData=(void *)((uint8_t*)ret+(sizeof(notifiedFIFO_elt_t)/32+1)*32);
laurent's avatar
laurent committed
96 97 98 99 100 101 102 103 104
  ret->malloced=true;
  return ret;
}

static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) {
  return elt->msgData;
}

static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
105 106 107
  AssertFatal(elt->malloced, "delNotifiedFIFO on something not allocated by newNotifiedFIFO\n");
  elt->malloced=false;
  free(elt);
laurent's avatar
laurent committed
108 109
}

110 111 112 113
static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
  nf->inF=NULL;
  nf->outF=NULL;
}
laurent's avatar
laurent committed
114 115 116
static inline void initNotifiedFIFO(notifiedFIFO_t *nf) {
  mutexinit(nf->lockF);
  condinit (nf->notifF);
117
  initNotifiedFIFO_nothreadSafe(nf);
laurent's avatar
laurent committed
118 119 120
  // No delete function: the creator has only to free the memory
}

121
static inline void pushNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
laurent's avatar
laurent committed
122 123 124 125 126
  msg->next=NULL;

  if (nf->outF == NULL)
    nf->outF = msg;

laurent's avatar
fixes  
laurent committed
127
  if (nf->inF != NULL)
laurent's avatar
laurent committed
128 129 130 131 132
    nf->inF->next = msg;

  nf->inF = msg;
}

133
static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
laurent's avatar
laurent committed
134
  mutexlock(nf->lockF);
135
  pushNotifiedFIFO_nothreadSafe(nf,msg);
136
  condsignal(nf->notifF);
137 138
  mutexunlock(nf->lockF);
}
laurent's avatar
laurent committed
139

140 141 142
static inline  notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf) {
  if (nf->outF == NULL)
    return NULL;
laurent's avatar
laurent committed
143 144

  notifiedFIFO_elt_t *ret=nf->outF;
145

146
  AssertFatal(nf->outF != nf->outF->next,"Circular list in thread pool: push several times the same buffer is forbidden\n");
147

laurent's avatar
laurent committed
148 149 150 151 152
  nf->outF=nf->outF->next;

  if (nf->outF==NULL)
    nf->inF=NULL;

153 154 155 156 157 158 159 160 161 162
  return ret;
}

static inline  notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
  mutexlock(nf->lockF);
  notifiedFIFO_elt_t *ret;

  while((ret=pullNotifiedFIFO_nothreadSafe(nf)) == NULL)
    condwait(nf->notifF, nf->lockF);

laurent's avatar
laurent committed
163 164 165 166 167 168 169 170 171 172
  mutexunlock(nf->lockF);
  return ret;
}

static inline  notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
  int tmp=mutextrylock(nf->lockF);

  if (tmp != 0 )
    return NULL;

173
  notifiedFIFO_elt_t *ret=pullNotifiedFIFO_nothreadSafe(nf);
laurent's avatar
laurent committed
174 175 176 177 178 179 180
  mutexunlock(nf->lockF);
  return ret;
}

// This function aborts all messages matching the key
// If the queue is used in thread pools, it doesn't cancels already running processing
// because the message has already been picked
181
static inline int abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) {
laurent's avatar
laurent committed
182
  mutexlock(nf->lockF);
183
  int nbDeleted=0;
laurent's avatar
laurent committed
184 185 186 187 188 189 190
  notifiedFIFO_elt_t **start=&nf->outF;

  while(*start!=NULL) {
    if ( (*start)->key == key ) {
      notifiedFIFO_elt_t *request=*start;
      *start=(*start)->next;
      delNotifiedFIFO_elt(request);
191 192
      nbDeleted++;
    } else
laurent's avatar
laurent committed
193 194 195
      start=&(*start)->next;
  }

196 197 198
  if (nf->outF == NULL)
    nf->inF=NULL;

laurent's avatar
laurent committed
199
  mutexunlock(nf->lockF);
200
  return nbDeleted;
laurent's avatar
laurent committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
}

struct one_thread {
  pthread_t  threadID;
  int id;
  int coreID;
  char name[256];
  uint64_t runningOnKey;
  bool abortFlag;
  struct thread_pool *pool;
  struct one_thread *next;
};

typedef struct thread_pool {
  int activated;
  bool measurePerf;
  int traceFd;
  int dummyTraceFd;
  uint64_t cpuCyclesMicroSec;
  uint64_t startProcessingUE;
  int nbThreads;
  bool restrictRNTI;
  notifiedFIFO_t incomingFifo;
  struct one_thread *allthreads;
} tpool_t;

static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) {
  if (t->measurePerf) msg->creationTime=rdtsc();

230 231 232 233 234 235 236 237 238 239 240 241 242 243
  if ( t->activated)
    pushNotifiedFIFO(&t->incomingFifo, msg);
  else {
    if (t->measurePerf)
      msg->startProcessingTime=rdtsc();

    msg->processingFunc(NotifiedFifoData(msg));

    if (t->measurePerf)
      msg->endProcessingTime=rdtsc();

    if (msg->reponseFifo)
      pushNotifiedFIFO(msg->reponseFifo, msg);
  }
laurent's avatar
laurent committed
244 245 246 247
}

static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
  notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo);
frtabu's avatar
frtabu committed
248
  AssertFatal(t->traceFd, "Thread pool used while not initialized");
laurent's avatar
laurent committed
249 250 251
  if (t->measurePerf)
    msg->returnTime=rdtsc();

252
  if (t->traceFd > 0)
laurent's avatar
laurent committed
253 254 255 256 257 258 259
    if(write(t->traceFd, msg, sizeof(*msg)));

  return msg;
}

static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
  notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo);
frtabu's avatar
frtabu committed
260
  AssertFatal(t->traceFd, "Thread pool used while not initialized");
laurent's avatar
laurent committed
261 262 263 264 265 266 267 268 269 270 271 272
  if (msg == NULL)
    return NULL;

  if (t->measurePerf)
    msg->returnTime=rdtsc();

  if (t->traceFd)
    if(write(t->traceFd, msg, sizeof(*msg)));

  return msg;
}

273 274
static inline int abortTpool(tpool_t *t, uint64_t key) {
  int nbRemoved=0;
laurent's avatar
laurent committed
275 276 277 278 279 280 281 282 283
  notifiedFIFO_t *nf=&t->incomingFifo;
  mutexlock(nf->lockF);
  notifiedFIFO_elt_t **start=&nf->outF;

  while(*start!=NULL) {
    if ( (*start)->key == key ) {
      notifiedFIFO_elt_t *request=*start;
      *start=(*start)->next;
      delNotifiedFIFO_elt(request);
284 285
      nbRemoved++;
    } else
laurent's avatar
laurent committed
286 287 288
      start=&(*start)->next;
  }

289 290 291
  if (t->incomingFifo.outF==NULL)
    t->incomingFifo.inF=NULL;

laurent's avatar
laurent committed
292 293 294
  struct one_thread *ptr=t->allthreads;

  while(ptr!=NULL) {
295
    if (ptr->runningOnKey==key) {
laurent's avatar
laurent committed
296
      ptr->abortFlag=true;
297 298
      nbRemoved++;
    }
laurent's avatar
laurent committed
299 300 301 302 303

    ptr=ptr->next;
  }

  mutexunlock(nf->lockF);
304
  return nbRemoved;
laurent's avatar
laurent committed
305
}
306 307
void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name);
#define  initTpool(PARAMPTR,TPOOLPTR, MEASURFLAG) initNamedTpool(PARAMPTR,TPOOLPTR, MEASURFLAG, NULL)
laurent's avatar
laurent committed
308
#endif