Commit 0f9fbef2 authored by laurent's avatar laurent

add thread pool feature

parent e492b971
...@@ -10,7 +10,9 @@ ...@@ -10,7 +10,9 @@
#include <fcntl.h> #include <fcntl.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <thread-pool.h> #include <ctype.h>
#include <sys/sysinfo.h>
#include <threadPool/thread-pool.h>
void displayList(notifiedFIFO_t *nf) { void displayList(notifiedFIFO_t *nf) {
int n=0; int n=0;
...@@ -46,12 +48,16 @@ static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf, ...@@ -46,12 +48,16 @@ static inline notifiedFIFO_elt_t *pullNotifiedFifoRemember( notifiedFIFO_t *nf,
void *one_thread(void *arg) { void *one_thread(void *arg) {
struct one_thread *myThread=(struct one_thread *) arg; struct one_thread *myThread=(struct one_thread *) arg;
struct thread_pool *tp=myThread->pool; struct thread_pool *tp=myThread->pool;
// configure the thread core assignment // configure the thread core assignment
// TBD: reserve the core for us exclusively // TBD: reserve the core for us exclusively
if ( myThread->coreID >= 0 && myThread->coreID < get_nprocs_conf()) {
cpu_set_t cpuset; cpu_set_t cpuset;
CPU_ZERO(&cpuset); CPU_ZERO(&cpuset);
CPU_SET(myThread->coreID, &cpuset); CPU_SET(myThread->coreID, &cpuset);
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
}
//Configure the thread scheduler policy for Linux //Configure the thread scheduler policy for Linux
struct sched_param sparam= {0}; struct sched_param sparam= {0};
sparam.sched_priority = sched_get_priority_max(SCHED_RR); sparam.sched_priority = sched_get_priority_max(SCHED_RR);
...@@ -112,22 +118,27 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) { ...@@ -112,22 +118,27 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
curptr=strtok_r(params,",",&saveptr); curptr=strtok_r(params,",",&saveptr);
while ( curptr!=NULL ) { while ( curptr!=NULL ) {
if (curptr[0] == 'u' || curptr[0] == 'U') { int c=toupper(curptr[0]);
switch (c) {
case 'U':
pool->restrictRNTI=true; pool->restrictRNTI=true;
} else if ( curptr[0]>='0' && curptr[0]<='9' ) { break;
struct one_thread *tmp=pool->allthreads;
case 'N':
pool->activated=false;
break;
default:
pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread)); pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread));
pool->allthreads->next=tmp; pool->allthreads->next=pool->allthreads;
printf("create a thread for core %d\n", atoi(curptr)); printf("create a thread for core %d\n", atoi(curptr));
pool->allthreads->coreID=atoi(curptr); pool->allthreads->coreID=atoi(curptr);
pool->allthreads->id=pool->nbThreads; pool->allthreads->id=pool->nbThreads;
pool->allthreads->pool=pool; pool->allthreads->pool=pool;
pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads); pthread_create(&pool->allthreads->threadID, NULL, one_thread, (void *)pool->allthreads);
pool->nbThreads++; pool->nbThreads++;
} else if (curptr[0] == 'n' || curptr[0] == 'N') { }
pool->activated=false;
} else
printf("Error in options for thread pool: %s\n",curptr);
curptr=strtok_r(NULL,",",&saveptr); curptr=strtok_r(NULL,",",&saveptr);
} }
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
#include <pthread.h> #include <pthread.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <assertions.h> #include <assertions.h>
#include <log.h> #include <LOG/log.h>
#ifdef DEBUG #ifdef DEBUG
#define THREADINIT PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP #define THREADINIT PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
...@@ -52,8 +52,7 @@ static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size, ...@@ -52,8 +52,7 @@ static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size,
notifiedFIFO_t *reponseFifo, notifiedFIFO_t *reponseFifo,
void (*processingFunc)(void *)) { void (*processingFunc)(void *)) {
notifiedFIFO_elt_t *ret; notifiedFIFO_elt_t *ret;
size_t sz=sizeof(notifiedFIFO_elt_t)+size; AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc(sizeof(notifiedFIFO_elt_t)+size+32)), "");
AssertFatal( NULL != (ret=(notifiedFIFO_elt_t *) malloc((sz/32+1)*32)), "");
ret->next=NULL; ret->next=NULL;
ret->key=key; ret->key=key;
ret->reponseFifo=reponseFifo; ret->reponseFifo=reponseFifo;
...@@ -69,8 +68,6 @@ static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) { ...@@ -69,8 +68,6 @@ static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt) {
} }
static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) { static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt) {
bool tmp=elt->malloced;
if (elt->malloced) { if (elt->malloced) {
elt->malloced=false; elt->malloced=false;
free(elt); free(elt);
...@@ -121,6 +118,7 @@ static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) { ...@@ -121,6 +118,7 @@ static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf) {
static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) { static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
int tmp=mutextrylock(nf->lockF); int tmp=mutextrylock(nf->lockF);
if (tmp != 0 ) if (tmp != 0 )
return NULL; return NULL;
...@@ -194,13 +192,14 @@ static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_ ...@@ -194,13 +192,14 @@ static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_
msg->returnTime=rdtsc(); msg->returnTime=rdtsc();
if (t->traceFd) if (t->traceFd)
(void)write(t->traceFd, msg, sizeof(*msg)); if(write(t->traceFd, msg, sizeof(*msg)));
return msg; return msg;
} }
static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) { static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
notifiedFIFO_elt_t *msg= pullNotifiedFIFO(responseFifo); notifiedFIFO_elt_t *msg= pollNotifiedFIFO(responseFifo);
if (msg == NULL) if (msg == NULL)
return NULL; return NULL;
...@@ -208,7 +207,7 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo ...@@ -208,7 +207,7 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo
msg->returnTime=rdtsc(); msg->returnTime=rdtsc();
if (t->traceFd) if (t->traceFd)
(void)write(t->traceFd, msg, sizeof(*msg)); if(write(t->traceFd, msg, sizeof(*msg)));
return msg; return msg;
} }
...@@ -240,5 +239,6 @@ static inline void abortTpool(tpool_t *t, uint64_t key) { ...@@ -240,5 +239,6 @@ static inline void abortTpool(tpool_t *t, uint64_t key) {
mutexunlock(nf->lockF); mutexunlock(nf->lockF);
} }
void initTpool(char *params,tpool_t *pool, bool performanceMeas);
#endif #endif
...@@ -11,7 +11,7 @@ All the thread pool functions are thread safe, nevertheless the working function ...@@ -11,7 +11,7 @@ All the thread pool functions are thread safe, nevertheless the working function
## license ## license
Author: Laurent Thomas, Open cells project Author: Laurent Thomas, Open cells project
The owner share the code usage to Openairsoftware alliance as per OSA license terms The owner share this piece code to Openairsoftware alliance as per OSA license terms
# jobs # jobs
...@@ -41,6 +41,8 @@ abort_notifiedFIFO() allows the customer to delete all waiting jobs that match w ...@@ -41,6 +41,8 @@ abort_notifiedFIFO() allows the customer to delete all waiting jobs that match w
The clients can create one or more thread pools with init_tpool() The clients can create one or more thread pools with init_tpool()
the params string structure: describes a list of cores, separated by "," that run a worker thread the params string structure: describes a list of cores, separated by "," that run a worker thread
If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core).
The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>" The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>"
## adding jobs ## adding jobs
...@@ -66,4 +68,4 @@ A performance measurement is integrated: the pool will automacillay fill timesta ...@@ -66,4 +68,4 @@ A performance measurement is integrated: the pool will automacillay fill timesta
if you set the environement variable: thread-pool-measurements to a valid file name if you set the environement variable: thread-pool-measurements to a valid file name
These measurements will be wrote to this Linux pipe. These measurements will be wrote to this Linux pipe.
A tool to read the linux fifo and display it in ascii will be provided (TBD) A tool to read the linux fifo and display it in ascii is provided: see the local directory Makefile for this tool and to compile the thread pool unitary tests.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment