Commit 5a06ace1 authored by mir's avatar mir

TPool from rx_func and tx_func deadlock avoided

parent 76f356a8
......@@ -63,8 +63,10 @@ void do_work(void* arg)
naive_fibonnacci(23 + a->a);
usleep(rand()%1024);
completed_task_ans(a->ans);
printf("Task completed\n");
//int64_t stop = time_now_us();
//char buffer[100] = {0};
......@@ -89,6 +91,7 @@ int main()
int64_t now = time_now_us();
for(int i = 0; i < NUM_JOBS; ++i){
usleep(rand()%1024);
pair_t* pa = &arr[i];
pa->a = 0; //i%10;
pa->time = 0;
......
......@@ -201,6 +201,7 @@ typedef struct {
pthread_mutex_t mtx;
pthread_cond_t cv;
seq_ring_task_t r;
size_t t_id;
// _Atomic int32_t* futex;
//_Atomic bool* waiting;
_Atomic int done;
......@@ -213,9 +214,10 @@ typedef struct{
static
void init_not_q(not_q_t* q /*, _Atomic int32_t* futex , _Atomic bool* waiting */)
void init_not_q(not_q_t* q, size_t t_id /*, _Atomic int32_t* futex , _Atomic bool* waiting */)
{
assert(q != NULL);
assert(t_id != 0 && "Invalid thread id");
q->done = 0;
//q->waiting = waiting;
......@@ -233,6 +235,8 @@ void init_not_q(not_q_t* q /*, _Atomic int32_t* futex , _Atomic bool* waiting */
rc = pthread_cond_init(&q->cv, c_attr);
assert(rc == 0);
q->t_id = t_id;
//q->futex = futex;
}
......@@ -259,16 +263,24 @@ bool try_push_not_q(not_q_t* q, task_t t)
assert(t.func != NULL);
assert(t.args != NULL);
// if(q->t_id == pthread_self() ){
// printf("[MIR]: Cycle detected. Thread from tpool calling itself. Reentrancy forbidden \n");
// return false;
// }
if(pthread_mutex_trylock(&q->mtx ) != 0)
return false;
push_back_seq_ring_task(&q->r, t);
pthread_cond_signal(&q->cv);
const size_t sz = size_seq_ring_task(&q->r);
assert(sz > 0);
int const rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
pthread_cond_signal(&q->cv);
return true;
}
......@@ -284,9 +296,11 @@ void push_not_q(not_q_t* q, task_t t)
push_back_seq_ring_task(&q->r, t);
pthread_cond_signal(&q->cv);
assert(size_seq_ring_task(&q->r) > 0);
pthread_mutex_unlock(&q->mtx);
pthread_cond_signal(&q->cv);
}
......@@ -334,34 +348,12 @@ bool pop_not_q(not_q_t* q, ret_try_t* out)
assert(rc == 0);
assert(q->done == 0 || q->done ==1);
while(size_seq_ring_task(&q->r) == 0 && q->done == 0)
while(size_seq_ring_task(&q->r) == 0 && q->done == 0){
pthread_cond_wait(&q->cv , &q->mtx);
/*
// Let's be conservative and not use memory_order_relaxed
// while (atomic_load_explicit(q->waiting, memory_order_seq_cst) == true){ //
// Issue X86 PAUSE or ARM YIELD instruction to reduce contention between
// hyper-threads
// pause_or_yield();
// }
pthread_mutex_lock(&q->mtx);
if(size_seq_ring_task(&q->r) == 0 && q->done == 0){
int rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
int val = atomic_load_explicit(q->futex, memory_order_acquire);
long r = syscall(SYS_futex, q->futex, FUTEX_WAIT_PRIVATE, val, NULL, 0);
assert(r != -1);
goto label;
}
*/
//printf("Waking %ld id %ld \n", time_now_us(), pthread_self());
assert(q->done == 0 || q->done ==1);
if(q->done == 1){
//printf("Done, returning \n");
int rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
return false;
......@@ -440,16 +432,19 @@ void* worker_thread(void* arg)
not_q_t* q_arr = (not_q_t*)man->q_arr;
int acc_num_task = 0;
for(;;){
init_not_q(&q_arr[idx], pthread_self() );
// Synchronize all threads
pthread_barrier_wait(&man->barrier);
size_t acc_num_task = 0;
for(;;){
ret_try_t ret = {.success = false};
for(uint32_t i = idx; i < num_it; ++i){
ret = try_pop_not_q(&q_arr[i%len]);
if(ret.success == true){
if(ret.success == true)
break;
}
}
if(ret.success == false){
......@@ -458,16 +453,15 @@ void* worker_thread(void* arg)
if(pop_not_q(&q_arr[idx], &ret) == false)
break;
}
//int64_t now = time_now_us();
//printf("Calling fuinc \n");
//printf("Calling func \n");
ret.t.func(ret.t.args);
//printf("Returning from func \n");
//int64_t stop = time_now_us();
acc_num_task += 1;
//cnt_out++;
//printf("Tasks out %d %ld \n", cnt_out, time_now_us());
acc_num_task +=1;
}
free(args);
......@@ -479,20 +473,20 @@ void init_task_manager(task_manager_t* man, size_t num_threads)
assert(man != NULL);
assert(num_threads > 0 && num_threads < 33 && "Do you have zero or more than 32 processors??");
printf("[MIR]: number of threads %ld \n", num_threads);
man->q_arr = calloc(num_threads, sizeof(not_q_t));
assert(man->q_arr != NULL && "Memory exhausted");
not_q_t* q_arr = (not_q_t*)man->q_arr;
for(size_t i = 0; i < num_threads; ++i){
init_not_q(&q_arr[i]);
}
man->t_arr = calloc(num_threads, sizeof(pthread_t));
assert(man->t_arr != NULL && "Memory exhausted" );
man->len_thr = num_threads;
man->index = 0;
const pthread_barrierattr_t * barrier_attr = NULL;
int rc = pthread_barrier_init(&man->barrier, barrier_attr, num_threads + 1);
assert(rc == 0);
for(size_t i = 0; i < num_threads; ++i){
task_thread_args_t* args = malloc(sizeof(task_thread_args_t) );
args->idx = i;
......@@ -519,7 +513,11 @@ void init_task_manager(task_manager_t* man, size_t num_threads)
}
}
man->index = 0;
// Syncronize thread pool threads. All the threads started
pthread_barrier_wait(&man->barrier);
rc = pthread_barrier_destroy(&man->barrier);
assert(rc == 0);
//pin_thread_to_core(3);
}
......@@ -545,6 +543,8 @@ void free_task_manager(task_manager_t* man, void (*clean)(task_t*))
free(man->q_arr);
free(man->t_arr);
}
void async_task_manager(task_manager_t* man, task_t t)
......@@ -554,28 +554,29 @@ void async_task_manager(task_manager_t* man, task_t t)
assert(t.func != NULL);
//assert(t.args != NULL);
uint64_t const index = man->index++;
const uint32_t len_thr = man->len_thr;
size_t const index = man->index++;
size_t const len_thr = man->len_thr;
not_q_t* q_arr = (not_q_t*)man->q_arr;
for(uint32_t i = 0; i < len_thr ; ++i){
//assert(pthread_self() != q_arr[index%len_thr].t_id);
for(size_t i = 0; i < len_thr ; ++i){
if(try_push_not_q(&q_arr[(i+index) % len_thr], t)){
man->num_task +=1;
// Debbugging purposes
// Debbugging purposes
//cnt_in++;
//printf("Tasks in %d %ld \n", cnt_in, time_now_us());
//printf(" async_task_manager t_id %ld Tasks in %d %ld num_task %ld idx %ld \n", pthread_self(), cnt_in, time_now_us(), man->num_task, (i+index) % len_thr );
return;
}
}
push_not_q(&q_arr[index%len_thr], t);
man->num_task +=1;
// Debbugging purposes
//cnt_in++;
//printf("Tasks in %d %ld \n", cnt_in, time_now_us());
//printf("t_id %ld Tasks in %d %ld num_takss %ld idx %ld \n", pthread_self(), cnt_in, time_now_us(), man->num_task , index % len_thr );
}
void completed_task_ans(task_ans_t* task)
......@@ -585,7 +586,8 @@ void completed_task_ans(task_ans_t* task)
int const task_not_completed = 0;
assert(atomic_load_explicit(&task->status, memory_order_acquire) == task_not_completed && "Task already finished?");
atomic_store_explicit(&task->status, 1, memory_order_release);
//atomic_store_explicit(&task->status, 1, memory_order_release);
atomic_store_explicit(&task->status, 1, memory_order_seq_cst);
}
......@@ -603,7 +605,8 @@ void join_task_ans(task_ans_t* arr, size_t len)
for(; j != -1 ; i++){
for(; j != -1; --j){
int const task_completed = 1;
if(atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
//if(atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
if(atomic_load_explicit(&arr[j].status, memory_order_seq_cst) != task_completed)
break;
}
if(i % 8 == 0){
......
......@@ -66,6 +66,8 @@ typedef struct{
_Atomic(uint64_t) num_task;
pthread_barrier_t barrier;
} task_manager_t;
void init_task_manager(task_manager_t* man, size_t num_threads);
......
......@@ -177,6 +177,12 @@ static void tx_func(void *param)
/* this thread is done with the sched_info, decrease the reference counter */
LOG_D(NR_PHY, "Calling deref_sched_response for id %d (tx_func) in %d.%d\n", info->sched_response_id, frame_tx, slot_tx);
deref_sched_response(info->sched_response_id);
//assert(info->elt->reponseFifo == &gNB->L1_tx_out);
if (info->elt->reponseFifo)
pushNotifiedFIFO(info->elt->reponseFifo, info->elt);
else
delNotifiedFIFO_elt(info->elt);
}
......@@ -288,6 +294,12 @@ void rx_func(void *param)
stop_meas(&softmodem_stats_rxtx_sf);
clock_gettime(CLOCK_MONOTONIC, &info->gNB->rt_L1_profiling.return_L1_RX[rt_prof_idx]);
//assert(info->elt->reponseFifo == &gNB->resp_L1);
if (info->elt->reponseFifo)
pushNotifiedFIFO(info->elt->reponseFifo, info->elt);
else
delNotifiedFIFO_elt(info->elt);
}
static size_t dump_L1_meas_stats(PHY_VARS_gNB *gNB, RU_t *ru, char *output, size_t outputlen) {
......@@ -372,16 +384,16 @@ void init_gNB_Tpool(int inst) {
PHY_VARS_gNB *gNB;
gNB = RC.gNB[inst];
// ULSCH decoding threadpool
#ifdef TASK_MANAGER_DECODING
int const num_threads = parse_num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(&gNB->man, num_threads);
init_task_manager(&gNB->man_rx_tx_ru, 2);
#endif
gNB_L1_proc_t *proc = &gNB->proc;
// PUSCH symbols per thread need to be calculated by how many threads we have
gNB->num_pusch_symbols_per_thread = 1;
// ULSCH decoding threadpool
initTpool(get_softmodem_params()->threadPoolConfig, &gNB->threadPool, cpumeas(CPUMEAS_GETSTATE));
// ULSCH decoder result FIFO
initNotifiedFIFO(&gNB->respPuschSymb);
initNotifiedFIFO(&gNB->respDecode);
......@@ -419,6 +431,7 @@ void term_gNB_Tpool(int inst) {
#ifdef TASK_MANAGER_DECODING
void (*clean)(task_t*) = NULL;
free_task_manager(&gNB->man , clean);
free_task_manager(&gNB->man_rx_tx_ru , clean);
#else
abortTpool(&gNB->threadPool);
#endif
......
......@@ -484,6 +484,11 @@ static void UE_synch(void *arg) {
break;
}
if (syncD->elt->reponseFifo)
pushNotifiedFIFO(syncD->elt->reponseFifo, syncD->elt);
else
delNotifiedFIFO_elt(syncD->elt);
}
static void RU_write(nr_rxtx_thread_data_t *rxtxD) {
......@@ -602,6 +607,11 @@ void processSlotTX(void *arg) {
}
RU_write(rxtxD);
if (rxtxD->elt->reponseFifo)
pushNotifiedFIFO(rxtxD->elt->reponseFifo, rxtxD->elt);
else
delNotifiedFIFO_elt(rxtxD->elt);
}
static void UE_dl_preprocessing(PHY_VARS_NR_UE *UE, const UE_nr_rxtx_proc_t *proc, int *tx_wait_for_dlsch, nr_phy_data_t *phy_data)
......@@ -656,6 +666,8 @@ void UE_dl_processing(void *arg) {
nr_phy_data_t *phy_data = &rxtxD->phy_data;
pdsch_processing(UE, proc, phy_data);
free(rxtxD);
}
void dummyWrite(PHY_VARS_NR_UE *UE,openair0_timestamp timestamp, int writeBlockSize) {
......@@ -797,7 +809,7 @@ void *UE_thread(void *arg)
while (!oai_exit) {
if (syncRunning) {
notifiedFIFO_elt_t *res=tryPullTpool(&nf,&(get_nrUE_params()->Tpool));
notifiedFIFO_elt_t *res = pollNotifiedFIFO(&nf);
if (res) {
syncRunning=false;
......@@ -835,7 +847,10 @@ void *UE_thread(void *arg)
syncData_t *syncMsg = (syncData_t *)NotifiedFifoData(Msg);
syncMsg->UE = UE;
memset(&syncMsg->proc, 0, sizeof(syncMsg->proc));
pushTpool(&(get_nrUE_params()->Tpool), Msg);
syncMsg->elt = Msg;
task_t t = {.func = UE_synch, .args = syncMsg};
async_task_manager(&(get_nrUE_params()->man), t);
trashed_frames = 0;
syncRunning = true;
continue;
......@@ -943,11 +958,13 @@ void *UE_thread(void *arg)
nr_ue_rrc_timer_trigger(UE->Mod_id, curMsg.proc.frame_tx, curMsg.proc.gNB_id);
// RX slot processing. We launch and forget.
notifiedFIFO_elt_t *newRx = newNotifiedFIFO_elt(sizeof(nr_rxtx_thread_data_t), curMsg.proc.nr_slot_rx, NULL, UE_dl_processing);
nr_rxtx_thread_data_t *curMsgRx = (nr_rxtx_thread_data_t *)NotifiedFifoData(newRx);
// Memory ownership is transferred to the function UE_dl_processing
nr_rxtx_thread_data_t *curMsgRx = calloc(1, sizeof(nr_rxtx_thread_data_t));
assert(curMsgRx != NULL && "Memory exhausted");
*curMsgRx = (nr_rxtx_thread_data_t){.proc = curMsg.proc, .UE = UE};
UE_dl_preprocessing(UE, &curMsgRx->proc, tx_wait_for_dlsch, &curMsgRx->phy_data);
pushTpool(&(get_nrUE_params()->Tpool), newRx);
t = (task_t){.func = UE_dl_processing, .args = curMsgRx};
async_task_manager(&(get_nrUE_params()->man), t);
// Start TX slot processing here. It runs in parallel with RX slot processing
// in current code, DURATION_RX_TO_TX constant is the limit to get UL data to encode from a RX slot
......@@ -959,12 +976,12 @@ void *UE_thread(void *arg)
curMsgTx->UE = UE;
curMsgTx->tx_wait_for_dlsch = tx_wait_for_dlsch[curMsgTx->proc.nr_slot_tx];
tx_wait_for_dlsch[curMsgTx->proc.nr_slot_tx] = 0;
pushTpool(&(get_nrUE_params()->Tpool), newTx);
curMsgTx->elt = newElt;
task_t t = {.func = processSlotTX, .args= curMsgTx};
async_task_manager(&(get_nrUE_params()->man), t);
// Wait for TX slot processing to finish
// Should be removed when bugs, race conditions, will be fixed
notifiedFIFO_elt_t *res;
res = pullTpool(&txFifo, &(get_nrUE_params()->Tpool));
notifiedFIFO_elt_t* res = pullNotifiedFIFO(&txFifo);
if (res == NULL)
LOG_E(PHY, "Tpool has been aborted\n");
else
......
......@@ -493,7 +493,6 @@ int main(int argc, char **argv)
int const num_threads = parse_num_threads(get_softmodem_params()->threadPoolConfig);
init_task_manager(&nrUE_params.man, num_threads);
#endif
initTpool(get_softmodem_params()->threadPoolConfig, &(nrUE_params.Tpool), cpumeas(CPUMEAS_GETSTATE));
//randominit (0);
set_taus_seed (0);
......
......@@ -73,7 +73,7 @@ typedef struct {
#ifdef TASK_MANAGER_UE_DECODING
task_manager_t man;
#endif
tpool_t Tpool; // thread pool
// tpool_t Tpool; // thread pool
int UE_scan_carrier;
int UE_fo_compensation;
int timing_advance;
......
......@@ -731,7 +731,7 @@ typedef struct PHY_VARS_gNB_s {
rt_L1_profiling_t rt_L1_profiling;
#if defined(TASK_MANAGER_DECODING) || defined(TASK_MANAGER_CODING) || defined(TASK_MANAGER_DEMODULATION) || defined(TASK_MANAGER_RU) || !defined(TASK_MANAGER_SIM)
task_manager_t man;
tpool_t threadPool;
task_manager_t man_rx_tx_ru;
#else
tpool_t threadPool;
#endif
......@@ -809,6 +809,7 @@ typedef struct processingData_L1 {
int slot_rx;
openair0_timestamp timestamp_tx;
PHY_VARS_gNB *gNB;
notifiedFIFO_elt_t *elt;
} processingData_L1_t;
typedef enum {
......@@ -834,6 +835,7 @@ typedef struct processingData_L1tx {
int num_ul_pdcch;
/* a reference to the sched_response, to release it when not needed anymore */
int sched_response_id;
notifiedFIFO_elt_t *elt;
} processingData_L1tx_t;
typedef struct processingData_L1rx {
......
......@@ -649,6 +649,7 @@ typedef struct nr_rxtx_thread_data_s {
int writeBlockSize;
nr_phy_data_t phy_data;
int tx_wait_for_dlsch;
notifiedFIFO_elt_t * elt;
} nr_rxtx_thread_data_t;
typedef struct LDPCDecode_ue_s {
......
......@@ -379,8 +379,8 @@ int main(int argc, char **argv)
init_task_manager(&gNB->man, num_threads);
init_task_manager(&nrUE_params.man, max(dlsch_threads, 1));
#endif
initNamedTpool(gNBthreads, &gNB->threadPool, true, "gNB-tpool");
initFloatingCoresTpool(dlsch_threads, &nrUE_params.Tpool, false, "UE-tpool");
//initNamedTpool(gNBthreads, &gNB->threadPool, true, "gNB-tpool");
//initFloatingCoresTpool(dlsch_threads, &nrUE_params.Tpool, false, "UE-tpool");
//gNB_config = &gNB->gNB_config;
frame_parms = &gNB->frame_parms; //to be initialized I suppose (maybe not necessary for PBCH)
frame_parms->nb_antennas_tx = n_tx;
......
......@@ -67,6 +67,7 @@
#include "PHY/NR_REFSIG/ul_ref_seq_nr.h"
#include <openair3/ocp-gtpu/gtp_itf.h>
#include "executables/nr-uesoftmodem.h"
#include "common/utils/thread_pool/task_manager.h"
//#define DEBUG_ULSIM
const char *__asan_default_options()
......@@ -553,7 +554,12 @@ int main(int argc, char *argv[])
gNB->ofdm_offset_divisor = UINT_MAX;
gNB->num_pusch_symbols_per_thread = 1;
#ifdef TASK_MANAGER_SIM
init_task_manager(&gNB->man, max(threadCnt, 1));
#else
initFloatingCoresTpool(threadCnt, &gNB->threadPool, false, "gNB-tpool");
#endif
initNotifiedFIFO(&gNB->respDecode);
initNotifiedFIFO(&gNB->respPuschSymb);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment