Commit 55a18514 authored by Jaroslava Fiedlerova's avatar Jaroslava Fiedlerova

Merge remote-tracking branch 'origin/thread-pool-fixed' into integration_2024_w49 (!3025)

New threadpool, fixed in NR UE

This merge request is an attempt to fix the new thread pool implementation
for use in NR UE and gNB (as well as eNB).
parents fb1fe172 793bdd86
......@@ -11,10 +11,10 @@ Ref :
feptx_prec : 13.0
feptx_ofdm : 33.0
feptx_total : 55.0
L1 Tx processing : 220.0
DLSCH encoding : 130.0
L1 Rx processing : 387.0
PUSCH inner-receiver : 150.0
L1 Tx processing : 200.0
DLSCH encoding : 100.0
L1 Rx processing : 330.0
PUSCH inner-receiver : 120.0
Schedule Response : 3.0
DL & UL scheduling timing : 6.0
UL Indication : 3.0
......
......@@ -21,7 +21,7 @@
#ifndef ACTOR_H
#define ACTOR_H
#include "thread-pool.h"
#include "notified_fifo.h"
#define INIT_ACTOR(ptr, name, core_affinity) init_actor((Actor_t *)ptr, name, core_affinity);
......
......@@ -53,7 +53,6 @@
#include <sys/resource.h>
#include "common/utils/load_module_shlib.h"
#include "common/config/config_userapi.h"
#include "common/utils/threadPool/thread-pool.h"
#include "executables/softmodem-common.h"
#include <readline/history.h>
#include "common/oai_version.h"
......
if (ENABLE_TESTS)
add_subdirectory(test)
endif()
add_library(thread-pool thread-pool.c)
add_library(thread-pool thread-pool.c task_ans.c)
target_link_libraries(thread-pool PRIVATE utils)
target_include_directories(thread-pool PUBLIC .)
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#ifndef BOUNDED_NOTIFIED_FIFO_H
#define BOUNDED_NOTIFIED_FIFO_H
#include "assertions.h"
#include <stdint.h>
#include <memory.h>
#include "task.h"
#include <pthread.h>
#include "pthread_utils.h"
// For working correctly, maintain the default elements to a 2^N e.g., 2^5=32
#define DEFAULT_ELM 256
typedef struct seq_ring_buf_s {
task_t* array;
size_t cap;
uint32_t head;
uint32_t tail;
_Atomic uint64_t sz;
} seq_ring_task_t;
static size_t size_seq_ring_task(seq_ring_task_t* r)
{
DevAssert(r != NULL);
return r->head - r->tail;
}
static uint32_t mask(uint32_t cap, uint32_t val)
{
return val & (cap - 1);
}
static bool full(seq_ring_task_t* r)
{
return size_seq_ring_task(r) == r->cap - 1;
}
static void enlarge_buffer(seq_ring_task_t* r)
{
DevAssert(r != NULL);
DevAssert(full(r));
const uint32_t factor = 2;
task_t* tmp_buffer = calloc(r->cap * factor, sizeof(task_t));
DevAssert(tmp_buffer != NULL);
const uint32_t head_pos = mask(r->cap, r->head);
const uint32_t tail_pos = mask(r->cap, r->tail);
if (head_pos > tail_pos) {
memcpy(tmp_buffer, r->array + tail_pos, (head_pos - tail_pos) * sizeof(task_t));
} else {
memcpy(tmp_buffer, r->array + tail_pos, (r->cap - tail_pos) * sizeof(task_t));
memcpy(tmp_buffer + (r->cap - tail_pos), r->array, head_pos * sizeof(task_t));
}
r->cap *= factor;
free(r->array);
r->array = tmp_buffer;
r->tail = 0;
r->head = r->cap / 2 - 1;
}
static void init_seq_ring_task(seq_ring_task_t* r)
{
DevAssert(r != NULL);
task_t* tmp_buffer = calloc(DEFAULT_ELM, sizeof(task_t));
DevAssert(tmp_buffer != NULL);
seq_ring_task_t tmp = {.array = tmp_buffer, .head = 0, .tail = 0, .cap = DEFAULT_ELM};
memcpy(r, &tmp, sizeof(seq_ring_task_t));
r->sz = 0;
}
static void free_seq_ring_task(seq_ring_task_t* r)
{
DevAssert(r != NULL);
free(r->array);
}
static void push_back_seq_ring_task(seq_ring_task_t* r, task_t t)
{
DevAssert(r != NULL);
if (full(r))
enlarge_buffer(r);
const uint32_t pos = mask(r->cap, r->head);
r->array[pos] = t;
r->head += 1;
r->sz += 1;
}
static task_t pop_seq_ring_task(seq_ring_task_t* r)
{
DevAssert(r != NULL);
DevAssert(size_seq_ring_task(r) > 0);
const uint32_t pos = mask(r->cap, r->tail);
task_t t = r->array[pos];
r->tail += 1;
r->sz -= 1;
return t;
}
#undef DEFAULT_ELM
typedef struct {
pthread_mutex_t mtx;
pthread_cond_t cv;
seq_ring_task_t r;
size_t idx;
} not_q_t;
typedef struct {
task_t t;
bool success;
} ret_try_t;
static void init_not_q(not_q_t* q, size_t idx)
{
DevAssert(q != NULL);
q->idx = idx;
init_seq_ring_task(&q->r);
mutexinit(q->mtx);
condinit(q->cv);
}
static void free_not_q(not_q_t* q)
{
DevAssert(q != NULL);
free_seq_ring_task(&q->r);
mutexdestroy(q->mtx);
conddestroy(q->cv);
}
static bool try_push_not_q(not_q_t* q, task_t t)
{
DevAssert(q != NULL);
if (mutextrylock(q->mtx) != 0)
return false;
push_back_seq_ring_task(&q->r, t);
const size_t sz = size_seq_ring_task(&q->r);
DevAssert(sz > 0);
mutexunlock(q->mtx);
condsignal(q->cv);
return true;
}
static void push_not_q(not_q_t* q, task_t t)
{
DevAssert(q != NULL);
DevAssert(t.func != NULL);
mutexlock(q->mtx);
push_back_seq_ring_task(&q->r, t);
DevAssert(size_seq_ring_task(&q->r) > 0);
mutexunlock(q->mtx);
condsignal(q->cv);
}
static ret_try_t try_pop_not_q(not_q_t* q)
{
DevAssert(q != NULL);
ret_try_t ret = {.success = false};
int rc = mutextrylock(q->mtx);
DevAssert(rc == 0 || rc == EBUSY);
if (rc == EBUSY)
return ret;
size_t sz = size_seq_ring_task(&q->r);
if (sz == 0) {
mutexunlock(q->mtx);
return ret;
}
DevAssert(sz > 0);
ret.t = pop_seq_ring_task(&q->r);
mutexunlock(q->mtx);
ret.success = true;
return ret;
}
static bool pop_not_q(not_q_t* q, ret_try_t* out)
{
DevAssert(q != NULL);
DevAssert(out != NULL);
mutexlock(q->mtx);
while (size_seq_ring_task(&q->r) == 0) {
condwait(q->cv, q->mtx);
}
out->t = pop_seq_ring_task(&q->r);
mutexunlock(q->mtx);
return true;
}
#endif
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#ifndef NOTIFIED_FIFO_H
#define NOTIFIED_FIFO_H
#include "pthread_utils.h"
#include <stdint.h>
#include <pthread.h>
#include "time_meas.h"
#include <memory.h>
#include <stdalign.h>
#include "assertions.h"
/// @brief Element on the notifiedFifo_t
/// next: internal FIFO chain, do not set it
/// key: a long int that the client can use to identify a job or a group of messages
/// ResponseFifo: if the client defines a response FIFO, the job will be posted back after processing
/// processingFunc: any function (of type void processingFunc(void *)) that a worker will process
/// msgData: the data passed to `processingFunc`. It can be added automatically, or you can set it to a buffer you are managing
/// malloced: a boolean that enables internal free in the case of no return FIFO or abort feature
typedef struct notifiedFIFO_elt_s {
struct notifiedFIFO_elt_s *next;
uint64_t key; // To filter out elements
struct notifiedFIFO_s *reponseFifo;
void (*processingFunc)(void *);
bool malloced;
oai_cputime_t creationTime;
oai_cputime_t startProcessingTime;
oai_cputime_t endProcessingTime;
oai_cputime_t returnTime;
// use alignas(32) to align msgData to 32b
// user data behind it will be aligned to 32b as well
// important! this needs to be the last member in the struct
alignas(32) void *msgData;
} notifiedFIFO_elt_t;
typedef struct notifiedFIFO_s {
notifiedFIFO_elt_t *outF;
notifiedFIFO_elt_t *inF;
pthread_mutex_t lockF;
pthread_cond_t notifF;
bool abortFIFO; // if set, the FIFO always returns NULL -> abort condition
} notifiedFIFO_t;
/// @brief Creates a new job.
/// @param size The data part of the job will have this size
/// @param key the job can be identified
/// @param reponseFifo response fifo
/// @param processingFunc function to call
/// @return new notifiedFIFO_elt_t element with extra memory allocated at the end equal to size.
static inline notifiedFIFO_elt_t *newNotifiedFIFO_elt(int size,
uint64_t key,
notifiedFIFO_t *reponseFifo,
void (*processingFunc)(void *))
{
notifiedFIFO_elt_t *ret = (notifiedFIFO_elt_t *)memalign(32, sizeof(notifiedFIFO_elt_t) + size);
AssertFatal(NULL != ret, "out of memory\n");
ret->next = NULL;
ret->key = key;
ret->reponseFifo = reponseFifo;
ret->processingFunc = processingFunc;
// We set user data piece aligend 32 bytes to be able to process it with SIMD
// msgData is aligned to 32bytes, so everything after will be as well
ret->msgData = ((uint8_t *)ret) + sizeof(notifiedFIFO_elt_t);
ret->malloced = true;
return ret;
}
/// @brief Get pointer to the data carried by notifiedFIFO_elt_t
/// @param elt
/// @return void pointer to the data allocated with the message
static inline void *NotifiedFifoData(notifiedFIFO_elt_t *elt)
{
return elt->msgData;
}
/// @brief Delete a notifiedFIFO_elt_t if its allocated
/// @param elt
static inline void delNotifiedFIFO_elt(notifiedFIFO_elt_t *elt)
{
if (elt->malloced) {
elt->malloced = false;
free(elt);
}
}
static inline void initNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf)
{
nf->inF = NULL;
nf->outF = NULL;
nf->abortFIFO = false;
}
static inline void initNotifiedFIFO(notifiedFIFO_t *nf)
{
mutexinit(nf->lockF);
condinit(nf->notifF);
initNotifiedFIFO_nothreadSafe(nf);
// No delete function: the creator has only to free the memory
}
static inline void pushNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg)
{
msg->next = NULL;
if (nf->outF == NULL)
nf->outF = msg;
if (nf->inF != NULL)
nf->inF->next = msg;
nf->inF = msg;
}
static inline void pushNotifiedFIFO(notifiedFIFO_t *nf, notifiedFIFO_elt_t *msg)
{
mutexlock(nf->lockF);
if (!nf->abortFIFO) {
pushNotifiedFIFO_nothreadSafe(nf, msg);
condsignal(nf->notifF);
}
mutexunlock(nf->lockF);
}
static inline notifiedFIFO_elt_t *pullNotifiedFIFO_nothreadSafe(notifiedFIFO_t *nf)
{
if (nf->outF == NULL)
return NULL;
if (nf->abortFIFO)
return NULL;
notifiedFIFO_elt_t *ret = nf->outF;
AssertFatal(nf->outF != nf->outF->next, "Circular list in thread pool: push several times the same buffer is forbidden\n");
nf->outF = nf->outF->next;
if (nf->outF == NULL)
nf->inF = NULL;
return ret;
}
static inline notifiedFIFO_elt_t *pullNotifiedFIFO(notifiedFIFO_t *nf)
{
mutexlock(nf->lockF);
notifiedFIFO_elt_t *ret = NULL;
while ((ret = pullNotifiedFIFO_nothreadSafe(nf)) == NULL && !nf->abortFIFO)
condwait(nf->notifF, nf->lockF);
mutexunlock(nf->lockF);
return ret;
}
static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf)
{
int tmp = mutextrylock(nf->lockF);
if (tmp != 0)
return NULL;
if (nf->abortFIFO) {
mutexunlock(nf->lockF);
return NULL;
}
notifiedFIFO_elt_t *ret = pullNotifiedFIFO_nothreadSafe(nf);
mutexunlock(nf->lockF);
return ret;
}
static inline time_stats_t exec_time_stats_NotifiedFIFO(const notifiedFIFO_elt_t *elt)
{
time_stats_t ts = {0};
if (elt->startProcessingTime == 0 && elt->endProcessingTime == 0)
return ts; /* no measurements done */
ts.in = elt->startProcessingTime;
ts.diff = elt->endProcessingTime - ts.in;
ts.p_time = ts.diff;
ts.diff_square = ts.diff * ts.diff;
ts.max = ts.diff;
ts.trials = 1;
return ts;
}
// This functions aborts all messages in the queue, and marks the queue as
// "aborted", such that every call to it will return NULL
static inline void abortNotifiedFIFO(notifiedFIFO_t *nf)
{
mutexlock(nf->lockF);
nf->abortFIFO = true;
notifiedFIFO_elt_t **elt = &nf->outF;
while (*elt != NULL) {
notifiedFIFO_elt_t *p = *elt;
*elt = (*elt)->next;
delNotifiedFIFO_elt(p);
}
if (nf->outF == NULL)
nf->inF = NULL;
condbroadcast(nf->notifF);
mutexunlock(nf->lockF);
}
#endif
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#ifndef PTHREAD_UTILS_H
#define PTHREAD_UTILS_H
#include <pthread.h>
#define mutexinit(mutex) {int ret=pthread_mutex_init(&mutex,NULL); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define condinit(signal) {int ret=pthread_cond_init(&signal,NULL); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define mutexlock(mutex) {int ret=pthread_mutex_lock(&mutex); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define mutextrylock(mutex) pthread_mutex_trylock(&mutex)
#define mutexunlock(mutex) {int ret=pthread_mutex_unlock(&mutex); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define condwait(condition, mutex) {int ret=pthread_cond_wait(&condition, &mutex); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define condbroadcast(signal) {int ret=pthread_cond_broadcast(&signal); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define condsignal(signal) {int ret=pthread_cond_signal(&signal); \
AssertFatal(ret==0,"ret=%d\n",ret);}
#define mutexdestroy(mutex) { int ret = pthread_mutex_destroy(&mutex);\
AssertFatal(ret==0,"ret=%d\n",ret);}
#define conddestroy(condition) { int ret = pthread_cond_destroy(&condition);\
AssertFatal(ret==0,"ret=%d\n",ret);}
#endif
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#ifndef TASK_WORK_STEALING_THREAD_POOL_H
#define TASK_WORK_STEALING_THREAD_POOL_H
typedef struct {
void* args;
void (*func)(void* args);
} task_t;
#endif
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#include "task_ans.h"
#include "assertions.h"
#include <limits.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
void completed_task_ans(task_ans_t* task)
{
DevAssert(task != NULL);
if (atomic_load_explicit(&task->status, memory_order_acquire) != 0)
AssertFatal(0, "Task already finished?");
atomic_store_explicit(&task->status, 1, memory_order_release);
}
void join_task_ans(task_ans_t* arr, size_t len)
{
DevAssert(len < INT_MAX);
DevAssert(arr != NULL);
// Spin lock inspired by:
// The Art of Writing Efficient Programs:
// An advanced programmer's guide to efficient hardware utilization
// and compiler optimizations using C++ examples
const struct timespec ns = {0, 1};
uint64_t i = 0;
int j = len - 1;
for (; j != -1; i++) {
for (; j != -1; --j) {
int const task_completed = 1;
if (atomic_load_explicit(&arr[j].status, memory_order_acquire) != task_completed)
break;
}
if (i % 8 == 0) {
nanosleep(&ns, NULL);
}
}
}
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
#ifndef TASK_ANSWER_THREAD_POOL_H
#define TASK_ANSWER_THREAD_POOL_H
#ifdef __cplusplus
extern "C" {
#endif
#ifndef __cplusplus
#include <stdalign.h>
#include <stdatomic.h>
#else
#include <atomic>
#define _Atomic(X) std::atomic<X>
#define _Alignas(X) alignas(X)
#endif
#include <stddef.h>
#include <stdint.h>
#if defined(__i386__) || defined(__x86_64__)
#define LEVEL1_DCACHE_LINESIZE 64
#elif defined(__aarch64__)
// This is not always true for ARM
// in linux, you can obtain the size at runtime using sysconf (_SC_LEVEL1_DCACHE_LINESIZE)
// or from the bash with the command $ getconf LEVEL1_DCACHE_LINESIZE
// in c++ using std::hardware_destructive_interference_size
#define LEVEL1_DCACHE_LINESIZE 64
#else
#error Unknown CPU architecture
#endif
typedef struct {
// Avoid false sharing
_Alignas(LEVEL1_DCACHE_LINESIZE) _Atomic(int) status;
} task_ans_t;
typedef struct {
uint8_t* buf;
size_t len;
size_t cap; // capacity
task_ans_t* ans;
} thread_info_tm_t;
void join_task_ans(task_ans_t* arr, size_t len);
void completed_task_ans(task_ans_t* task);
#ifdef __cplusplus
}
#endif
#endif
......@@ -27,8 +27,10 @@
#include <unistd.h>
#include <ctype.h>
#include <sys/sysinfo.h>
#include <threadPool/thread-pool.h>
#include "thread-pool.h"
#include "log.h"
#include "task_ans.h"
#include "notified_fifo.h"
void displayList(notifiedFIFO_t *nf)
{
......@@ -47,6 +49,7 @@ struct testData {
int id;
int sleepTime;
char txt[50];
task_ans_t* task_ans;
};
void processing(void *arg)
......@@ -57,6 +60,7 @@ void processing(void *arg)
in->sleepTime = rand() % 1000;
usleep(in->sleepTime);
// printf("done: %d, %s, in thr %ld\n",in->id, in->txt,pthread_self() );
completed_task_ans(in->task_ans);
}
int main()
......@@ -102,8 +106,6 @@ int main()
tpool_t pool;
char params[] = "1,2,3,4,5";
initTpool(params, &pool, true);
notifiedFIFO_t worker_back;
initNotifiedFIFO(&worker_back);
//sleep(1);
int cumulProcessTime = 0;
......@@ -112,21 +114,22 @@ int main()
int nb_jobs = 4;
for (int i = 0; i < 1000; i++) {
int parall = nb_jobs;
task_ans_t task_ans[parall];
memset(task_ans, 0, sizeof(task_ans));
struct testData test_data[parall];
memset(test_data, 0, sizeof(test_data));
for (int j = 0; j < parall; j++) {
notifiedFIFO_elt_t *work = newNotifiedFIFO_elt(sizeof(struct testData), i, &worker_back, processing);
struct testData *x = (struct testData *)NotifiedFifoData(work);
task_t task = {.args = &test_data[j], .func = processing};
struct testData *x = (struct testData *)task.args;
x->id = i;
pushTpool(&pool, work);
x->task_ans = &task_ans[j];
pushTpool(&pool, task);
}
join_task_ans(task_ans, parall);
int sleepmax = 0;
while (parall) {
tmp = pullTpool(&worker_back, &pool);
if (tmp) {
parall--;
struct testData *dd = NotifiedFifoData(tmp);
if (dd->sleepTime > sleepmax)
sleepmax = dd->sleepTime;
delNotifiedFIFO_elt(tmp);
for (int j = 0; j < parall; j++) {
if (test_data[j].sleepTime > sleepmax) {
sleepmax = test_data[j].sleepTime;
}
}
cumulProcessTime += sleepmax;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -27,7 +27,7 @@
#include "assertions.h"
#include <pthread.h>
#include "common/config/config_userapi.h"
#include <common/utils/threadPool/thread-pool.h>
#include "common/utils/threadPool/notified_fifo.h"
// global var for openair performance profiler
int cpu_meas_enabled = 0;
double cpu_freq_GHz __attribute__ ((aligned(32)));
......
......@@ -28,7 +28,6 @@
#undef MALLOC
#include "assertions.h"
#include "PHY/types.h"
#include <threadPool/thread-pool.h>
/* help strings definition for command line options, used in CMDLINE_XXX_DESC macros and printed when -h option is used */
#define CONFIG_HLP_RFCFGF "Configuration file for front-end (e.g. LMS7002M)\n"
......
......@@ -40,6 +40,7 @@
#include "openair1/PHY/TOOLS/phy_scope_interface.h"
#include "PHY/MODULATION/nr_modulation.h"
#include "instrumentation.h"
#include "common/utils/threadPool/notified_fifo.h"
/*
* NR SLOT PROCESSING SEQUENCE
......@@ -101,8 +102,10 @@
static void *NRUE_phy_stub_standalone_pnf_task(void *arg);
static void start_process_slot_tx(void* arg) {
notifiedFIFO_elt_t *newTx = arg;
pushTpool(&(get_nrUE_params()->Tpool), newTx);
task_t task;
task.args = arg;
task.func = processSlotTX;
pushTpool(&(get_nrUE_params()->Tpool), task);
}
static size_t dump_L1_UE_meas_stats(PHY_VARS_NR_UE *ue, char *output, size_t max_len)
......@@ -487,7 +490,7 @@ static void RU_write(nr_rxtx_thread_data_t *rxtxD, bool sl_tx_action)
void processSlotTX(void *arg)
{
TracyCZone(ctx, true);
nr_rxtx_thread_data_t *rxtxD = (nr_rxtx_thread_data_t *) arg;
nr_rxtx_thread_data_t *rxtxD = arg;
const UE_nr_rxtx_proc_t *proc = &rxtxD->proc;
PHY_VARS_NR_UE *UE = rxtxD->UE;
nr_phy_data_tx_t phy_data = {0};
......@@ -550,6 +553,7 @@ void processSlotTX(void *arg)
int next_slot = (proc->nr_slot_tx + 1) % slots_per_frame;
dynamic_barrier_join(&UE->process_slot_tx_barriers[next_slot]);
RU_write(rxtxD, sl_tx_action);
free(rxtxD);
TracyCZoneEnd(ctx);
}
......@@ -823,7 +827,7 @@ void *UE_thread(void *arg)
while (!oai_exit) {
if (syncRunning) {
notifiedFIFO_elt_t *res=tryPullTpool(&nf,&(get_nrUE_params()->Tpool));
notifiedFIFO_elt_t *res = pollNotifiedFIFO(&nf);
if (res) {
syncRunning = false;
......@@ -1003,7 +1007,7 @@ void *UE_thread(void *arg)
nr_ue_rrc_timer_trigger(UE->Mod_id, curMsg.proc.frame_tx, curMsg.proc.gNB_id);
// RX slot processing. We launch and forget.
notifiedFIFO_elt_t *newRx = newNotifiedFIFO_elt(sizeof(nr_rxtx_thread_data_t), curMsg.proc.nr_slot_rx, NULL, UE_dl_processing);
notifiedFIFO_elt_t *newRx = newNotifiedFIFO_elt(sizeof(nr_rxtx_thread_data_t), curMsg.proc.nr_slot_tx, NULL, UE_dl_processing);
nr_rxtx_thread_data_t *curMsgRx = (nr_rxtx_thread_data_t *)NotifiedFifoData(newRx);
*curMsgRx = (nr_rxtx_thread_data_t){.proc = curMsg.proc, .UE = UE};
int ret = UE_dl_preprocessing(UE, &curMsgRx->proc, tx_wait_for_dlsch, &curMsgRx->phy_data, &stats_printed);
......@@ -1013,8 +1017,7 @@ void *UE_thread(void *arg)
// Start TX slot processing here. It runs in parallel with RX slot processing
// in current code, DURATION_RX_TO_TX constant is the limit to get UL data to encode from a RX slot
notifiedFIFO_elt_t *newTx = newNotifiedFIFO_elt(sizeof(nr_rxtx_thread_data_t), curMsg.proc.nr_slot_tx, NULL, processSlotTX);
nr_rxtx_thread_data_t *curMsgTx = (nr_rxtx_thread_data_t *)NotifiedFifoData(newTx);
nr_rxtx_thread_data_t *curMsgTx = calloc(1, sizeof(*curMsgTx));
curMsgTx->proc = curMsg.proc;
curMsgTx->writeBlockSize = writeBlockSize;
curMsgTx->proc.timestamp_tx = writeTimestamp;
......@@ -1026,7 +1029,7 @@ void *UE_thread(void *arg)
dynamic_barrier_update(&UE->process_slot_tx_barriers[slot],
tx_wait_for_dlsch[slot] + sync_to_previous_thread,
start_process_slot_tx,
newTx);
curMsgTx);
stream_status = STREAM_STATUS_SYNCED;
tx_wait_for_dlsch[slot] = 0;
}
......
......@@ -33,7 +33,6 @@
#include "SCHED_NR_UE/defs.h"
#include "common/ran_context.h"
#include "common/config/config_userapi.h"
//#include "common/utils/threadPool/thread-pool.h"
#include "common/utils/load_module_shlib.h"
//#undef FRAME_LENGTH_COMPLEX_SAMPLES //there are two conflicting definitions, so we better make sure we don't use it at all
#include "common/utils/nr/nr_common.h"
......
......@@ -2,6 +2,7 @@
#define NR_UESOFTMODEM_H
#include <executables/nr-softmodem-common.h>
#include <executables/softmodem-common.h>
#include "common/utils/threadPool/thread-pool.h"
#include "PHY/defs_nr_UE.h"
#define CONFIG_HLP_IF_FREQ "IF frequency for RF, if needed\n"
......@@ -78,7 +79,7 @@ typedef struct {
uint64_t optmask; // mask to store boolean config options
uint32_t ofdm_offset_divisor; // Divisor for sample offset computation for each OFDM symbol
int max_ldpc_iterations; // number of maximum LDPC iterations
tpool_t Tpool; // thread pool
tpool_t Tpool;
int UE_scan_carrier;
int UE_fo_compensation;
uint64_t if_freq;
......
......@@ -64,6 +64,7 @@ typedef struct {
nrLDPC_params_per_cb_t perCB[NR_LDPC_MAX_NUM_CB];
// Redundancy version index
uint8_t rv;
task_ans_t *ans;
} encoder_implemparams_t;
typedef int32_t(LDPC_initfunc_t)(void);
......
......@@ -42,7 +42,6 @@
#include "common/utils/LOG/log.h"
#include "executables/lte-softmodem.h"
#include <syscall.h>
#include <common/utils/threadPool/thread-pool.h>
//#define DEBUG_DLSCH_CODING
//#define DEBUG_DLSCH_FREE 1
......@@ -284,6 +283,9 @@ static void TPencode(void * arg) {
rdata->r,
hadlsch->nb_rb);
stop_meas(rdata->rm_stats);
// Task completed in parallel
completed_task_ans(rdata->ans);
}
int dlsch_encoding(PHY_VARS_eNB *eNB,
......@@ -321,8 +323,6 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
num_pdcch_symbols,
frame,subframe,beamforming_mode);
int nbEncode = 0;
// if (hadlsch->Ndi == 1) { // this is a new packet
if (hadlsch->round == 0) { // this is a new packet
// Add 24-bit crc (polynomial A) to payload
......@@ -349,13 +349,14 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
return(-1);
}
notifiedFIFO_t respEncode;
initNotifiedFIFO(&respEncode);
for (int r=0, r_offset=0; r<hadlsch->C; r++) {
union turboReqUnion id= {.s={dlsch->rnti,frame,subframe,r,0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(turboEncode_t), id.p, &respEncode, TPencode);
turboEncode_t * rdata=(turboEncode_t *) NotifiedFifoData(req);
turboEncode_t arr[hadlsch->C];
task_ans_t ans[hadlsch->C];
memset(ans, 0, hadlsch->C * sizeof(task_ans_t));
for (int r = 0, r_offset = 0; r < hadlsch->C; r++) {
turboEncode_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
rdata->filler=(r==0) ? hadlsch->F : 0;
......@@ -369,8 +370,8 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
rdata->r_offset=r_offset;
rdata->G=G;
pushTpool(proc->threadPool, req);
nbEncode++;
task_t t = {.func = TPencode, .args = rdata};
pushTpool(proc->threadPool, t);
int Qm=hadlsch->Qm;
int C=hadlsch->C;
......@@ -382,14 +383,9 @@ int dlsch_encoding(PHY_VARS_eNB *eNB,
else
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
// Wait all other threads finish to process
while (nbEncode) {
notifiedFIFO_elt_t *res = pullTpool(&respEncode, proc->threadPool);
if (res == NULL)
break; // Tpool has been stopped
delNotifiedFIFO_elt(res);
nbEncode--;
}
join_task_ans(ans, hadlsch->C);
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_ENB_DLSCH_ENCODING, VCD_FUNCTION_OUT);
return(0);
}
......@@ -454,14 +450,14 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
&hadlsch->F)<0)
return(-1);
}
int nbEncode = 0;
notifiedFIFO_t respEncode;
initNotifiedFIFO(&respEncode);
for (int r=0, r_offset=0; r<hadlsch->C; r++) {
union turboReqUnion id= {.s={dlsch->rnti,frame,subframe,r,0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(turboEncode_t), id.p, &respEncode, TPencode);
turboEncode_t * rdata=(turboEncode_t *) NotifiedFifoData(req);
turboEncode_t arr[hadlsch->C];
task_ans_t ans[hadlsch->C];
memset(ans, 0, hadlsch->C * sizeof(task_ans_t));
for (int r = 0, r_offset = 0; r < hadlsch->C; r++) {
turboEncode_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->input=hadlsch->c[r];
rdata->Kr_bytes= ( r<hadlsch->Cminus ? hadlsch->Kminus : hadlsch->Kplus) >>3;
rdata->filler=(r==0) ? hadlsch->F : 0;
......@@ -475,8 +471,8 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
rdata->r_offset=r_offset;
rdata->G=G;
pushTpool(proc->threadPool, req);
nbEncode++;
task_t t = {.func = TPencode, .args = rdata};
pushTpool(proc->threadPool, t);
int Qm=hadlsch->Qm;
int C=hadlsch->C;
......@@ -488,14 +484,9 @@ int dlsch_encoding_fembms_pmch(PHY_VARS_eNB *eNB,
else
r_offset += Nl*Qm * ((GpmodC==0?0:1) + (Gp/C));
}
// Wait all other threads finish to process
while (nbEncode) {
notifiedFIFO_elt_t *res = pullTpool(&respEncode, proc->threadPool);
if (res == NULL)
break; // Tpool has been stopped
delNotifiedFIFO_elt(res);
nbEncode--;
}
join_task_ans(ans, hadlsch->C);
return(0);
}
......
......@@ -36,6 +36,7 @@
#include "nfapi_interface.h"
#include "transport_common_proto.h"
// Functions below implement 36-211 and 36-212
/** @addtogroup _PHY_TRANSPORT_
......@@ -520,12 +521,13 @@ void rx_ulsch(PHY_VARS_eNB *eNB,
@param llr8_flag If 1, indicate that the 8-bit turbo decoder should be used
@returns 0 on success
*/
unsigned int ulsch_decoding(PHY_VARS_eNB *phy_vars_eNB,
L1_rxtx_proc_t *proc,
uint8_t UE_id,
uint8_t control_only_flag,
uint8_t Nbundled,
uint8_t llr8_flag);
unsigned int ulsch_decoding(PHY_VARS_eNB *phy_vars_eNB,
L1_rxtx_proc_t *proc,
uint8_t UE_id,
uint8_t control_only_flag,
uint8_t Nbundled,
uint8_t llr8_flag,
thread_info_tm_t *t_info);
void generate_phich_top(PHY_VARS_eNB *phy_vars_eNB,
L1_rxtx_proc_t *proc,
......
......@@ -239,8 +239,10 @@ void processULSegment(void * arg) {
1,
r,
&E)==-1) {
LOG_E(PHY,"ulsch_decoding.c: Problem in rate matching\n");
return;
// Task completed in parallel
completed_task_ans(rdata->ans);
LOG_E(PHY, "ulsch_decoding.c: Problem in rate matching\n");
return;
}
stop_meas(&eNB->ulsch_rate_unmatching_stats);
int max_Ncb = 3*ulsch_harq->RTC[r]*32 ;
......@@ -284,6 +286,9 @@ void processULSegment(void * arg) {
&eNB->ulsch_tc_intl1_stats,
&eNB->ulsch_tc_intl2_stats,
&ulsch_harq->abort_decode);
// Task completed in parallel
completed_task_ans(rdata->ans);
}
/*!
......@@ -295,7 +300,12 @@ void processULSegment(void * arg) {
@returns 0 on success
*/
static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_id, int harq_pid, int llr8_flag)
static int ulsch_decoding_data(PHY_VARS_eNB *eNB,
L1_rxtx_proc_t *proc,
int UE_id,
int harq_pid,
int llr8_flag,
thread_info_tm_t *t_info)
{
unsigned int r_offset=0;
int offset = 0;
......@@ -330,13 +340,11 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_i
E = ulsch_harq->Qm * (Gp/ulsch_harq->C);
else
E = ulsch_harq->Qm * ((GpmodC==0?0:1) + (Gp/ulsch_harq->C));
union turboReqUnion id= {.s={ulsch->rnti,proc->frame_rx,proc->subframe_rx,0,0}};
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(turboDecode_t),
id.p,
proc->respDecode,
processULSegment);
turboDecode_t * rdata=(turboDecode_t *) NotifiedFifoData(req);
turboDecode_t *rdata = &((turboDecode_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
rdata->ans = &t_info->ans[t_info->len];
t_info->len += 1;
rdata->eNB=eNB;
rdata->frame=proc->frame_rx;
......@@ -355,7 +363,10 @@ static int ulsch_decoding_data(PHY_VARS_eNB *eNB, L1_rxtx_proc_t *proc, int UE_i
rdata->function=td;
int Fbytes=(r==0) ? rdata->Fbits>>3 : 0;
int sz=Kr_bytes - Fbytes - ((ulsch_harq->C>1)?3:0);
pushTpool(proc->threadPool,req);
task_t t = {.func = &processULSegment, .args = rdata};
pushTpool(proc->threadPool, t);
proc->nbDecode++;
LOG_D(PHY,"Added a block to decode, in pipe: %d\n",proc->nbDecode);
r_offset+=E;
......@@ -390,12 +401,13 @@ static inline unsigned int lte_gold_unscram(unsigned int *x1, unsigned int *x2,
// printf("n=%d : c %x\n",n,x1^x2);
}
unsigned int ulsch_decoding(PHY_VARS_eNB *eNB,
L1_rxtx_proc_t *proc,
uint8_t UE_id,
uint8_t control_only_flag,
uint8_t Nbundled,
uint8_t llr8_flag)
unsigned int ulsch_decoding(PHY_VARS_eNB *eNB,
L1_rxtx_proc_t *proc,
uint8_t UE_id,
uint8_t control_only_flag,
uint8_t Nbundled,
uint8_t llr8_flag,
thread_info_tm_t *t_info)
{
int16_t *ulsch_llr = eNB->pusch_vars[UE_id]->llr;
LTE_DL_FRAME_PARMS *frame_parms = &eNB->frame_parms;
......@@ -1082,7 +1094,7 @@ unsigned int ulsch_decoding(PHY_VARS_eNB *eNB,
LOG_D(PHY,"frame %d subframe %d O_ACK:%d o_ACK[]=%d:%d:%d:%d\n",frame,subframe,ulsch_harq->O_ACK,ulsch_harq->o_ACK[0],ulsch_harq->o_ACK[1],ulsch_harq->o_ACK[2],ulsch_harq->o_ACK[3]);
// Do ULSCH Decoding for data portion
ret = ulsch_decoding_data(eNB, proc, UE_id, harq_pid, llr8_flag);
ret = ulsch_decoding_data(eNB, proc, UE_id, harq_pid, llr8_flag, t_info);
return(ret);
}
......
......@@ -409,6 +409,7 @@ static void nr_pusch_antenna_processing(void *arg)
*(rdata->noise_amp2) = noise_amp2;
*(rdata->nest_count) = nest_count;
}
completed_task_ans(rdata->ans);
}
......@@ -503,14 +504,13 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
start_meas(&gNB->pusch_channel_estimation_antenna_processing_stats);
int numAntennas = gNB->dmrs_num_antennas_per_thread;
int num_jobs = CEILIDIV(gNB->frame_parms.nb_antennas_rx, numAntennas);
puschAntennaProc_t rdatas[num_jobs];
memset(rdatas, 0, sizeof(rdatas));
task_ans_t ans[num_jobs];
memset(ans, 0, sizeof(ans));
for (int job_id = 0; job_id < num_jobs; job_id++) {
union puschAntennaReqUnion id = {.s = {ul_id, 0}};
id.p = 1 + job_id * numAntennas;
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(puschAntennaProc_t),
id.p,
&respPuschAarx,
&nr_pusch_antenna_processing); // create a job for Tpool
puschAntennaProc_t *rdata = (puschAntennaProc_t *)NotifiedFifoData(req); // data for the job
puschAntennaProc_t *rdata = &rdatas[job_id];
task_t task = {.func = nr_pusch_antenna_processing, .args = rdata};
// Local init in the current loop
rdata->Ns = Ns;
......@@ -531,22 +531,18 @@ int nr_pusch_channel_estimation(PHY_VARS_gNB *gNB,
rdata->pusch_vars = &gNB->pusch_vars[ul_id];
rdata->chest_freq = gNB->chest_freq;
rdata->rxdataF = gNB->common_vars.rxdataF;
rdata->ans = &ans[job_id];
// Call the nr_pusch_antenna_processing function
if (job_id == num_jobs - 1) {
// Run the last job inline
nr_pusch_antenna_processing(rdata);
delNotifiedFIFO_elt(req);
} else {
pushTpool(&gNB->threadPool, req);
pushTpool(&gNB->threadPool, task);
}
LOG_D(PHY, "Added Antenna (count %d/%d) to process, in pipe\n", job_id, num_jobs);
} // Antenna Loop
while (num_jobs - 1 > 0) {
notifiedFIFO_elt_t *req = pullTpool(&respPuschAarx, &gNB->threadPool);
num_jobs--;
delNotifiedFIFO_elt(req);
}
join_task_ans(ans, num_jobs - 1);
stop_meas(&gNB->pusch_channel_estimation_antenna_processing_stats);
for (int aarx = 0; aarx < gNB->frame_parms.nb_antennas_rx; aarx++) {
......
......@@ -117,7 +117,7 @@ NR_gNB_DLSCH_t new_gNB_dlsch(NR_DL_FRAME_PARMS *frame_parms, uint16_t N_RB)
return(dlsch);
}
void ldpc8blocks(void *p)
static void ldpc8blocks(void *p)
{
encoder_implemparams_t *impp=(encoder_implemparams_t *) p;
NR_DL_gNB_HARQ_t *harq = (NR_DL_gNB_HARQ_t *)impp->harq;
......@@ -232,6 +232,9 @@ void ldpc8blocks(void *p)
#endif
r_offset += E;
}
// Task running in // completed
completed_task_ans(impp->ans);
}
int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
......@@ -369,24 +372,23 @@ int nr_dlsch_encoding(PHY_VARS_gNB *gNB,
}
ldpc_interface_offload.LDPCencoder(harq->c, &impp.output, &impp);
} else {
notifiedFIFO_t nf;
initNotifiedFIFO(&nf);
int nbJobs = 0;
for (int j = 0; j < (impp.n_segments / 8 + ((impp.n_segments & 7) == 0 ? 0 : 1)); j++) {
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(impp), j, &nf, ldpc8blocks);
encoder_implemparams_t *perJobImpp = (encoder_implemparams_t *)NotifiedFifoData(req);
size_t const n_seg = (impp.n_segments / 8 + ((impp.n_segments & 7) == 0 ? 0 : 1));
encoder_implemparams_t arr[n_seg];
task_ans_t ans[n_seg];
memset(ans, 0, n_seg * sizeof(task_ans_t));
for (int j = 0; j < n_seg; j++) {
encoder_implemparams_t *perJobImpp = &arr[j];
*perJobImpp = impp;
perJobImpp->macro_num = j;
pushTpool(&gNB->threadPool, req);
nbJobs++;
}
while (nbJobs) {
notifiedFIFO_elt_t *req = pullTpool(&nf, &gNB->threadPool);
if (req == NULL)
break; // Tpool has been stopped
delNotifiedFIFO_elt(req);
nbJobs--;
perJobImpp->ans = &ans[j];
task_t t = {.func = ldpc8blocks, .args = perJobImpp};
pushTpool(&gNB->threadPool, t);
}
join_task_ans(ans, n_seg);
}
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_gNB_DLSCH_ENCODING, VCD_FUNCTION_OUT);
return 0;
......
......@@ -62,7 +62,8 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
uint32_t frame,
uint8_t nr_tti_rx,
uint8_t harq_pid,
uint32_t G);
uint32_t G,
thread_info_tm_t *t_info);
/*! \brief Perform PUSCH unscrambling. TS 38.211 V15.4.0 subclause 6.3.1.1
@param llr, Pointer to llr bits
......
......@@ -50,6 +50,10 @@
//#define DEBUG_ULSCH_DECODING
//#define gNB_DEBUG_TRACE
#include <stdint.h>
#include <time.h>
#include <stdalign.h>
#define OAI_UL_LDPC_MAX_NUM_LLR 27000//26112 // NR_LDPC_NCOL_BG1*NR_LDPC_ZMAX = 68*384
//#define DEBUG_CRC
#ifdef DEBUG_CRC
......@@ -181,6 +185,9 @@ static void nr_processULSegment(void *arg)
LOG_E(PHY, "ulsch_decoding.c: Problem in rate_matching\n");
rdata->decodeIterations = max_ldpc_iterations + 1;
set_abort(&ulsch_harq->abort_decode, true);
// Task completed
completed_task_ans(rdata->ans);
return;
}
......@@ -220,6 +227,9 @@ static void nr_processULSegment(void *arg)
if (rdata->decodeIterations <= p_decoderParms->numMaxIter)
memcpy(ulsch_harq->c[r],llrProcBuf, Kr>>3);
// Task completed
completed_task_ans(rdata->ans);
}
int decode_offload(PHY_VARS_gNB *phy_vars_gNB,
......@@ -320,7 +330,8 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
uint32_t frame,
uint8_t nr_tti_rx,
uint8_t harq_pid,
uint32_t G)
uint32_t G,
thread_info_tm_t *t_info)
{
if (!ulsch_llr) {
LOG_E(PHY, "ulsch_decoding.c: NULL ulsch_llr pointer\n");
......@@ -430,9 +441,12 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
set_abort(&harq_process->abort_decode, false);
for (int r = 0; r < harq_process->C; r++) {
int E = nr_get_E(G, harq_process->C, Qm, n_layers, r);
union ldpcReqUnion id = {.s = {ulsch->rnti, frame, nr_tti_rx, 0, 0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(ldpcDecode_t), id.p, &phy_vars_gNB->respDecode, &nr_processULSegment);
ldpcDecode_t *rdata = (ldpcDecode_t *)NotifiedFifoData(req);
ldpcDecode_t *rdata = &((ldpcDecode_t *)t_info->buf)[t_info->len];
DevAssert(t_info->len < t_info->cap);
rdata->ans = &t_info->ans[t_info->len];
t_info->len += 1;
decParams.R = nr_get_R_ldpc_decoder(pusch_pdu->pusch_data.rv_index,
E,
decParams.BG,
......@@ -457,7 +471,10 @@ int nr_ulsch_decoding(PHY_VARS_gNB *phy_vars_gNB,
rdata->ulsch = ulsch;
rdata->ulsch_id = ULSCH_id;
rdata->tbslbrm = pusch_pdu->maintenance_parms_v3.tbSizeLbrmBytes;
pushTpool(&phy_vars_gNB->threadPool, req);
task_t t = {.func = &nr_processULSegment, .args = rdata};
pushTpool(&phy_vars_gNB->threadPool, t);
LOG_D(PHY, "Added a block to decode, in pipe: %d\n", r);
r_offset += E;
offset += ((harq_process->K >> 3) - (harq_process->F >> 3) - ((harq_process->C > 1) ? 3 : 0));
......
......@@ -1179,6 +1179,7 @@ typedef struct puschSymbolProc_s {
int16_t *scramblingSequence;
uint32_t nvar;
int beam_nb;
task_ans_t *ans;
} puschSymbolProc_t;
static void nr_pusch_symbol_processing(void *arg)
......@@ -1227,6 +1228,9 @@ static void nr_pusch_symbol_processing(void *arg)
for (int i = 0; i < end; i++)
llr16[i] = llr_ptr[i] * s[i];
}
// Task running in // completed
completed_task_ans(rdata->ans);
}
static uint32_t average_u32(const uint32_t *x, uint16_t size)
......@@ -1483,8 +1487,13 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
start_meas(&gNB->rx_pusch_symbol_processing_stats);
int numSymbols = gNB->num_pusch_symbols_per_thread;
int total_res = 0;
int const loop_iter = rel15_ul->nr_of_symbols / numSymbols;
puschSymbolProc_t arr[loop_iter];
task_ans_t arr_ans[loop_iter];
memset(arr_ans, 0, loop_iter * sizeof(task_ans_t));
int sz_arr = 0;
for(uint8_t symbol = rel15_ul->start_symbol_index; symbol < end_symbol; symbol += numSymbols) {
int res_per_task = 0;
for (int s = 0; s < numSymbols; s++) {
......@@ -1496,10 +1505,9 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
}
total_res += res_per_task;
if (res_per_task > 0) {
union puschSymbolReqUnion id = {.s={ulsch_id,frame,slot,0}};
id.p=1+symbol;
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(puschSymbolProc_t), id.p, &gNB->respPuschSymb, &nr_pusch_symbol_processing); // create a job for Tpool
puschSymbolProc_t *rdata = (puschSymbolProc_t*)NotifiedFifoData(req); // data for the job
puschSymbolProc_t *rdata = &arr[sz_arr];
rdata->ans = &arr_ans[sz_arr];
++sz_arr;
rdata->gNB = gNB;
rdata->frame_parms = frame_parms;
......@@ -1517,7 +1525,8 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
if (rel15_ul->pdu_bit_map & PUSCH_PDU_BITMAP_PUSCH_PTRS) {
nr_pusch_symbol_processing(rdata);
} else {
pushTpool(&gNB->threadPool, req);
task_t t = {.func = &nr_pusch_symbol_processing, .args = rdata};
pushTpool(&gNB->threadPool, t);
nbSymb++;
}
......@@ -1525,12 +1534,9 @@ int nr_rx_pusch_tp(PHY_VARS_gNB *gNB,
}
} // symbol loop
while (nbSymb) {
notifiedFIFO_elt_t *req = pullTpool(&gNB->respPuschSymb, &gNB->threadPool);
nbSymb--;
delNotifiedFIFO_elt(req);
if (nbSymb > 0) {
join_task_ans(arr_ans, sz_arr);
}
stop_meas(&gNB->rx_pusch_symbol_processing_stats);
// Copy the data to the scope. This cannot be performed in one call to gNBscopeCopy because the data is not contiguous in the
......
......@@ -70,15 +70,13 @@ void nr_dlsch_unscrambling(int16_t *llr, uint32_t size, uint8_t q, uint32_t Nid,
}
static bool nr_ue_postDecode(PHY_VARS_NR_UE *phy_vars_ue,
notifiedFIFO_elt_t *req,
notifiedFIFO_t *nf_p,
const bool last,
ldpcDecode_ue_t *rdata,
bool last,
int b_size,
uint8_t b[b_size],
int *num_seg_ok,
const UE_nr_rxtx_proc_t *proc)
{
ldpcDecode_ue_t *rdata = (ldpcDecode_ue_t*) NotifiedFifoData(req);
NR_DL_UE_HARQ_t *harq_process = rdata->harq_process;
NR_UE_DLSCH_t *dlsch = (NR_UE_DLSCH_t *) rdata->dlsch;
int r = rdata->segment_r;
......@@ -257,6 +255,7 @@ static void nr_processDLSegment(void *arg)
memcpy(harq_process->c[r], LDPCoutput, Kr >> 3);
stop_meas(&rdata->ts_ldpc_decode);
}
completed_task_ans(rdata->ans);
}
uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
......@@ -369,16 +368,17 @@ uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
Kr = harq_process->K;
Kr_bytes = Kr>>3;
offset = 0;
notifiedFIFO_t nf;
initNotifiedFIFO(&nf);
ldpcDecode_ue_t arr[harq_process->C];
task_ans_t ans[harq_process->C];
memset(ans, 0, harq_process->C * sizeof(task_ans_t));
set_abort(&harq_process->abort_decode, false);
for (r=0; r<harq_process->C; r++) {
//printf("start rx segment %d\n",r);
uint32_t E = nr_get_E(G, harq_process->C, dlsch->dlsch_config.qamModOrder, dlsch->Nl, r);
decParams.R = nr_get_R_ldpc_decoder(dlsch->dlsch_config.rv, E, decParams.BG, decParams.Z, &harq_process->llrLen, harq_process->DLround);
union ldpcReqUnion id = {.s = {dlsch->rnti, frame, nr_slot_rx, 0, 0}};
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(ldpcDecode_ue_t), id.p, &nf, &nr_processDLSegment);
ldpcDecode_ue_t * rdata=(ldpcDecode_ue_t *) NotifiedFifoData(req);
ldpcDecode_ue_t *rdata = &arr[r];
rdata->ans = &ans[r];
rdata->phy_vars_ue = phy_vars_ue;
rdata->harq_process = harq_process;
......@@ -398,7 +398,8 @@ uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
reset_meas(&rdata->ts_deinterleave);
reset_meas(&rdata->ts_rate_unmatch);
reset_meas(&rdata->ts_ldpc_decode);
pushTpool(&get_nrUE_params()->Tpool,req);
task_t t = {.args = rdata, .func = nr_processDLSegment};
pushTpool(&get_nrUE_params()->Tpool, t);
LOG_D(PHY, "Added a block to decode, in pipe: %d\n", r);
r_offset += E;
offset += (Kr_bytes - (harq_process->F>>3) - ((harq_process->C>1)?3:0));
......@@ -406,13 +407,11 @@ uint32_t nr_dlsch_decoding(PHY_VARS_NR_UE *phy_vars_ue,
}
int num_seg_ok = 0;
int nbDecode = harq_process->C;
while (nbDecode) {
notifiedFIFO_elt_t *req=pullTpool(&nf, &get_nrUE_params()->Tpool);
if (req == NULL)
break; // Tpool has been stopped
nr_ue_postDecode(phy_vars_ue, req, &nf, nbDecode == 1, b_size, b, &num_seg_ok, proc);
delNotifiedFIFO_elt(req);
nbDecode--;
if (nbDecode > 0) {
join_task_ans(ans, nbDecode);
for (size_t i = 0; i < nbDecode; ++i) {
nr_ue_postDecode(phy_vars_ue, &arr[i], i == nbDecode - 1, b_size, b, &num_seg_ok, proc);
}
}
LOG_D(PHY,
"%d.%d DLSCH Decoded, harq_pid %d, round %d, result: %d TBS %d (%d) G %d nb_re_dmrs %d length dmrs %d mcs %d Nl %d "
......
......@@ -288,6 +288,8 @@ void nr_scan_ssb(void *arg)
}
}
}
completed_task_ans(ssbInfo->ans);
}
nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
......@@ -299,18 +301,18 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
{
NR_DL_FRAME_PARMS *fp = &ue->frame_parms;
notifiedFIFO_t nf;
initNotifiedFIFO(&nf);
// Perform SSB scanning in parallel. One GSCN per thread.
LOG_I(NR_PHY,
"Starting cell search with center freq: %ld, bandwidth: %d. Scanning for %d number of GSCN.\n",
fp->dl_CarrierFreq,
fp->N_RB_DL,
numGscn);
task_ans_t ans[numGscn];
memset(ans, 0, sizeof(ans));
nr_ue_ssb_scan_t ssb_info[numGscn];
for (int s = 0; s < numGscn; s++) {
notifiedFIFO_elt_t *req = newNotifiedFIFO_elt(sizeof(nr_ue_ssb_scan_t), gscnInfo[s].gscn, &nf, &nr_scan_ssb);
nr_ue_ssb_scan_t *ssbInfo = (nr_ue_ssb_scan_t *)NotifiedFifoData(req);
nr_ue_ssb_scan_t *ssbInfo = &ssb_info[s];
*ssbInfo = (nr_ue_ssb_scan_t){.gscnInfo = gscnInfo[s],
.fp = &ue->frame_parms,
.proc = proc,
......@@ -329,32 +331,34 @@ nr_initial_sync_t nr_initial_sync(UE_nr_rxtx_proc_t *proc,
ssbInfo->gscnInfo.gscn,
ssbInfo->gscnInfo.ssbFirstSC,
ssbInfo->gscnInfo.ssRef);
pushTpool(&get_nrUE_params()->Tpool, req);
ssbInfo->ans = &ans[s];
task_t t = {.func = nr_scan_ssb, .args = ssbInfo};
pushTpool(&get_nrUE_params()->Tpool, t);
}
// Collect the scan results
nr_ue_ssb_scan_t res = {0};
while (numGscn) {
notifiedFIFO_elt_t *req = pullTpool(&nf, &get_nrUE_params()->Tpool);
nr_ue_ssb_scan_t *ssbInfo = (nr_ue_ssb_scan_t *)NotifiedFifoData(req);
if (ssbInfo->syncRes.cell_detected) {
LOG_A(NR_PHY,
"Cell Detected with GSCN: %d, SSB SC offset: %d, SSB Ref: %lf, PSS Corr peak: %d dB, PSS Corr Average: %d\n",
ssbInfo->gscnInfo.gscn,
ssbInfo->gscnInfo.ssbFirstSC,
ssbInfo->gscnInfo.ssRef,
ssbInfo->pssCorrPeakPower,
ssbInfo->pssCorrAvgPower);
if (!res.syncRes.cell_detected) { // take the first cell detected
res = *ssbInfo;
if (numGscn > 0) {
join_task_ans(ans, numGscn);
for (int i = 0; i < numGscn; i++) {
nr_ue_ssb_scan_t *ssbInfo = &ssb_info[i];
if (ssbInfo->syncRes.cell_detected) {
LOG_I(NR_PHY,
"Cell Detected with GSCN: %d, SSB SC offset: %d, SSB Ref: %lf, PSS Corr peak: %d dB, PSS Corr Average: %d\n",
ssbInfo->gscnInfo.gscn,
ssbInfo->gscnInfo.ssbFirstSC,
ssbInfo->gscnInfo.ssRef,
ssbInfo->pssCorrPeakPower,
ssbInfo->pssCorrAvgPower);
if (!res.syncRes.cell_detected) { // take the first cell detected
res = *ssbInfo;
}
}
for (int ant = 0; ant < fp->nb_antennas_rx; ant++) {
free(ssbInfo->rxdata[ant]);
}
free(ssbInfo->rxdata);
}
for (int ant = 0; ant < fp->nb_antennas_rx; ant++) {
free(ssbInfo->rxdata[ant]);
}
free(ssbInfo->rxdata);
delNotifiedFIFO_elt(req);
numGscn--;
}
// Set globals based on detected cell
......
......@@ -39,8 +39,9 @@
#include "time_meas.h"
#include "defs_common.h"
#include "nfapi_nr_interface_scf.h"
#include <common/utils/threadPool/thread-pool.h>
#include <executables/rt_profiling.h>
#include "common/utils/threadPool/task_ans.h"
#include "common/utils/threadPool/thread-pool.h"
#define MAX_BANDS_PER_RRU 4
#define MAX_RRU_CONFIG_SIZE 1024
......@@ -175,7 +176,8 @@ typedef struct {
struct RU_t_s *ru;
int startSymbol;
int endSymbol;
int slot;
int slot;
task_ans_t *ans;
} feprx_cmd_t;
typedef struct {
......@@ -184,6 +186,7 @@ typedef struct {
int slot;
int startSymbol;
int numSymbols;
task_ans_t *ans;
} feptx_cmd_t;
typedef struct {
......@@ -426,7 +429,7 @@ typedef enum {
typedef struct RU_t_s {
/// ThreadPool for RU
/// ThreadPool for RU
tpool_t *threadPool;
/// index of this ru
uint32_t idx;
......
......@@ -66,7 +66,7 @@
#include "PHY/LTE_TRANSPORT/transport_eNB.h"
#include "openair2/PHY_INTERFACE/IF_Module.h"
#include "common/openairinterface5g_limits.h"
#include "common/utils/threadPool/task_ans.h"
#define PBCH_A 24
#define MAX_NUM_RU_PER_eNB 64
......@@ -786,6 +786,7 @@ typedef struct TurboDecode_s {
int offset;
int maxIterations;
int decodeIterations;
task_ans_t *ans;
} turboDecode_t;
#define TURBO_SIMD_SOFTBITS 96+12+3+3*6144
......@@ -802,6 +803,7 @@ typedef struct turboEncode_s {
time_stats_t *rm_stats;
time_stats_t *te_stats;
time_stats_t *i_stats;
task_ans_t *ans;
} turboEncode_t;
......
......@@ -575,7 +575,7 @@ typedef struct PHY_VARS_gNB_s {
struct processingData_L1tx *msgDataTx;
void *scopeData;
/// structure for analyzing high-level RT measurements
rt_L1_profiling_t rt_L1_profiling;
rt_L1_profiling_t rt_L1_profiling;
} PHY_VARS_gNB;
struct puschSymbolReqId {
......@@ -609,6 +609,7 @@ typedef struct puschAntennaProc_s {
NR_gNB_PUSCH *pusch_vars;
NR_DL_FRAME_PARMS *frame_parms;
c16_t ***rxdataF;
task_ans_t* ans;
} puschAntennaProc_t;
struct puschAntennaReqId {
......@@ -641,6 +642,7 @@ typedef struct LDPCDecode_s {
int offset;
int decodeIterations;
uint32_t tbslbrm;
task_ans_t *ans;
} ldpcDecode_t;
struct ldpcReqId {
......@@ -661,6 +663,7 @@ typedef struct processingData_L1 {
int slot_rx;
openair0_timestamp timestamp_tx;
PHY_VARS_gNB *gNB;
notifiedFIFO_elt_t *elt;
} processingData_L1_t;
typedef struct processingData_L1tx {
......
......@@ -576,6 +576,7 @@ typedef struct {
int pssCorrPeakPower;
int pssCorrAvgPower;
int adjust_rxgain;
task_ans_t *ans;
} nr_ue_ssb_scan_t;
typedef struct nr_phy_data_tx_s {
......@@ -632,6 +633,7 @@ typedef struct LDPCDecode_ue_s {
time_stats_t ts_deinterleave;
time_stats_t ts_rate_unmatch;
time_stats_t ts_ldpc_decode;
task_ans_t *ans;
} ldpcDecode_ue_t;
static inline void start_meas_nr_ue_phy(PHY_VARS_NR_UE *ue, int meas_index) {
......
......@@ -1232,10 +1232,8 @@ uci_procedures(PHY_VARS_eNB *eNB,
} // end loop for (int i = 0; i < NUMBER_OF_UCI_MAX; i++) {
}
void postDecode(L1_rxtx_proc_t *proc, notifiedFIFO_elt_t *req)
void postDecode(L1_rxtx_proc_t *proc, turboDecode_t *rdata)
{
turboDecode_t * rdata=(turboDecode_t *) NotifiedFifoData(req);
LTE_eNB_ULSCH_t *ulsch = rdata->eNB->ulsch[rdata->UEid];
LTE_UL_eNB_HARQ_t *ulsch_harq = rdata->ulsch_harq;
PHY_VARS_eNB *eNB=rdata->eNB;
......@@ -1330,6 +1328,10 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
const int frame = proc->frame_rx;
uint32_t harq_pid0 = subframe2harq_pid(&eNB->frame_parms,frame,subframe);
turboDecode_t arr[64] = {0};
task_ans_t ans[64] = {0};
thread_info_tm_t t_info = {.ans = ans, .cap = 64, .len = 0, .buf = (uint8_t *)arr};
for (i = 0; i < NUMBER_OF_ULSCH_MAX; i++) {
ulsch = eNB->ulsch[i];
if (!ulsch) continue;
......@@ -1398,7 +1400,8 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
i,
0, // control_only_flag
ulsch_harq->V_UL_DAI,
ulsch_harq->nb_rb > 20 ? 1 : 0);
ulsch_harq->nb_rb > 20 ? 1 : 0,
&t_info);
}
else if ((ulsch) &&
(ulsch->rnti>0) &&
......@@ -1415,14 +1418,12 @@ void pusch_procedures(PHY_VARS_eNB *eNB,L1_rxtx_proc_t *proc) {
} // for (i=0; i<NUMBER_OF_ULSCH_MAX; i++)
const bool decode = proc->nbDecode;
while (proc->nbDecode > 0) {
notifiedFIFO_elt_t *req=pullTpool(proc->respDecode, proc->threadPool);
if (req == NULL)
break; // Tpool has been stopped
postDecode(proc, req);
const time_stats_t ts = exec_time_stats_NotifiedFIFO(req);
merge_meas(&eNB->ulsch_turbo_decoding_stats, &ts);
delNotifiedFIFO_elt(req);
DevAssert(t_info.len == proc->nbDecode);
if (proc->nbDecode > 0) {
join_task_ans(t_info.ans, t_info.len);
for (size_t i = 0; i < t_info.len; ++i) {
postDecode(proc, &arr[i]);
}
}
if (decode)
stop_meas(&eNB->ulsch_decoding_stats);
......
......@@ -289,47 +289,59 @@ void nr_feptx(void *arg)
////////////FEPTX////////////
nr_feptx0(ru, slot, startSymbol, numSymbols, aa);
// Task completed in //
completed_task_ans(feptx->ans);
}
// RU FEP TX using thread-pool
void nr_feptx_tp(RU_t *ru, int frame_tx, int slot)
{
nfapi_nr_config_request_scf_t *cfg = &ru->gNB_list[0]->gNB_config;
int nbfeptx = 0;
if (nr_slot_select(cfg, frame_tx, slot) == NR_UPLINK_SLOT)
return;
if (ru->idx == 0)
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPTX_OFDM, 1);
start_meas(&ru->ofdm_total_stats);
size_t const sz = ru->nb_tx + (ru->half_slot_parallelization > 0) * ru->nb_tx;
AssertFatal(sz < 64, "Please, increase the buffer size");
feptx_cmd_t arr[64] = {0};
task_ans_t ans[64] = {0};
int nbfeptx = 0;
for (int aid = 0; aid < ru->nb_tx; aid++) {
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feptx_cmd_t), 2000 + aid, ru->respfeptx, nr_feptx);
feptx_cmd_t *feptx_cmd = (feptx_cmd_t*)NotifiedFifoData(req);
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->ans = &ans[nbfeptx];
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
feptx_cmd->slot = slot;
feptx_cmd->startSymbol = 0;
feptx_cmd->numSymbols = (ru->half_slot_parallelization > 0) ?
ru->nr_frame_parms->symbols_per_slot >> 1 :
ru->nr_frame_parms->symbols_per_slot;
pushTpool(ru->threadPool, req);
feptx_cmd->numSymbols =
(ru->half_slot_parallelization > 0) ? ru->nr_frame_parms->symbols_per_slot >> 1 : ru->nr_frame_parms->symbols_per_slot;
task_t t = {.func = nr_feptx, .args = feptx_cmd};
pushTpool(ru->threadPool, t);
nbfeptx++;
if (ru->half_slot_parallelization > 0) {
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feptx_cmd_t), 2000 + aid + ru->nb_tx, ru->respfeptx, nr_feptx);
feptx_cmd_t *feptx_cmd = (feptx_cmd_t*)NotifiedFifoData(req);
feptx_cmd_t *feptx_cmd = &arr[nbfeptx];
feptx_cmd->ans = &ans[nbfeptx];
feptx_cmd->aid = aid;
feptx_cmd->ru = ru;
feptx_cmd->slot = slot;
feptx_cmd->startSymbol = ru->nr_frame_parms->symbols_per_slot >> 1;
feptx_cmd->numSymbols = ru->nr_frame_parms->symbols_per_slot >> 1;
pushTpool(ru->threadPool, req);
task_t t = {.func = nr_feptx, .args = feptx_cmd};
pushTpool(ru->threadPool, t);
nbfeptx++;
}
}
while (nbfeptx > 0) {
notifiedFIFO_elt_t *req=pullTpool(ru->respfeptx, ru->threadPool);
delNotifiedFIFO_elt(req);
nbfeptx--;
}
join_task_ans(ans, nbfeptx);
stop_meas(&ru->ofdm_total_stats);
if (ru->idx == 0)
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPTX_OFDM, 0);
......@@ -359,6 +371,9 @@ void nr_fep(void* arg)
tti_rx,
ru->N_TA_offset);
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPRX+aid, 0);
// Task completed in //
completed_task_ans(feprx_cmd->ans);
}
// RU RX FEP using thread-pool
......@@ -367,33 +382,45 @@ void nr_fep_tp(RU_t *ru, int slot) {
int nbfeprx=0;
if (ru->idx == 0) VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPRX, 1 );
start_meas(&ru->ofdm_demod_stats);
size_t const sz = ru->nb_rx + (ru->half_slot_parallelization > 0) * ru->nb_rx;
AssertFatal(sz < 64, "Please, increase buffer size");
feprx_cmd_t arr[64] = {0};
task_ans_t ans[64] = {0};
for (int aid=0;aid<ru->nb_rx;aid++) {
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feprx_cmd_t), 1000 + aid,ru->respfeprx,nr_fep);
feprx_cmd_t *feprx_cmd=(feprx_cmd_t*)NotifiedFifoData(req);
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
feprx_cmd->slot = ru->proc.tti_rx;
feprx_cmd->startSymbol = 0;
feprx_cmd->endSymbol = (ru->half_slot_parallelization > 0)?(ru->nr_frame_parms->symbols_per_slot>>1)-1:(ru->nr_frame_parms->symbols_per_slot-1);
pushTpool(ru->threadPool,req);
nbfeprx++;
if (ru->half_slot_parallelization>0) {
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(feprx_cmd_t), 1000 + aid + ru->nb_rx,ru->respfeprx,nr_fep);
feprx_cmd_t *feprx_cmd=(feprx_cmd_t*)NotifiedFifoData(req);
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
feprx_cmd->slot = ru->proc.tti_rx;
feprx_cmd->startSymbol = ru->nr_frame_parms->symbols_per_slot>>1;
feprx_cmd->endSymbol = ru->nr_frame_parms->symbols_per_slot-1;
pushTpool(ru->threadPool,req);
nbfeprx++;
}
}
while (nbfeprx>0) {
notifiedFIFO_elt_t *req=pullTpool(ru->respfeprx, ru->threadPool);
delNotifiedFIFO_elt(req);
nbfeprx--;
feprx_cmd_t *feprx_cmd = &arr[nbfeprx];
feprx_cmd->ans = &ans[nbfeprx];
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
feprx_cmd->slot = ru->proc.tti_rx;
feprx_cmd->startSymbol = 0;
feprx_cmd->endSymbol = (ru->half_slot_parallelization > 0) ? (ru->nr_frame_parms->symbols_per_slot >> 1) - 1
: (ru->nr_frame_parms->symbols_per_slot - 1);
task_t t = {.func = nr_fep, .args = feprx_cmd};
pushTpool(ru->threadPool, t);
nbfeprx++;
if (ru->half_slot_parallelization > 0) {
feprx_cmd_t *feprx_cmd = &arr[nbfeprx];
feprx_cmd->ans = &ans[nbfeprx];
feprx_cmd->aid = aid;
feprx_cmd->ru = ru;
feprx_cmd->slot = ru->proc.tti_rx;
feprx_cmd->startSymbol = ru->nr_frame_parms->symbols_per_slot >> 1;
feprx_cmd->endSymbol = ru->nr_frame_parms->symbols_per_slot - 1;
task_t t = {.func = nr_fep, .args = feprx_cmd};
pushTpool(ru->threadPool, t);
nbfeprx++;
}
}
join_task_ans(ans, nbfeprx);
stop_meas(&ru->ofdm_demod_stats);
if (ru->idx == 0) VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_RU_FEPRX, 0 );
}
......
......@@ -39,6 +39,7 @@
#include "nfapi/oai_integration/vendor_ext.h"
#include "assertions.h"
#include <time.h>
#include <stdint.h>
//#define DEBUG_RXDATA
//#define SRS_IND_DEBUG
......@@ -304,9 +305,8 @@ void phy_procedures_gNB_TX(processingData_L1tx_t *msgTx,
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_PHY_PROCEDURES_gNB_TX + gNB->CC_id, 0);
}
static void nr_postDecode(PHY_VARS_gNB *gNB, notifiedFIFO_elt_t *req)
static void nr_postDecode(PHY_VARS_gNB *gNB, ldpcDecode_t *rdata)
{
ldpcDecode_t *rdata = (ldpcDecode_t*) NotifiedFifoData(req);
NR_UL_gNB_HARQ_t *ulsch_harq = rdata->ulsch_harq;
NR_gNB_ULSCH_t *ulsch = rdata->ulsch;
int r = rdata->segment_r;
......@@ -415,7 +415,12 @@ static void nr_postDecode(PHY_VARS_gNB *gNB, notifiedFIFO_elt_t *req)
}
}
static int nr_ulsch_procedures(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx, int ULSCH_id, uint8_t harq_pid)
static int nr_ulsch_procedures(PHY_VARS_gNB *gNB,
int frame_rx,
int slot_rx,
int ULSCH_id,
uint8_t harq_pid,
thread_info_tm_t *t_info)
{
NR_DL_FRAME_PARMS *frame_parms = &gNB->frame_parms;
nfapi_nr_pusch_pdu_t *pusch_pdu = &gNB->ulsch[ULSCH_id].harq_process->ulsch_pdu;
......@@ -466,9 +471,17 @@ static int nr_ulsch_procedures(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx, int
* measurement per processed TB.*/
if (gNB->max_nb_pusch == 1)
start_meas(&gNB->ulsch_decoding_stats);
int nbDecode =
nr_ulsch_decoding(gNB, ULSCH_id, gNB->pusch_vars[ULSCH_id].llr, frame_parms, pusch_pdu, frame_rx, slot_rx, harq_pid, G);
int const nbDecode = nr_ulsch_decoding(gNB,
ULSCH_id,
gNB->pusch_vars[ULSCH_id].llr,
frame_parms,
pusch_pdu,
frame_rx,
slot_rx,
harq_pid,
G,
t_info);
return nbDecode;
}
......@@ -827,6 +840,11 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
}
}
ldpcDecode_t arr[64];
task_ans_t ans[64] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .len = 0, .cap = 64, .ans = ans};
// int64_t const t0 = time_now_ns();
int totalDecode = 0;
for (int ULSCH_id = 0; ULSCH_id < gNB->max_nb_pusch; ULSCH_id++) {
NR_gNB_ULSCH_t *ulsch = &gNB->ulsch[ULSCH_id];
......@@ -924,21 +942,20 @@ int phy_procedures_gNB_uespec_RX(PHY_VARS_gNB *gNB, int frame_rx, int slot_rx)
// LOG_M("rxdataF_comp.m","rxF_comp",gNB->pusch_vars[0]->rxdataF_comp[0],6900,1,1);
// LOG_M("rxdataF_ext.m","rxF_ext",gNB->pusch_vars[0]->rxdataF_ext[0],6900,1,1);
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_NR_ULSCH_PROCEDURES_RX, 1);
int const tasks_added = nr_ulsch_procedures(gNB, frame_rx, slot_rx, ULSCH_id, ulsch->harq_pid);
int const tasks_added = nr_ulsch_procedures(gNB, frame_rx, slot_rx, ULSCH_id, ulsch->harq_pid, &t_info);
if (tasks_added > 0)
totalDecode += tasks_added;
VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME(VCD_SIGNAL_DUMPER_FUNCTIONS_NR_ULSCH_PROCEDURES_RX, 0);
}
}
while (totalDecode > 0) {
notifiedFIFO_elt_t *req = pullTpool(&gNB->respDecode, &gNB->threadPool);
if (req == NULL)
break; // Tpool has been stopped
nr_postDecode(gNB, req);
delNotifiedFIFO_elt(req);
totalDecode--;
}
DevAssert(totalDecode == t_info.len);
join_task_ans(t_info.ans, t_info.len);
for (int i = 0; i < t_info.len; ++i) {
nr_postDecode(gNB, &arr[i]);
}
/* Do ULSCH decoding time measurement only when number of PUSCH is limited to 1
* (valid for unitary physical simulators). ULSCH processing loop is then executed
* only once, which ensures exactly one start and stop of the ULSCH decoding time
......
......@@ -661,6 +661,7 @@ int main(int argc, char **argv)
free(gNB->gNB_config.tdd_table.max_tdd_periodicity_list[i].max_num_of_symbol_per_slot_list);
free(gNB->gNB_config.tdd_table.max_tdd_periodicity_list);
abortTpool(&gNB->threadPool);
phy_free_nr_gNB(gNB);
free(RC.gNB[0]);
free(RC.gNB);
......
......@@ -45,7 +45,6 @@
#include "openair1/SIMULATION/TOOLS/sim.h"
#include "openair1/SIMULATION/RF/rf.h"
#include "openair1/SIMULATION/NR_PHY/nr_unitary_defs.h"
#include "common/utils/threadPool/thread-pool.h"
#include "openair2/LAYER2/NR_MAC_COMMON/nr_mac_common.h"
#include "executables/nr-uesoftmodem.h"
#include "nfapi/oai_integration/vendor_ext.h"
......@@ -91,9 +90,8 @@ void deref_sched_response(int _)
exit(1);
}
int nr_postDecode_sim(PHY_VARS_gNB *gNB, notifiedFIFO_elt_t *req, int *nb_ok)
int nr_postDecode_sim(PHY_VARS_gNB *gNB, ldpcDecode_t *rdata, int *nb_ok)
{
ldpcDecode_t *rdata = (ldpcDecode_t*) NotifiedFifoData(req);
NR_UL_gNB_HARQ_t *ulsch_harq = rdata->ulsch_harq;
int r = rdata->segment_r;
......@@ -107,8 +105,10 @@ int nr_postDecode_sim(PHY_VARS_gNB *gNB, notifiedFIFO_elt_t *req, int *nb_ok)
}
// if all segments are done
if (rdata->nbSegments == ulsch_harq->processedSegments)
if (rdata->nbSegments == ulsch_harq->processedSegments) {
return *nb_ok == rdata->nbSegments;
}
return 0;
}
......@@ -594,20 +594,23 @@ int main(int argc, char **argv)
exit(-1);
#endif
int nbDecode = nr_ulsch_decoding(gNB, UE_id, channel_output_fixed, frame_parms, rel15_ul, frame, subframe, harq_pid, G);
int nb_ok = 0;
if (nbDecode > 0)
while (nbDecode > 0) {
notifiedFIFO_elt_t *req = pullTpool(&gNB->respDecode, &gNB->threadPool);
ret = nr_postDecode_sim(gNB, req, &nb_ok);
delNotifiedFIFO_elt(req);
nbDecode--;
}
ldpcDecode_t arr[16] = {0};
task_ans_t ans[16] = {0};
thread_info_tm_t t_info = {.buf = (uint8_t *)arr, .cap = 16, .len = 0, .ans = ans};
int nbDecode =
nr_ulsch_decoding(gNB, UE_id, channel_output_fixed, frame_parms, rel15_ul, frame, subframe, harq_pid, G, &t_info);
DevAssert(nbDecode > 0);
int nb_ok = 0;
join_task_ans(t_info.ans, t_info.len);
for (size_t i = 0; i < nbDecode; ++i) {
ret = nr_postDecode_sim(gNB, &arr[i], &nb_ok);
}
nbDecode = 0;
if (ret)
n_errors++;
}
printf("*****************************************\n");
printf("SNR %f, BLER %f (false positive %f)\n", SNR,
(float) n_errors / (float) n_trials,
......@@ -633,6 +636,7 @@ int main(int argc, char **argv)
term_nr_ue_signal(UE, 1);
free(UE);
abortTpool(&gNB->threadPool);
phy_free_nr_gNB(gNB);
free(RC.gNB[0]);
free(RC.gNB);
......
......@@ -41,6 +41,7 @@
#include "common/ran_context.h"
#include "intertask_interface.h"
#include <pthread.h>
#include "common/utils/system.h"
extern RAN_CONTEXT_t RC;
extern int oai_exit;
......
......@@ -37,7 +37,6 @@
//#include "openair1/PHY/LTE_TRANSPORT/transport_eNB.h"
#include "nfapi_interface.h"
#include "common/platform_types.h"
#include <common/utils/threadPool/thread-pool.h>
#include <radio/COMMON/common_lib.h>
#define MAX_NUM_DL_PDU 100
......
......@@ -37,7 +37,7 @@
#include <sys/types.h>
#include <openair1/PHY/TOOLS/tools_defs.h>
#include "record_player.h"
#include <common/utils/threadPool/thread-pool.h>
#include "common/utils/threadPool/notified_fifo.h"
/* default name of shared library implementing the radio front end */
#define OAI_RF_LIBNAME "oai_device"
......
......@@ -50,7 +50,6 @@
#include "common_lib.h"
#include "ethernet_lib.h"
#include "openair1/PHY/sse_intrin.h"
#include "common/utils/threadPool/thread-pool.h"
//#define DEBUG 1
......
......@@ -41,7 +41,6 @@
#include <sys/socket.h>
#include <net/if.h>
#include <netinet/ether.h>
#include <common/utils/threadPool/thread-pool.h>
#define MAX_INST 4
#define DEFAULT_IF "lo"
......
......@@ -49,6 +49,7 @@
#include "common/utils/LOG/log.h"
#include "common_lib.h"
#include "assertions.h"
#include "system.h"
#include "common/utils/LOG/vcd_signal_dumper.h"
......
......@@ -31,7 +31,6 @@
#include "common/utils/LOG/log.h"
#include "common/utils/LOG/vcd_signal_dumper.h"
#include "openair1/PHY/defs_gNB.h"
#include "common/utils/threadPool/thread-pool.h"
#include "oaioran.h"
#include "oran-config.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment