Commit 7794f720 authored by Eurecom's avatar Eurecom

Merge branch 'tp_poc_5' into tp_poc

parents e1304a77 e222a862
......@@ -963,6 +963,7 @@ set(PHY_SRC
# ${OPENAIR1_DIR}/PHY/LTE_ESTIMATION/lte_ul_channel_estimation.c
${OPENAIR1_DIR}/PHY/LTE_ESTIMATION/lte_eNB_measurements.c
${OPENAIR1_DIR}/PHY/INIT/lte_init.c
${OPENAIR_DIR}/common/utils/thread_pool/task_manager.c
)
set(PHY_SRC_RU
......
#ifndef TASK_WORK_STEALING_THREAD_POOL_H
#define TASK_WORK_STEALING_THREAD_POOL_H
#if defined (__i386__) || defined(__x86_64__)
#define LEVEL1_DCACHE_LINESIZE 64
#elif __aarch64__
// This is not true for ARM in the general case
// in linux, you can obtain the size at runtime using sysconf (_SC_LEVEL1_DCACHE_LINESIZE)
// in c++ using std::hardware_destructive_interference_size
#define LEVEL1_DCACHE_LINESIZE 64
#else
static_assert(0!=0, "Unknown CPU architecture");
#endif
typedef struct{
void* args;
// Avoid false sharing. Doing it in the first member
_Alignas(LEVEL1_DCACHE_LINESIZE) void* args;
void (*func)(void* args);
} task_t;
......
......@@ -7,6 +7,7 @@
#include <errno.h>
#include <limits.h>
#include <stdbool.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
......@@ -20,14 +21,6 @@
#include <sys/syscall.h> /* Definition of SYS_* constants */
#include <unistd.h>
#if defined (__i386__) || defined(__x86_64__)
#define pause_or_yield __builtin_ia32_pause
#elif __aarch64__
#define pause_or_yield() asm volatile("yield" ::: "memory")
#else
static_assert(0!=0, "Unknown CPU architecture");
#endif
/*
static
int64_t time_now_us(void)
......@@ -197,8 +190,8 @@ typedef struct {
pthread_mutex_t mtx;
pthread_cond_t cv;
seq_ring_task_t r;
// _Atomic int32_t* futex;
//_Atomic bool* waiting;
// _Atomic int32_t* futex;
// _Atomic bool* waiting;
_Atomic int done;
} not_q_t;
......@@ -209,7 +202,7 @@ typedef struct{
static
void init_not_q(not_q_t* q /*, _Atomic int32_t* futex , _Atomic bool* waiting */)
void init_not_q(not_q_t* q/*, _Atomic int32_t* futex, _Atomic bool* waiting*/)
{
assert(q != NULL);
......@@ -260,11 +253,11 @@ bool try_push_not_q(not_q_t* q, task_t t)
push_back_seq_ring_task(&q->r, t);
pthread_cond_signal(&q->cv);
int const rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
pthread_cond_signal(&q->cv);
return true;
}
......@@ -326,34 +319,29 @@ bool pop_not_q(not_q_t* q, ret_try_t* out)
assert(out != NULL);
assert(q->done == 0 || q->done ==1);
int rc = pthread_mutex_lock(&q->mtx);
assert(rc == 0);
assert(q->done == 0 || q->done ==1);
while(size_seq_ring_task(&q->r) == 0 && q->done == 0)
pthread_cond_wait(&q->cv , &q->mtx);
/*
//label:
// Let's be conservative and not use memory_order_relaxed
// while (atomic_load_explicit(q->waiting, memory_order_seq_cst) == true){ //
// while (atomic_load_explicit(q->waiting, memory_order_seq_cst) == true){ //
// Issue X86 PAUSE or ARM YIELD instruction to reduce contention between
// hyper-threads
// pause_or_yield();
// }
// pause_or_yield();
// }
pthread_mutex_lock(&q->mtx);
if(size_seq_ring_task(&q->r) == 0 && q->done == 0){
int rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
int val = atomic_load_explicit(q->futex, memory_order_acquire);
long r = syscall(SYS_futex, q->futex, FUTEX_WAIT_PRIVATE, val, NULL, 0);
assert(r != -1);
goto label;
}
*/
while(size_seq_ring_task(&q->r) == 0 && q->done == 0)
pthread_cond_wait(&q->cv , &q->mtx);
// if(size_seq_ring_task(&q->r) == 0 && q->done == 0){
// int rc = pthread_mutex_unlock(&q->mtx);
// assert(rc == 0);
//
// int val = *q->futex; // atomic_load_explicit(q->futex, memory_order_acquire);
// long r = syscall(SYS_futex, q->futex, FUTEX_WAIT_PRIVATE, val, NULL, 0);
// assert(r != -1);
// goto label;
// }
//
//printf("Waking %ld id %ld \n", time_now_us(), pthread_self());
assert(q->done == 0 || q->done ==1);
......@@ -366,7 +354,7 @@ bool pop_not_q(not_q_t* q, ret_try_t* out)
out->t = pop_seq_ring_task(&q->r);
rc = pthread_mutex_unlock(&q->mtx);
int rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
return true;
......@@ -383,12 +371,15 @@ void done_not_q(not_q_t* q)
q->done = 1;
//long r = syscall(SYS_futex, q->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
//assert(r != -1);
rc = pthread_mutex_unlock(&q->mtx);
rc = pthread_cond_signal(&q->cv);
assert(rc == 0);
rc = pthread_cond_signal(&q->cv);
rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
// rc = pthread_cond_signal(&q->cv);
// assert(rc == 0);
// q->futex++;
}
......@@ -424,8 +415,8 @@ void* worker_thread(void* arg)
task_thread_args_t* args = (task_thread_args_t*)arg;
int const idx = args->idx;
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
//int const log_cores = get_nprocs_conf();
//assert(log_cores > 0);
// Assuming: 2 x Physical cores = Logical cores
//pin_thread_to_core(idx+log_cores/2);
......@@ -473,21 +464,17 @@ void* worker_thread(void* arg)
void init_task_manager(task_manager_t* man, uint32_t num_threads)
{
assert(man != NULL);
// assert(num_threads > 0); // && num_threads < 33 && "Do you have zero or more than 32 processors??");
if(num_threads == 0 || num_threads > 8){
printf("[MIR]: number of threads set to 8. %d number of threads asked \n", num_threads);
num_threads = 8;
}
assert(num_threads > 0 && num_threads < 33 && "Do you have zero or more than 32 processors??");
man->q_arr = calloc(num_threads, sizeof(not_q_t));
assert(man->q_arr != NULL && "Memory exhausted");
//atomic_store_explicit(&man->futex, 0, memory_order_seq_cst);
//man->futex = 0;
// man->waiting = false;
//man->waiting = false;
not_q_t* q_arr = (not_q_t*)man->q_arr;
for(uint32_t i = 0; i < num_threads; ++i){
init_not_q(&q_arr[i] /*,&man->futex, &man->waiting */);
init_not_q(&q_arr[i] /*, &man->futex, &man->waiting*/);
}
man->t_arr = calloc(num_threads, sizeof(pthread_t));
......@@ -499,7 +486,6 @@ void init_task_manager(task_manager_t* man, uint32_t num_threads)
args->idx = i;
args->man = man;
/*
pthread_attr_t attr = {0};
int ret=pthread_attr_init(&attr);
assert(ret == 0);
......@@ -510,28 +496,25 @@ void init_task_manager(task_manager_t* man, uint32_t num_threads)
struct sched_param sparam={0};
sparam.sched_priority = 94;
ret=pthread_attr_setschedparam(&attr, &sparam);
*/
int rc = pthread_create(&man->t_arr[i], NULL, worker_thread, args);
int rc = pthread_create(&man->t_arr[i], &attr, worker_thread, args);
assert(rc == 0);
}
man->index = 0;
/*
pthread_mutexattr_t attr = {0};
#ifdef _DEBUG
int const rc_mtx = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
assert(rc_mtx == 0);
#endif
int rc = pthread_mutex_init(&man->wait_mtx, &attr);
assert(rc == 0 && "Error while creating the mtx");
pthread_condattr_t* c_attr = NULL;
rc = pthread_cond_init(&man->wait_cv, c_attr);
assert(rc == 0);
*/
//
// pthread_mutexattr_t attr = {0};
//#ifdef _DEBUG
// int const rc_mtx = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
// assert(rc_mtx == 0);
//#endif
// int rc = pthread_mutex_init(&man->wait_mtx, &attr);
// assert(rc == 0 && "Error while creating the mtx");
//
// pthread_condattr_t* c_attr = NULL;
// rc = pthread_cond_init(&man->wait_cv, c_attr);
// assert(rc == 0);
//
//pin_thread_to_core(3);
}
......@@ -556,13 +539,11 @@ void free_task_manager(task_manager_t* man, void (*clean)(task_t*))
free(man->q_arr);
free(man->t_arr);
/*
int rc = pthread_mutex_destroy(&man->wait_mtx);
assert(rc == 0);
rc = pthread_cond_destroy(&man->wait_cv);
assert(rc == 0);
*/
// int rc = pthread_mutex_destroy(&man->wait_mtx);
// assert(rc == 0);
//
// rc = pthread_cond_destroy(&man->wait_cv);
// assert(rc == 0);
}
void async_task_manager(task_manager_t* man, task_t t)
......@@ -664,7 +645,7 @@ void wait_all_task_manager(task_manager_t* man)
}
*/
/*
// This function does not belong here logically
void wait_task_status_completed(size_t len, task_status_t* arr)
{
......@@ -687,6 +668,31 @@ void wait_task_status_completed(size_t len, task_status_t* arr)
}
}
}
*/
#undef pause_or_yield
// This function does not belong here logically
void wait_task_status_completed(size_t len, task_status_t* arr)
{
assert(len > 0);
assert(arr != NULL);
// We are believing Fedor
const struct timespec ns = {0,1};
uint64_t i = 0;
for(int j = len -1; j != -1 ; i++){
for(; j != -1; --j){
int const task_completed = 1;
if(atomic_load_explicit(&arr[j].completed, memory_order_acquire) != task_completed) // memory_order_acquire
break;
}
if(i % 16 == 0){
nanosleep(&ns, NULL);
}
//sched_yield();
// pause_or_yield();
}
}
#undef pause_or_yield
#ifndef TASK_MANAGER_WORKING_STEALING_H
#define TASK_MANAGER_WORKING_STEALING_H
// Comment for deactivating ws tpool
#define TASK_MANAGER
//#define TASK_MANAGER_DEMODULATION
//#define TASK_MANAGER_CODING
//#define TASK_MANAGER_RU
//#define TASK_MANAGER_UE
//#define TASK_MANAGER_UE_DECODING
//#define TASK_MANAGER_SIM
// LTE
//#define TASK_MANAGER_LTE
#define TASK_MANAGER_DEMODULATION
#define TASK_MANAGER_CODING
#define TASK_MANAGER_RU
#define TASK_MANAGER_UE
#define TASK_MANAGER_UE_DECODING
#define TASK_MANAGER_SIM
#define TASK_MANAGER_LTE
#include "task.h"
#ifndef __cplusplus
#include <stdalign.h>
#include <stdatomic.h>
#else
#include <atomic>
#define _Atomic(X) std::atomic< X >
#define _Alignas(X) alignas(X)
#endif
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#if defined (__i386__) || defined(__x86_64__)
#define LEVEL1_DCACHE_LINESIZE 64
#define pause_or_yield __builtin_ia32_pause
#elif __aarch64__
// This is not true for ARM in the general case
// in linux, you can obtain the size at runtime using sysconf (_SC_LEVEL1_DCACHE_LINESIZE)
// in c++ using std::hardware_destructive_interference_size
#define LEVEL1_DCACHE_LINESIZE 64
#define pause_or_yield() asm volatile("yield" ::: "memory")
#else
static_assert(0!=0, "Unknown CPU architecture");
#endif
......@@ -49,11 +31,10 @@ typedef struct{
_Alignas(LEVEL1_DCACHE_LINESIZE) _Atomic(int) completed;
} task_status_t;
typedef struct{
uint8_t* buf;
size_t len;
task_status_t* task_status;
task_status_t* tasks_remaining;
} thread_info_tm_t;
typedef struct{
......@@ -61,18 +42,18 @@ typedef struct{
pthread_t* t_arr;
size_t len_thr;
_Atomic(uint64_t) index;
atomic_uint_fast64_t index;
void* q_arr;
_Atomic(uint64_t) num_task;
atomic_uint_fast64_t num_task;
// pthread_cond_t wait_cv;
// pthread_mutex_t wait_mtx;
// _Atomic(int32_t) futex;
// _Atomic int32_t futex;
//_Atomic(bool) waiting;
// _Atomic bool waiting;
} task_manager_t;
void init_task_manager(task_manager_t* man, uint32_t num_threads);
......@@ -81,23 +62,22 @@ void free_task_manager(task_manager_t* man, void (*clean)(task_t* args) );
void async_task_manager(task_manager_t* man, task_t t);
// This function triggers the futex if the thread is waiting.
// Note that if the thread was working, this call is superflous
// This method proved a bit faster as it is only one syscall
// instead of a syscall when async_task_manager is called
// void trigger_all_task_manager(task_manager_t* man);
//void trigger_and_spin_task_manager(task_manager_t* man);
//void trigger_all_task_manager(task_manager_t* man);
//
////void trigger_and_spin_task_manager(task_manager_t* man);
//
//void stop_spining_task_manager(task_manager_t* man);
//void trigger_and_wait_all_task_manager(task_manager_t* man);
//
////void trigger_and_wait_all_task_manager(task_manager_t* man);
//
//void wait_all_task_manager(task_manager_t* man);
//
// This function does not belong here.
// It should be in an algorithm file
void wait_task_status_completed(size_t len, task_status_t* arr);
//void wait_spin_all_atomics_one(size_t len, _Atomic int arr[len]);
void wait_task_status_completed(size_t len, task_status_t* arr);
#endif
......@@ -33,8 +33,7 @@
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include <sched.h>
#include <sys/sysinfo.h>
#include <ctype.h>
#undef MALLOC //there are two conflicting definitions, so we better make sure we don't use it at all
#include "assertions.h"
......@@ -435,6 +434,37 @@ static void wait_nfapi_init(char *thread_name) {
pthread_mutex_unlock(&nfapi_sync_mutex);
printf( "NFAPI: got sync (%s)\n", thread_name);
}
#ifdef TASK_MANAGER_LTE
static int num_threads(char* params)
{
char *saveptr, * curptr;
char *parms_cpy=strdup(params);
int nbThreads=0;
curptr=strtok_r(parms_cpy,",",&saveptr);
while ( curptr!=NULL ) {
int c=toupper(curptr[0]);
switch (c) {
case 'N':
//pool->activated=false;
break;
default:
int core_id = atoi(curptr);
printf("create a thread for core %d\n", core_id);
// set the thread name for debugging
nbThreads++;
}
curptr=strtok_r(NULL,",",&saveptr);
}
free(parms_cpy);
return nbThreads;
}
#endif
configmodule_interface_t *uniqCfg = NULL;
int main ( int argc, char **argv )
{
......@@ -547,13 +577,12 @@ int main ( int argc, char **argv )
init_eNB(get_softmodem_params()->single_thread_flag,get_softmodem_params()->wait_for_sync);
}
#ifdef TASK_MANAGER_LTE
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
assert(strlen(get_softmodem_params()->threadPoolConfig) > 0);
int n_threads = num_threads(get_softmodem_params()->threadPoolConfig);
task_manager_t man = {0};
// Assuming: Physical cores = Logical cores / 2
init_task_manager(&man, log_cores/2);
#endif
init_task_manager(&man, n_threads);
#endif
for (int x=0; x < RC.nb_L1_inst; x++)
for (int CC_id=0; CC_id<RC.nb_L1_CC[x]; CC_id++) {
L1_rxtx_proc_t *L1proc= &RC.eNB[x][CC_id]->proc.L1_proc;
......@@ -567,12 +596,14 @@ int main ( int argc, char **argv )
L1proc->threadPool = (tpool_t *)malloc(sizeof(tpool_t));
if ( strlen(get_softmodem_params()->threadPoolConfig) > 0 )
initTpool(get_softmodem_params()->threadPoolConfig, L1proc->threadPool, true);
else
initTpool("n", L1proc->threadPool, true);
}
else{
assert(0!=0 && "Bug");
initTpool("n", L1proc->threadPool, true);
}
L1proctx->threadPool = L1proc->threadPool;
initNotifiedFIFO(L1proc->respDecode);
#endif
initNotifiedFIFO(L1proc->respDecode);
}
printf("wait_eNBs()\n");
......
......@@ -284,25 +284,35 @@ void rx_func(void *param)
} else {
//notifiedFIFO_elt_t * res = pullTpool(&gNB->L1_tx_filled, &gNB->threadPool);
notifiedFIFO_elt_t* res = pullNotifiedFIFO(&gNB->L1_tx_filled);
if (res == NULL)
return; // Tpool has been stopped
if (res == NULL){
#ifdef TASK_MANAGER_RU
if(info->elm != NULL)
pushNotifiedFIFO(info->elm->reponseFifo, info->elm);
#endif
return; // Queue aborted
}
processingData_L1tx_t* syncMsg = (processingData_L1tx_t *)NotifiedFifoData(res);
syncMsg->gNB = gNB;
syncMsg->timestamp_tx = info->timestamp_tx;
res->key = slot_tx;
#ifdef TASK_MANAGER
//#ifdef TASK_MANAGER
assert(res->processingFunc != NULL);
res->processingFunc(NotifiedFifoData(res));
if (res->reponseFifo)
pushNotifiedFIFO(res->reponseFifo, res);
#else
pushTpool(&gNB->threadPool, res);
#endif
//#else
// pushTpool(&gNB->threadPool, res);
//#endif
}
} else if (get_softmodem_params()->continuous_tx) {
notifiedFIFO_elt_t *res = pullNotifiedFIFO(&gNB->L1_tx_free);
if (res == NULL)
return; // Tpool has been stopped
if (res == NULL){
#ifdef TASK_MANAGER_RU
if(info->elm != NULL)
pushNotifiedFIFO(info->elm->reponseFifo, info->elm);
#endif
return; // Queue aborted
}
processingData_L1tx_t *syncMsg = (processingData_L1tx_t *)NotifiedFifoData(res);
syncMsg->gNB = gNB;
syncMsg->timestamp_tx = info->timestamp_tx;
......@@ -311,6 +321,11 @@ void rx_func(void *param)
res->key = slot_tx;
pushNotifiedFIFO(&gNB->L1_tx_out, res);
}
#ifdef TASK_MANAGER_RU
// I can also write bad code
if(info->elm != NULL)
pushNotifiedFIFO(info->elm->reponseFifo, info->elm);
#endif
#if 0
LOG_D(PHY, "rxtx:%lld nfapi:%lld phy:%lld tx:%lld rx:%lld prach:%lld ofdm:%lld ",
......@@ -482,70 +497,48 @@ static
int num_threads(char* params)
{
char *saveptr, * curptr;
char *parms_cpy=strdup(params);
int nbThreads=0;
curptr=strtok_r(parms_cpy,",",&saveptr);
curptr=strtok_r( params,",",&saveptr);
while ( curptr!=NULL ) {
int c=toupper(curptr[0]);
switch (c) {
case 'N':
//pool->activated=false;
break;
default:
int core_id = atoi(curptr);
printf("[MIR]: create a thread for core %d\n", core_id);
int core_id = atoi(curptr);
printf("create a thread for core %d\n", core_id);
nbThreads++;
}
curptr=strtok_r(NULL,",",&saveptr);
}
free(parms_cpy);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
printf("[MIR]: num threads %d\n", nbThreads);
return nbThreads;
}
}
void init_gNB_Tpool(int inst)
{
void init_gNB_Tpool(int inst) {
PHY_VARS_gNB *gNB;
gNB = RC.gNB[inst];
gNB_L1_proc_t *proc = &gNB->proc;
#if defined(TASK_MANAGER) && defined(TASK_MANAGER_CODING) && defined(TASK_MANAGER_DEMODULATION)
//int const log_cores = get_nprocs_conf();
//assert(log_cores > 0);
//printf("[MIR]: log cores %d \n", log_cores);
// Assuming: 2 x Physical cores = Logical cores
// PUSCH symbols per thread need to be calculated by how many threads we have
gNB->num_pusch_symbols_per_thread = 1;
#if defined(TASK_MANAGER) && defined(TASK_MANAGER_CODING) && defined(TASK_MANAGER_DEMODULATION) && defined(TASK_MANAGER_RU)
int n_threads = num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(&gNB->man, n_threads);
#elif !defined(TASK_MANAGER) || !defined(TASK_MANAGER_CODING) || !defined(TASK_MANAGER_DEMODULATION)
init_task_manager(&gNB->man, n_threads); //log_cores/2);
#elif !defined(TASK_MANAGER) || !defined(TASK_MANAGER_CODING) || !defined(TASK_MANAGER_DEMODULATION) || !defined(TASK_MANAGER_RU)
int n_threads = num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(&gNB->man, n_threads);
init_task_manager(&gNB->man, n_threads ); //log_cores/2);
// ULSCH decoding threadpool
initTpool(get_softmodem_params()->threadPoolConfig, &gNB->threadPool, cpumeas(CPUMEAS_GETSTATE));
#else
// ULSCH decoding threadpool
initTpool(get_softmodem_params()->threadPoolConfig, &gNB->threadPool, cpumeas(CPUMEAS_GETSTATE));
#endif
// PUSCH symbols per thread need to be calculated by how many threads we have
gNB->num_pusch_symbols_per_thread = 1;
// ULSCH decoder result FIFO
// ULSCH decoder result FIFO
initNotifiedFIFO(&gNB->respPuschSymb);
initNotifiedFIFO(&gNB->respDecode);
......
......@@ -1191,9 +1191,6 @@ void *ru_thread( void *param ) {
pthread_mutex_unlock(&RC.ru_mutex);
wait_sync("ru_thread");
processingData_L1_t *syncMsg;
notifiedFIFO_elt_t *res;
if(!emulate_rf) {
// Start RF device if any
if (ru->start_rf) {
......@@ -1341,9 +1338,10 @@ void *ru_thread( void *param ) {
}
} // end if (slot_type == NR_UPLINK_SLOT || slot_type == NR_MIXED_SLOT) {
notifiedFIFO_elt_t *res = NULL;
// At this point, all information for subframe has been received on FH interface
if (!get_softmodem_params()->reorder_thread_disable) {
//res = pullTpool(&gNB->resp_L1, &gNB->threadPool);
res = pullNotifiedFIFO(&gNB->resp_L1);
if (res == NULL)
break; // Tpool has been stopped
......@@ -1351,6 +1349,8 @@ void *ru_thread( void *param ) {
res = newNotifiedFIFO_elt(sizeof(processingData_L1_t), 0, &gNB->resp_L1, NULL);
}
processingData_L1_t *syncMsg = NULL;
syncMsg = (processingData_L1_t *)NotifiedFifoData(res);
syncMsg->gNB = gNB;
syncMsg->frame_rx = proc->frame_rx;
......@@ -1359,12 +1359,13 @@ void *ru_thread( void *param ) {
syncMsg->slot_tx = proc->tti_tx;
syncMsg->timestamp_tx = proc->timestamp_tx;
res->key = proc->tti_rx;
#ifdef TASK_MANAGER // no TASK_MANAGER_RU
#ifdef TASK_MANAGER // No TASK_MANAGER_RU
if (!get_softmodem_params()->reorder_thread_disable) {
assert(res->processingFunc != NULL);
assert(res->processingFunc == rx_func);
assert(res->reponseFifo != NULL);
res->processingFunc(NotifiedFifoData(res));
pushNotifiedFIFO(res->reponseFifo, res);
syncMsg->elm = res;
task_t t = {.func = rx_func, .args = syncMsg};
async_task_manager(&gNB->man, t);
}
#else
if (!get_softmodem_params()->reorder_thread_disable)
......
......@@ -69,10 +69,14 @@ typedef struct {
uint64_t optmask; //mask to store boolean config options
uint32_t ofdm_offset_divisor; // Divisor for sample offset computation for each OFDM symbol
int max_ldpc_iterations; // number of maximum LDPC iterations
#if defined TASK_MANAGER_UE || defined TASK_MANAGER_UE_DECODING
#if defined (TASK_MANAGER_UE) && defined (TASK_MANAGER_UE_DECODING) && defined(TASK_MANAGER_SIM)
task_manager_t man;
#elif !defined (TASK_MANAGER_UE) || !defined(TASK_MANAGER_UE_DECODING) || !defined(TASK_MANAGER_SIM)
task_manager_t man;
#endif
tpool_t Tpool; // thread pool
#else
tpool_t Tpool; // thread pool
#endif
int UE_scan_carrier;
int UE_fo_compensation;
int timing_advance;
......
......@@ -67,7 +67,7 @@ typedef struct {
/// LDPC-code outputs
uint8_t *d[MAX_NUM_NR_DLSCH_SEGMENTS_PER_LAYER*NR_MAX_NB_LAYERS];
#ifdef TASK_MANAGER_CODING
task_status_t* task_status;
task_status_t* task_done;
#endif
} encoder_implemparams_t;
......
......@@ -43,6 +43,7 @@
#include "executables/lte-softmodem.h"
#include <syscall.h>
#include <common/utils/threadPool/thread-pool.h>
#include <stdatomic.h>
//#define DEBUG_DLSCH_CODING
//#define DEBUG_DLSCH_FREE 1
......@@ -282,9 +283,10 @@ static void TPencode(void * arg) {
stop_meas(rdata->rm_stats);
#ifdef TASK_MANAGER_LTE
assert(atomic_load(&rdata->task_status->completed) == 0);
atomic_store_explicit(&rdata->task_status->completed, 1, memory_order_seq_cst);
//assert(atomic_load(rdata->tasks_remaining) == 0);
atomic_store_explicit(&rdata->tasks_remaining->completed,1, memory_order_seq_cst);
#endif
}
int dlsch_encoding(PHY_VARS_eNB *eNB,
......@@ -321,9 +323,8 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
hadlsch->Nl,
num_pdcch_symbols,
frame,subframe,beamforming_mode);
#ifndef TASK_MANAGER_LTE
int nbEncode = 0;
#endif
// if (hadlsch->Ndi == 1) { // this is a new packet
if (hadlsch->round == 0) { // this is a new packet
// Add 24-bit crc (polynomial A) to payload
......@@ -352,24 +353,23 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
#ifdef TASK_MANAGER_LTE
turboEncode_t arr[hadlsch->C];
task_status_t task_status[hadlsch->C];
task_status_t tasks_remaining[hadlsch->C];
memset(arr, 0, sizeof(turboEncode_t)*hadlsch->C); // Let's waste some CPU cycles...
memset(task_status, 0, sizeof(task_status_t)*hadlsch->C);
memset(tasks_remaining, 0, sizeof(task_status_t)*hadlsch->C);
#else
notifiedFIFO_t respEncode;
initNotifiedFIFO(&respEncode);
#endif
for (int r=0, r_offset=0; r<hadlsch->C; r++) {
for (int r=0, r_offset=0; r<hadlsch->C; r++) {
#ifdef TASK_MANAGER_LTE
turboEncode_t* rdata = &arr[r];
rdata->task_status = &task_status[r];
#else
rdata->tasks_remaining = &tasks_remaining[r];
#else
union turboReqUnion id= {.s={dlsch->rnti,frame,subframe,r,0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(turboEncode_t), id.p, &respEncode, TPencode);
turboEncode_t * rdata=(turboEncode_t *) NotifiedFifoData(req);
#endif
#endif
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
rdata->filler=(r==0) ? hadlsch->F : 0;
......@@ -384,13 +384,12 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
rdata->G=G;
#ifdef TASK_MANAGER_LTE
task_t t = {.func = TPencode, .args = rdata};
task_t t = {.func = TPencode, .args = rdata};
async_task_manager(proc->man, t);
#else
pushTpool(proc->threadPool, req);
nbEncode++;
pushTpool(proc->threadPool, req);
#endif
nbEncode++;
int Qm=hadlsch->Qm;
int C=hadlsch->C;
int Nl=hadlsch->Nl;
......@@ -403,8 +402,9 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
}
#ifdef TASK_MANAGER_LTE
//trigger_all_task_manager(proc->man);
wait_task_status_completed(hadlsch->C, task_status);
if(nbEncode > 0){
wait_task_status_completed(hadlsch->C, tasks_remaining);
}
#else
// Wait all other threads finish to process
while (nbEncode) {
......@@ -415,7 +415,6 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
nbEncode--;
}
#endif
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ENB_DLSCH_ENCODING, VCD_FUNCTION_OUT);
return(0);
}
......@@ -480,23 +479,22 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
&hadlsch->F)<0)
return(-1);
}
int nbEncode = 0;
#ifdef TASK_MANAGER_LTE
turboEncode_t arr[hadlsch->C];
task_status_t task_status[hadlsch->C];
task_status_t tasks_remaining[hadlsch->C];
memset(arr, 0, sizeof(turboEncode_t)*hadlsch->C); // Let's waste some CPU cycles...
memset(task_status, 0, hadlsch->C*sizeof( task_status_t));
memset(tasks_remaining, 0, sizeof(task_status_t)*hadlsch->C);
#else
int nbEncode = 0;
notifiedFIFO_t respEncode;
initNotifiedFIFO(&respEncode);
#endif
for (int r=0, r_offset=0; r<hadlsch->C; r++) {
#ifdef TASK_MANAGER_LTE
turboEncode_t* rdata = &arr[r];
rdata->task_status = &task_status[r];
#else
rdata->tasks_remaining = &tasks_remaining[r];
#else
union turboReqUnion id= {.s={dlsch->rnti,frame,subframe,r,0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(turboEncode_t), id.p, &respEncode, TPencode);
turboEncode_t * rdata=(turboEncode_t *) NotifiedFifoData(req);
......@@ -519,8 +517,9 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
async_task_manager(proc->man, t);
#else
pushTpool(proc->threadPool, req);
nbEncode++;
#endif
nbEncode++;
int Qm=hadlsch->Qm;
int C=hadlsch->C;
int Nl=hadlsch->Nl;
......@@ -531,10 +530,11 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
else
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
#ifdef TASK_MANAGER_LTE
//trigger_all_task_manager(proc->man);
wait_task_status_completed(hadlsch->C, task_status);
if(nbEncode > 0){
wait_task_status_completed(hadlsch->C, tasks_remaining);
}
#else
// Wait all other threads finish to process
while (nbEncode) {
......
......@@ -36,8 +36,6 @@
#include "nfapi_interface.h"
#include "transport_common_proto.h"
#include "common/utils/thread_pool/task_manager.h"
// Functions below implement 36-211 and 36-212
/** @addtogroup _PHY_TRANSPORT_
......@@ -529,9 +527,10 @@ unsigned int ulsch_decoding(PHY_VARS_eNB *phy_vars_eNB,
uint8_t Nbundled,
uint8_t llr8_flag
#ifdef TASK_MANAGER_LTE
// This is a broken idea. But so is the code arquitecture
,thread_info_tm_t* t_info
#endif
);
#endif
);
void generate_phich_top(PHY_VARS_eNB *phy_vars_eNB,
L1_rxtx_proc_t *proc,
......
......@@ -33,6 +33,7 @@
//#define DEBUG_ULSCH_DECODING
#include <syscall.h>
#include <stdatomic.h>
#include "PHY/defs_eNB.h"
#include "PHY/phy_extern.h"
#include "PHY/CODING/coding_extern.h"
......@@ -41,6 +42,7 @@
#include "RRC/LTE/rrc_extern.h"
#include "PHY_INTERFACE/phy_interface.h"
#include "transport_proto.h"
#include "common/utils/thread_pool/task_manager.h"
extern int oai_exit;
......@@ -209,8 +211,7 @@ uint8_t extract_cqi_crc(uint8_t *cqi,uint8_t CQI_LENGTH) {
return(crc);
}
static void processULSegment(void * arg)
{
static void processULSegment(void * arg) {
turboDecode_t* rdata=(turboDecode_t*) arg;
PHY_VARS_eNB *eNB=rdata->eNB;
LTE_UL_eNB_HARQ_t *ulsch_harq=rdata->ulsch_harq;
......@@ -241,8 +242,8 @@ static void processULSegment(void * arg)
r,
&E)==-1) {
#ifdef TASK_MANAGER_LTE
assert(rdata->task_status != NULL);
atomic_store_explicit(&rdata->task_status->completed, 1, memory_order_seq_cst);
// assert(rdata->task_status != NULL);
atomic_store_explicit(&rdata->tasks_remaining->completed, 1, memory_order_seq_cst);
#endif
LOG_E(PHY,"ulsch_decoding.c: Problem in rate matching\n");
return;
......@@ -283,6 +284,7 @@ static void processULSegment(void * arg)
rdata->Fbits,
&eNB->ulsch_tc_init_stats,
&eNB->ulsch_tc_alpha_stats,
&eNB->ulsch_tc_beta_stats,
&eNB->ulsch_tc_gamma_stats,
&eNB->ulsch_tc_ext_stats,
......@@ -291,9 +293,9 @@ static void processULSegment(void * arg)
&ulsch_harq->abort_decode);
#ifdef TASK_MANAGER_LTE
assert(rdata->task_status != NULL);
atomic_store_explicit(&rdata->task_status->completed, 1, memory_order_seq_cst);
#endif
assert(rdata->tasks_remaining != NULL);
atomic_store_explicit(&rdata->tasks_remaining->completed, 1, memory_order_seq_cst);
#endif
}
/*!
......@@ -305,9 +307,7 @@ static void processULSegment(void * arg)
@returns 0 on success
*/
#ifdef TASK_MANAGER_LTE
#ifdef TASK_MANAGER_LTE
static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_id, int harq_pid, int llr8_flag, thread_info_tm_t* t_info)
#else
static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_id, int harq_pid, int llr8_flag)
......@@ -325,8 +325,6 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_i
*decoder16 : *decoder8;
ulsch_harq->processedSegments=0;
set_abort(&ulsch_harq->abort_decode, false);
for (int r=0; r<ulsch_harq->C; r++) {
// printf("before subblock deinterleaving c[%d] = %p\n",r,ulsch_harq->c[r]);
// Get Turbo interleaver parameters
......@@ -350,10 +348,10 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_i
E = ulsch_harq->Qm * ((GpmodC==0?0:1) + (Gp/ulsch_harq->C));
#ifdef TASK_MANAGER_LTE
turboDecode_t* rdata = &((turboDecode_t*)t_info->buf)[t_info->len];
assert(t_info->len < 64);
rdata->task_status = &t_info->task_status[t_info->len];
t_info->len += 1;
turboDecode_t* rdata = &((turboDecode_t*)t_info->buf)[t_info->len];
assert(t_info->len < 64);
rdata->tasks_remaining = &t_info->tasks_remaining[t_info->len];
t_info->len += 1;
#else
union turboReqUnion id= {.s={ulsch->rnti,proc->frame_rx,proc->subframe_rx,0,0}};
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(turboDecode_t),
......@@ -390,10 +388,6 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_i
r_offset+=E;
offset+=sz;
}
#ifdef TASK_MANAGER_LTE
//trigger_all_task_manager(proc->man);
#endif
return(ret);
}
......@@ -433,7 +427,7 @@ unsigned int ulsch_decoding(PHY_VARS_eNB *eNB,
// This is a broken idea. But so is the code arquitecture
,thread_info_tm_t* t_info
#endif
)
)
{
int16_t *ulsch_llr = eNB->pusch_vars[UE_id]->llr;
LTE_DL_FRAME_PARMS *frame_parms = &eNB->frame_parms;
......@@ -1120,7 +1114,7 @@ unsigned int ulsch_decoding(PHY_VARS_eNB *eNB,
LOG_D(PHY,"frame %d subframe %d O_ACK:%d o_ACK[]=%d:%d:%d:%d\n",frame,subframe,ulsch_harq->O_ACK,ulsch_harq->o_ACK[0],ulsch_harq->o_ACK[1],ulsch_harq->o_ACK[2],ulsch_harq->o_ACK[3]);
// Do ULSCH Decoding for data portion
#ifdef TASK_MANAGER_LTE
#ifdef TASK_MANAGER_LTE
ret = ulsch_decoding_data(eNB, proc, UE_id, harq_pid, llr8_flag, t_info);
#else
ret = ulsch_decoding_data(eNB, proc, UE_id, harq_pid, llr8_flag);
......
......@@ -42,6 +42,7 @@
#include "common/utils/LOG/vcd_signal_dumper.h"
#include "common/utils/LOG/log.h"
#include "common/utils/nr/nr_common.h"
#include <stdatomic.h>
#include <syscall.h>
#include <openair2/UTIL/OPT/opt.h>
#include "common/utils/thread_pool/task_manager.h"
......@@ -277,8 +278,7 @@ static void ldpc8blocks(void *p)
}
#ifdef TASK_MANAGER_CODING
assert(atomic_load(&impp->task_status->completed) == 0);
atomic_store_explicit(&impp->task_status->completed, 1, memory_order_seq_cst);
atomic_store_explicit(&impp->task_done->completed, 1, memory_order_release);
#endif
}
......@@ -331,10 +331,10 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
B = A + 24;
// harq->b = a;
AssertFatal((A / 8) + 4 <= max_bytes,
"A %d is too big (A/8+4 = %d > %d)\n",
A,
(A / 8) + 4,
max_bytes);
"A %d is too big (A/8+4 = %d > %d)\n",
A,
(A / 8) + 4,
max_bytes);
memcpy(harq->b, a, (A / 8) + 4); // why is this +4 if the CRC is only 3 bytes?
} else {
// Add 16-bit crc (polynomial A) to payload
......@@ -346,10 +346,10 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
B = A + 16;
// harq->b = a;
AssertFatal((A / 8) + 3 <= max_bytes,
"A %d is too big (A/8+3 = %d > %d)\n",
A,
(A / 8) + 3,
max_bytes);
"A %d is too big (A/8+3 = %d > %d)\n",
A,
(A / 8) + 3,
max_bytes);
memcpy(harq->b, a, (A / 8) + 3); // using 3 bytes to mimic the case of 24 bit crc
}
......@@ -390,14 +390,14 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
impp.Qm = rel15->qamModOrder[0];
impp.rv = rel15->rvIndex[0];
int nb_re_dmrs =
(rel15->dmrsConfigType == NFAPI_NR_DMRS_TYPE1) ? (6 * rel15->numDmrsCdmGrpsNoData) : (4 * rel15->numDmrsCdmGrpsNoData);
(rel15->dmrsConfigType == NFAPI_NR_DMRS_TYPE1) ? (6 * rel15->numDmrsCdmGrpsNoData) : (4 * rel15->numDmrsCdmGrpsNoData);
impp.G = nr_get_G(rel15->rbSize,
rel15->NrOfSymbols,
nb_re_dmrs,
get_num_dmrs(rel15->dlDmrsSymbPos),
harq->unav_res,
rel15->qamModOrder[0],
rel15->nrOfLayers);
rel15->NrOfSymbols,
nb_re_dmrs,
get_num_dmrs(rel15->dlDmrsSymbPos),
harq->unav_res,
rel15->qamModOrder[0],
rel15->nrOfLayers);
uint8_t tmp[68 * 384] __attribute__((aligned(32)));
uint8_t *d = tmp;
int r_offset = 0;
......@@ -408,15 +408,15 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
uint8_t e[impp.E];
bzero(e, impp.E);
nr_rate_matching_ldpc(rel15->maintenance_parms_v3.tbSizeLbrmBytes,
impp.BG,
impp.Zc,
tmp,
e,
impp.n_segments,
impp.F,
impp.K - impp.F - 2 * impp.Zc,
impp.rv,
impp.E);
impp.BG,
impp.Zc,
tmp,
e,
impp.n_segments,
impp.F,
impp.K - impp.F - 2 * impp.Zc,
impp.rv,
impp.E);
nr_interleaving_ldpc(impp.E, impp.Qm, e, impp.output + r_offset);
r_offset += impp.E;
}
......@@ -425,11 +425,11 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
initNotifiedFIFO(&nf);
int nbJobs = 0;
#ifdef TASK_MANAGER_CODING
size_t const sz = (impp.n_segments/8+((impp.n_segments&7)==0 ? 0 : 1));
encoder_implemparams_t arr[sz];
task_status_t task_status[sz];
memset(task_status, 0, sz * sizeof(task_status_t));
#endif
size_t const sz = (impp.n_segments/8+((impp.n_segments&7)==0 ? 0 : 1));
encoder_implemparams_t arr[sz];
task_status_t task_status[sz];
memset(task_status, 0, sz * sizeof(task_status_t));
#endif
for (int j = 0; j < (impp.n_segments / 8 + ((impp.n_segments & 7) == 0 ? 0 : 1)); j++) {
#ifdef TASK_MANAGER_CODING
assert(nbJobs < sz);
......@@ -441,22 +441,20 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
*perJobImpp = impp;
perJobImpp->macro_num = j;
#ifdef TASK_MANAGER_CODING
perJobImpp->task_status = &task_status[nbJobs];
task_t t = {.args = perJobImpp, .func = ldpc8blocks};
assert(atomic_load(&perJobImpp->task_status->completed) == 0);
async_task_manager(&gNB->man, t);
#else
perJobImpp->task_done = &task_status[nbJobs];
task_t t = {.args = perJobImpp, .func = ldpc8blocks};
//assert(atomic_load(&perJobImpp->task_status->completed) == 0);
async_task_manager(&gNB->man, t);
#else
pushTpool(&gNB->threadPool, req);
#endif
nbJobs++;
}
#ifdef TASK_MANAGER_CODING
if(nbJobs > 0) {
//trigger_all_task_manager(&gNB->man);
wait_task_status_completed(nbJobs, task_status);
nbJobs = 0;
}
if(nbJobs > 0) {
wait_task_status_completed(nbJobs, task_status);
}
#else
while (nbJobs) {
notifiedFIFO_elt_t *req = pullTpool(&nf, &gNB->threadPool);
......
......@@ -51,6 +51,7 @@
//#define gNB_DEBUG_TRACE
#include "common/utils/thread_pool/task_manager.h"
#include <stdatomic.h>
#include <stdint.h>
#include <time.h>
#include <stdalign.h>
......@@ -190,8 +191,8 @@ static void nr_processULSegment(void *arg)
LOG_E(PHY, "ulsch_decoding.c: Problem in rate_matching\n");
rdata->decodeIterations = max_ldpc_iterations + 1;
#ifdef TASK_MANAGER
assert(rdata->task_status != NULL && atomic_load(&rdata->task_status->completed) == 0);
atomic_store_explicit(&rdata->task_status->completed, 1, memory_order_release); // memory_order order );
//assert(rdata->task_status != NULL && atomic_load(&rdata->task_status->completed) == 0);
atomic_store_explicit(&rdata->tasks_remaining->completed, 1, memory_order_release); // memory_order order );
#endif
return;
}
......@@ -235,8 +236,8 @@ static void nr_processULSegment(void *arg)
memcpy(ulsch_harq->c[r],llrProcBuf, Kr>>3);
#ifdef TASK_MANAGER
assert(rdata->task_status != NULL && atomic_load(&rdata->task_status->completed) == 0);
atomic_store_explicit(&rdata->task_status->completed, 1, memory_order_release); // memory_order order );
assert(rdata->tasks_remaining != NULL);
atomic_store_explicit(&rdata->tasks_remaining->completed, 1, memory_order_release); // memory_order order );
#endif
}
......@@ -345,7 +346,6 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
uint8_t harq_pid,
uint32_t G
#ifdef TASK_MANAGER
// This is a broken idea. But so is the code arquitecture
, thread_info_tm_t* t_info
#endif
)
......@@ -459,8 +459,8 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
int E = nr_get_E(G, harq_process->C, Qm, n_layers, r);
#ifdef TASK_MANAGER
ldpcDecode_t* rdata = &((ldpcDecode_t*)t_info->buf)[t_info->len];
assert(t_info->len < 64);
rdata->task_status = &t_info->task_status[t_info->len];
assert(t_info->len < 16);
rdata->tasks_remaining = &t_info->tasks_remaining[t_info->len];
t_info->len += 1;
#else
union ldpcReqUnion id = {.s = {ulsch->rnti, frame, nr_tti_rx, 0, 0}};
......
......@@ -47,6 +47,7 @@
#include "openair2/LAYER2/NR_MAC_COMMON/nr_mac_common.h"
#include "openair1/PHY/TOOLS/phy_scope_interface.h"
#include "common/utils/thread_pool/task_manager.h"
#include <stdatomic.h>
//#define ENABLE_PHY_PAYLOAD_DEBUG 1
......@@ -238,8 +239,8 @@ static void nr_processDLSegment(void *arg)
LOG_E(PHY,"dlsch_decoding.c: Problem in rate_matching\n");
#ifdef TASK_MANAGER_UE_DECODING
assert(atomic_load(&rdata->task_status->completed) == 0);
atomic_store(&rdata->task_status->completed, 1);
//assert(atomic_load(rdata->task_done) == 0);
atomic_store_explicit(&rdata->task_done->completed, 1, memory_order_release);
#endif
return;
}
......@@ -283,8 +284,7 @@ static void nr_processDLSegment(void *arg)
stop_meas(&rdata->ts_ldpc_decode);
}
#ifdef TASK_MANAGER_UE_DECODING
assert(atomic_load(&rdata->task_status->completed) == 0);
atomic_store(&rdata->task_status->completed, 1);
atomic_store_explicit(&rdata->task_done->completed, 1, memory_order_release);
#endif
}
......@@ -427,8 +427,8 @@ uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
#ifdef TASK_MANAGER_UE_DECODING
ldpcDecode_ue_t arr[harq_process->C];
task_status_t task_status[harq_process->C];
memset(task_status, 0, harq_process->C*sizeof(task_status_t));
task_status_t task_done[harq_process->C];
memset(task_done, 0, harq_process->C*sizeof(task_status_t));
#endif
for (r=0; r<harq_process->C; r++) {
......@@ -437,7 +437,7 @@ uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
decParams.R = nr_get_R_ldpc_decoder(dlsch->dlsch_config.rv, E, decParams.BG, decParams.Z, &harq_process->llrLen, harq_process->DLround);
#ifdef TASK_MANAGER_UE_DECODING
ldpcDecode_ue_t* rdata = &arr[r];
rdata->task_status = &task_status[r];
rdata->task_done = &task_done[r];
#else
union ldpcReqUnion id = {.s={dlsch->rnti,frame,nr_slot_rx,0,0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(ldpcDecode_ue_t), id.p, &nf, &nr_processDLSegment);
......@@ -479,11 +479,12 @@ uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
int num_seg_ok = 0;
int nbDecode = harq_process->C;
#ifdef TASK_MANAGER_UE_DECODING
//trigger_all_task_manager(&get_nrUE_params()->man);
wait_task_status_completed(nbDecode, task_status);
for(size_t i = 0; i < harq_process->C; ++i){
nr_ue_postDecode(phy_vars_ue, &arr[i], nbDecode == 1, b_size, b, &num_seg_ok, proc);
nbDecode--;
if(nbDecode > 0) {
wait_task_status_completed(nbDecode, task_done);
for(size_t i = 0; i < harq_process->C; ++i){
nr_ue_postDecode(phy_vars_ue, &arr[i], nbDecode == 1, b_size, b, &num_seg_ok, proc);
nbDecode--;
}
}
#else
while (nbDecode) {
......
......@@ -179,7 +179,7 @@ typedef struct {
int endSymbol;
int slot;
#ifdef TASK_MANAGER_RU
task_status_t* task_status;
task_status_t* task_done;
#endif
} feprx_cmd_t;
......@@ -190,7 +190,7 @@ typedef struct {
int startSymbol;
int numSymbols;
#ifdef TASK_MANAGER_RU
task_status_t* task_status;
task_status_t* task_done;
#endif
} feptx_cmd_t;
......
......@@ -252,13 +252,11 @@ typedef struct {
pthread_cond_t cond_RUs;
/// mutex for RXn-TXnp4 processing thread
pthread_mutex_t mutex_RUs;
#ifdef TASK_MANAGER_LTE
task_manager_t* man; // non-owning ptr
#else
tpool_t *threadPool;
#endif
int nbDecode;
notifiedFIFO_t *respDecode;
pthread_mutex_t mutex_emulateRF;
......@@ -800,7 +798,7 @@ typedef struct TurboDecode_s {
int maxIterations;
int decodeIterations;
#ifdef TASK_MANAGER_LTE
task_status_t* task_status;
task_status_t* tasks_remaining;
#endif
} turboDecode_t;
......@@ -819,7 +817,7 @@ typedef struct turboEncode_s {
time_stats_t *te_stats;
time_stats_t *i_stats;
#ifdef TASK_MANAGER_LTE
task_status_t* task_status;
task_status_t* tasks_remaining;
#endif
} turboEncode_t;
......
......@@ -738,9 +738,10 @@ typedef struct PHY_VARS_gNB_s {
void *scopeData;
/// structure for analyzing high-level RT measurements
rt_L1_profiling_t rt_L1_profiling;
#if defined(TASK_MANAGER) && defined(TASK_MANAGER_CODING) && defined(TASK_MANAGER_DEMODULATION)
#if defined(TASK_MANAGER) && defined(TASK_MANAGER_CODING) && defined(TASK_MANAGER_DEMODULATION) && defined(TASK_MANAGER_RU) && defined(TASK_MANAGER_SIM)
task_manager_t man;
#elif !defined(TASK_MANAGER) || !defined(TASK_MANAGER_CODING) || !defined(TASK_MANAGER_DEMODULATION)
#elif !defined(TASK_MANAGER) || !defined(TASK_MANAGER_CODING) || !defined(TASK_MANAGER_DEMODULATION) || !defined(TASK_MANAGER_RU) || !defined(TASK_MANAGER_SIM)
task_manager_t man;
tpool_t threadPool;
#else
......@@ -760,9 +761,9 @@ typedef struct puschSymbolProc_s {
int16_t **llr_layers;
int16_t *s;
uint32_t nvar;
#ifdef TASK_MANAGER_DEMODULATION
task_status_t* task_status;
#endif
#ifdef TASK_MANAGER_DEMODULATION
task_status_t* task_finished;
#endif
} puschSymbolProc_t;
struct puschSymbolReqId {
......@@ -798,7 +799,7 @@ typedef struct LDPCDecode_s {
int decodeIterations;
uint32_t tbslbrm;
#ifdef TASK_MANAGER
task_status_t* task_status;
task_status_t* tasks_remaining;
#endif
} ldpcDecode_t;
......@@ -822,6 +823,9 @@ typedef struct processingData_L1 {
int slot_tx;
openair0_timestamp timestamp_tx;
PHY_VARS_gNB *gNB;
#ifdef TASK_MANAGER_RU
notifiedFIFO_elt_t* elm;
#endif
} processingData_L1_t;
typedef enum {
......
......@@ -32,6 +32,12 @@
#ifndef __PHY_DEFS_NR_UE__H__
#define __PHY_DEFS_NR_UE__H__
#ifdef __cplusplus
#include <atomic>
#define _Atomic(X) std::atomic< X >
#endif
#include "defs_nr_common.h"
#include "CODING/nrPolar_tools/nr_polar_pbch_defs.h"
#include "common/utils/thread_pool/task_manager.h"
......@@ -681,7 +687,7 @@ typedef struct nr_rxtx_thread_data_s {
nr_phy_data_t phy_data;
int tx_wait_for_dlsch;
#ifdef TASK_MANAGER_UE_DECODING
_Atomic(int)* task_done;
task_status_t* task_done;
#endif
} nr_rxtx_thread_data_t;
......@@ -710,7 +716,7 @@ typedef struct LDPCDecode_ue_s {
time_stats_t ts_ldpc_decode;
UE_nr_rxtx_proc_t *proc;
#ifdef TASK_MANAGER_UE_DECODING
task_status_t* task_status;
task_status_t* task_done;
#endif
} ldpcDecode_ue_t;
......
......@@ -1233,8 +1233,7 @@ uci_procedures(PHY_VARS_eNB *eNB,
}
#ifdef TASK_MANAGER_LTE
void postDecode(L1_rxtx_proc_t *proc, turboDecode_t * rdata)
{
void postDecode(L1_rxtx_proc_t *proc, turboDecode_t* rdata){
#else
void postDecode(L1_rxtx_proc_t *proc, notifiedFIFO_elt_t *req)
{
......@@ -1335,9 +1334,9 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
uint32_t harq_pid0 = subframe2harq_pid(&eNB->frame_parms,frame,subframe);
#ifdef TASK_MANAGER_LTE
turboDecode_t arr[64] = {0};
task_status_t task_status[64] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t*)arr, .len = 0, .task_status = task_status};
turboDecode_t arr[64] = {0};
task_status_t tasks_remaining[64] = {0};
thread_info_tm_t t_info = { .tasks_remaining = tasks_remaining, .buf = (uint8_t*)arr };
#endif
for (i = 0; i < NUMBER_OF_ULSCH_MAX; i++) {
......@@ -1392,7 +1391,6 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
rx_ulsch(eNB,proc, i);
stop_meas(&eNB->ulsch_demodulation_stats);
start_meas(&eNB->ulsch_decoding_stats);
ulsch_decoding(eNB,
proc,
i,
......@@ -1400,9 +1398,9 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
ulsch_harq->V_UL_DAI,
ulsch_harq->nb_rb > 20 ? 1 : 0
#ifdef TASK_MANAGER_LTE
,&t_info
#endif
);
,&t_info
#endif
);
}
else if ((ulsch) &&
(ulsch->rnti>0) &&
......@@ -1420,14 +1418,12 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
const bool decode = proc->nbDecode;
#ifdef TASK_MANAGER_LTE
if (proc->nbDecode) {
// Not needed, but won't hurt performance
//trigger_all_task_manager(proc->man);
wait_task_status_completed(t_info.len, t_info.task_status);
for(int i = 0; i < t_info.len; ++i){
postDecode(proc, &arr[i]);
}
//printf("Decoding time %ld \n", time_now_ns() - t0);
assert(t_info.len == proc->nbDecode);
if (proc->nbDecode > 0) {
wait_task_status_completed(t_info.len, t_info.tasks_remaining);
for(size_t i = 0; i < t_info.len; ++i){
postDecode(proc, &arr[i]);
}
}
#else
while (proc->nbDecode > 0) {
......@@ -1440,7 +1436,6 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
delNotifiedFIFO_elt(req);
}
#endif
if (decode)
stop_meas(&eNB->ulsch_decoding_stats);
}
......
......@@ -34,6 +34,7 @@
#include "sched_nr.h"
#include "PHY/MODULATION/modulation_common.h"
#include "PHY/MODULATION/nr_modulation.h"
#include <stdatomic.h>
#include "common/utils/LOG/log.h"
#include "common/utils/system.h"
......@@ -327,8 +328,8 @@ void nr_feptx(void *arg) {
nr_feptx0(ru,slot,startSymbol,numSymbols,aa);
#ifdef TASK_MANAGER_RU
//assert(atomic_load(&feptx->task_status->completed) == 0);
atomic_store_explicit(&feptx->task_status->completed, 1, memory_order_release);
//assert(atomic_load(&feptx->task_done->completed) == 0);
atomic_store_explicit(&feptx->task_done->completed, 1, memory_order_release);
#endif
}
......@@ -345,19 +346,18 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot) {
#ifdef TASK_MANAGER_RU
size_t const sz = ru->nb_tx + (ru->half_slot_parallelization>0)*ru->nb_tx;
assert(sz < 32);
feptx_cmd_t arr[32] = {0};
task_status_t task_status[32] = {0};
//memset(&task_status, 0, sz * sizeof(task_status_t));
feptx_cmd_t arr[sz];
task_status_t tasks_done[sz];
memset(&tasks_done, 0, sizeof(task_status_t) * sz);
#endif
for (int aid=0;aid<ru->nb_tx;aid++) {
#ifdef TASK_MANAGER_RU
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->task_status = &task_status[nbfeptx];
feptx_cmd->task_done = &tasks_done[nbfeptx];
#else
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feptx_cmd_t), 2000 + aid,ru->respfeptx,nr_feptx);
feptx_cmd_t *feptx_cmd=(feptx_cmd_t*)NotifiedFifoData(req);
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feptx_cmd_t), 2000 + aid,ru->respfeptx,nr_feptx);
feptx_cmd_t *feptx_cmd=(feptx_cmd_t*)NotifiedFifoData(req);
#endif
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
......@@ -374,8 +374,8 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot) {
nbfeptx++;
if (ru->half_slot_parallelization>0) {
#ifdef TASK_MANAGER_RU
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->task_status = &task_status[nbfeptx];
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->task_done = &tasks_done[nbfeptx];
#else
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(feptx_cmd_t), 2000 + aid + ru->nb_tx,ru->respfeptx,nr_feptx);
feptx_cmd_t *feptx_cmd=(feptx_cmd_t*)NotifiedFifoData(req);
......@@ -386,8 +386,8 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot) {
feptx_cmd->startSymbol = ru->nr_frame_parms->symbols_per_slot>>1;
feptx_cmd->numSymbols = ru->nr_frame_parms->symbols_per_slot>>1;
#ifdef TASK_MANAGER_RU
task_t t = {.func = nr_feptx, .args = feptx_cmd};
async_task_manager(&ru->man, t);
task_t t = {.func = nr_feptx, .args = feptx_cmd};
async_task_manager(&ru->man, t);
#else
pushTpool(ru->threadPool,req);
#endif
......@@ -396,12 +396,8 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot) {
}
#ifdef TASK_MANAGER_RU
if(nbfeptx > 0) {
//stop_spining_task_manager(&ru->man);
//trigger_all_task_manager(&ru->man);
wait_task_status_completed(nbfeptx, task_status);
nbfeptx = 0;
}
if(nbfeptx > 0)
wait_task_status_completed(nbfeptx, tasks_done);
#else
while (nbfeptx>0) {
notifiedFIFO_elt_t *req=pullTpool(ru->respfeptx, ru->threadPool);
......@@ -441,8 +437,8 @@ void nr_fep(void* arg) {
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPRX+aid, 0);
#ifdef TASK_MANAGER_RU
//assert(atomic_load(&feprx_cmd->task_status->completed) == 0);
atomic_store_explicit(&feprx_cmd->task_status->completed, 1, memory_order_release);
// assert(atomic_load(feprx_cmd->task_done->completed) == 0);
atomic_store_explicit(&feprx_cmd->task_done->completed, 1, memory_order_release);
#endif
}
......@@ -455,16 +451,15 @@ void nr_fep_tp(RU_t *ru, int slot)
#ifdef TASK_MANAGER_RU
size_t const sz = ru->nb_rx + (ru->half_slot_parallelization>0)*ru->nb_rx;
assert(sz < 32);
feprx_cmd_t arr[32] = {0};
task_status_t task_status[32] = {0};
//memset(&task_status, 0, sizeof(task_status_t) * sz);
feprx_cmd_t arr[sz];
task_status_t tasks_done[sz];
memset(&tasks_done, 0, sizeof(task_status_t) * sz);
#endif
for (int aid=0;aid<ru->nb_rx;aid++) {
#ifdef TASK_MANAGER_RU
feprx_cmd_t* feprx_cmd= &arr[nbfeprx];
feprx_cmd->task_status = &task_status[nbfeprx];
feprx_cmd->task_done = &tasks_done[nbfeprx];
#else
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feprx_cmd_t), 1000 + aid,ru->respfeprx,nr_fep);
feprx_cmd_t *feprx_cmd=(feprx_cmd_t*)NotifiedFifoData(req);
......@@ -484,8 +479,8 @@ void nr_fep_tp(RU_t *ru, int slot)
if (ru->half_slot_parallelization>0) {
#ifdef TASK_MANAGER_RU
feprx_cmd_t* feprx_cmd= &arr[nbfeprx];
feprx_cmd->task_status = &task_status[nbfeprx];
feprx_cmd_t * feprx_cmd= &arr[nbfeprx];
feprx_cmd->task_done = &tasks_done[nbfeprx];
#else
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feprx_cmd_t), 1000 + aid + ru->nb_rx,ru->respfeprx,nr_fep);
feprx_cmd_t *feprx_cmd=(feprx_cmd_t*)NotifiedFifoData(req);
......@@ -506,11 +501,8 @@ void nr_fep_tp(RU_t *ru, int slot)
}
}
#ifdef TASK_MANAGER_RU
//stop_spining_task_manager(&ru->man);
if(nbfeprx > 0) {
//trigger_all_task_manager(&ru->man);
wait_task_status_completed(nbfeprx, task_status);
}
if(nbfeprx > 0)
wait_task_status_completed(nbfeprx, tasks_done);
#else
while (nbfeprx>0) {
notifiedFIFO_elt_t *req=pullTpool(ru->respfeprx, ru->threadPool);
......
......@@ -46,7 +46,7 @@
#include <time.h>
#include <stdint.h>
/*
static
int64_t time_now_ns(void)
{
......@@ -57,7 +57,7 @@ int64_t time_now_ns(void)
int64_t nanos = tms.tv_sec * 1000000000 + tms.tv_nsec;
return nanos;
}
*/
//#define DEBUG_RXDATA
//#define SRS_IND_DEBUG
......@@ -783,6 +783,9 @@ bool maybe_trigger_and_spin_threads(PHY_VARS_gNB* gNB, int frame_rx, int slot_rx
*/
#endif
static int cnt = 0;
int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
{
#ifdef TASK_MANAGER
......@@ -882,12 +885,12 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
}
#ifdef TASK_MANAGER
ldpcDecode_t arr[64] = {0}; // cheap to initialize, even here
task_status_t task_status[64] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t*)arr, .len = 0, .task_status = task_status};
ldpcDecode_t arr[16];
task_status_t tasks_remaining[16] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t*)arr, .len = 0, .tasks_remaining = tasks_remaining};
#endif
//int64_t const t0 = time_now_ns();
int64_t const t0 = time_now_ns();
int totalDecode = 0;
for (int ULSCH_id = 0; ULSCH_id < gNB->max_nb_pusch; ULSCH_id++) {
NR_gNB_ULSCH_t *ulsch = &gNB->ulsch[ULSCH_id];
......@@ -1000,15 +1003,17 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
}
}
++cnt;
#ifdef TASK_MANAGER
if (totalDecode > 0) {
assert(totalDecode == t_info.len);
wait_task_status_completed(t_info.len, t_info.task_status);
assert(totalDecode == t_info.len);
wait_task_status_completed(t_info.len, t_info.tasks_remaining);
for(int i = 0; i < t_info.len; ++i){
nr_postDecode(gNB, &arr[i]);
}
totalDecode = 0;
//printf("Decoding time %ld \n", time_now_ns() - t0);
if(cnt % 1024 == 0)
printf("Decoding time %ld \n", time_now_ns() - t0);
}
#else
while (totalDecode > 0) {
......@@ -1019,7 +1024,9 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
delNotifiedFIFO_elt(req);
totalDecode--;
}
//printf("Decoding time %ld \n", time_now_ns() - t0);
if(cnt % 1024 == 0)
printf("Decoding time %ld \n", time_now_ns() - t0);
#endif
stop_meas(&gNB->ulsch_decoding_stats);
......
......@@ -592,7 +592,6 @@ int main(int argc, char **argv) {
#ifdef TASK_MANAGER_LTE
task_manager_t man = {0};
#endif
cpu_freq_GHz = get_cpu_freq_GHz();
printf("Detected cpu_freq %f GHz\n",cpu_freq_GHz);
memset((void *)&sched_resp,0,sizeof(sched_resp));
......@@ -1261,9 +1260,8 @@ int main(int argc, char **argv) {
proc_eNB->respDecode=(notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(proc_eNB->respDecode);
#ifdef TASK_MANAGER_LTE
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
init_task_manager(&man, log_cores);
int const n_threads = 1;
init_task_manager(&man, n_threads);
proc_eNB->man = &man;
#else
proc_eNB->threadPool = (tpool_t *)malloc(sizeof(tpool_t));
......
......@@ -363,11 +363,6 @@ int main(int argc, char **argv) {
static double maxDoppler = 0.0;
static int srs_flag = 0;
static int N_RB_DL=25,osf=1;
#ifdef TASK_MANAGER_LTE
task_manager_t man = {0};
#endif
//uint8_t cyclic_shift = 0;
static uint8_t beta_ACK=0,beta_RI=0,beta_CQI=2,cqi_size=11;
static uint8_t tdd_config=3,frame_type=FDD;
......@@ -794,15 +789,12 @@ int main(int argc, char **argv) {
proc_rxtx_ue->subframe_rx = (proc_rxtx->subframe_tx+6)%10;
#ifdef TASK_MANAGER_LTE
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
init_task_manager(&man, log_cores);
proc_rxtx->man = &man;
int const n_threads = 1;
init_task_manager(proc_rxtx->man, n_threads);
#else
proc_rxtx->threadPool = (tpool_t *)malloc(sizeof(tpool_t));
initTpool("n", proc_rxtx->threadPool, true);
#endif
proc_rxtx->respDecode=(notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(proc_rxtx->respDecode);
......
......@@ -19,6 +19,7 @@
* contact@openairinterface.org
*/
#include <ctype.h>
#include <string.h>
#include <math.h>
#include <unistd.h>
......@@ -101,6 +102,35 @@ nrUE_params_t *get_nrUE_params(void) {
}
configmodule_interface_t *uniqCfg = NULL;
static
int num_threads(char* parms_cpy)
{
char *saveptr, * curptr;
int nbThreads=0;
curptr=strtok_r(parms_cpy,",",&saveptr);
while ( curptr!=NULL ) {
int c=toupper(curptr[0]);
switch (c) {
case 'N':
//pool->activated=false;
break;
default:
int const core_id = atoi(curptr);
printf("create a thread for core %d\n", core_id);
nbThreads++;
}
curptr=strtok_r(NULL,",",&saveptr);
}
return nbThreads;
}
int main(int argc, char **argv)
{
char c;
......@@ -375,9 +405,8 @@ int main(int argc, char **argv)
RC.gNB[0] = calloc(1, sizeof(PHY_VARS_gNB));
gNB = RC.gNB[0];
#ifdef TASK_MANAGER_SIM
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
init_task_manager(&gNB->man, log_cores);
int const n_threads = num_threads(gNBthreads);
init_task_manager(&gNB->man, n_threads);
init_task_manager(&nrUE_params.man, dlsch_threads);
#else
initNamedTpool(gNBthreads, &gNB->threadPool, true, "gNB-tpool");
......
......@@ -19,6 +19,7 @@
* contact@openairinterface.org
*/
#include <ctype.h>
#include <fcntl.h>
#include <math.h>
#include <string.h>
......@@ -261,6 +262,34 @@ void validate_input_pmi(nr_pdsch_AntennaPorts_t pdsch_AntennaPorts, int nrOfLaye
}
static
int num_threads(char* parms_cpy)
{
char *saveptr, * curptr;
int nbThreads=0;
curptr=strtok_r(parms_cpy,",",&saveptr);
while ( curptr!=NULL ) {
int c=toupper(curptr[0]);
switch (c) {
case 'N':
//pool->activated=false;
break;
default:
int const core_id = atoi(curptr);
printf("create a thread for core %d\n", core_id);
nbThreads++;
}
curptr=strtok_r(NULL,",",&saveptr);
}
return nbThreads;
}
int NB_UE_INST = 1;
configmodule_interface_t *uniqCfg = NULL;
int main(int argc, char **argv)
......@@ -916,9 +945,8 @@ int main(int argc, char **argv)
int n_errs = 0;
#ifdef TASK_MANAGER_SIM
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
init_task_manager(&gNB->man, log_cores/2);
int const n_threads = num_threads(gNBthreads);
init_task_manager(&gNB->man, n_threads );
#else
initNamedTpool(gNBthreads, &gNB->threadPool, true, "gNB-tpool");
#endif
......
......@@ -605,8 +605,8 @@ int main(int argc, char **argv)
#ifdef TASK_MANAGER
ldpcDecode_t arr[64] = {0};
task_status_t task_status[64] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t*)arr, .len = 0, .task_status = task_status };
task_status_t tasks_remaining[64] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t*)arr, .len = 0, .tasks_remaining = tasks_remaining };
int nbDecode = nr_ulsch_decoding(gNB, UE_id, channel_output_fixed, frame_parms, rel15_ul, frame, subframe, harq_pid, G, &t_info);
assert(nbDecode > 0);
#else
......@@ -614,7 +614,7 @@ int main(int argc, char **argv)
#endif
int nb_ok = 0;
#ifdef TASK_MANAGER
wait_task_status_completed(t_info.len, t_info.task_status);
wait_task_status_completed(t_info.len, t_info.tasks_remaining);
if(nbDecode > 0){
for(size_t i = 0; i < nbDecode; ++i){
ret = nr_postDecode_sim(gNB, &arr[i], &nb_ok);
......@@ -679,7 +679,6 @@ int main(int argc, char **argv)
loader_reset();
logTerm();
return (n_errors);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment