The **thread pool** is a working server, made of a set of worker threads that can be mapped on CPU cores. Each thread pool has an **input queue** ("**FIFO**"), from which its workers pick **jobs** (FIFO element) to execute. When a job is done, the worker sends a message to an output queue, if it has been defined.
The **thread pool** is a working server, made of a set of worker threads that can be mapped on CPU cores. Each thread pool has an **input queue** ("**FIFO**"), from which its workers pick **jobs** (FIFO element) to execute. When a job is done, the worker sends a message to an output queue, if it has been defined.
A selective abort allows to cancel parallel jobs. This can be useful, e.g., if a client pushed jobs, but from a response of one job, the other linked jobs become useless.
All the thread pool functions are thread safe. The functions executed by worker threads are provided by the thread pool client, so the client has to handle the concurrency/parallel execution of his functions.
All the thread pool functions are thread safe. The functions executed by worker threads are provided by the thread pool client, so the client has to handle the concurrency/parallel execution of his functions.
## license
## license
...
@@ -56,7 +54,6 @@ Queues can be used to enqueue messages/jobs, of type `notifiedFIFO_t`.
...
@@ -56,7 +54,6 @@ Queues can be used to enqueue messages/jobs, of type `notifiedFIFO_t`.
*`pushNotifiedFIFO()`: Add a job to a queue
*`pushNotifiedFIFO()`: Add a job to a queue
*`pullNotifiedFIFO()`: Pull a job from a queue. This call is blocking until a job arrived.
*`pullNotifiedFIFO()`: Pull a job from a queue. This call is blocking until a job arrived.
*`pollNotifiedFIFO()`: Pull a job from a queue. This call is not blocking, so it returns always very shortly
*`pollNotifiedFIFO()`: Pull a job from a queue. This call is not blocking, so it returns always very shortly
*`abortNotifiedFIFOJob()`: Allows to delete all waiting jobs that match a key (see `key` in jobs definition)
*`abortNotifiedFIFO()`: Aborts a FIFO, such that it will always return `NULL`
*`abortNotifiedFIFO()`: Aborts a FIFO, such that it will always return `NULL`
Note that in 99.9% of cases, `pull()` is better than `poll()`.
Note that in 99.9% of cases, `pull()` is better than `poll()`.
...
@@ -104,12 +101,6 @@ Like `pullNotifiedFIFO()`, but non-blocking: they check if the queue `nf` contai
...
@@ -104,12 +101,6 @@ Like `pullNotifiedFIFO()`, but non-blocking: they check if the queue `nf` contai
Note that unlike for `pullNotifiedFIFO()`, returning `NULL` does not inform whether the queue has been aborted; the caller should manually check the `abortFIFO` flag of `nf` in this case.
Note that unlike for `pullNotifiedFIFO()`, returning `NULL` does not inform whether the queue has been aborted; the caller should manually check the `abortFIFO` flag of `nf` in this case.
Aborts all jobs in FIFO queue `nf` with key `key`. Jobs already under execution will be silently dropped and not put in the FIFO return queue, if any.
Returns the number of aborted jobs.
### `void abortNotifiedFIFO(notifiedFIFO_t *nf)`
### `void abortNotifiedFIFO(notifiedFIFO_t *nf)`
Aborts the entire FIFO queue `nf`: all jobs will be dropped, and the FIFO is marked as aborted, such that a call to `pullNotifiedFIFO()` returns `NULL`.
Aborts the entire FIFO queue `nf`: all jobs will be dropped, and the FIFO is marked as aborted, such that a call to `pullNotifiedFIFO()` returns `NULL`.
...
@@ -135,10 +126,6 @@ If they need a return value (e.g., result of a computation), they have to create
...
@@ -135,10 +126,6 @@ If they need a return value (e.g., result of a computation), they have to create
## Abort
## Abort
A abort service `abortTpoolJob()` allows to abort all jobs that match a key (see a job's `key`). When the abort returns, it garanties that no job (matching the key) response will be posted on response queues.
Nevertheless, jobs already performed before the return of `abortTpoolJob()` are pushed in the response Fifo queue.
`abortTpool()` kills all jobs in the Tpool, and terminates the pool.
`abortTpool()` kills all jobs in the Tpool, and terminates the pool.
## API details
## API details
...
@@ -171,7 +158,7 @@ Adds a job for processing in the thread pool.
...
@@ -171,7 +158,7 @@ Adds a job for processing in the thread pool.
The job data you can set are, inside `msg`:
The job data you can set are, inside `msg`:
*`key`: an arbitrary key to find a job in a response queue, and which can be used to abort jobs using `abortTpoolJob()`.
*`key`: an arbitrary key to find a job in a response queue.
*`reponseFifo`: if non-`NULL`, the message will be sent back on this queue when the job is done. If `NULL`, the thread pool automatically frees the job when it is done.
*`reponseFifo`: if non-`NULL`, the message will be sent back on this queue when the job is done. If `NULL`, the thread pool automatically frees the job when it is done.
*`processingFunc`: the function to execute for this job.
*`processingFunc`: the function to execute for this job.
...
@@ -187,14 +174,6 @@ Multiple return queues might be useful. Consider the following example in the eN
...
@@ -187,14 +174,6 @@ Multiple return queues might be useful. Consider the following example in the eN
The same as `pullTpool()` in a non-blocking fashion (an alternative name would have been `pollTpool()`).
The same as `pullTpool()` in a non-blocking fashion (an alternative name would have been `pollTpool()`).
### `int abortTpoolJob(tpool_t *t, uint64_t key)`
Is a facility to cancel work you pushed to a thread pool: every job with a given `key` will be deleted, and results of jobs with such `key` under execution will be dropped.
It returns the number of aborted jobs, including the ones that are currently being executed.
I used it once: when eNB performs turbo decode, I push all segments in the thread pool. But when I get back the decoding results, if one segment can't be decoded, I don't need the results of the other segments of the same UE.
### `int abortTpool(tpool_t *t)`
### `int abortTpool(tpool_t *t)`
Aborts the complete Tpool: cancel every work in the input queue, marks to drop existing jobs in processing, and terminates all worker threads. It is afterwards still possible to call functions such as `pushTpool()`, but each calling thread will execute the job itself.
Aborts the complete Tpool: cancel every work in the input queue, marks to drop existing jobs in processing, and terminates all worker threads. It is afterwards still possible to call functions such as `pushTpool()`, but each calling thread will execute the job itself.