Commit a8ea2207 authored by Eurecom's avatar Eurecom

conditional variables added

parent 98fdcb61
......@@ -36,7 +36,6 @@ int64_t time_now_us(void)
}
return micros;
}
*/
static
void pin_thread_to_core(int core_num)
......@@ -49,6 +48,7 @@ void pin_thread_to_core(int core_num)
printf("Pining into core %d id %ld \n", core_num, pthread_self());
}
*/
//////////////////////////////
//////////////////////////////
......@@ -190,8 +190,8 @@ typedef struct {
pthread_mutex_t mtx;
pthread_cond_t cv;
seq_ring_task_t r;
_Atomic int32_t* futex;
_Atomic bool* waiting;
// _Atomic int32_t* futex;
// _Atomic bool* waiting;
_Atomic int done;
} not_q_t;
......@@ -202,12 +202,12 @@ typedef struct{
static
void init_not_q(not_q_t* q, _Atomic int32_t* futex, _Atomic bool* waiting)
void init_not_q(not_q_t* q/*, _Atomic int32_t* futex, _Atomic bool* waiting*/)
{
assert(q != NULL);
q->done = 0;
q->waiting = waiting;
//q->waiting = waiting;
init_seq_ring_task(&q->r);
pthread_mutexattr_t attr = {0};
......@@ -222,7 +222,7 @@ void init_not_q(not_q_t* q, _Atomic int32_t* futex, _Atomic bool* waiting)
rc = pthread_cond_init(&q->cv, c_attr);
assert(rc == 0);
q->futex = futex;
//q->futex = futex;
}
static
......@@ -253,6 +253,8 @@ bool try_push_not_q(not_q_t* q, task_t t)
push_back_seq_ring_task(&q->r, t);
pthread_cond_signal(&q->cv);
int const rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
......@@ -271,6 +273,8 @@ void push_not_q(not_q_t* q, task_t t)
push_back_seq_ring_task(&q->r, t);
pthread_cond_signal(&q->cv);
pthread_mutex_unlock(&q->mtx);
}
......@@ -314,26 +318,29 @@ bool pop_not_q(not_q_t* q, ret_try_t* out)
assert(out != NULL);
assert(q->done == 0 || q->done ==1);
label:
//label:
// Let's be conservative and not use memory_order_relaxed
while (atomic_load_explicit(q->waiting, memory_order_seq_cst) == true){ //
// while (atomic_load_explicit(q->waiting, memory_order_seq_cst) == true){ //
// Issue X86 PAUSE or ARM YIELD instruction to reduce contention between
// hyper-threads
pause_or_yield();
}
// pause_or_yield();
// }
pthread_mutex_lock(&q->mtx);
if(size_seq_ring_task(&q->r) == 0 && q->done == 0){
int rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
int val = *q->futex; // atomic_load_explicit(q->futex, memory_order_acquire);
long r = syscall(SYS_futex, q->futex, FUTEX_WAIT_PRIVATE, val, NULL, 0);
assert(r != -1);
goto label;
}
while(size_seq_ring_task(&q->r) == 0 && q->done == 0)
pthread_cond_wait(&q->cv , &q->mtx);
// if(size_seq_ring_task(&q->r) == 0 && q->done == 0){
// int rc = pthread_mutex_unlock(&q->mtx);
// assert(rc == 0);
//
// int val = *q->futex; // atomic_load_explicit(q->futex, memory_order_acquire);
// long r = syscall(SYS_futex, q->futex, FUTEX_WAIT_PRIVATE, val, NULL, 0);
// assert(r != -1);
// goto label;
// }
//
assert(q->done == 0 || q->done ==1);
//printf("Waking %ld id %ld \n", time_now_us(), pthread_self());
......@@ -362,8 +369,11 @@ void done_not_q(not_q_t* q)
assert(rc == 0);
q->done = 1;
long r = syscall(SYS_futex, q->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
assert(r != -1);
//long r = syscall(SYS_futex, q->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0);
//assert(r != -1);
rc = pthread_cond_signal(&q->cv);
assert(rc == 0);
rc = pthread_mutex_unlock(&q->mtx);
assert(rc == 0);
......@@ -399,8 +409,8 @@ void* worker_thread(void* arg)
task_thread_args_t* args = (task_thread_args_t*)arg;
int const idx = args->idx;
int const log_cores = get_nprocs_conf();
assert(log_cores > 0);
//int const log_cores = get_nprocs_conf();
//assert(log_cores > 0);
// Assuming: 2 x Physical cores = Logical cores
//pin_thread_to_core(idx+log_cores/2);
......@@ -450,13 +460,13 @@ void init_task_manager(task_manager_t* man, uint32_t num_threads)
man->q_arr = calloc(num_threads, sizeof(not_q_t));
assert(man->q_arr != NULL && "Memory exhausted");
man->futex = 0;
//man->futex = 0;
man->waiting = false;
//man->waiting = false;
not_q_t* q_arr = (not_q_t*)man->q_arr;
for(uint32_t i = 0; i < num_threads; ++i){
init_not_q(&q_arr[i], &man->futex, &man->waiting);
init_not_q(&q_arr[i] /*, &man->futex, &man->waiting*/);
}
man->t_arr = calloc(num_threads, sizeof(pthread_t));
......@@ -484,26 +494,26 @@ void init_task_manager(task_manager_t* man, uint32_t num_threads)
}
man->index = 0;
pthread_mutexattr_t attr = {0};
#ifdef _DEBUG
int const rc_mtx = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
assert(rc_mtx == 0);
#endif
int rc = pthread_mutex_init(&man->wait_mtx, &attr);
assert(rc == 0 && "Error while creating the mtx");
pthread_condattr_t* c_attr = NULL;
rc = pthread_cond_init(&man->wait_cv, c_attr);
assert(rc == 0);
//
// pthread_mutexattr_t attr = {0};
//#ifdef _DEBUG
// int const rc_mtx = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
// assert(rc_mtx == 0);
//#endif
// int rc = pthread_mutex_init(&man->wait_mtx, &attr);
// assert(rc == 0 && "Error while creating the mtx");
//
// pthread_condattr_t* c_attr = NULL;
// rc = pthread_cond_init(&man->wait_cv, c_attr);
// assert(rc == 0);
//
//pin_thread_to_core(3);
}
void free_task_manager(task_manager_t* man, void (*clean)(task_t*))
{
not_q_t* q_arr = (not_q_t*)man->q_arr;
atomic_store(&man->waiting, false);
//atomic_store(&man->waiting, false);
for(uint32_t i = 0; i < man->len_thr; ++i){
done_not_q(&q_arr[i]);
......@@ -522,11 +532,11 @@ void free_task_manager(task_manager_t* man, void (*clean)(task_t*))
free(man->t_arr);
int rc = pthread_mutex_destroy(&man->wait_mtx);
assert(rc == 0);
rc = pthread_cond_destroy(&man->wait_cv);
assert(rc == 0);
// int rc = pthread_mutex_destroy(&man->wait_mtx);
// assert(rc == 0);
//
// rc = pthread_cond_destroy(&man->wait_cv);
// assert(rc == 0);
}
void async_task_manager(task_manager_t* man, task_t t)
......@@ -550,6 +560,7 @@ void async_task_manager(task_manager_t* man, task_t t)
man->num_task +=1;
}
/*
void trigger_and_spin_task_manager(task_manager_t* man)
{
assert(man != NULL);
......@@ -611,6 +622,8 @@ void wait_all_task_manager(task_manager_t* man)
}
}
*/
/*
// This function does not belong here logically
void wait_spin_all_atomics_one(size_t len, _Atomic int arr[len])
......@@ -663,7 +676,5 @@ void wait_task_status_completed(size_t len, task_status_t* arr)
nanosleep(&ns, NULL);
}
}
}
}
......@@ -59,12 +59,12 @@ typedef struct{
atomic_uint_fast64_t num_task;
pthread_cond_t wait_cv;
pthread_mutex_t wait_mtx;
// pthread_cond_t wait_cv;
// pthread_mutex_t wait_mtx;
_Atomic int32_t futex;
// _Atomic int32_t futex;
_Atomic bool waiting;
// _Atomic bool waiting;
} task_manager_t;
void init_task_manager(task_manager_t* man, uint32_t num_threads);
......@@ -73,27 +73,22 @@ void free_task_manager(task_manager_t* man, void (*clean)(task_t* args) );
void async_task_manager(task_manager_t* man, task_t t);
void trigger_all_task_manager(task_manager_t* man);
//void trigger_and_spin_task_manager(task_manager_t* man);
void stop_spining_task_manager(task_manager_t* man);
//void trigger_and_wait_all_task_manager(task_manager_t* man);
void wait_all_task_manager(task_manager_t* man);
//void trigger_all_task_manager(task_manager_t* man);
//
////void trigger_and_spin_task_manager(task_manager_t* man);
//
//void stop_spining_task_manager(task_manager_t* man);
//
////void trigger_and_wait_all_task_manager(task_manager_t* man);
//
//void wait_all_task_manager(task_manager_t* man);
//
// This function does not belong here.
// It should be in an algorithm file
//void wait_spin_all_atomics_one(size_t len, _Atomic int arr[len]);
void wait_task_status_completed(size_t len, task_status_t* arr);
#endif
......@@ -525,8 +525,8 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
offset += ((harq_process->K >> 3) - (harq_process->F >> 3) - ((harq_process->C > 1) ? 3 : 0));
}
#ifdef TASK_MANAGER
stop_spining_task_manager(&phy_vars_gNB->man);
trigger_all_task_manager(&phy_vars_gNB->man);
//stop_spining_task_manager(&phy_vars_gNB->man);
//trigger_all_task_manager(&phy_vars_gNB->man);
#endif
return harq_process->C;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment