Commit ee99ff9b authored by mir's avatar mir

_Generic added. Compile time Tpools possible

parent 315f90e2
......@@ -649,8 +649,11 @@ add_library(UTIL
${OPENAIR_DIR}/common/utils/LOG/vcd_signal_dumper.c
${OPENAIR2_DIR}/UTIL/MATH/oml.c
${OPENAIR2_DIR}/UTIL/OPT/probe.c
${OPENAIR_DIR}/common/utils/threadPool/thread-pool.c
${OPENAIR_DIR}/common/utils/thread_pool/task_manager.c
${OPENAIR_DIR}/common/utils/task_manager/task.c
${OPENAIR_DIR}/common/utils/task_manager/task_ans.c
${OPENAIR_DIR}/common/utils/task_manager/thread_pool/task_manager.c
${OPENAIR_DIR}/common/utils/task_manager/threadPool/thread-pool.c
${OPENAIR_DIR}/common/utils/task_manager/C-Thread-Pool/thpool.c
${OPENAIR_DIR}/common/utils/utils.c
${OPENAIR_DIR}/common/utils/system.c
${OPENAIR_DIR}/common/utils/time_meas.c
......@@ -1939,7 +1942,7 @@ add_executable(nfapi_test
)
add_executable(measurement_display
${OPENAIR_DIR}/common/utils/threadPool/measurement_display.c)
${OPENAIR_DIR}/common/utils/task_manager/threadPool/measurement_display.c)
target_link_libraries (measurement_display minimal_lib)
add_executable(test5Gnas
......
/* ********************************
* Author: Johan Hanssen Seferidis
* License: MIT
* Description: Library providing a threading pool where you can add
* work. For usage, check the thpool.h file or README.md
*
*//** @file thpool.h *//*
*
********************************/
#if defined(__APPLE__)
#include <AvailabilityMacros.h>
#else
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500
#endif
#endif
#include <assert.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#if defined(__linux__)
#include <sys/prctl.h>
#endif
#if defined(__FreeBSD__) || defined(__OpenBSD__)
#include <pthread_np.h>
#endif
#include "thpool.h"
#ifdef THPOOL_DEBUG
#define THPOOL_DEBUG 1
#else
#define THPOOL_DEBUG 0
#endif
#if !defined(DISABLE_PRINT) || defined(THPOOL_DEBUG)
#define err(str) fprintf(stderr, str)
#else
#define err(str)
#endif
#ifndef THPOOL_THREAD_NAME
#define THPOOL_THREAD_NAME thpool
#endif
#define STRINGIFY(x) #x
#define TOSTRING(x) STRINGIFY(x)
static volatile int threads_keepalive;
static volatile int threads_on_hold;
///// Wrapper Functions
void init_c_tp(threadpool* tp, int* core_id, size_t num_core_id)
{
assert(tp != NULL);
assert(num_core_id > 0);
*tp = thpool_init(num_core_id);
}
void free_c_tp(threadpool* tp, void (*clean)(task_t*) )
{
thpool_destroy(*tp);
}
void async_tp_task_manager(threadpool* tp, task_t t )
{
assert(tp != NULL);
int rc = thpool_add_work(*tp, t.func , t.args);
assert(rc == 0);
}
/* ========================== STRUCTURES ============================ */
/* Binary semaphore */
typedef struct bsem {
pthread_mutex_t mutex;
pthread_cond_t cond;
int v;
} bsem;
/* Job */
typedef struct job{
struct job* prev; /* pointer to previous job */
void (*function)(void* arg); /* function pointer */
void* arg; /* function's argument */
} job;
/* Job queue */
typedef struct jobqueue{
pthread_mutex_t rwmutex; /* used for queue r/w access */
job *front; /* pointer to front of queue */
job *rear; /* pointer to rear of queue */
bsem *has_jobs; /* flag as binary semaphore */
int len; /* number of jobs in queue */
} jobqueue;
/* Thread */
typedef struct thread{
int id; /* friendly id */
pthread_t pthread; /* pointer to actual thread */
struct thpool_* thpool_p; /* access to thpool */
} thread;
/* Threadpool */
typedef struct thpool_{
thread** threads; /* pointer to threads */
volatile int num_threads_alive; /* threads currently alive */
volatile int num_threads_working; /* threads currently working */
pthread_mutex_t thcount_lock; /* used for thread count etc */
pthread_cond_t threads_all_idle; /* signal to thpool_wait */
jobqueue jobqueue; /* job queue */
} thpool_;
/* ========================== PROTOTYPES ============================ */
static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
static void* thread_do(struct thread* thread_p);
static void thread_hold(int sig_id);
static void thread_destroy(struct thread* thread_p);
static int jobqueue_init(jobqueue* jobqueue_p);
static void jobqueue_clear(jobqueue* jobqueue_p);
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
static struct job* jobqueue_pull(jobqueue* jobqueue_p);
static void jobqueue_destroy(jobqueue* jobqueue_p);
static void bsem_init(struct bsem *bsem_p, int value);
static void bsem_reset(struct bsem *bsem_p);
static void bsem_post(struct bsem *bsem_p);
static void bsem_post_all(struct bsem *bsem_p);
static void bsem_wait(struct bsem *bsem_p);
/* ========================== THREADPOOL ============================ */
/* Initialise thread pool */
struct thpool_* thpool_init(int num_threads){
threads_on_hold = 0;
threads_keepalive = 1;
if (num_threads < 0){
num_threads = 0;
}
/* Make new thread pool */
thpool_* thpool_p;
thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
if (thpool_p == NULL){
err("thpool_init(): Could not allocate memory for thread pool\n");
return NULL;
}
thpool_p->num_threads_alive = 0;
thpool_p->num_threads_working = 0;
/* Initialise the job queue */
if (jobqueue_init(&thpool_p->jobqueue) == -1){
err("thpool_init(): Could not allocate memory for job queue\n");
free(thpool_p);
return NULL;
}
/* Make threads in pool */
thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread *));
if (thpool_p->threads == NULL){
err("thpool_init(): Could not allocate memory for threads\n");
jobqueue_destroy(&thpool_p->jobqueue);
free(thpool_p);
return NULL;
}
pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
pthread_cond_init(&thpool_p->threads_all_idle, NULL);
/* Thread init */
int n;
for (n=0; n<num_threads; n++){
thread_init(thpool_p, &thpool_p->threads[n], n);
#if THPOOL_DEBUG
printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
#endif
}
/* Wait for threads to initialize */
while (thpool_p->num_threads_alive != num_threads) {}
return thpool_p;
}
/* Add work to the thread pool */
int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p){
job* newjob;
newjob=(struct job*)malloc(sizeof(struct job));
if (newjob==NULL){
err("thpool_add_work(): Could not allocate memory for new job\n");
return -1;
}
/* add function and argument */
newjob->function=function_p;
newjob->arg=arg_p;
/* add job to queue */
jobqueue_push(&thpool_p->jobqueue, newjob);
return 0;
}
/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
pthread_mutex_lock(&thpool_p->thcount_lock);
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
}
/* Destroy the threadpool */
void thpool_destroy(thpool_* thpool_p){
/* No need to destroy if it's NULL */
if (thpool_p == NULL) return ;
volatile int threads_total = thpool_p->num_threads_alive;
/* End each thread 's infinite loop */
threads_keepalive = 0;
/* Give one second to kill idle threads */
double TIMEOUT = 1.0;
time_t start, end;
double tpassed = 0.0;
time (&start);
while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue.has_jobs);
time (&end);
tpassed = difftime(end,start);
}
/* Poll remaining threads */
while (thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue.has_jobs);
sleep(1);
}
/* Job queue cleanup */
jobqueue_destroy(&thpool_p->jobqueue);
/* Deallocs */
int n;
for (n=0; n < threads_total; n++){
thread_destroy(thpool_p->threads[n]);
}
free(thpool_p->threads);
free(thpool_p);
}
/* Pause all threads in threadpool */
void thpool_pause(thpool_* thpool_p) {
int n;
for (n=0; n < thpool_p->num_threads_alive; n++){
pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
}
}
/* Resume all threads in threadpool */
void thpool_resume(thpool_* thpool_p) {
// resuming a single threadpool hasn't been
// implemented yet, meanwhile this suppresses
// the warnings
(void)thpool_p;
threads_on_hold = 0;
}
int thpool_num_threads_working(thpool_* thpool_p){
return thpool_p->num_threads_working;
}
/* ============================ THREAD ============================== */
/* Initialize a thread in the thread pool
*
* @param thread address to the pointer of the thread to be created
* @param id id to be given to the thread
* @return 0 on success, -1 otherwise.
*/
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
*thread_p = (struct thread*)malloc(sizeof(struct thread));
if (*thread_p == NULL){
err("thread_init(): Could not allocate memory for thread\n");
return -1;
}
(*thread_p)->thpool_p = thpool_p;
(*thread_p)->id = id;
pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p));
pthread_detach((*thread_p)->pthread);
return 0;
}
/* Sets the calling thread on hold */
static void thread_hold(int sig_id) {
(void)sig_id;
threads_on_hold = 1;
while (threads_on_hold){
sleep(1);
}
}
/* What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interrupted is once
* thpool_destroy() is invoked or the program exits.
*
* @param thread thread that will run this function
* @return nothing
*/
static void* thread_do(struct thread* thread_p){
/* Set thread name for profiling and debugging */
char thread_name[16] = {0};
snprintf(thread_name, 16, TOSTRING(THPOOL_THREAD_NAME) "-%d", thread_p->id);
#if defined(__linux__)
/* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
prctl(PR_SET_NAME, thread_name);
#elif defined(__APPLE__) && defined(__MACH__)
pthread_setname_np(thread_name);
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(thread_p->pthread, thread_name);
#else
err("thread_do(): pthread_setname_np is not supported on this system");
#endif
/* Assure all threads have been created before starting serving */
thpool_* thpool_p = thread_p->thpool_p;
/* Register signal handler */
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_ONSTACK;
act.sa_handler = thread_hold;
if (sigaction(SIGUSR1, &act, NULL) == -1) {
err("thread_do(): cannot handle SIGUSR1");
}
/* Mark thread as alive (initialized) */
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_alive += 1;
pthread_mutex_unlock(&thpool_p->thcount_lock);
while(threads_keepalive){
bsem_wait(thpool_p->jobqueue.has_jobs);
if (threads_keepalive){
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working++;
pthread_mutex_unlock(&thpool_p->thcount_lock);
/* Read job from queue and execute it */
void (*func_buff)(void*);
void* arg_buff;
job* job_p = jobqueue_pull(&thpool_p->jobqueue);
if (job_p) {
func_buff = job_p->function;
arg_buff = job_p->arg;
func_buff(arg_buff);
free(job_p);
}
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working--;
if (!thpool_p->num_threads_working) {
pthread_cond_signal(&thpool_p->threads_all_idle);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
}
}
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_alive --;
pthread_mutex_unlock(&thpool_p->thcount_lock);
return NULL;
}
/* Frees a thread */
static void thread_destroy (thread* thread_p){
free(thread_p);
}
/* ============================ JOB QUEUE =========================== */
/* Initialize queue */
static int jobqueue_init(jobqueue* jobqueue_p){
jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (jobqueue_p->has_jobs == NULL){
return -1;
}
pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
bsem_init(jobqueue_p->has_jobs, 0);
return 0;
}
/* Clear the queue */
static void jobqueue_clear(jobqueue* jobqueue_p){
while(jobqueue_p->len){
free(jobqueue_pull(jobqueue_p));
}
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
bsem_reset(jobqueue_p->has_jobs);
jobqueue_p->len = 0;
}
/* Add (allocated) job to queue
*/
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
pthread_mutex_lock(&jobqueue_p->rwmutex);
newjob->prev = NULL;
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
jobqueue_p->front = newjob;
jobqueue_p->rear = newjob;
break;
default: /* if jobs in queue */
jobqueue_p->rear->prev = newjob;
jobqueue_p->rear = newjob;
}
jobqueue_p->len++;
bsem_post(jobqueue_p->has_jobs);
pthread_mutex_unlock(&jobqueue_p->rwmutex);
}
/* Get first job from queue(removes it from queue)
* Notice: Caller MUST hold a mutex
*/
static struct job* jobqueue_pull(jobqueue* jobqueue_p){
pthread_mutex_lock(&jobqueue_p->rwmutex);
job* job_p = jobqueue_p->front;
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
break;
case 1: /* if one job in queue */
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->len = 0;
break;
default: /* if >1 jobs in queue */
jobqueue_p->front = job_p->prev;
jobqueue_p->len--;
/* more than one job in queue -> post it */
bsem_post(jobqueue_p->has_jobs);
}
pthread_mutex_unlock(&jobqueue_p->rwmutex);
return job_p;
}
/* Free all queue resources back to the system */
static void jobqueue_destroy(jobqueue* jobqueue_p){
jobqueue_clear(jobqueue_p);
free(jobqueue_p->has_jobs);
}
/* ======================== SYNCHRONISATION ========================= */
/* Init semaphore to 1 or 0 */
static void bsem_init(bsem *bsem_p, int value) {
if (value < 0 || value > 1) {
err("bsem_init(): Binary semaphore can take only values 1 or 0");
exit(1);
}
pthread_mutex_init(&(bsem_p->mutex), NULL);
pthread_cond_init(&(bsem_p->cond), NULL);
bsem_p->v = value;
}
/* Reset semaphore to 0 */
static void bsem_reset(bsem *bsem_p) {
pthread_mutex_destroy(&(bsem_p->mutex));
pthread_cond_destroy(&(bsem_p->cond));
bsem_init(bsem_p, 0);
}
/* Post to at least one thread */
static void bsem_post(bsem *bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
bsem_p->v = 1;
pthread_cond_signal(&bsem_p->cond);
pthread_mutex_unlock(&bsem_p->mutex);
}
/* Post to all threads */
static void bsem_post_all(bsem *bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
bsem_p->v = 1;
pthread_cond_broadcast(&bsem_p->cond);
pthread_mutex_unlock(&bsem_p->mutex);
}
/* Wait on semaphore until semaphore has value 0 */
static void bsem_wait(bsem* bsem_p) {
pthread_mutex_lock(&bsem_p->mutex);
while (bsem_p->v != 1) {
pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
}
bsem_p->v = 0;
pthread_mutex_unlock(&bsem_p->mutex);
}
/**********************************
* @author Johan Hanssen Seferidis
* License: MIT
*
**********************************/
#ifndef _THPOOL_
#define _THPOOL_
#ifdef __cplusplus
extern "C" {
#endif
#include <stddef.h>
#include "../task.h"
/* =================================== API ======================================= */
typedef struct thpool_* threadpool;
/// Wrapper functions
void init_c_tp(threadpool * tp, int* core_id, size_t num_core_id);
void free_c_tp(threadpool* tp, void (*clean)(task_t*) );
void async_tp_task_manager(threadpool* tp, task_t t );
/**
* @brief Initialize threadpool
*
* Initializes a threadpool. This function will not return until all
* threads have initialized successfully.
*
* @example
*
* ..
* threadpool thpool; //First we declare a threadpool
* thpool = thpool_init(4); //then we initialize it to 4 threads
* ..
*
* @param num_threads number of threads to be created in the threadpool
* @return threadpool created threadpool on success,
* NULL on error
*/
threadpool thpool_init(int num_threads);
/**
* @brief Add work to the job queue
*
* Takes an action and its argument and adds it to the threadpool's job queue.
* If you want to add to work a function with more than one arguments then
* a way to implement this is by passing a pointer to a structure.
*
* NOTICE: You have to cast both the function and argument to not get warnings.
*
* @example
*
* void print_num(int num){
* printf("%d\n", num);
* }
*
* int main() {
* ..
* int a = 10;
* thpool_add_work(thpool, (void*)print_num, (void*)a);
* ..
* }
*
* @param threadpool threadpool to which the work will be added
* @param function_p pointer to function to add as work
* @param arg_p pointer to an argument
* @return 0 on success, -1 otherwise.
*/
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
/**
* @brief Wait for all queued jobs to finish
*
* Will wait for all jobs - both queued and currently running to finish.
* Once the queue is empty and all work has completed, the calling thread
* (probably the main program) will continue.
*
* Smart polling is used in wait. The polling is initially 0 - meaning that
* there is virtually no polling at all. If after 1 seconds the threads
* haven't finished, the polling interval starts growing exponentially
* until it reaches max_secs seconds. Then it jumps down to a maximum polling
* interval assuming that heavy processing is being used in the threadpool.
*
* @example
*
* ..
* threadpool thpool = thpool_init(4);
* ..
* // Add a bunch of work
* ..
* thpool_wait(thpool);
* puts("All added work has finished");
* ..
*
* @param threadpool the threadpool to wait for
* @return nothing
*/
void thpool_wait(threadpool);
/**
* @brief Pauses all threads immediately
*
* The threads will be paused no matter if they are idle or working.
* The threads return to their previous states once thpool_resume
* is called.
*
* While the thread is being paused, new work can be added.
*
* @example
*
* threadpool thpool = thpool_init(4);
* thpool_pause(thpool);
* ..
* // Add a bunch of work
* ..
* thpool_resume(thpool); // Let the threads start their magic
*
* @param threadpool the threadpool where the threads should be paused
* @return nothing
*/
void thpool_pause(threadpool);
/**
* @brief Unpauses all threads if they are paused
*
* @example
* ..
* thpool_pause(thpool);
* sleep(10); // Delay execution 10 seconds
* thpool_resume(thpool);
* ..
*
* @param threadpool the threadpool where the threads should be unpaused
* @return nothing
*/
void thpool_resume(threadpool);
/**
* @brief Destroy the threadpool
*
* This will wait for the currently active threads to finish and then 'kill'
* the whole threadpool to free up memory.
*
* @example
* int main() {
* threadpool thpool1 = thpool_init(2);
* threadpool thpool2 = thpool_init(2);
* ..
* thpool_destroy(thpool1);
* ..
* return 0;
* }
*
* @param threadpool the threadpool to destroy
* @return nothing
*/
void thpool_destroy(threadpool);
/**
* @brief Show currently working threads
*
* Working threads are the threads that are performing work (not idle).
*
* @example
* int main() {
* threadpool thpool1 = thpool_init(2);
* threadpool thpool2 = thpool_init(2);
* ..
* printf("Working threads: %d\n", thpool_num_threads_working(thpool1));
* ..
* return 0;
* }
*
* @param threadpool the threadpool of interest
* @return integer number of threads working
*/
int thpool_num_threads_working(threadpool);
#ifdef __cplusplus
}
#endif
#endif
#include "task.h"
#include <assert.h>
#include <ctype.h>
#include <limits.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/sysinfo.h>
#include <time.h>
// Compatibility with previous TPool
void parse_num_threads(char const* params, span_core_id_t* out)
{
assert(params != NULL);
int const logical_cores = get_nprocs_conf();
assert(logical_cores > 0);
char *saveptr = NULL;
char* params_cpy = strdup(params);
char* curptr = strtok_r(params_cpy, ",", &saveptr);
while (curptr != NULL) {
int const c = toupper(curptr[0]);
switch (c) {
case 'N': {
// pool->activated=false;
free(params_cpy);
out->core_id[out->sz++] = -1;
return;
break;
}
default: {
assert(out->sz != out->cap && "Capacity limit passed!");
int const core_id = atoi(curptr);
assert((core_id == -1 || core_id < logical_cores) && "Invalid core ID passed");
out->core_id[out->sz++] = core_id;
}
}
curptr = strtok_r(NULL, ",", &saveptr);
}
free(params_cpy);
}
#ifndef TASK_WORK_STEALING_THREAD_POOL_H
#define TASK_WORK_STEALING_THREAD_POOL_H
//#ifdef __cplusplus
//extern "C" {
//#endif
typedef struct{
void* args;
void (*func)(void* args);
} task_t;
// Compatibility with previous TPool
typedef struct {
int* core_id;
int sz;
int const cap;
} span_core_id_t ;
void parse_num_threads(char const* params, span_core_id_t* out);
//#ifdef __cplusplus
//}
//#endif
#endif
#include "task_ans.h"
#include <assert.h>
#include <limits.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
void completed_task_ans(task_ans_t* task)
{
assert(task != NULL);
int const task_not_completed = 0;
assert(atomic_load_explicit(&task->status, memory_order_acquire) == task_not_completed && "Task already finished?");
atomic_store_explicit(&task->status, 1, memory_order_release);
}
void join_task_ans(task_ans_t* arr, size_t len)
{
assert(len < INT_MAX);
assert(arr != NULL);
// We are believing Fedor
const struct timespec ns = {0,1};
uint64_t i = 0;
int j = len -1;
for(; j != -1 ; i++){
for(; j != -1; --j){
int const task_completed = 1;
if(atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
break;
//if(atomic_load_explicit(&arr[j].status, memory_order_seq_cst) != task_completed)
}
if(i % 8 == 0){
nanosleep(&ns, NULL);
}
//sched_yield();
// pause_or_yield();
}
}
#ifndef TASK_MANAGER_WORKING_STEALING_H
#define TASK_MANAGER_WORKING_STEALING_H
#ifndef TASK_ANSWER_THREAD_POOL_H
#define TASK_ANSWER_THREAD_POOL_H
#include "task.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifndef __cplusplus
#include <stdalign.h>
......@@ -12,8 +14,7 @@
#define _Alignas(X) alignas(X)
#endif
#include <pthread.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#if defined (__i386__) || defined(__x86_64__)
......@@ -37,37 +38,8 @@ void join_task_ans(task_ans_t* arr, size_t len);
void completed_task_ans(task_ans_t* task);
typedef struct{
uint8_t* buf;
size_t len;
task_ans_t* ans;
} thread_info_tm_t;
typedef struct{
pthread_t* t_arr;
size_t len_thr;
_Atomic(uint64_t) index;
void* q_arr;
_Atomic(uint64_t) num_task;
pthread_barrier_t barrier;
} task_manager_t;
void init_task_manager(task_manager_t* man, size_t num_threads);
void free_task_manager(task_manager_t* man, void (*clean)(task_t* args) );
void async_task_manager(task_manager_t* man, task_t t);
// Compatibility with previous TPool
int parse_num_threads(char const* params);
#ifdef __cplusplus
}
#endif
#endif
#ifndef TASK_MANAGER_GENERIC_H
#define TASK_MANAGER_GENERIC_H
#include "task.h"
#include "thread_pool/task_manager.h"
#include "threadPool/thread-pool.h"
#include "C-Thread-Pool/thpool.h"
// Uncomment one task_manager_t to compile the
// desired thread pool
// Work stealing thread pool
#define task_manager_t ws_task_manager_t
// Previous single queue OAI thread pool
//#define task_manager_t tpool_t
// Most rated C thread pool in github
//#define task_manager_t threadpool
#define init_task_manager(T, P, N) _Generic ((T), \
ws_task_manager_t*: init_ws_task_manager, \
tpool_t*: init_sq_task_manager,\
threadpool* : init_c_tp, \
default: init_ws_task_manager) (T, P, N)
#define free_task_manager(T, F) _Generic ((T), \
ws_task_manager_t*: free_ws_task_manager, \
tpool_t*: free_sq_task_manager,\
threadpool* : free_c_tp, \
default: free_ws_task_manager) (T, F)
#define async_task_manager(T, TASK) _Generic ((T), \
ws_task_manager_t*: async_ws_task_manager, \
tpool_t*: async_sq_task_manager,\
threadpool* : async_tp_task_manager, \
default: async_ws_task_manager) (T, TASK)
#endif
......@@ -23,6 +23,7 @@
#define _GNU_SOURCE
#include <assert.h>
#include <sched.h>
#include <sys/types.h>
#include <sys/stat.h>
......@@ -31,7 +32,7 @@
#include <unistd.h>
#include <ctype.h>
#include <sys/sysinfo.h>
#include <threadPool/thread-pool.h>
#include <task_manager/threadPool/thread-pool.h>
void displayList(notifiedFIFO_t *nf) {
int n=0;
......@@ -83,7 +84,10 @@ void *one_thread(void *arg) {
if (tp->measurePerf) elt->startProcessingTime=rdtsc_oai();
elt->processingFunc(NotifiedFifoData(elt));
//elt->processingFunc(NotifiedFifoData(elt));
elt->processingFunc(elt->processingArg);
if (tp->measurePerf) elt->endProcessingTime=rdtsc_oai();
......@@ -178,6 +182,50 @@ void initFloatingCoresTpool(int nbThreads,tpool_t *pool, bool performanceMeas, c
initNamedTpool(threads, pool, performanceMeas, name);
}
void init_sq_task_manager(tpool_t *pool, int* lst, size_t num_threads)
{
assert(pool != NULL);
assert(lst != NULL);
assert(num_threads > 0);
char str[1024] = {0};
int it = 0;
for(int i = 0; i < num_threads - 1; ++i){
it += sprintf(&str[it], "%d,", lst[i]);
assert(it < 1024);
}
it += sprintf(&str[it], "%d", lst[num_threads - 1]);
assert(it < 1024);
bool performanceMeas = false;
char name[] = "single_queue_thread_pool";
initNamedTpool(str, pool, performanceMeas, name);
}
void async_sq_task_manager(tpool_t* pool, task_t t)
{
assert(pool != NULL);
int size = sizeof(void*);
uint64_t const key = 1021;
notifiedFIFO_t* responseFifo = NULL;
void (*processingFunc)(void *) = t.func;
notifiedFIFO_elt_t *elm = newNotifiedFIFO_elt(size, key, responseFifo, processingFunc);
elm->processingArg = t.args;
pushTpool(pool, elm);
}
void free_sq_task_manager(tpool_t* pool, void (*clean)(task_t*))
{
assert(pool != NULL);
abortTpool(pool);
}
#ifdef TEST_THREAD_POOL
int oai_exit = 0;
......@@ -294,4 +342,5 @@ int main() {
*/
return 0;
}
#endif
......@@ -24,6 +24,9 @@
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include "../task.h"
#include <stdbool.h>
#include <stdint.h>
#include <malloc.h>
......@@ -31,7 +34,7 @@
#include <pthread.h>
#include <unistd.h>
#include <sys/syscall.h>
#include "assertions.h"
#include "common/utils/assertions.h"
#include "common/utils/time_meas.h"
#include "common/utils/system.h"
......@@ -61,6 +64,7 @@ typedef struct notifiedFIFO_elt_s {
uint64_t key; //To filter out elements
struct notifiedFIFO_s *reponseFifo;
void (*processingFunc)(void *);
void* processingArg;
bool malloced;
oai_cputime_t creationTime;
oai_cputime_t startProcessingTime;
......@@ -341,6 +345,12 @@ static inline int abortTpool(tpool_t *t) {
return nbRemoved;
}
void init_sq_task_manager(tpool_t *pool, int* lst, size_t num_threads);
void async_sq_task_manager(tpool_t* pool, task_t t);
void free_sq_task_manager(tpool_t* pool, void (*clean)(task_t*));
void initNamedTpool(char *params,tpool_t *pool, bool performanceMeas, char *name);
void initFloatingCoresTpool(int nbThreads,tpool_t *pool, bool performanceMeas, char *name);
#define initTpool(PARAMPTR,TPOOLPTR, MEASURFLAG) initNamedTpool(PARAMPTR,TPOOLPTR, MEASURFLAG, NULL)
......
......@@ -31,7 +31,6 @@
static_assert(0!=0, "Unknown CPU architecture");
#endif
/*
static
int64_t time_now_us(void)
......@@ -48,6 +47,7 @@ int64_t time_now_us(void)
}
return micros;
}
*/
static
void pin_thread_to_core(int core_num)
......@@ -59,7 +59,6 @@ void pin_thread_to_core(int core_num)
assert(ret != -1);
printf("Pining into core %d id %ld \n", core_num, pthread_self());
}
*/
//////////////////////////////
//////////////////////////////
......@@ -202,6 +201,7 @@ typedef struct {
pthread_cond_t cv;
seq_ring_task_t r;
size_t t_id;
size_t idx; // for debugginf
// _Atomic int32_t* futex;
//_Atomic bool* waiting;
_Atomic int done;
......@@ -214,11 +214,13 @@ typedef struct{
static
void init_not_q(not_q_t* q, size_t t_id /*, _Atomic int32_t* futex , _Atomic bool* waiting */)
void init_not_q(not_q_t* q, size_t idx, size_t t_id /*, _Atomic int32_t* futex , _Atomic bool* waiting */)
{
assert(q != NULL);
assert(t_id != 0 && "Invalid thread id");
q->idx = idx;
q->done = 0;
//q->waiting = waiting;
init_seq_ring_task(&q->r);
......@@ -352,6 +354,8 @@ bool pop_not_q(not_q_t* q, ret_try_t* out)
pthread_cond_wait(&q->cv , &q->mtx);
}
//printf("Waking idx %ld %ld \n", q->idx, time_now_us());
assert(q->done == 0 || q->done ==1);
if(q->done == 1){
int rc = pthread_mutex_unlock(&q->mtx);
......@@ -400,8 +404,9 @@ void done_not_q(not_q_t* q)
//static int marker_fd;
typedef struct{
task_manager_t* man;
ws_task_manager_t* man;
int idx;
int core_id;
} task_thread_args_t;
......@@ -420,19 +425,20 @@ void* worker_thread(void* arg)
task_thread_args_t* args = (task_thread_args_t*)arg;
int const idx = args->idx;
//int const log_cores = get_nprocs_conf();
//assert(log_cores > 0);
// Assuming: 2 x Physical cores = Logical cores
//pin_thread_to_core(idx+log_cores/2);
task_manager_t* man = args->man;
ws_task_manager_t* man = args->man;
uint32_t const len = man->len_thr;
uint32_t const num_it = 2*(man->len_thr + idx);
not_q_t* q_arr = (not_q_t*)man->q_arr;
init_not_q(&q_arr[idx], pthread_self() );
init_not_q(&q_arr[idx], idx, pthread_self() );
int const logical_cores = get_nprocs_conf();
assert(logical_cores > 0);
assert(args->core_id > -2 && args->core_id < logical_cores);
if(args->core_id != -1)
pin_thread_to_core(args->core_id);
// Synchronize all threads
pthread_barrier_wait(&man->barrier);
......@@ -468,7 +474,7 @@ void* worker_thread(void* arg)
return NULL;
}
void init_task_manager(task_manager_t* man, size_t num_threads)
void init_ws_task_manager(ws_task_manager_t* man, int* core_id, size_t num_threads)
{
assert(man != NULL);
assert(num_threads > 0 && num_threads < 33 && "Do you have zero or more than 32 processors??");
......@@ -482,27 +488,28 @@ void init_task_manager(task_manager_t* man, size_t num_threads)
man->index = 0;
const pthread_barrierattr_t * barrier_attr = NULL;
int rc = pthread_barrier_init(&man->barrier, barrier_attr, num_threads + 1);
assert(rc == 0);
for(size_t i = 0; i < num_threads; ++i){
task_thread_args_t* args = malloc(sizeof(task_thread_args_t) );
assert(args != NULL && "Memory exhausted");
args->idx = i;
args->man = man;
args->core_id = core_id[i];
pthread_attr_t attr = {0};
int ret=pthread_attr_init(&attr);
int ret = pthread_attr_init(&attr);
assert(ret == 0);
ret=pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
ret = pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);
assert(ret == 0);
ret=pthread_attr_setschedpolicy(&attr, SCHED_RR);
ret = pthread_attr_setschedpolicy(&attr, SCHED_RR);
assert(ret == 0);
struct sched_param sparam={0};
sparam.sched_priority = 94;
ret=pthread_attr_setschedparam(&attr, &sparam);
struct sched_param sparam = {0};
sparam.sched_priority = 99;
ret = pthread_attr_setschedparam(&attr, &sparam);
int rc = pthread_create(&man->t_arr[i], &attr, worker_thread, args);
if(rc != 0){
......@@ -522,7 +529,7 @@ void init_task_manager(task_manager_t* man, size_t num_threads)
//pin_thread_to_core(3);
}
void free_task_manager(task_manager_t* man, void (*clean)(task_t*))
void free_ws_task_manager(ws_task_manager_t* man, void (*clean)(task_t*))
{
not_q_t* q_arr = (not_q_t*)man->q_arr;
//atomic_store(&man->waiting, false);
......@@ -547,7 +554,7 @@ void free_task_manager(task_manager_t* man, void (*clean)(task_t*))
}
void async_task_manager(task_manager_t* man, task_t t)
void async_ws_task_manager(ws_task_manager_t* man, task_t t)
{
assert(man != NULL);
assert(man->len_thr > 0);
......@@ -563,6 +570,9 @@ void async_task_manager(task_manager_t* man, task_t t)
for(size_t i = 0; i < len_thr ; ++i){
if(try_push_not_q(&q_arr[(i+index) % len_thr], t)){
man->num_task +=1;
//printf("Pushing idx %ld %ld \n",(i+index) % len_thr, time_now_us());
// Debbugging purposes
//cnt_in++;
//printf(" async_task_manager t_id %ld Tasks in %d %ld num_task %ld idx %ld \n", pthread_self(), cnt_in, time_now_us(), man->num_task, (i+index) % len_thr );
......@@ -572,6 +582,8 @@ void async_task_manager(task_manager_t* man, task_t t)
push_not_q(&q_arr[index%len_thr], t);
//printf("Pushing idx %ld %ld \n", index % len_thr, time_now_us());
man->num_task +=1;
// Debbugging purposes
......@@ -579,77 +591,5 @@ void async_task_manager(task_manager_t* man, task_t t)
//printf("t_id %ld Tasks in %d %ld num_takss %ld idx %ld \n", pthread_self(), cnt_in, time_now_us(), man->num_task , index % len_thr );
}
void completed_task_ans(task_ans_t* task)
{
assert(task != NULL);
int const task_not_completed = 0;
assert(atomic_load_explicit(&task->status, memory_order_acquire) == task_not_completed && "Task already finished?");
//atomic_store_explicit(&task->status, 1, memory_order_release);
atomic_store_explicit(&task->status, 1, memory_order_seq_cst);
}
// This function does not belong here logically
//
void join_task_ans(task_ans_t* arr, size_t len)
{
assert(len < INT_MAX);
assert(arr != NULL);
// We are believing Fedor
const struct timespec ns = {0,1};
uint64_t i = 0;
int j = len -1;
for(; j != -1 ; i++){
for(; j != -1; --j){
int const task_completed = 1;
//if(atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
if(atomic_load_explicit(&arr[j].status, memory_order_seq_cst) != task_completed)
break;
}
if(i % 8 == 0){
nanosleep(&ns, NULL);
}
//sched_yield();
// pause_or_yield();
}
}
// Compatibility with previous TPool
int parse_num_threads(char const* params)
{
assert(params != NULL);
char *saveptr = NULL;
char* params_cpy = strdup(params);
char* curptr = strtok_r(params_cpy, ",", &saveptr);
int nbThreads = 0;
while (curptr != NULL) {
int const c = toupper(curptr[0]);
switch (c) {
case 'N': {
// pool->activated=false;
free(params_cpy);
return 1;
break;
}
default: {
int const core_id = atoi(curptr);
printf("[MIR]: Ask to create a thread for core %d ignoring request\n", core_id);
nbThreads++;
}
}
curptr = strtok_r(NULL, ",", &saveptr);
}
free(params_cpy);
return nbThreads;
}
#undef pause_or_yield
#ifndef TASK_MANAGER_WORKING_STEALING_H
#define TASK_MANAGER_WORKING_STEALING_H
#include "../task.h"
#include "../task_ans.h"
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
typedef struct{
uint8_t* buf;
size_t len;
task_ans_t* ans;
} thread_info_tm_t;
typedef struct{
pthread_t* t_arr;
size_t len_thr;
_Atomic(uint64_t) index;
void* q_arr;
_Atomic(uint64_t) num_task;
pthread_barrier_t barrier;
} ws_task_manager_t;
void init_ws_task_manager(ws_task_manager_t* man, int* core_id, size_t num_threads);
void free_ws_task_manager(ws_task_manager_t* man, void (*clean)(task_t* args) );
void async_ws_task_manager(ws_task_manager_t* man, task_t t);
#endif
......@@ -53,7 +53,7 @@
#include <sys/resource.h>
#include "common/utils/load_module_shlib.h"
#include "common/config/config_userapi.h"
#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/task_manager/threadPool/thread-pool.h"
#include "executables/softmodem-common.h"
#include <readline/history.h>
......
......@@ -27,7 +27,7 @@
#include "assertions.h"
#include <pthread.h>
#include "common/config/config_userapi.h"
#include <common/utils/threadPool/thread-pool.h>
#include <common/utils/task_manager/threadPool/thread-pool.h>
// global var for openair performance profiler
int opp_enabled = 0;
double cpu_freq_GHz __attribute__ ((aligned(32)));
......
......@@ -73,7 +73,7 @@ unsigned short config_frames[4] = {2,9,11,13};
#include "common/utils/LOG/vcd_signal_dumper.h"
#include "UTIL/OPT/opt.h"
#include "enb_config.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "create_tasks.h"
......@@ -552,15 +552,18 @@ int main ( int argc, char **argv )
L1_rxtx_proc_t *L1proctx= &RC.eNB[x][CC_id]->proc.L1_proc_tx;
L1proc->respDecode=(notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
if ( strlen(get_softmodem_params()->threadPoolConfig) > 0 ){
L1proc->man = calloc(1, sizeof(task_manager_t));
L1proc->man = calloc(1, sizeof(ws_task_manager_t));
assert(L1proc->man != NULL && "Memory exhausted");
int const num_threads = parse_num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(L1proc->man, num_threads);
int core_id[128] = {0};
span_core_id_t out = {.cap = 128, .core_id = core_id};
parse_num_threads(get_softmodem_params()->threadPoolConfig, &out);
init_task_manager(L1proc->man, out.core_id, out.sz);
}else {
L1proc->man = calloc(1, sizeof(task_manager_t));
L1proc->man = calloc(1, sizeof(ws_task_manager_t));
assert(L1proc->man != NULL && "Memory exhausted");
int const num_threads = parse_num_threads("n");
init_task_manager(L1proc->man, num_threads);
int lst_core_id = -1;
init_task_manager(L1proc->man, &lst_core_id, 1);
}
initNotifiedFIFO(L1proc->respDecode);
L1proctx->man = L1proc->man;
......
......@@ -90,7 +90,7 @@
#include <openair1/PHY/NR_TRANSPORT/nr_dlsch.h>
#include <PHY/NR_ESTIMATION/nr_ul_estimation.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define USRP_DEBUG 1
// Fix per CC openair rf/if device update
......@@ -385,10 +385,13 @@ void init_gNB_Tpool(int inst) {
gNB = RC.gNB[inst];
// ULSCH decoding threadpool
int const num_threads = parse_num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(&gNB->man, num_threads);
int core_id[128] = {0};
span_core_id_t out = {.cap = 128, .core_id = core_id, .sz = 0};
parse_num_threads(get_softmodem_params()->threadPoolConfig, &out);
init_task_manager(&gNB->man, out.core_id, out.sz);
// 2nd tpool needed to avoid current cycle and deadlock
init_task_manager(&gNB->man_rx_tx_ru, 2);
int lst_cores[] = {-1, -1};
init_task_manager(&gNB->man_rx_tx_ru, lst_cores, 2);
gNB_L1_proc_t *proc = &gNB->proc;
// PUSCH symbols per thread need to be calculated by how many threads we have
......@@ -427,9 +430,6 @@ void init_gNB_Tpool(int inst) {
void term_gNB_Tpool(int inst) {
PHY_VARS_gNB *gNB = RC.gNB[inst];
void (*clean)(task_t*) = NULL;
free_task_manager(&gNB->man , clean);
free_task_manager(&gNB->man_rx_tx_ru , clean);
abortNotifiedFIFO(&gNB->respDecode);
abortNotifiedFIFO(&gNB->resp_L1);
abortNotifiedFIFO(&gNB->L1_tx_free);
......@@ -437,6 +437,10 @@ void term_gNB_Tpool(int inst) {
abortNotifiedFIFO(&gNB->L1_tx_out);
abortNotifiedFIFO(&gNB->L1_rx_out);
void (*clean)(task_t*) = NULL;
free_task_manager(&gNB->man , clean);
free_task_manager(&gNB->man_rx_tx_ru , clean);
gNB_L1_proc_t *proc = &gNB->proc;
if (!get_softmodem_params()->emulate_l1)
pthread_join(proc->L1_stats_thread, NULL);
......
......@@ -54,7 +54,7 @@
#include <executables/softmodem-common.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#ifdef SMBV
#include "PHY/TOOLS/smbv.h"
......@@ -1909,8 +1909,11 @@ void init_NR_RU(configmodule_interface_t *cfg, char *rf_config_file)
s_offset+=sprintf(pool+s_offset,",%d",ru->tpcores[icpu]);
}
LOG_I(PHY,"RU thread-pool core string %s\n",pool);
int const num_threads = parse_num_threads(pool);
init_task_manager(&ru->man, num_threads);
int core_id[128] = {0};
span_core_id_t out = {.cap = 128, .core_id = core_id};
parse_num_threads(pool, &out);
init_task_manager(&ru->man, out.core_id, out.sz);
// FEP RX result FIFO
ru->respfeprx = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
......
......@@ -28,7 +28,7 @@
#undef MALLOC
#include "assertions.h"
#include "PHY/types.h"
#include <threadPool/thread-pool.h>
#include <task_manager/threadPool/thread-pool.h>
#include "s1ap_eNB.h"
#include "SIMULATION/ETH_TRANSPORT/proto.h"
......
......@@ -36,6 +36,7 @@
#include "LAYER2/nr_pdcp/nr_pdcp_oai_api.h"
#include "LAYER2/nr_rlc/nr_rlc_oai_api.h"
#include "RRC/NR/MESSAGES/asn1_msg.h"
#include "common/utils/task_manager/task_manager_gen.h"
/*
* NR SLOT PROCESSING SEQUENCE
......
......@@ -32,8 +32,7 @@
#include "SCHED_NR_UE/defs.h"
#include "common/ran_context.h"
#include "common/config/config_userapi.h"
//#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "common/utils/load_module_shlib.h"
//#undef FRAME_LENGTH_COMPLEX_SAMPLES //there are two conflicting definitions, so we better make sure we don't use it at all
#include "common/utils/nr/nr_common.h"
......@@ -489,8 +488,10 @@ int main(int argc, char **argv)
T_Config_Init();
#endif
int const num_threads = parse_num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(&nrUE_params.man, num_threads);
int core_id[128] = {0};
span_core_id_t out = {.cap = 128, .core_id = core_id };
parse_num_threads(get_softmodem_params()->threadPoolConfig, &out);
init_task_manager(&nrUE_params.man, out.core_id, out.sz);
//randominit (0);
set_taus_seed (0);
......
......@@ -4,7 +4,7 @@
#include <executables/softmodem-common.h>
#include "PHY/defs_nr_UE.h"
#include "SIMULATION/ETH_TRANSPORT/proto.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/thread_pool/task_manager.h"
#define CONFIG_HLP_IF_FREQ "IF frequency for RF, if needed\n"
......@@ -70,7 +70,7 @@ typedef struct {
uint64_t optmask; //mask to store boolean config options
uint32_t ofdm_offset_divisor; // Divisor for sample offset computation for each OFDM symbol
int max_ldpc_iterations; // number of maximum LDPC iterations
task_manager_t man;
ws_task_manager_t man;
int UE_scan_carrier;
int UE_fo_compensation;
int timing_advance;
......
......@@ -24,7 +24,7 @@
#define __NRLDPC_DEFS__H__
#include <openair1/PHY/defs_nr_common.h>
#include "openair1/PHY/CODING/nrLDPC_decoder/nrLDPC_types.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/thread_pool/task_manager.h"
/**
\brief LDPC encoder
......
......@@ -42,8 +42,7 @@
#include "common/utils/LOG/log.h"
#include "executables/lte-softmodem.h"
#include <syscall.h>
#include <common/utils/threadPool/thread-pool.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define DEBUG_DLSCH_CODING
//#define DEBUG_DLSCH_FREE 1
......
......@@ -36,7 +36,7 @@
#include "nfapi_interface.h"
#include "transport_common_proto.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/thread_pool/task_manager.h"
// Functions below implement 36-211 and 36-212
......
......@@ -41,7 +41,7 @@
#include "RRC/LTE/rrc_extern.h"
#include "PHY_INTERFACE/phy_interface.h"
#include "transport_proto.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
extern int oai_exit;
......
......@@ -44,7 +44,7 @@
#include "common/utils/nr/nr_common.h"
#include <syscall.h>
#include <openair2/UTIL/OPT/opt.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define DEBUG_DLSCH_CODING
//#define DEBUG_DLSCH_FREE 1
......
......@@ -31,8 +31,8 @@
*/
#include "PHY/defs_gNB.h"
#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/threadPool/thread-pool.h"
#include "common/utils/task_manager/thread_pool/task_manager.h"
void free_gNB_ulsch(NR_gNB_ULSCH_t *ulsch, uint16_t N_RB_UL);
......
......@@ -50,7 +50,7 @@
//#define DEBUG_ULSCH_DECODING
//#define gNB_DEBUG_TRACE
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include <stdint.h>
#include <time.h>
#include <stdalign.h>
......@@ -449,7 +449,7 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
int E = nr_get_E(G, harq_process->C, Qm, n_layers, r);
ldpcDecode_t* rdata = &((ldpcDecode_t*)t_info->buf)[t_info->len];
assert(t_info->len < 16);
assert(t_info->len < 64);
rdata->ans = &t_info->ans[t_info->len];
t_info->len += 1;
......
......@@ -10,6 +10,7 @@
#include "common/utils/nr/nr_common.h"
#include <openair1/PHY/TOOLS/phy_scope_interface.h>
#include "PHY/sse_intrin.h"
#include "common/utils/task_manager/task_manager_gen.h"
#define INVALID_VALUE 255
......@@ -1653,7 +1654,6 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
start_meas(&gNB->rx_pusch_symbol_processing_stats);
int numSymbols = gNB->num_pusch_symbols_per_thread;
int const loop_iter = rel15_ul->nr_of_symbols/numSymbols;
puschSymbolProc_t arr[loop_iter];
task_ans_t arr_ans[loop_iter];
......
......@@ -45,7 +45,7 @@
#include "PHY/CODING/nrLDPC_extern.h"
#include "common/utils/nr/nr_common.h"
#include "openair1/PHY/TOOLS/phy_scope_interface.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define ENABLE_PHY_PAYLOAD_DEBUG 1
......
......@@ -39,10 +39,8 @@
#include "time_meas.h"
#include "defs_common.h"
#include "nfapi_nr_interface_scf.h"
#include <common/utils/threadPool/thread-pool.h>
#include <executables/rt_profiling.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#define MAX_BANDS_PER_RRU 4
#define MAX_RRU_CONFIG_SIZE 1024
......
......@@ -66,7 +66,7 @@
#include "PHY/LTE_TRANSPORT/transport_eNB.h"
#include "openair2/PHY_INTERFACE/IF_Module.h"
#include "common/openairinterface5g_limits.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#define PBCH_A 24
#define MAX_NUM_RU_PER_eNB 64
......
......@@ -43,7 +43,7 @@
#include "PHY/CODING/nrLDPC_decoder/nrLDPC_types.h"
#include "executables/rt_profiling.h"
#include "nfapi_nr_interface_scf.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/thread_pool/task_manager.h"
#define MAX_NUM_RU_PER_gNB 8
#define MAX_PUCCH0_NID 8
......
......@@ -48,7 +48,7 @@
#include "common_lib.h"
#include "fapi_nr_ue_interface.h"
#include "assertions.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/thread_pool/task_manager.h"
#ifdef MEX
#define msg mexPrintf
......
......@@ -41,7 +41,7 @@
#include <common/utils/system.h>
#include "common/utils/LOG/vcd_signal_dumper.h"
#include <nfapi/oai_integration/nfapi_pnf.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "assertions.h"
......
......@@ -38,7 +38,7 @@
#include "common/utils/LOG/log.h"
#include "common/utils/system.h"
#include "common/utils/LOG/vcd_signal_dumper.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "T.h"
......
......@@ -45,19 +45,6 @@
#include <time.h>
#include <stdint.h>
/*
static
int64_t time_now_ns(void)
{
struct timespec tms;
if (clock_gettime(CLOCK_MONOTONIC_RAW, &tms)) {
return -1;
}
int64_t nanos = tms.tv_sec * 1000000000 + tms.tv_nsec;
return nanos;
}
*/
//#define DEBUG_RXDATA
//#define SRS_IND_DEBUG
......@@ -254,6 +241,7 @@ void phy_procedures_gNB_TX(processingData_L1tx_t *msgTx,
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_gNB_TX + gNB->CC_id, 0);
}
static void nr_postDecode(PHY_VARS_gNB *gNB, ldpcDecode_t *rdata)
{
NR_UL_gNB_HARQ_t *ulsch_harq = rdata->ulsch_harq;
......@@ -326,7 +314,8 @@ static void nr_postDecode(PHY_VARS_gNB *gNB, ldpcDecode_t *rdata)
// dumpsig=1;
}
ulsch->last_iteration_cnt = rdata->decodeIterations;
/*
/*
if (ulsch_harq->ulsch_pdu.mcs_index == 0 && dumpsig==1) {
int off = ((ulsch_harq->ulsch_pdu.rb_size&1) == 1)? 4:0;
......@@ -363,11 +352,14 @@ static void nr_postDecode(PHY_VARS_gNB *gNB, ldpcDecode_t *rdata)
}
*/
ulsch->last_iteration_cnt = rdata->decodeIterations;
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_gNB_ULSCH_DECODING,0);
}
}
static int nr_ulsch_procedures(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx, int ULSCH_id, uint8_t harq_pid, thread_info_tm_t* t_info)
{
NR_DL_FRAME_PARMS *frame_parms = &gNB->frame_parms;
......@@ -852,8 +844,8 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
}
}
ldpcDecode_t arr[16];
task_ans_t ans[16] = {0};
ldpcDecode_t arr[64];
task_ans_t ans[64] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t*)arr, .len = 0, .ans = ans};
//int64_t const t0 = time_now_ns();
......@@ -954,7 +946,6 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
// LOG_M("rxdataF_comp.m","rxF_comp",gNB->pusch_vars[0]->rxdataF_comp[0],6900,1,1);
// LOG_M("rxdataF_ext.m","rxF_ext",gNB->pusch_vars[0]->rxdataF_ext[0],6900,1,1);
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_NR_ULSCH_PROCEDURES_RX, 1);
int const tasks_added = nr_ulsch_procedures(gNB, frame_rx, slot_rx, ULSCH_id, ulsch->harq_pid, &t_info);
if (tasks_added > 0)
totalDecode += tasks_added;
......
......@@ -56,7 +56,7 @@
#include "dummy_functions.c"
#include "executables/thread-common.h"
#include "common/ran_context.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
void feptx_ofdm(RU_t *ru, int frame, int subframe);
void feptx_prec(RU_t *ru, int frame, int subframe);
......@@ -1259,8 +1259,8 @@ int main(int argc, char **argv) {
proc_eNB->respDecode=(notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(proc_eNB->respDecode);
int const n_threads = parse_num_threads("n");
init_task_manager(&man, n_threads);
int lst_core_id = -1;
init_task_manager(&man, &lst_core_id, 1);
proc_eNB->man = &man;
proc_eNB->frame_tx=0;
......
......@@ -55,7 +55,7 @@
#include "PHY/LTE_ESTIMATION/lte_estimation.h"
#include "openair1/PHY/LTE_TRANSPORT/dlsch_tbs_full.h"
#include "PHY/phy_extern.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
const char *__asan_default_options()
{
......@@ -789,10 +789,10 @@ int main(int argc, char **argv) {
proc_rxtx_ue->subframe_tx = proc_rxtx->subframe_rx;
proc_rxtx_ue->subframe_rx = (proc_rxtx->subframe_tx+6)%10;
int const n_threads = parse_num_threads("n");
proc_rxtx->man = calloc(1, sizeof(task_manager_t));
int lst_core_id = -1;
proc_rxtx->man = calloc(1, sizeof(ws_task_manager_t));
assert(proc_rxtx->man != NULL && "Memory exhausted");
init_task_manager(proc_rxtx->man, n_threads);
init_task_manager(proc_rxtx->man, &lst_core_id, 1);
proc_rxtx->respDecode=(notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(proc_rxtx->respDecode);
......
......@@ -49,8 +49,7 @@
#include "openair2/LAYER2/NR_MAC_COMMON/nr_mac_common.h"
#include "executables/nr-uesoftmodem.h"
#include "nfapi/oai_integration/vendor_ext.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define DEBUG_NR_DLSCHSIM
......@@ -375,11 +374,16 @@ int main(int argc, char **argv)
RC.gNB[0] = calloc(1, sizeof(PHY_VARS_gNB));
gNB = RC.gNB[0];
int const num_threads = parse_num_threads(gNBthreads);
init_task_manager(&gNB->man, num_threads);
init_task_manager(&nrUE_params.man, max(dlsch_threads, 1));
int core_id[128] = {0};
span_core_id_t out = {.cap = 128, .core_id = core_id};
parse_num_threads(gNBthreads, &out);
init_task_manager(&gNB->man, out.core_id, out.sz);
int lst_core_id[8] = {-1,-1,-1,-1,-1,-1,-1,-1 };
assert(dlsch_threads < 8);
init_task_manager(&nrUE_params.man, lst_core_id, max(dlsch_threads, 1));
frame_parms = &gNB->frame_parms; //to be initialized I suppose (maybe not necessary for PBCH)
frame_parms = &gNB->frame_parms; //to be initialized I suppose (maybe not necessary for PBCH)
frame_parms->nb_antennas_tx = n_tx;
frame_parms->nb_antennas_rx = n_rx;
frame_parms->N_RB_DL = N_RB_DL;
......
......@@ -30,7 +30,7 @@
#include "common/utils/nr/nr_common.h"
#include "common/utils/var_array.h"
#include "common/utils/LOG/log.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "LAYER2/NR_MAC_gNB/nr_mac_gNB.h"
#include "LAYER2/NR_MAC_UE/mac_defs.h"
......@@ -74,7 +74,7 @@
#include <executables/softmodem-common.h>
#include <openair3/ocp-gtpu/gtp_itf.h>
#include <executables/nr-uesoftmodem.h>
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
const char *__asan_default_options()
{
......@@ -858,7 +858,9 @@ int main(int argc, char **argv)
unsigned char *test_input_bit;
unsigned int errors_bit = 0;
init_task_manager(&nrUE_params.man, max(1, dlsch_threads));
int lst_core_id[8] = {-1,-1,-1,-1,-1,-1,-1,-1};
assert(dlsch_threads < 9);
init_task_manager(&nrUE_params.man, lst_core_id, max(1, dlsch_threads));
test_input_bit = (unsigned char *) malloc16(sizeof(unsigned char) * 16 * 68 * 384);
estimated_output_bit = (unsigned char *) malloc16(sizeof(unsigned char) * 16 * 68 * 384);
......@@ -887,8 +889,10 @@ int main(int argc, char **argv)
//NR_COMMON_channels_t *cc = RC.nrmac[0]->common_channels;
int n_errs = 0;
int const num_threads = parse_num_threads(gNBthreads);
init_task_manager(&gNB->man, num_threads);
int core_id[128] = {0};
span_core_id_t out = {.cap = 128, .core_id = core_id};
parse_num_threads(gNBthreads, &out);
init_task_manager(&gNB->man, out.core_id, out.sz);
initNotifiedFIFO(&gNB->L1_tx_free);
initNotifiedFIFO(&gNB->L1_tx_filled);
......
......@@ -45,12 +45,11 @@
#include "openair1/SIMULATION/TOOLS/sim.h"
#include "openair1/SIMULATION/RF/rf.h"
#include "openair1/SIMULATION/NR_PHY/nr_unitary_defs.h"
#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "openair2/LAYER2/NR_MAC_COMMON/nr_mac_common.h"
#include "executables/nr-uesoftmodem.h"
#include "nfapi/oai_integration/vendor_ext.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define DEBUG_NR_ULSCHSIM
......@@ -411,8 +410,8 @@ int main(int argc, char **argv)
gNB = RC.gNB[0];
//gNB_config = &gNB->gNB_config;
int const num_threads = parse_num_threads("n");
init_task_manager(&gNB->man, num_threads);
int lst_core_id = -1;
init_task_manager(&gNB->man, &lst_core_id, 1);
initNotifiedFIFO(&gNB->respDecode);
frame_parms = &gNB->frame_parms; //to be initialized I suppose (maybe not necessary for PBCH)
......
......@@ -56,7 +56,7 @@
#include "openair2/RRC/NR/nr_rrc_config.h"
#include "openair2/LAYER2/NR_MAC_UE/mac_proto.h"
#include "openair2/LAYER2/NR_MAC_gNB/mac_proto.h"
#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/task_manager/task_manager_gen.h"
#include "PHY/NR_REFSIG/ptrs_nr.h"
#define inMicroS(a) (((double)(a))/(get_cpu_freq_GHz()*1000.0))
#include "SIMULATION/LTE_PHY/common_sim.h"
......@@ -67,7 +67,7 @@
#include "PHY/NR_REFSIG/ul_ref_seq_nr.h"
#include <openair3/ocp-gtpu/gtp_itf.h>
#include "executables/nr-uesoftmodem.h"
#include "common/utils/thread_pool/task_manager.h"
#include "common/utils/task_manager/task_manager_gen.h"
//#define DEBUG_ULSIM
const char *__asan_default_options()
......@@ -554,7 +554,9 @@ int main(int argc, char *argv[])
gNB->ofdm_offset_divisor = UINT_MAX;
gNB->num_pusch_symbols_per_thread = 1;
init_task_manager(&gNB->man, max(threadCnt, 1));
int lst_core_id[32] = {-1,-1,-1,-1,-1,-1,-1,-1, -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, -1,-1,-1,-1,-1,-1,-1,-1};
assert(threadCnt < 33);
init_task_manager(&gNB->man, lst_core_id, max(threadCnt, 1));
initNotifiedFIFO(&gNB->respDecode);
......
......@@ -37,7 +37,7 @@
//#include "openair1/PHY/LTE_TRANSPORT/transport_eNB.h"
#include "nfapi_interface.h"
#include "common/platform_types.h"
#include <common/utils/threadPool/thread-pool.h>
#include <common/utils/task_manager/threadPool/thread-pool.h>
#include <radio/COMMON/common_lib.h>
#define MAX_NUM_DL_PDU 100
......
......@@ -37,7 +37,7 @@
#include <sys/types.h>
#include <openair1/PHY/TOOLS/tools_defs.h>
#include "record_player.h"
#include <common/utils/threadPool/thread-pool.h>
#include <common/utils/task_manager/threadPool/thread-pool.h>
/* default name of shared library implementing the radio front end */
#define OAI_RF_LIBNAME "oai_device"
......
......@@ -50,7 +50,7 @@
#include "common_lib.h"
#include "ethernet_lib.h"
#include "openair1/PHY/sse_intrin.h"
#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/task_manager/threadPool/thread-pool.h"
//#define DEBUG 1
......
......@@ -41,7 +41,7 @@
#include <sys/socket.h>
#include <net/if.h>
#include <netinet/ether.h>
#include <common/utils/threadPool/thread-pool.h>
#include <common/utils/task_manager/threadPool/thread-pool.h>
#define MAX_INST 4
#define DEFAULT_IF "lo"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment