Commit 0657ab22 authored by Bartosz Podrygajlo's avatar Bartosz Podrygajlo

Remove spinlock from threadpool task return

The idea to use atomic aligned to cache line for counting finished jobs
is reused, but instead of polling a semaphore was added
parent f7d3b728
......@@ -26,37 +26,50 @@
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include "pthread_utils.h"
#include "errno.h"
#include <string.h>
void completed_task_ans(task_ans_t* task)
{
DevAssert(task != NULL);
int status = atomic_load_explicit(&task->status, memory_order_acquire);
AssertFatal(status == 0, "Task not expected to be finished here. Status = %d\n", status);
#define seminit(sem) \
{ \
int ret = sem_init(&sem, 0, 0); \
AssertFatal(ret == 0, "sem_init(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
#define sempost(sem) \
{ \
int ret = sem_post(&sem); \
AssertFatal(ret == 0, "sem_post(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
#define semwait(sem) \
{ \
int ret = sem_wait(&sem); \
AssertFatal(ret == 0, "sem_wait(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
#define semdestroy(sem) \
{ \
int ret = sem_destroy(&sem); \
AssertFatal(ret == 0, "sem_destroy(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
atomic_store_explicit(&task->status, 1, memory_order_release);
void init_task_ans(task_ans_t* ans, uint num_jobs)
{
ans->counter = num_jobs;
seminit(ans->sem);
}
void join_task_ans(task_ans_t* arr, size_t len)
void completed_many_task_ans(task_ans_t* ans, uint num_completed_jobs)
{
DevAssert(len < INT_MAX);
DevAssert(arr != NULL);
// Spin lock inspired by:
// The Art of Writing Efficient Programs:
// An advanced programmer's guide to efficient hardware utilization
// and compiler optimizations using C++ examples
const struct timespec ns = {0, 1};
uint64_t i = 0;
int j = len - 1;
for (; j != -1; i++) {
for (; j != -1; --j) {
int const task_completed = 1;
if (atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
break;
}
if (i % 8 == 0) {
nanosleep(&ns, NULL);
}
DevAssert(ans != NULL);
// Using atomic counter in contention scenario to avoid locking in producers
int num_jobs = atomic_fetch_sub_explicit(&ans->counter, num_completed_jobs, memory_order_relaxed);
if (num_jobs == num_completed_jobs) {
// Using semaphore to enable blocking call in join_task_ans
sempost(ans->sem);
}
}
void join_task_ans(task_ans_t* ans)
{
semwait(ans->sem);
semdestroy(ans->sem);
}
......@@ -21,7 +21,7 @@
#ifndef TASK_ANSWER_THREAD_POOL_H
#define TASK_ANSWER_THREAD_POOL_H
#include "pthread_utils.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -37,6 +37,7 @@ extern "C" {
#include <stddef.h>
#include <stdint.h>
#include <semaphore.h>
#if defined(__i386__) || defined(__x86_64__)
#define LEVEL1_DCACHE_LINESIZE 64
......@@ -50,9 +51,17 @@ extern "C" {
#error Unknown CPU architecture
#endif
/** @brief
* A multi-producer - single-consumer synchronization mechanism built for efficiency under
* contention.
*
* @param sem semaphore to wait on
* @param counter atomic counter to keep track of the number of tasks completed. Atomic counter
* is used for efficiency under contention.
*/
typedef struct {
// Avoid false sharing
_Alignas(LEVEL1_DCACHE_LINESIZE) _Atomic(int) status;
sem_t sem;
_Alignas(LEVEL1_DCACHE_LINESIZE) _Atomic(int) counter;
} task_ans_t;
typedef struct {
......@@ -62,10 +71,29 @@ typedef struct {
task_ans_t* ans;
} thread_info_tm_t;
/// @brief Initialize a task_ans_t struct
///
/// @param ans task_ans_t struct
/// @param num_jobs number of tasks to wait for
void init_task_ans(task_ans_t* ans, unsigned int num_jobs);
void join_task_ans(task_ans_t* arr, size_t len);
/// @brief Wait for all tasks to complete
/// @param ans task_ans_t struct
void join_task_ans(task_ans_t* arr);
void completed_task_ans(task_ans_t* task);
/// @brief Mark a number of tasks as completed.
///
/// @param ans task_ans_t struct
/// @param num_completed_jobs number of tasks to mark as completed
void completed_many_task_ans(task_ans_t* ans, uint num_completed_jobs);
/// @brief Mark 1 tasks as completed.
///
/// @param ans task_ans_t struct
static inline void completed_task_ans(task_ans_t* ans)
{
completed_many_task_ans(ans, 1);
}
#ifdef __cplusplus
}
......
......@@ -114,18 +114,18 @@ int main()
int nb_jobs = 4;
for (int i = 0; i < 1000; i++) {
int parall = nb_jobs;
task_ans_t task_ans[parall];
memset(task_ans, 0, sizeof(task_ans));
task_ans_t task_ans;
init_task_ans(&task_ans, parall);
struct testData test_data[parall];
memset(test_data, 0, sizeof(test_data));
for (int j = 0; j < parall; j++) {
task_t task = {.args = &test_data[j], .func = processing};
struct testData *x = (struct testData *)task.args;
x->id = i;
x->task_ans = &task_ans[j];
x->task_ans = &task_ans;
pushTpool(&pool, task);
}
join_task_ans(task_ans, parall);
join_task_ans(&task_ans);
int sleepmax = 0;
for (int j = 0; j < parall; j++) {
if (test_data[j].sleepTime > sleepmax) {
......
......@@ -250,7 +250,7 @@ int nrLDPC_prepare_TB_decoding(nrLDPC_slot_decoding_parameters_t *nrLDPC_slot_de
for (int r = 0; r < nrLDPC_TB_decoding_parameters->C; r++) {
nrLDPC_decoding_parameters_t *rdata = &((nrLDPC_decoding_parameters_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
rdata->ans = &t_info->ans[t_info->len];
rdata->ans = t_info->ans;
t_info->len += 1;
decParams.R = nrLDPC_TB_decoding_parameters->segments[r].R;
......@@ -309,19 +309,16 @@ int32_t nrLDPC_coding_decoder(nrLDPC_slot_decoding_parameters_t *nrLDPC_slot_dec
nbSegments += nrLDPC_TB_decoding_parameters->C;
}
nrLDPC_decoding_parameters_t arr[nbSegments];
task_ans_t ans[nbSegments];
memset(ans, 0, nbSegments * sizeof(task_ans_t));
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbSegments, .ans = ans};
task_ans_t ans;
init_task_ans(&ans, nbSegments);
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbSegments, .ans = &ans};
int nbDecode = 0;
for (int pusch_id = 0; pusch_id < nrLDPC_slot_decoding_parameters->nb_TBs; pusch_id++) {
nbDecode += nrLDPC_prepare_TB_decoding(nrLDPC_slot_decoding_parameters, pusch_id, &t_info);
(void)nrLDPC_prepare_TB_decoding(nrLDPC_slot_decoding_parameters, pusch_id, &t_info);
}
DevAssert(nbDecode == t_info.len);
// Execute thread poool tasks
join_task_ans(t_info.ans, t_info.len);
// Execute thread pool tasks
join_task_ans(t_info.ans);
for (int pusch_id = 0; pusch_id < nrLDPC_slot_decoding_parameters->nb_TBs; pusch_id++) {
nrLDPC_TB_decoding_parameters_t *nrLDPC_TB_decoding_parameters = &nrLDPC_slot_decoding_parameters->TBs[pusch_id];
......
......@@ -189,7 +189,7 @@ static int nrLDPC_prepare_TB_encoding(nrLDPC_slot_encoding_parameters_t *nrLDPC_
for (int j = 0; j < n_seg; j++) {
ldpc8blocks_args_t *perJobImpp = &((ldpc8blocks_args_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
impp.ans = &t_info->ans[t_info->len];
impp.ans = t_info->ans;
t_info->len += 1;
impp.macro_num = j;
......@@ -211,19 +211,19 @@ int nrLDPC_coding_encoder(nrLDPC_slot_encoding_parameters_t *nrLDPC_slot_encodin
nbTasks += n_seg;
}
ldpc8blocks_args_t arr[nbTasks];
task_ans_t ans[nbTasks];
memset(ans, 0, nbTasks * sizeof(task_ans_t));
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbTasks, .ans = ans};
task_ans_t ans;
init_task_ans(&ans, nbTasks);
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbTasks, .ans = &ans};
int nbEncode = 0;
for (int dlsch_id = 0; dlsch_id < nrLDPC_slot_encoding_parameters->nb_TBs; dlsch_id++) {
nbEncode += nrLDPC_prepare_TB_encoding(nrLDPC_slot_encoding_parameters, dlsch_id, &t_info);
}
DevAssert(nbEncode == t_info.len);
// Execute thread poool tasks
join_task_ans(ans, nbEncode);
if (nbEncode < nbTasks) {
completed_many_task_ans(&ans, nbTasks - nbEncode);
}
// Execute thread pool tasks
join_task_ans(&ans);
return 0;
}
......@@ -278,7 +278,7 @@ int decoder_xdma(nrLDPC_TB_decoding_parameters_t *TB_params, int frame_rx, int s
DevAssert(num_threads_prepare == t_info.len);
// wait for the prepare jobs to complete
join_task_ans(t_info.ans, t_info.len);
join_task_ans(t_info.ans);
for (uint32_t job = 0; job < num_threads_prepare; job++) {
args_fpga_decode_prepare_t *args = &arr[job];
......
......@@ -348,12 +348,12 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
}
turboEncode_t arr[hadlsch->C];
task_ans_t ans[hadlsch->C];
memset(ans, 0, hadlsch->C * sizeof(task_ans_t));
task_ans_t ans;
init_task_ans(&ans, hadlsch->C);
for (int r = 0, r_offset = 0; r < hadlsch->C; r++) {
turboEncode_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->ans = &ans;
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
......@@ -382,7 +382,7 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
join_task_ans(ans, hadlsch->C);
join_task_ans(&ans);
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ENB_DLSCH_ENCODING, VCD_FUNCTION_OUT);
return(0);
......@@ -449,12 +449,12 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
return(-1);
}
turboEncode_t arr[hadlsch->C];
task_ans_t ans[hadlsch->C];
memset(ans, 0, hadlsch->C * sizeof(task_ans_t));
task_ans_t ans;
init_task_ans(&ans, hadlsch->C);
for (int r = 0, r_offset = 0; r < hadlsch->C; r++) {
turboEncode_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->ans = &ans;
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
......@@ -483,7 +483,7 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
join_task_ans(ans, hadlsch->C);
join_task_ans(&ans);
return(0);
}
......
......@@ -341,7 +341,7 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB,
turboDecode_t *rdata = &((turboDecode_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
rdata->ans = &t_info->ans[t_info->len];
rdata->ans = t_info->ans;
t_info->len += 1;
rdata->eNB=eNB;
......
......@@ -506,8 +506,8 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
int num_jobs = CEILIDIV(gNB->frame_parms.nb_antennas_rx, numAntennas);
puschAntennaProc_t rdatas[num_jobs];
memset(rdatas, 0, sizeof(rdatas));
task_ans_t ans[num_jobs];
memset(ans, 0, sizeof(ans));
task_ans_t ans;
init_task_ans(&ans, num_jobs);
for (int job_id = 0; job_id < num_jobs; job_id++) {
puschAntennaProc_t *rdata = &rdatas[job_id];
task_t task = {.func = nr_pusch_antenna_processing, .args = rdata};
......@@ -531,7 +531,7 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
rdata->pusch_vars = &gNB->pusch_vars[ul_id];
rdata->chest_freq = gNB->chest_freq;
rdata->rxdataF = gNB->common_vars.rxdataF;
rdata->ans = &ans[job_id];
rdata->ans = &ans;
// Call the nr_pusch_antenna_processing function
if (job_id == num_jobs - 1) {
// Run the last job inline
......@@ -539,10 +539,9 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
} else {
pushTpool(&gNB->threadPool, task);
}
LOG_D(PHY, "Added Antenna (count %d/%d) to process, in pipe\n", job_id, num_jobs);
} // Antenna Loop
join_task_ans(ans, num_jobs - 1);
join_task_ans(&ans);
stop_meas(&gNB->pusch_channel_estimation_antenna_processing_stats);
for (int aarx = 0; aarx < gNB->frame_parms.nb_antennas_rx; aarx++) {
......
......@@ -1260,7 +1260,6 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
nfapi_nr_pusch_pdu_t *rel15_ul = &gNB->ulsch[ulsch_id].harq_process->ulsch_pdu;
NR_gNB_PUSCH *pusch_vars = &gNB->pusch_vars[ulsch_id];
int nbSymb = 0;
uint32_t bwp_start_subcarrier = ((rel15_ul->rb_start + rel15_ul->bwp_start) * NR_NB_SC_PER_RB + frame_parms->first_carrier_offset) % frame_parms->ofdm_symbol_size;
LOG_D(PHY,"pusch %d.%d : bwp_start_subcarrier %d, rb_start %d, first_carrier_offset %d\n", frame,slot,bwp_start_subcarrier, rel15_ul->rb_start, frame_parms->first_carrier_offset);
LOG_D(PHY,"pusch %d.%d : ul_dmrs_symb_pos %x\n",frame,slot,rel15_ul->ul_dmrs_symb_pos);
......@@ -1483,9 +1482,9 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
int total_res = 0;
int const loop_iter = CEILIDIV(rel15_ul->nr_of_symbols, numSymbols);
puschSymbolProc_t arr[loop_iter];
task_ans_t arr_ans[loop_iter];
task_ans_t ans;
init_task_ans(&ans, loop_iter);
memset(arr_ans, 0, sizeof(arr_ans));
int sz_arr = 0;
for(uint8_t task_index = 0; task_index < loop_iter; task_index++) {
int symbol = task_index * numSymbols + rel15_ul->start_symbol_index;
......@@ -1500,7 +1499,7 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
total_res += res_per_task;
if (res_per_task > 0) {
puschSymbolProc_t *rdata = &arr[sz_arr];
rdata->ans = &arr_ans[sz_arr];
rdata->ans = &ans;
++sz_arr;
rdata->gNB = gNB;
......@@ -1522,16 +1521,15 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
} else {
task_t t = {.func = &nr_pusch_symbol_processing, .args = rdata};
pushTpool(&gNB->threadPool, t);
nbSymb++;
}
LOG_D(PHY, "%d.%d Added symbol %d (count %d) to process, in pipe\n", frame, slot, symbol, nbSymb);
LOG_D(PHY, "%d.%d Added symbol %d to process, in pipe\n", frame, slot, symbol);
} else {
completed_task_ans(&ans);
}
} // symbol loop
if (nbSymb > 0) {
join_task_ans(arr_ans, sz_arr);
}
join_task_ans(&ans);
stop_meas(&gNB->rx_pusch_symbol_processing_stats);
// Copy the data to the scope. This cannot be performed in one call to gNBscopeCopy because the data is not contiguous in the
......
......@@ -308,8 +308,8 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
fp->N_RB_DL,
numGscn);
task_ans_t ans[numGscn];
memset(ans, 0, sizeof(ans));
task_ans_t ans;
init_task_ans(&ans, numGscn);
nr_ue_ssb_scan_t ssb_info[numGscn];
for (int s = 0; s < numGscn; s++) {
nr_ue_ssb_scan_t *ssbInfo = &ssb_info[s];
......@@ -331,7 +331,7 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
ssbInfo->gscnInfo.gscn,
ssbInfo->gscnInfo.ssbFirstSC,
ssbInfo->gscnInfo.ssRef);
ssbInfo->ans = &ans[s];
ssbInfo->ans = &ans;
task_t t = {.func = nr_scan_ssb, .args = ssbInfo};
pushTpool(&get_nrUE_params()->Tpool, t);
}
......@@ -339,7 +339,7 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
// Collect the scan results
nr_ue_ssb_scan_t res = {0};
if (numGscn > 0) {
join_task_ans(ans, numGscn);
join_task_ans(&ans);
for (int i = 0; i < numGscn; i++) {
nr_ue_ssb_scan_t *ssbInfo = &ssb_info[i];
if (ssbInfo->syncRes.cell_detected) {
......
......@@ -1329,8 +1329,9 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
uint32_t harq_pid0 = subframe2harq_pid(&eNB->frame_parms,frame,subframe);
turboDecode_t arr[64] = {0};
task_ans_t ans[64] = {0};
thread_info_tm_t t_info = {.ans = ans, .cap = 64, .len = 0, .buf = (uint8_t *)arr};
task_ans_t ans;
init_task_ans(&ans, 64);
thread_info_tm_t t_info = {.ans = &ans, .cap = 64, .len = 0, .buf = (uint8_t *)arr};
for (i = 0; i < NUMBER_OF_ULSCH_MAX; i++) {
ulsch = eNB->ulsch[i];
......@@ -1420,7 +1421,10 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
const bool decode = proc->nbDecode;
DevAssert(t_info.len == proc->nbDecode);
if (proc->nbDecode > 0) {
join_task_ans(t_info.ans, t_info.len);
if (t_info.len != t_info.cap) {
completed_many_task_ans(t_info.ans, t_info.cap - t_info.len);
}
join_task_ans(t_info.ans);
for (size_t i = 0; i < t_info.len; ++i) {
postDecode(proc, &arr[i]);
}
......
......@@ -300,14 +300,14 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
start_meas(&ru->ofdm_total_stats);
size_t const sz = ru->nb_tx + (ru->half_slot_parallelization > 0) * ru->nb_tx;
AssertFatal(sz < 64, "Please, increase the buffer size");
feptx_cmd_t arr[64] = {0};
task_ans_t ans[64] = {0};
feptx_cmd_t arr[sz];
task_ans_t ans;
init_task_ans(&ans, sz);
int nbfeptx = 0;
for (int aid = 0; aid < ru->nb_tx; aid++) {
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->ans = &ans[nbfeptx];
feptx_cmd->ans = &ans;
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
......@@ -321,7 +321,7 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
nbfeptx++;
if (ru->half_slot_parallelization > 0) {
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->ans = &ans[nbfeptx];
feptx_cmd->ans = &ans;
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
......@@ -334,8 +334,7 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
nbfeptx++;
}
}
join_task_ans(ans, nbfeptx);
join_task_ans(&ans);
stop_meas(&ru->ofdm_total_stats);
if (ru->idx == 0)
......@@ -379,13 +378,13 @@ void nr_fep_tp(RU_t *ru, int slot) {
start_meas(&ru->ofdm_demod_stats);
size_t const sz = ru->nb_rx + (ru->half_slot_parallelization > 0) * ru->nb_rx;
AssertFatal(sz < 64, "Please, increase buffer size");
feprx_cmd_t arr[64] = {0};
task_ans_t ans[64] = {0};
feprx_cmd_t arr[sz];
task_ans_t ans;
init_task_ans(&ans, sz);
for (int aid=0;aid<ru->nb_rx;aid++) {
feprx_cmd_t *feprx_cmd = &arr[nbfeprx];
feprx_cmd->ans = &ans[nbfeprx];
feprx_cmd->ans = &ans;
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
......@@ -399,7 +398,7 @@ void nr_fep_tp(RU_t *ru, int slot) {
nbfeprx++;
if (ru->half_slot_parallelization > 0) {
feprx_cmd_t *feprx_cmd = &arr[nbfeprx];
feprx_cmd->ans = &ans[nbfeprx];
feprx_cmd->ans = &ans;
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
......@@ -413,8 +412,7 @@ void nr_fep_tp(RU_t *ru, int slot) {
nbfeprx++;
}
}
join_task_ans(ans, nbfeprx);
join_task_ans(&ans);
stop_meas(&ru->ofdm_demod_stats);
if (ru->idx == 0) VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPRX, 0 );
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment