Commit 77ddc362 authored by Robert Schmidt's avatar Robert Schmidt

Merge remote-tracking branch 'origin/tpool-nospinlock' into integration_2025_w03 (!3193)

Remove spinlock from threadpool task return

The idea to use atomic aligned to cache line for counting finished jobs
is reused, but instead of polling a semaphore was added.
parents a4bbf9e5 0657ab22
......@@ -26,37 +26,50 @@
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include "pthread_utils.h"
#include "errno.h"
#include <string.h>
void completed_task_ans(task_ans_t* task)
{
DevAssert(task != NULL);
int status = atomic_load_explicit(&task->status, memory_order_acquire);
AssertFatal(status == 0, "Task not expected to be finished here. Status = %d\n", status);
#define seminit(sem) \
{ \
int ret = sem_init(&sem, 0, 0); \
AssertFatal(ret == 0, "sem_init(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
#define sempost(sem) \
{ \
int ret = sem_post(&sem); \
AssertFatal(ret == 0, "sem_post(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
#define semwait(sem) \
{ \
int ret = sem_wait(&sem); \
AssertFatal(ret == 0, "sem_wait(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
#define semdestroy(sem) \
{ \
int ret = sem_destroy(&sem); \
AssertFatal(ret == 0, "sem_destroy(): ret=%d, errno=%d (%s)\n", ret, errno, strerror(errno)); \
}
atomic_store_explicit(&task->status, 1, memory_order_release);
void init_task_ans(task_ans_t* ans, uint num_jobs)
{
ans->counter = num_jobs;
seminit(ans->sem);
}
void join_task_ans(task_ans_t* arr, size_t len)
void completed_many_task_ans(task_ans_t* ans, uint num_completed_jobs)
{
DevAssert(len < INT_MAX);
DevAssert(arr != NULL);
// Spin lock inspired by:
// The Art of Writing Efficient Programs:
// An advanced programmer's guide to efficient hardware utilization
// and compiler optimizations using C++ examples
const struct timespec ns = {0, 1};
uint64_t i = 0;
int j = len - 1;
for (; j != -1; i++) {
for (; j != -1; --j) {
int const task_completed = 1;
if (atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
break;
}
if (i % 8 == 0) {
nanosleep(&ns, NULL);
}
DevAssert(ans != NULL);
// Using atomic counter in contention scenario to avoid locking in producers
int num_jobs = atomic_fetch_sub_explicit(&ans->counter, num_completed_jobs, memory_order_relaxed);
if (num_jobs == num_completed_jobs) {
// Using semaphore to enable blocking call in join_task_ans
sempost(ans->sem);
}
}
void join_task_ans(task_ans_t* ans)
{
semwait(ans->sem);
semdestroy(ans->sem);
}
......@@ -21,7 +21,7 @@
#ifndef TASK_ANSWER_THREAD_POOL_H
#define TASK_ANSWER_THREAD_POOL_H
#include "pthread_utils.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -39,6 +39,7 @@ extern "C" {
#include <stddef.h>
#include <stdint.h>
#include <semaphore.h>
#if defined(__i386__) || defined(__x86_64__)
#define LEVEL1_DCACHE_LINESIZE 64
......@@ -52,9 +53,17 @@ extern "C" {
#error Unknown CPU architecture
#endif
/** @brief
* A multi-producer - single-consumer synchronization mechanism built for efficiency under
* contention.
*
* @param sem semaphore to wait on
* @param counter atomic counter to keep track of the number of tasks completed. Atomic counter
* is used for efficiency under contention.
*/
typedef struct {
// Avoid false sharing
_Alignas(LEVEL1_DCACHE_LINESIZE) _Atomic(int) status;
sem_t sem;
_Alignas(LEVEL1_DCACHE_LINESIZE) _Atomic(int) counter;
} task_ans_t;
typedef struct {
......@@ -64,10 +73,29 @@ typedef struct {
task_ans_t* ans;
} thread_info_tm_t;
/// @brief Initialize a task_ans_t struct
///
/// @param ans task_ans_t struct
/// @param num_jobs number of tasks to wait for
void init_task_ans(task_ans_t* ans, unsigned int num_jobs);
void join_task_ans(task_ans_t* arr, size_t len);
/// @brief Wait for all tasks to complete
/// @param ans task_ans_t struct
void join_task_ans(task_ans_t* arr);
void completed_task_ans(task_ans_t* task);
/// @brief Mark a number of tasks as completed.
///
/// @param ans task_ans_t struct
/// @param num_completed_jobs number of tasks to mark as completed
void completed_many_task_ans(task_ans_t* ans, uint num_completed_jobs);
/// @brief Mark 1 tasks as completed.
///
/// @param ans task_ans_t struct
static inline void completed_task_ans(task_ans_t* ans)
{
completed_many_task_ans(ans, 1);
}
#ifdef __cplusplus
}
......
......@@ -114,18 +114,18 @@ int main()
int nb_jobs = 4;
for (int i = 0; i < 1000; i++) {
int parall = nb_jobs;
task_ans_t task_ans[parall];
memset(task_ans, 0, sizeof(task_ans));
task_ans_t task_ans;
init_task_ans(&task_ans, parall);
struct testData test_data[parall];
memset(test_data, 0, sizeof(test_data));
for (int j = 0; j < parall; j++) {
task_t task = {.args = &test_data[j], .func = processing};
struct testData *x = (struct testData *)task.args;
x->id = i;
x->task_ans = &task_ans[j];
x->task_ans = &task_ans;
pushTpool(&pool, task);
}
join_task_ans(task_ans, parall);
join_task_ans(&task_ans);
int sleepmax = 0;
for (int j = 0; j < parall; j++) {
if (test_data[j].sleepTime > sleepmax) {
......
......@@ -250,7 +250,7 @@ int nrLDPC_prepare_TB_decoding(nrLDPC_slot_decoding_parameters_t *nrLDPC_slot_de
for (int r = 0; r < nrLDPC_TB_decoding_parameters->C; r++) {
nrLDPC_decoding_parameters_t *rdata = &((nrLDPC_decoding_parameters_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
rdata->ans = &t_info->ans[t_info->len];
rdata->ans = t_info->ans;
t_info->len += 1;
decParams.R = nrLDPC_TB_decoding_parameters->segments[r].R;
......@@ -309,19 +309,16 @@ int32_t nrLDPC_coding_decoder(nrLDPC_slot_decoding_parameters_t *nrLDPC_slot_dec
nbSegments += nrLDPC_TB_decoding_parameters->C;
}
nrLDPC_decoding_parameters_t arr[nbSegments];
task_ans_t ans[nbSegments];
memset(ans, 0, nbSegments * sizeof(task_ans_t));
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbSegments, .ans = ans};
task_ans_t ans;
init_task_ans(&ans, nbSegments);
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbSegments, .ans = &ans};
int nbDecode = 0;
for (int pusch_id = 0; pusch_id < nrLDPC_slot_decoding_parameters->nb_TBs; pusch_id++) {
nbDecode += nrLDPC_prepare_TB_decoding(nrLDPC_slot_decoding_parameters, pusch_id, &t_info);
(void)nrLDPC_prepare_TB_decoding(nrLDPC_slot_decoding_parameters, pusch_id, &t_info);
}
DevAssert(nbDecode == t_info.len);
// Execute thread poool tasks
join_task_ans(t_info.ans, t_info.len);
// Execute thread pool tasks
join_task_ans(t_info.ans);
for (int pusch_id = 0; pusch_id < nrLDPC_slot_decoding_parameters->nb_TBs; pusch_id++) {
nrLDPC_TB_decoding_parameters_t *nrLDPC_TB_decoding_parameters = &nrLDPC_slot_decoding_parameters->TBs[pusch_id];
......
......@@ -189,7 +189,7 @@ static int nrLDPC_prepare_TB_encoding(nrLDPC_slot_encoding_parameters_t *nrLDPC_
for (int j = 0; j < n_seg; j++) {
ldpc8blocks_args_t *perJobImpp = &((ldpc8blocks_args_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
impp.ans = &t_info->ans[t_info->len];
impp.ans = t_info->ans;
t_info->len += 1;
impp.macro_num = j;
......@@ -211,19 +211,19 @@ int nrLDPC_coding_encoder(nrLDPC_slot_encoding_parameters_t *nrLDPC_slot_encodin
nbTasks += n_seg;
}
ldpc8blocks_args_t arr[nbTasks];
task_ans_t ans[nbTasks];
memset(ans, 0, nbTasks * sizeof(task_ans_t));
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbTasks, .ans = ans};
task_ans_t ans;
init_task_ans(&ans, nbTasks);
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = nbTasks, .ans = &ans};
int nbEncode = 0;
for (int dlsch_id = 0; dlsch_id < nrLDPC_slot_encoding_parameters->nb_TBs; dlsch_id++) {
nbEncode += nrLDPC_prepare_TB_encoding(nrLDPC_slot_encoding_parameters, dlsch_id, &t_info);
}
DevAssert(nbEncode == t_info.len);
// Execute thread poool tasks
join_task_ans(ans, nbEncode);
if (nbEncode < nbTasks) {
completed_many_task_ans(&ans, nbTasks - nbEncode);
}
// Execute thread pool tasks
join_task_ans(&ans);
return 0;
}
......@@ -278,7 +278,7 @@ int decoder_xdma(nrLDPC_TB_decoding_parameters_t *TB_params, int frame_rx, int s
DevAssert(num_threads_prepare == t_info.len);
// wait for the prepare jobs to complete
join_task_ans(t_info.ans, t_info.len);
join_task_ans(t_info.ans);
for (uint32_t job = 0; job < num_threads_prepare; job++) {
args_fpga_decode_prepare_t *args = &arr[job];
......
......@@ -348,12 +348,12 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
}
turboEncode_t arr[hadlsch->C];
task_ans_t ans[hadlsch->C];
memset(ans, 0, hadlsch->C * sizeof(task_ans_t));
task_ans_t ans;
init_task_ans(&ans, hadlsch->C);
for (int r = 0, r_offset = 0; r < hadlsch->C; r++) {
turboEncode_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->ans = &ans;
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
......@@ -382,7 +382,7 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
join_task_ans(ans, hadlsch->C);
join_task_ans(&ans);
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ENB_DLSCH_ENCODING, VCD_FUNCTION_OUT);
return(0);
......@@ -449,12 +449,12 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
return(-1);
}
turboEncode_t arr[hadlsch->C];
task_ans_t ans[hadlsch->C];
memset(ans, 0, hadlsch->C * sizeof(task_ans_t));
task_ans_t ans;
init_task_ans(&ans, hadlsch->C);
for (int r = 0, r_offset = 0; r < hadlsch->C; r++) {
turboEncode_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->ans = &ans;
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
......@@ -483,7 +483,7 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
join_task_ans(ans, hadlsch->C);
join_task_ans(&ans);
return(0);
}
......
......@@ -341,7 +341,7 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB,
turboDecode_t *rdata = &((turboDecode_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
rdata->ans = &t_info->ans[t_info->len];
rdata->ans = t_info->ans;
t_info->len += 1;
rdata->eNB=eNB;
......
......@@ -506,8 +506,8 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
int num_jobs = CEILIDIV(gNB->frame_parms.nb_antennas_rx, numAntennas);
puschAntennaProc_t rdatas[num_jobs];
memset(rdatas, 0, sizeof(rdatas));
task_ans_t ans[num_jobs];
memset(ans, 0, sizeof(ans));
task_ans_t ans;
init_task_ans(&ans, num_jobs);
for (int job_id = 0; job_id < num_jobs; job_id++) {
puschAntennaProc_t *rdata = &rdatas[job_id];
task_t task = {.func = nr_pusch_antenna_processing, .args = rdata};
......@@ -531,7 +531,7 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
rdata->pusch_vars = &gNB->pusch_vars[ul_id];
rdata->chest_freq = gNB->chest_freq;
rdata->rxdataF = gNB->common_vars.rxdataF;
rdata->ans = &ans[job_id];
rdata->ans = &ans;
// Call the nr_pusch_antenna_processing function
if (job_id == num_jobs - 1) {
// Run the last job inline
......@@ -539,10 +539,9 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
} else {
pushTpool(&gNB->threadPool, task);
}
LOG_D(PHY, "Added Antenna (count %d/%d) to process, in pipe\n", job_id, num_jobs);
} // Antenna Loop
join_task_ans(ans, num_jobs - 1);
join_task_ans(&ans);
stop_meas(&gNB->pusch_channel_estimation_antenna_processing_stats);
for (int aarx = 0; aarx < gNB->frame_parms.nb_antennas_rx; aarx++) {
......
......@@ -1246,7 +1246,6 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
nfapi_nr_pusch_pdu_t *rel15_ul = &gNB->ulsch[ulsch_id].harq_process->ulsch_pdu;
NR_gNB_PUSCH *pusch_vars = &gNB->pusch_vars[ulsch_id];
int nbSymb = 0;
uint32_t bwp_start_subcarrier = ((rel15_ul->rb_start + rel15_ul->bwp_start) * NR_NB_SC_PER_RB + frame_parms->first_carrier_offset) % frame_parms->ofdm_symbol_size;
LOG_D(PHY,"pusch %d.%d : bwp_start_subcarrier %d, rb_start %d, first_carrier_offset %d\n", frame,slot,bwp_start_subcarrier, rel15_ul->rb_start, frame_parms->first_carrier_offset);
LOG_D(PHY,"pusch %d.%d : ul_dmrs_symb_pos %x\n",frame,slot,rel15_ul->ul_dmrs_symb_pos);
......@@ -1469,9 +1468,9 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
int total_res = 0;
int const loop_iter = CEILIDIV(rel15_ul->nr_of_symbols, numSymbols);
puschSymbolProc_t arr[loop_iter];
task_ans_t arr_ans[loop_iter];
task_ans_t ans;
init_task_ans(&ans, loop_iter);
memset(arr_ans, 0, sizeof(arr_ans));
int sz_arr = 0;
for(uint8_t task_index = 0; task_index < loop_iter; task_index++) {
int symbol = task_index * numSymbols + rel15_ul->start_symbol_index;
......@@ -1486,7 +1485,7 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
total_res += res_per_task;
if (res_per_task > 0) {
puschSymbolProc_t *rdata = &arr[sz_arr];
rdata->ans = &arr_ans[sz_arr];
rdata->ans = &ans;
++sz_arr;
rdata->gNB = gNB;
......@@ -1508,16 +1507,15 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
} else {
task_t t = {.func = &nr_pusch_symbol_processing, .args = rdata};
pushTpool(&gNB->threadPool, t);
nbSymb++;
}
LOG_D(PHY, "%d.%d Added symbol %d (count %d) to process, in pipe\n", frame, slot, symbol, nbSymb);
LOG_D(PHY, "%d.%d Added symbol %d to process, in pipe\n", frame, slot, symbol);
} else {
completed_task_ans(&ans);
}
} // symbol loop
if (nbSymb > 0) {
join_task_ans(arr_ans, sz_arr);
}
join_task_ans(&ans);
stop_meas(&gNB->rx_pusch_symbol_processing_stats);
// Copy the data to the scope. This cannot be performed in one call to gNBscopeCopy because the data is not contiguous in the
......
......@@ -308,8 +308,8 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
fp->N_RB_DL,
numGscn);
task_ans_t ans[numGscn];
memset(ans, 0, sizeof(ans));
task_ans_t ans;
init_task_ans(&ans, numGscn);
nr_ue_ssb_scan_t ssb_info[numGscn];
for (int s = 0; s < numGscn; s++) {
nr_ue_ssb_scan_t *ssbInfo = &ssb_info[s];
......@@ -331,7 +331,7 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
ssbInfo->gscnInfo.gscn,
ssbInfo->gscnInfo.ssbFirstSC,
ssbInfo->gscnInfo.ssRef);
ssbInfo->ans = &ans[s];
ssbInfo->ans = &ans;
task_t t = {.func = nr_scan_ssb, .args = ssbInfo};
pushTpool(&get_nrUE_params()->Tpool, t);
}
......@@ -339,7 +339,7 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
// Collect the scan results
nr_ue_ssb_scan_t res = {0};
if (numGscn > 0) {
join_task_ans(ans, numGscn);
join_task_ans(&ans);
for (int i = 0; i < numGscn; i++) {
nr_ue_ssb_scan_t *ssbInfo = &ssb_info[i];
if (ssbInfo->syncRes.cell_detected) {
......
......@@ -1329,8 +1329,9 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
uint32_t harq_pid0 = subframe2harq_pid(&eNB->frame_parms,frame,subframe);
turboDecode_t arr[64] = {0};
task_ans_t ans[64] = {0};
thread_info_tm_t t_info = {.ans = ans, .cap = 64, .len = 0, .buf = (uint8_t *)arr};
task_ans_t ans;
init_task_ans(&ans, 64);
thread_info_tm_t t_info = {.ans = &ans, .cap = 64, .len = 0, .buf = (uint8_t *)arr};
for (i = 0; i < NUMBER_OF_ULSCH_MAX; i++) {
ulsch = eNB->ulsch[i];
......@@ -1420,7 +1421,10 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
const bool decode = proc->nbDecode;
DevAssert(t_info.len == proc->nbDecode);
if (proc->nbDecode > 0) {
join_task_ans(t_info.ans, t_info.len);
if (t_info.len != t_info.cap) {
completed_many_task_ans(t_info.ans, t_info.cap - t_info.len);
}
join_task_ans(t_info.ans);
for (size_t i = 0; i < t_info.len; ++i) {
postDecode(proc, &arr[i]);
}
......
......@@ -300,14 +300,14 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
start_meas(&ru->ofdm_total_stats);
size_t const sz = ru->nb_tx + (ru->half_slot_parallelization > 0) * ru->nb_tx;
AssertFatal(sz < 64, "Please, increase the buffer size");
feptx_cmd_t arr[64] = {0};
task_ans_t ans[64] = {0};
feptx_cmd_t arr[sz];
task_ans_t ans;
init_task_ans(&ans, sz);
int nbfeptx = 0;
for (int aid = 0; aid < ru->nb_tx; aid++) {
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->ans = &ans[nbfeptx];
feptx_cmd->ans = &ans;
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
......@@ -321,7 +321,7 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
nbfeptx++;
if (ru->half_slot_parallelization > 0) {
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->ans = &ans[nbfeptx];
feptx_cmd->ans = &ans;
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
......@@ -334,8 +334,7 @@ void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
nbfeptx++;
}
}
join_task_ans(ans, nbfeptx);
join_task_ans(&ans);
stop_meas(&ru->ofdm_total_stats);
if (ru->idx == 0)
......@@ -379,13 +378,13 @@ void nr_fep_tp(RU_t *ru, int slot) {
start_meas(&ru->ofdm_demod_stats);
size_t const sz = ru->nb_rx + (ru->half_slot_parallelization > 0) * ru->nb_rx;
AssertFatal(sz < 64, "Please, increase buffer size");
feprx_cmd_t arr[64] = {0};
task_ans_t ans[64] = {0};
feprx_cmd_t arr[sz];
task_ans_t ans;
init_task_ans(&ans, sz);
for (int aid=0;aid<ru->nb_rx;aid++) {
feprx_cmd_t *feprx_cmd = &arr[nbfeprx];
feprx_cmd->ans = &ans[nbfeprx];
feprx_cmd->ans = &ans;
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
......@@ -399,7 +398,7 @@ void nr_fep_tp(RU_t *ru, int slot) {
nbfeprx++;
if (ru->half_slot_parallelization > 0) {
feprx_cmd_t *feprx_cmd = &arr[nbfeprx];
feprx_cmd->ans = &ans[nbfeprx];
feprx_cmd->ans = &ans;
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
......@@ -413,8 +412,7 @@ void nr_fep_tp(RU_t *ru, int slot) {
nbfeprx++;
}
}
join_task_ans(ans, nbfeprx);
join_task_ans(&ans);
stop_meas(&ru->ofdm_demod_stats);
if (ru->idx == 0) VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPRX, 0 );
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment