Commit 7d44d4dd authored by Raphael Defosseux's avatar Raphael Defosseux

Merge remote-tracking branch 'origin/feature-thread-pool' into develop_integration_2019_w13

parents cf2996f4 0f9fbef2
Branches unavailable
2023.w22 2023.w21 2023.w20 2023.w19 2023.w18 2023.w18b 2023.w16 2023.w15 2023.w14 2023.w13 2023.w12 2023.w11 2023.w11b 2023.w10 2023.w10b 2023.w09 2023.w08 2023.w08b 2023.w07 2023.w06 2023.w05 2023.w03 2023.w02 2022.42 2022.41 2022.w51 2022.w50 2022.w49 2022.w48 2022.w47 2022.w46 2022.w45 2022.w43 2022.w42 2022.w42b 2022.w41 2022.w40 2022.w39 2022.w38 2022.w37 2022.w37b 2022.w36 2022.w35 2022.w33 2022.w32 2022.w31 2022.w31b 2022.w30 2022.w29 2022.w26 2022.w25 2022.w24 2022.w24b 2022.w23 2022.w22 2022.w21 2022.w20 2022.w19 2022.w18 2022.w17 2022.w15 2022.w15b 2022.w14a 2022.w13 2022.w13b 2022.w13a 2022.w12 2022.w10 2022.w09 2022.w09b 2022.w08 2022.w08b 2022.w07 2022.w07b 2022.w06 2022.w06a 2022.w05 2022.w05b 2022.w03_hotfix 2022.w03_b 2022.w02 2022.w01 2021.wk46 2021.wk14_a 2021.wk13_d 2021.wk13_c 2021.w51_c 2021.w51_a 2021.w50_a 2021.w49_b 2021.w49_a 2021.w48 2021.w47 2021.w46 2021.w46-powder 2021.w45 2021.w45_b 2021.w44 2021.w43 2021.w42 2021.w37 2021.w36 2021.w35 2021.w34 2021.w33 2021.w32 2021.w31 2021.w30 2021.w29 2021.w28 2021.w27 2021.w26 2021.w25 2021.w24 2021.w23 2021.w22 2021.w20 2021.w19 2021.w18_b 2021.w18_a 2021.w17_b 2021.w16 2021.w15 2021.w14 2021.w13_a 2021.w12 2021.w11 2021.w10 2021.w09 2021.w08 2021.w06 2021.w05 2021.w04 2021.w02 2020.w51_2 2020.w51 2020.w50 2020.w49 2020.w48_2 2020.w48 2020.w47 2020.w46_2 2020.w46 2020.w45_2 2020.w45 2020.w44 2020.w42_2 2020.w42 2020.w41 2020.w39 2020.w38 2020.w37 2020.w36 2020.w34 2020.w33 2020.w31 2020.w30 2020.w29 2020.w28 2020.w26 2020.w25 2020.w24 2020.w23 2020.w22 2020.w19 2020.w17 2020.w16 2020.w15 2020.w11 2020.w09 2020.w06 2020.w05 2020.w04 2020.w03 2019.w51 2019.w44 2019.w41 2019.w36 2019.w30 2019.w28 2019.w27 2019.w25 2019.w23 2019.w21 2019.w17 2019.w15 2019.w13 v1.2.2 v1.2.1 v1.2.0 v1.1.1 v1.1.0 setparam nr-ip-over-lte-v.1.5 nr-ip-over-lte-v.1.4 nr-ip-over-lte-v.1.3 nr-ip-over-lte-v.1.2 nr-ip-over-lte-v.1.1 nr-ip-over-lte-v.1.0 flexran-eol develop-nr-2020w03 develop-nr-2020w02 develop-nr-2019w51 develop-nr-2019w50 develop-nr-2019w48 develop-nr-2019w47 develop-nr-2019w45 develop-nr-2019w43 develop-nr-2019w42 develop-nr-2019w40 benetel_phase_rotation benetel_gnb_rel_2.0 benetel_gnb_rel_1.0 benetel_enb_rel_2.0 benetel_enb_rel_1.0
No related merge requests found
all: measurement_display thread-pool-test
measurement_display: measurement_display.c thread-pool.h
gcc measurement_display.c -I /data/openairinterface5g.nr/common/utils/ -I. /data/openairinterface5g.nr/common/utils/backtrace.c -lpthread -D TEST_THREAD_POOL -I../LOG -I../../utils/T -o measurement_display
thread-pool-test: thread-pool.c thread-pool.h
gcc -g thread-pool.c -I /data/openairinterface5g.nr/common/utils/ -I. /data/openairinterface5g.nr/common/utils/backtrace.c -lpthread -D TEST_THREAD_POOL -I../LOG -I../../utils/T -o thread-pool-test
/*
Author: Laurent THOMAS, Open Cells
copyleft: OpenAirInterface Software Alliance and it's licence
*/
#define __USE_GNU
#define _GNU_SOURCE
#include <stdio.h>
#include <pthread.h>
#include <sched.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "thread-pool.h"
#define SEP "\t"
uint64_t cpuCyclesMicroSec;
int main(int argc, char *argv[]) {
if(argc != 2) {
printf("Need one paramter: the trace Linux pipe (fifo)");
exit(1);
}
mkfifo(argv[1],0666);
int fd=open(argv[1], O_RDONLY);
if ( fd == -1 ) {
perror("open read mode trace file:");
exit(1);
}
uint64_t deb=rdtsc();
usleep(100000);
cpuCyclesMicroSec=(rdtsc()-deb)/100000;
printf("Cycles per µs: %lu\n",cpuCyclesMicroSec);
printf("Key" SEP "delay to process" SEP "processing time" SEP "delay to be read answer\n");
notifiedFIFO_elt_t doneRequest;
while ( 1 ) {
if ( read(fd,&doneRequest, sizeof(doneRequest)) == sizeof(doneRequest)) {
printf("%lu" SEP "%lu" SEP "%lu" SEP "%lu" "\n",
doneRequest.key,
(doneRequest.startProcessingTime-doneRequest.creationTime)/cpuCyclesMicroSec,
(doneRequest.endProcessingTime-doneRequest.startProcessingTime)/cpuCyclesMicroSec,
(doneRequest.returnTime-doneRequest.endProcessingTime)/cpuCyclesMicroSec
);
} else {
printf("no measurements\n");
sleep(1);
}
}
}
/*
Author: Laurent THOMAS, Open Cells
copyleft: OpenAirInterface Software Alliance and it's licence
*/
#define _GNU_SOURCE
#include <sched.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <ctype.h>
#include <sys/sysinfo.h>
#include <threadPool/thread-pool.h>
void displayList(notifiedFIFO_t *nf) {
int n=0;
notifiedFIFO_elt_t *ptr=nf->outF;
while(ptr) {
printf("element: %d, key: %lu\n",++n,ptr->key);
ptr=ptr->next;
}
printf("End of list: %d elements\n",n);
}
static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf, struct one_thread *thr) {
mutexlock(nf->lockF);
while(!nf->outF)
condwait(nf->notifF, nf->lockF);
notifiedFIFO_elt_t *ret=nf->outF;
nf->outF=nf->outF->next;
if (nf->outF==NULL)
nf->inF=NULL;
// For abort feature
thr->runningOnKey=ret->key;
thr->abortFlag=false;
mutexunlock(nf->lockF);
return ret;
}
void *one_thread(void *arg) {
struct one_thread *myThread=(struct one_thread *) arg;
struct thread_pool *tp=myThread->pool;
// configure the thread core assignment
// TBD: reserve the core for us exclusively
if ( myThread->coreID >= 0 && myThread->coreID < get_nprocs_conf()) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(myThread->coreID, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}
//Configure the thread scheduler policy for Linux
struct sched_param sparam= {0};
sparam.sched_priority = sched_get_priority_max(SCHED_RR);
pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
// set the thread name for debugging
sprintf(myThread->name,"Tpool_%d",myThread->coreID);
pthread_setname_np(pthread_self(), myThread->name );
// Infinite loop to process requests
do {
notifiedFIFO_elt_t *elt=pullNotifiedFifoRemember(&tp->incomingFifo, myThread);
if (tp->measurePerf) elt->startProcessingTime=rdtsc();
elt->processingFunc(NotifiedFifoData(elt));
if (tp->measurePerf) elt->endProcessingTime=rdtsc();
if (elt->reponseFifo) {
// Check if the job is still alive, else it has been aborted
mutexlock(tp->incomingFifo.lockF);
if (myThread->abortFlag)
delNotifiedFIFO_elt(elt);
else
pushNotifiedFIFO(elt->reponseFifo, elt);
mutexunlock(tp->incomingFifo.lockF);
}
} while (true);
}
void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
memset(pool,0,sizeof(*pool));
char *measr=getenv("threadPoolMeasurements");
pool->measurePerf=performanceMeas;
// force measurement if the output is defined
pool->measurePerf=measr!=NULL;
if (measr) {
mkfifo(measr,0666);
AssertFatal(-1 != (pool->dummyTraceFd=
open(measr, O_RDONLY| O_NONBLOCK)),"");
AssertFatal(-1 != (pool->traceFd=
open(measr, O_WRONLY|O_APPEND|O_NOATIME|O_NONBLOCK)),"");
} else
pool->traceFd=-1;
//Configure the thread scheduler policy for Linux
struct sched_param sparam= {0};
sparam.sched_priority = sched_get_priority_max(SCHED_RR)-1;
pthread_setschedparam(pthread_self(), SCHED_RR, &sparam);
pool->activated=true;
initNotifiedFIFO(&pool->incomingFifo);
char *saveptr, * curptr;
pool->nbThreads=0;
pool->restrictRNTI=false;
curptr=strtok_r(params,",",&saveptr);
while ( curptr!=NULL ) {
int c=toupper(curptr[0]);
switch (c) {
case 'U':
pool->restrictRNTI=true;
break;
case 'N':
pool->activated=false;
break;
default:
pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread));
pool->allthreads->next=pool->allthreads;
printf("create a thread for core %d\n", atoi(curptr));
pool->allthreads->coreID=atoi(curptr);
pool->allthreads->id=pool->nbThreads;
pool->allthreads->pool=pool;
pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads);
pool->nbThreads++;
}
curptr=strtok_r(NULL,",",&saveptr);
}
if (pool->activated && pool->nbThreads==0) {
printf("No servers created in the thread pool, exit\n");
exit(1);
}
}
#ifdef TEST_THREAD_POOL
struct testData {
int id;
char txt[30];
};
void processing(void *arg) {
struct testData *in=(struct testData *)arg;
printf("doing: %d, %s, in thr %ld\n",in->id, in->txt,pthread_self() );
sprintf(in->txt,"Done by %ld, job %d", pthread_self(), in->id);
usleep(rand()%100);
printf("done: %d, %s, in thr %ld\n",in->id, in->txt,pthread_self() );
}
int main() {
notifiedFIFO_t myFifo;
initNotifiedFIFO(&myFifo);
pushNotifiedFIFO(&myFifo,newNotifiedFIFO_elt(sizeof(struct testData), 1234,NULL,NULL));
for(int i=10; i>1; i--) {
pushNotifiedFIFO(&myFifo,newNotifiedFIFO_elt(sizeof(struct testData), 1000+i,NULL,NULL));
}
displayList(&myFifo);
notifiedFIFO_elt_t *tmp=pullNotifiedFIFO(&myFifo);
printf("pulled: %lu\n", tmp->key);
displayList(&myFifo);
tmp=pullNotifiedFIFO(&myFifo);
printf("pulled: %lu\n", tmp->key);
displayList(&myFifo);
abortNotifiedFIFO(&myFifo,1005);
printf("aborted 1005\n");
displayList(&myFifo);
pushNotifiedFIFO(&myFifo,newNotifiedFIFO_elt(sizeof(struct testData), 12345678, NULL, NULL));
displayList(&myFifo);
abortNotifiedFIFO(&myFifo,12345678);
printf("aborted 12345678\n");
displayList(&myFifo);
do {
tmp=pollNotifiedFIFO(&myFifo);
if (tmp) {
printf("pulled: %lu\n", tmp->key);
displayList(&myFifo);
} else
printf("Empty list \n");
} while(tmp);
tpool_t pool;
char params[]="1,2,3,u";
initTpool(params,&pool, true);
notifiedFIFO_t worker_back;
initNotifiedFIFO(&worker_back);
for (int i=0; i <1000 ; i++) {
notifiedFIFO_elt_t *work=newNotifiedFIFO_elt(sizeof(struct testData), i, &worker_back, processing);
struct testData *x=(struct testData *)NotifiedFifoData(work);
x->id=i;
pushTpool(&pool, work);
}
do {
tmp=pullTpool(&worker_back,&pool);
if (tmp) {
struct testData *dd=NotifiedFifoData(tmp);
printf("Result: %s\n",dd->txt);
delNotifiedFIFO_elt(tmp);
} else
printf("Empty list \n");
abortTpool(&pool,510);
} while(tmp);
return 0;
}
#endif
/*
Author: Laurent THOMAS, Open Cells
copyleft: OpenAirInterface Software Alliance and it's licence
*/
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/syscall.h>
#include <assertions.h>
#include <LOG/log.h>
#ifdef DEBUG
#define THREADINIT PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
#else
#define THREADINIT PTHREAD_MUTEX_INITIALIZER
#endif
#define mutexinit(mutex) AssertFatal(pthread_mutex_init(&mutex,NULL)==0,"");
#define condinit(signal) AssertFatal(pthread_cond_init(&signal,NULL)==0,"");
#define mutexlock(mutex) AssertFatal(pthread_mutex_lock(&mutex)==0,"");
#define mutextrylock(mutex) pthread_mutex_trylock(&mutex)
#define mutexunlock(mutex) AssertFatal(pthread_mutex_unlock(&mutex)==0,"");
#define condwait(condition, mutex) AssertFatal(pthread_cond_wait(&condition, &mutex)==0,"");
#define condbroadcast(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,"");
#define condsignal(signal) AssertFatal(pthread_cond_broadcast(&signal)==0,"");
typedef struct notifiedFIFO_elt_s {
struct notifiedFIFO_elt_s *next;
uint64_t key; //To filter out elements
struct notifiedFIFO_s *reponseFifo;
void (*processingFunc)(void *);
bool malloced;
uint64_t creationTime;
uint64_t startProcessingTime;
uint64_t endProcessingTime;
uint64_t returnTime;
void *msgData;
} notifiedFIFO_elt_t;
typedef struct notifiedFIFO_s {
notifiedFIFO_elt_t *outF;
notifiedFIFO_elt_t *inF;
pthread_mutex_t lockF;
pthread_cond_t notifF;
} notifiedFIFO_t;
// You can use this allocator or use any piece of memory
static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size,
uint64_t key,
notifiedFIFO_t *reponseFifo,
void (*processingFunc)(void *)) {
notifiedFIFO_elt_t *ret;
AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), "");
ret->next=NULL;
ret->key=key;
ret->reponseFifo=reponseFifo;
ret->processingFunc=processingFunc;
// We set user data piece aligend 32 bytes to be able to process it with SIMD
ret->msgData=(void *)ret+(sizeof(notifiedFIFO_elt_t)/32+1)*32;
ret->malloced=true;
return ret;
}
static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) {
return elt->msgData;
}
static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
if (elt->malloced) {
elt->malloced=false;
free(elt);
} else
printf("delNotifiedFIFO on something not allocated by newNotifiedFIFO\n");
//LOG_W(UTIL,"delNotifiedFIFO on something not allocated by newNotifiedFIFO\n");
}
static inline void initNotifiedFIFO(notifiedFIFO_t *nf) {
mutexinit(nf->lockF);
condinit (nf->notifF);
nf->inF=NULL;
nf->outF=NULL;
// No delete function: the creator has only to free the memory
}
static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg) {
mutexlock(nf->lockF);
msg->next=NULL;
if (nf->outF == NULL)
nf->outF = msg;
if (nf->inF)
nf->inF->next = msg;
nf->inF = msg;
condbroadcast(nf->notifF);
mutexunlock(nf->lockF);
}
static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
mutexlock(nf->lockF);
while(!nf->outF)
condwait(nf->notifF, nf->lockF);
notifiedFIFO_elt_t *ret=nf->outF;
nf->outF=nf->outF->next;
if (nf->outF==NULL)
nf->inF=NULL;
mutexunlock(nf->lockF);
return ret;
}
static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
int tmp=mutextrylock(nf->lockF);
if (tmp != 0 )
return NULL;
notifiedFIFO_elt_t *ret=nf->outF;
if (ret!=NULL)
nf->outF=nf->outF->next;
if (nf->outF==NULL)
nf->inF=NULL;
mutexunlock(nf->lockF);
return ret;
}
// This function aborts all messages matching the key
// If the queue is used in thread pools, it doesn't cancels already running processing
// because the message has already been picked
static inline void abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) {
mutexlock(nf->lockF);
notifiedFIFO_elt_t **start=&nf->outF;
while(*start!=NULL) {
if ( (*start)->key == key ) {
notifiedFIFO_elt_t *request=*start;
*start=(*start)->next;
delNotifiedFIFO_elt(request);
}
if (*start != NULL)
start=&(*start)->next;
}
mutexunlock(nf->lockF);
}
struct one_thread {
pthread_t threadID;
int id;
int coreID;
char name[256];
uint64_t runningOnKey;
bool abortFlag;
struct thread_pool *pool;
struct one_thread *next;
};
typedef struct thread_pool {
int activated;
bool measurePerf;
int traceFd;
int dummyTraceFd;
uint64_t cpuCyclesMicroSec;
uint64_t startProcessingUE;
int nbThreads;
bool restrictRNTI;
notifiedFIFO_t incomingFifo;
struct one_thread *allthreads;
} tpool_t;
static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) {
if (t->measurePerf) msg->creationTime=rdtsc();
pushNotifiedFIFO(&t->incomingFifo, msg);
}
static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo);
if (t->measurePerf)
msg->returnTime=rdtsc();
if (t->traceFd)
if(write(t->traceFd, msg, sizeof(*msg)));
return msg;
}
static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo);
if (msg == NULL)
return NULL;
if (t->measurePerf)
msg->returnTime=rdtsc();
if (t->traceFd)
if(write(t->traceFd, msg, sizeof(*msg)));
return msg;
}
static inline void abortTpool(tpool_t *t, uint64_t key) {
notifiedFIFO_t *nf=&t->incomingFifo;
mutexlock(nf->lockF);
notifiedFIFO_elt_t **start=&nf->outF;
while(*start!=NULL) {
if ( (*start)->key == key ) {
notifiedFIFO_elt_t *request=*start;
*start=(*start)->next;
delNotifiedFIFO_elt(request);
}
if (*start != NULL)
start=&(*start)->next;
}
struct one_thread *ptr=t->allthreads;
while(ptr!=NULL) {
if (ptr->runningOnKey==key)
ptr->abortFlag=true;
ptr=ptr->next;
}
mutexunlock(nf->lockF);
}
void initTpool(char *params,tpool_t *pool, bool performanceMeas);
#endif
# Thread pool
The thread pool is a working server, made of a set of worker threads that can be mapped on CPU cores.
Each worker loop on pick from the same input queue jobs to do.
When a job is done, the worker sends a return if a return is defined.
A selective abort allows to cancel parallel jobs (usage: a client pushed jobs, but from a response of one job, the other linked jobs becomes useless).
All the thread pool functions are thread safe, nevertheless the working functions are implemented by the thread pool client, so the client has to tackle the parallel execution of his functions called "processingFunc" hereafter.
## license
Author: Laurent Thomas, Open cells project
The owner share this piece code to Openairsoftware alliance as per OSA license terms
# jobs
A job is a message (notifiedFIFO_elt_t):
next: internal FIFO chain, do not set it
key: a long int that the client can use to identify a message or a group of messages
responseFifo: if the client defines a response FIFO, the message will be posted back after processing
processingFunc: any funtion (type void processingFunc(void *)) that the worker will launch
msgData: the data passed to processingFunc. It can be added automatically, or you can set it to a buffer you are managing
malloced: a boolean that enable internal free in these cases: no return Fifo or Abort feature
The job messages can be created with newNotifiedFIFO_elt() and delNotifiedFIFO_elt() or managed by the client.
# Queues of jobs
Queues are type of: notifiedFIFO_t that must be initialized by init_notifiedFIFO()
No delete function is required, the creator has only to free the data of type notifiedFIFO_t
push_notifiedFIFO() add a job in the queue
pull_notifiedFIFO() is blocking, poll_notifiedFIFO() is non blocking
abort_notifiedFIFO() allows the customer to delete all waiting jobs that match with the key (see key in jobs definition)
# Thread pools
## initialization
The clients can create one or more thread pools with init_tpool()
the params string structure: describes a list of cores, separated by "," that run a worker thread
If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core).
The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>"
## adding jobs
The client create their jobs messages as a notifiedFIFO_elt_t, then they push it with pushTpool() (that internally calls push_notifiedFIFO())
If they need a return, they have to create response queues with init_notifiedFIFO() and set this FIFO pointer in the notifiedFIFO_elt_t before pushing the job.
## abort
A abort service abortTpool() allows to abort all jobs that match a key (see jobs "key"). When the abort returns, it garanties no job (matching the key) response will be posted on response queues.
Nevertheless, jobs already performed before the return of abortTpool() are pushed in the response Fifo queue.
## Performance measurements
A performance measurement is integrated: the pool will automacillay fill timestamps:
* creationTime: time the request is push to the pool;
* startProcessingTime: time a worker start to run on the job
* endProcessingTime: time the worker finished the job
* returnTime: time the client reads the result
if you set the environement variable: thread-pool-measurements to a valid file name
These measurements will be wrote to this Linux pipe.
A tool to read the linux fifo and display it in ascii is provided: see the local directory Makefile for this tool and to compile the thread pool unitary tests.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment