thread-pool.md 7.89 KB
Newer Older
laurent's avatar
laurent committed
1 2 3 4 5
# Thread pool

The thread pool is a working server, made of a set of worker threads that can be mapped on CPU cores.

Each worker loop on pick from the same input queue jobs to do.
6

laurent's avatar
laurent committed
7 8
When a job is done, the worker sends a return if a return is defined.

Laurent's avatar
Laurent committed
9
A selective abort allows to cancel parallel jobs (usage: a client pushed jobs, but from a response of one job, the other linked jobs becomes useless).
laurent's avatar
laurent committed
10

Laurent's avatar
Laurent committed
11
All the thread pool functions are thread safe, nevertheless the working functions are implemented by the thread pool client, so the client has to tackle the parallel execution of his functions called "processingFunc" hereafter.
laurent's avatar
laurent committed
12 13

## license
14 15 16
    Author:
      Laurent Thomas, Open cells project
      The owner share this piece code to Openairsoftware alliance as per OSA license terms
laurent's avatar
laurent committed
17 18 19

# jobs

Laurent's avatar
Laurent committed
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
A job is a message (notifiedFIFO_elt_t):  

* next:
  internal FIFO chain, do not set it
* key:
  a long int that the client can use to identify a message or a group of messages
* ResponseFifo:
  if the client defines a response FIFO, the message will be posted back after processing
* processingFunc:
  any funtion (type void processingFunc(void *)) that the worker will launch
* msgData:
  the data passed to processingFunc. It can be added automatically, or you can set it to a buffer you are managing
* malloced:
  a boolean that enable internal free in these cases:
  no return Fifo or Abort feature

The job messages can be created with newNotifiedFIFO_elt() and delNotifiedFIFO_elt() or managed by the client.
laurent's avatar
laurent committed
37 38 39

# Queues of jobs

Laurent's avatar
Laurent committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
Queues are type of:

* notifiedFIFO_t that must be initialized by init_notifiedFIFO()
* No delete function is required, the creator has only to free the data of type notifiedFIFO_t
* push_notifiedFIFO() add a job in the queue
* pull_notifiedFIFO() is blocking, poll_notifiedFIFO() is non blocking
* abort_notifiedFIFO() allows the customer to delete all waiting jobs that match with the key (see key in jobs definition)

These queues details hereafter

## Common

newNotifiedFIFO_elt()

creates a message, that will later be used in queues/FIFO

delNotifiedFIFO_elt()

deletes it

NotifiedFifoData()

gives a pointer to the beginning of free usage memory in a message (you can put any data there, up to 'size' parameter you passed to newNotifiedFIFO_elt()

These 3 calls are not mandatory, you can also use your own memory to save the malloc()/free() that are behind these calls
iFirst level: non thread safe FIFO (or queues)

# low level: fast and simple

initNotifiedFIFO_nothreadSafe()

to create a queue

pushNotifiedFIFO_nothreadSafe()

Add a element in a queue

pullNotifiedFIFO_nothreadSafe()

pull a element from a queue

As these queues are not thread safe, there is NO blocking mechanism, neither pull() versus poll() calls

There is no delete for a message queue: you only have to abandon the memory you allocated to call iinitNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf)

So if you malloced the memory under 'nf' parameter you have to free it, if it is automatic variable (local variable) or global variable, nothing is to be done.


## thread safe queues

These queues are built on not thread safe queues when we need thread to thread protection

initNotifiedFIFO()

to create a queue

pushNotifiedFIFO()
laurent's avatar
laurent committed
97

Laurent's avatar
Laurent committed
98
Add a element in a queue
laurent's avatar
laurent committed
99

Laurent's avatar
Laurent committed
100 101 102 103 104 105 106 107 108 109 110
pullNotifiedFIFO()

pull a element from a queue, this call is blocking until a message arrived

pollNotifiedFIFO()

pull a element from a queue, this call is not blocking, so it returns always very shortly

in 99.9% cases, pull() is better than poll()

No delete() call, same principle as not thread safe queues
laurent's avatar
laurent committed
111 112 113 114

# Thread pools

## initialization
Laurent's avatar
Laurent committed
115 116 117
The clients can create one or more thread pools with init_tpool()

the params string structure: describes a list of cores, separated by "," that run a worker thread
laurent's avatar
laurent committed
118

Laurent's avatar
Laurent committed
119
If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core).
laurent's avatar
laurent committed
120

Laurent's avatar
Laurent committed
121
The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>"
laurent's avatar
laurent committed
122 123

## adding jobs
Laurent's avatar
Laurent committed
124
The client create their jobs messages as a notifiedFIFO_elt_t, then they push it with pushTpool() (that internally calls push_notifiedFIFO())
laurent's avatar
laurent committed
125

Laurent's avatar
Laurent committed
126
If they need a return, they have to create response queues with init_notifiedFIFO() and set this FIFO pointer in the notifiedFIFO_elt_t before pushing the job.
laurent's avatar
laurent committed
127 128 129

## abort

Laurent's avatar
Laurent committed
130
A abort service abortTpool() allows to abort all jobs that match a key (see jobs "key"). When the abort returns, it garanties no job (matching the key) response will be posted on response queues.
laurent's avatar
laurent committed
131

Laurent's avatar
Laurent committed
132
Nevertheless, jobs already performed before the return of abortTpool() are pushed in the response Fifo queue.
laurent's avatar
laurent committed
133

Laurent's avatar
Laurent committed
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
## API details
Each thread pool (there can be several in the same process) should be initialized

### initTpool(char *params,tpool_t *pool, bool performanceMeas) is to be called oncepool

the configuration parameter is a string, elements separated by ",":

* N: if a N is in the parameter list, no threads are created
    The purpose is to keep the same API in any case
* a CPU with a little number of cores,
    or in debugging sessions to simplify the human work
* a number that represent a valid CPU core on the target CPU
    A thread is created and stick on the core (with set affinity)
* a number that is not a valid CPU core
    a floating thread is created (Linux is responsible of the real time core allocation)

example: "-1,-1,-1"
as there is no core number -1, the thread pool is made of 3 floating threads
be careful with fix allocation: it is hard to be more clever than Linux kernel

pool is the memory you allocated before to store the thread pool internal state (same concept as above queues)

performanceMeas is a flag to enable measurements (well described in documentation)

### pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg)
laurent's avatar
laurent committed
159

Laurent's avatar
Laurent committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
adds a job to do in the thread pool

The msg data you can set are:

* key:  
    a value for you that you will find back in the response
    it is also the key for abortTpool()
* reponseFifo  
    if you set it, the message will be sent back on this queue when the job is done
    if you don't set it, no return is performed, the thread pool frees the message 'msg' when the job is done
* processingFunc  
    the function the job will run. the function prototype is void <func>(void *memory)
    the data part (the pointer returned by NotifiedFifoData(msg)) is passed to the function
    it is used to send data to the processing function and also to write back results
    of course, writing back results will lead you to use also a return queue (the parameter reponseFifo)

### pullTpool() collects job result in a return queue

you collect results in one result queue: the message you gave to pushTpool(), nevertheless it has been updated by processingFunc()

An example of multiple return queues, in eNB: I created one single thread pool (because it depends mainly on CPU hardware), but i use two return queues: one for turbo encode, one for turbo decode.

### tryPullTpool()

is the same, but not blocking (pollTpool() would have been a better name)

### abortTpool()

Is a facility to cancel work you pushed to a thread pool

I used it once: when eNB performs turbo decode, I push all segments in the thread pool.

But when I get back the decoding results, if one segment can't be decoded, I don't need the results of the other segments of the same UE.

## Performance measurements
195

Laurent's avatar
Laurent committed
196 197
A performance measurement is integrated:
the pool will automacillay fill timestamps:
laurent's avatar
laurent committed
198

Laurent's avatar
Laurent committed
199 200
* creationTime:
time the request is push to the pool;
201 202 203 204 205 206
* startProcessingTime:
time a worker start to run on the job
* endProcessingTime:
time the worker finished the job
* returnTime:
time the client reads the result
laurent's avatar
laurent committed
207

208 209
if you set the environement variable:
thread-pool-measurements to a valid file name
laurent's avatar
laurent committed
210 211
These measurements will be wrote to this Linux pipe.

212 213
A tool to read the linux fifo and display it in ascii is provided:
see the local directory Makefile for this tool and to compile the thread pool unitary tests.