Commit a3379d1d authored by Florian Kaltenberger's avatar Florian Kaltenberger

Merge remote-tracking branch 'origin/fix-nr-rfsim-gNB-notx-slots2to19' into...

Merge remote-tracking branch 'origin/fix-nr-rfsim-gNB-notx-slots2to19' into integration-develop-nr-2019w46
parents 6082a33d ea7c4d95
...@@ -185,7 +185,7 @@ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-rpath -Wl,${CMAKE_CU ...@@ -185,7 +185,7 @@ set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,-rpath -Wl,${CMAKE_CU
# set a flag for changes in the source code # set a flag for changes in the source code
# these changes are related to hardcoded path to include .h files # these changes are related to hardcoded path to include .h files
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS} -g -DMALLOC_CHECK_=3") set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS} -g -DMALLOC_CHECK_=3")
set(CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS} -g -DMALLOC_CHECK_=3 -O2") set(CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS} -g3 -DMALLOC_CHECK_=3 -O2")
set(GIT_BRANCH "UNKNOWN") set(GIT_BRANCH "UNKNOWN")
......
all: measurement_display thread-pool-test all:
measurement_display thread-pool-test
measurement_display: measurement_display.c thread-pool.h measurement_display:
gcc measurement_display.c -I ${OPENAIR_DIR}/ -I ${OPENAIR_DIR}/common/utils/ -I. ${OPENAIR_DIR}/common/utils/backtrace.c -lpthread -D TEST_THREAD_POOL -I../LOG -I../../utils/T -o measurement_display measurement_display.c thread-pool.h
gcc measurement_display.c -I $ {OPENAIR_DIR}/ -I $ {OPENAIR_DIR}/common/utils/ -I. $ {OPENAIR_DIR}/common/utils/backtrace.c -lpthread -D TEST_THREAD_POOL -I../LOG -I../../utils/T -o
measurement_display
thread-pool-test: thread-pool.c thread-pool.h thread-pool-test:
gcc -g thread-pool.c -I ${OPENAIR_DIR}/ -I ${OPENAIR_DIR}/common/utils/ -I. ${OPENAIR_DIR}/common/utils/backtrace.c -I ${OPENAIR_DIR}/openair2/COMMON ${OPENAIR_DIR}/common/utils/LOG/log.c ${OPENAIR_DIR}/common/config/config_userapi.c ${OPENAIR_DIR}/common/config/config_load_configmodule.c ${OPENAIR_DIR}/common/config/config_cmdline.c -lpthread -ldl -D TEST_THREAD_POOL -I../LOG -I../../utils/T -o thread-pool-test thread-pool.c thread-pool.h
gcc -g thread-pool.c -I $ {OPENAIR_DIR}/ -I $ {OPENAIR_DIR}/common/utils/ -I. $ {OPENAIR_DIR}/common/utils/backtrace.c -I $ {OPENAIR_DIR}/openair2/COMMON $ {OPENAIR_DIR}/common/utils/LOG/log.c $ {OPENAIR_DIR}/common/config/config_userapi.c
$ {OPENAIR_DIR}/common/config/config_load_configmodule.c $ {OPENAIR_DIR}/common/config/config_cmdline.c -lpthread -ldl -D TEST_THREAD_POOL -I../LOG -I../../utils/T -o thread-pool-test
...@@ -67,7 +67,7 @@ void *one_thread(void *arg) { ...@@ -67,7 +67,7 @@ void *one_thread(void *arg) {
delNotifiedFIFO_elt(elt); delNotifiedFIFO_elt(elt);
else else
pushNotifiedFIFO(elt->reponseFifo, elt); pushNotifiedFIFO(elt->reponseFifo, elt);
myThread->runningOnKey=-1;
mutexunlock(tp->incomingFifo.lockF); mutexunlock(tp->incomingFifo.lockF);
} }
} while (true); } while (true);
...@@ -95,7 +95,7 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) { ...@@ -95,7 +95,7 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
pool->nbThreads=0; pool->nbThreads=0;
pool->restrictRNTI=false; pool->restrictRNTI=false;
curptr=strtok_r(params,",",&saveptr); curptr=strtok_r(params,",",&saveptr);
struct one_thread * ptr;
while ( curptr!=NULL ) { while ( curptr!=NULL ) {
int c=toupper(curptr[0]); int c=toupper(curptr[0]);
...@@ -109,8 +109,9 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) { ...@@ -109,8 +109,9 @@ void initTpool(char *params,tpool_t *pool, bool performanceMeas) {
break; break;
default: default:
ptr=pool->allthreads;
pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread)); pool->allthreads=(struct one_thread *)malloc(sizeof(struct one_thread));
pool->allthreads->next=pool->allthreads; pool->allthreads->next=ptr;
printf("create a thread for core %d\n", atoi(curptr)); printf("create a thread for core %d\n", atoi(curptr));
pool->allthreads->coreID=atoi(curptr); pool->allthreads->coreID=atoi(curptr);
pool->allthreads->id=pool->nbThreads; pool->allthreads->id=pool->nbThreads;
......
...@@ -150,8 +150,9 @@ static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) { ...@@ -150,8 +150,9 @@ static inline notifiedFIFO_elt_t *pollNotifiedFIFO(notifiedFIFO_t *nf) {
// This function aborts all messages matching the key // This function aborts all messages matching the key
// If the queue is used in thread pools, it doesn't cancels already running processing // If the queue is used in thread pools, it doesn't cancels already running processing
// because the message has already been picked // because the message has already been picked
static inline void abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) { static inline int abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) {
mutexlock(nf->lockF); mutexlock(nf->lockF);
int nbDeleted=0;
notifiedFIFO_elt_t **start=&nf->outF; notifiedFIFO_elt_t **start=&nf->outF;
while(*start!=NULL) { while(*start!=NULL) {
...@@ -159,13 +160,16 @@ static inline void abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) { ...@@ -159,13 +160,16 @@ static inline void abortNotifiedFIFO(notifiedFIFO_t *nf, uint64_t key) {
notifiedFIFO_elt_t *request=*start; notifiedFIFO_elt_t *request=*start;
*start=(*start)->next; *start=(*start)->next;
delNotifiedFIFO_elt(request); delNotifiedFIFO_elt(request);
} nbDeleted++;
} else
if (*start != NULL)
start=&(*start)->next; start=&(*start)->next;
} }
if (nf->outF == NULL)
nf->inF=NULL;
mutexunlock(nf->lockF); mutexunlock(nf->lockF);
return nbDeleted;
} }
struct one_thread { struct one_thread {
...@@ -195,7 +199,20 @@ typedef struct thread_pool { ...@@ -195,7 +199,20 @@ typedef struct thread_pool {
static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) { static inline void pushTpool(tpool_t *t, notifiedFIFO_elt_t *msg) {
if (t->measurePerf) msg->creationTime=rdtsc(); if (t->measurePerf) msg->creationTime=rdtsc();
if ( t->activated)
pushNotifiedFIFO(&t->incomingFifo, msg); pushNotifiedFIFO(&t->incomingFifo, msg);
else {
if (t->measurePerf)
msg->startProcessingTime=rdtsc();
msg->processingFunc(NotifiedFifoData(msg));
if (t->measurePerf)
msg->endProcessingTime=rdtsc();
if (msg->reponseFifo)
pushNotifiedFIFO(msg->reponseFifo, msg);
}
} }
static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) { static inline notifiedFIFO_elt_t *pullTpool(notifiedFIFO_t *responseFifo, tpool_t *t) {
...@@ -225,7 +242,8 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo ...@@ -225,7 +242,8 @@ static inline notifiedFIFO_elt_t *tryPullTpool(notifiedFIFO_t *responseFifo, tpo
return msg; return msg;
} }
static inline void abortTpool(tpool_t *t, uint64_t key) { static inline int abortTpool(tpool_t *t, uint64_t key) {
int nbRemoved=0;
notifiedFIFO_t *nf=&t->incomingFifo; notifiedFIFO_t *nf=&t->incomingFifo;
mutexlock(nf->lockF); mutexlock(nf->lockF);
notifiedFIFO_elt_t **start=&nf->outF; notifiedFIFO_elt_t **start=&nf->outF;
...@@ -235,22 +253,27 @@ static inline void abortTpool(tpool_t *t, uint64_t key) { ...@@ -235,22 +253,27 @@ static inline void abortTpool(tpool_t *t, uint64_t key) {
notifiedFIFO_elt_t *request=*start; notifiedFIFO_elt_t *request=*start;
*start=(*start)->next; *start=(*start)->next;
delNotifiedFIFO_elt(request); delNotifiedFIFO_elt(request);
} nbRemoved++;
} else
if (*start != NULL)
start=&(*start)->next; start=&(*start)->next;
} }
if (t->incomingFifo.outF==NULL)
t->incomingFifo.inF=NULL;
struct one_thread *ptr=t->allthreads; struct one_thread *ptr=t->allthreads;
while(ptr!=NULL) { while(ptr!=NULL) {
if (ptr->runningOnKey==key) if (ptr->runningOnKey==key) {
ptr->abortFlag=true; ptr->abortFlag=true;
nbRemoved++;
}
ptr=ptr->next; ptr=ptr->next;
} }
mutexunlock(nf->lockF); mutexunlock(nf->lockF);
return nbRemoved;
} }
void initTpool(char *params,tpool_t *pool, bool performanceMeas); void initTpool(char *params,tpool_t *pool, bool performanceMeas);
......
...@@ -3,69 +3,90 @@ ...@@ -3,69 +3,90 @@
The thread pool is a working server, made of a set of worker threads that can be mapped on CPU cores. The thread pool is a working server, made of a set of worker threads that can be mapped on CPU cores.
Each worker loop on pick from the same input queue jobs to do. Each worker loop on pick from the same input queue jobs to do.
When a job is done, the worker sends a return if a return is defined. When a job is done, the worker sends a return if a return is defined.
A selective abort allows to cancel parallel jobs (usage: a client pushed jobs, but from a response of one job, the other linked jobs becomes useless). A selective abort allows to cancel parallel jobs (usage: a client pushed jobs, but from a response of one job, the other linked jobs becomes useless).
All the thread pool functions are thread safe, nevertheless the working functions are implemented by the thread pool client, so the client has to tackle the parallel execution of his functions called "processingFunc" hereafter. All the thread pool functions are thread safe, nevertheless the working functions are implemented by the thread pool client,
so the client has to tackle the parallel execution of his functions called "processingFunc" hereafter.
## license ## license
Author: Laurent Thomas, Open cells project Author:
The owner share this piece code to Openairsoftware alliance as per OSA license terms Laurent Thomas, Open cells project
The owner share this piece code to Openairsoftware alliance as per OSA license terms
# jobs # jobs
A job is a message (notifiedFIFO_elt_t): A job is a message (notifiedFIFO_elt_t):
next: internal FIFO chain, do not set it next:
key: a long int that the client can use to identify a message or a group of messages internal FIFO chain, do not set it
responseFifo: if the client defines a response FIFO, the message will be posted back after processing key:
processingFunc: any funtion (type void processingFunc(void *)) that the worker will launch a long int that the client can use to identify a message or a group of messages
msgData: the data passed to processingFunc. It can be added automatically, or you can set it to a buffer you are managing responseFifo:
malloced: a boolean that enable internal free in these cases: no return Fifo or Abort feature if the client defines a response FIFO, the message will be posted back after processing
processingFunc:
The job messages can be created with newNotifiedFIFO_elt() and delNotifiedFIFO_elt() or managed by the client. any funtion (type void processingFunc(void *)) that the worker will launch
msgData:
the data passed to processingFunc. It can be added automatically, or you can set it to a buffer you are managing
malloced:
a boolean that enable internal free in these cases:
no return Fifo or Abort feature
The job messages can be created with newNotifiedFIFO_elt() and delNotifiedFIFO_elt() or managed by the client.
# Queues of jobs # Queues of jobs
Queues are type of: notifiedFIFO_t that must be initialized by init_notifiedFIFO() Queues are type of:
No delete function is required, the creator has only to free the data of type notifiedFIFO_t notifiedFIFO_t that must be initialized by init_notifiedFIFO()
No delete function is required, the creator has only to free the data of type notifiedFIFO_t
push_notifiedFIFO() add a job in the queue push_notifiedFIFO() add a job in the queue
pull_notifiedFIFO() is blocking, poll_notifiedFIFO() is non blocking pull_notifiedFIFO() is blocking, poll_notifiedFIFO() is non blocking
abort_notifiedFIFO() allows the customer to delete all waiting jobs that match with the key (see key in jobs definition) abort_notifiedFIFO() allows the customer to delete all waiting jobs that match with the key (see key in jobs definition)
# Thread pools # Thread pools
## initialization ## initialization
The clients can create one or more thread pools with init_tpool() The clients can create one or more thread pools with init_tpool()
the params string structure: describes a list of cores, separated by "," that run a worker thread the params string structure:
describes a list of cores, separated by "," that run a worker thread
If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and the related code (use negative values is allowed, so the thread will never be mapped on a specific core). If the core exists on the CPU, the thread pool initialization sets the affinity between this thread and
the related code (use negative values is allowed, so the thread will never be mapped on a specific core).
The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>" The threads are all Linux real time scheduler, their name is set automatically is "Tpool_<core id>"
## adding jobs ## adding jobs
The client create their jobs messages as a notifiedFIFO_elt_t, then they push it with pushTpool() (that internally calls push_notifiedFIFO()) The client create their jobs messages as a notifiedFIFO_elt_t, then they push it with pushTpool() (that internally calls push_notifiedFIFO())
If they need a return, they have to create response queues with init_notifiedFIFO() and set this FIFO pointer in the notifiedFIFO_elt_t before pushing the job. If they need a return, they have to create response queues with init_notifiedFIFO() and set this FIFO pointer in the notifiedFIFO_elt_t before pushing the job.
## abort ## abort
A abort service abortTpool() allows to abort all jobs that match a key (see jobs "key"). When the abort returns, it garanties no job (matching the key) response will be posted on response queues. A abort service abortTpool() allows to abort all jobs that match a key (see jobs "key"). When the abort returns, it garanties no job (matching the key) response will be posted on response queues.
Nevertheless, jobs already performed before the return of abortTpool() are pushed in the response Fifo queue. Nevertheless, jobs already performed before the return of abortTpool() are pushed in the response Fifo queue.
## Performance measurements ## Performance measurements
A performance measurement is integrated: the pool will automacillay fill timestamps: A performance measurement is integrated:
the pool will automacillay fill timestamps:
* creationTime:
time the request is push to the pool;
* creationTime: time the request is push to the pool; * startProcessingTime:
* startProcessingTime: time a worker start to run on the job time a worker start to run on the job
* endProcessingTime: time the worker finished the job * endProcessingTime:
* returnTime: time the client reads the result time the worker finished the job
* returnTime:
time the client reads the result
if you set the environement variable: thread-pool-measurements to a valid file name if you set the environement variable:
thread-pool-measurements to a valid file name
These measurements will be wrote to this Linux pipe. These measurements will be wrote to this Linux pipe.
A tool to read the linux fifo and display it in ascii is provided: see the local directory Makefile for this tool and to compile the thread pool unitary tests. A tool to read the linux fifo and display it in ascii is provided:
see the local directory Makefile for this tool and to compile the thread pool unitary tests.
...@@ -81,6 +81,7 @@ ...@@ -81,6 +81,7 @@
#include "common/utils/LOG/vcd_signal_dumper.h" #include "common/utils/LOG/vcd_signal_dumper.h"
#include "enb_config.h" #include "enb_config.h"
#include <executables/nr-softmodem.h>
#ifdef SMBV #ifdef SMBV
#include "PHY/TOOLS/smbv.h" #include "PHY/TOOLS/smbv.h"
...@@ -730,13 +731,15 @@ void tx_rf(RU_t *ru,int frame,int slot, uint64_t timestamp) { ...@@ -730,13 +731,15 @@ void tx_rf(RU_t *ru,int frame,int slot, uint64_t timestamp) {
//nr_subframe_t SF_type = nr_slot_select(cfg,slot%fp->slots_per_frame); //nr_subframe_t SF_type = nr_slot_select(cfg,slot%fp->slots_per_frame);
if ((slot == 0) || if ((slot == 0) ||
(slot == 1)) { (slot == 1) || IS_SOFTMODEM_RFSIM ) {
int siglen=fp->samples_per_slot; int siglen=fp->samples_per_slot;
int flags; int flags;
if (slot==0) if (slot==0)
flags = 2; flags = 2;
else if (slot==1) else if (slot==1)
flags=3; flags=3;
else
flags=4;
/* /*
if (SF_type == SF_S) { if (SF_type == SF_S) {
......
...@@ -49,10 +49,12 @@ ...@@ -49,10 +49,12 @@
#define SOFTMODEM_NOS1_BIT (1<<0) #define SOFTMODEM_NOS1_BIT (1<<0)
#define SOFTMODEM_NOKRNMOD_BIT (1<<1) #define SOFTMODEM_NOKRNMOD_BIT (1<<1)
#define SOFTMODEM_RFSIM_BIT (1<<10)
#define IS_SOFTMODEM_NOS1 ( get_softmodem_optmask() & SOFTMODEM_NOS1_BIT) #define IS_SOFTMODEM_NOS1 ( get_softmodem_optmask() & SOFTMODEM_NOS1_BIT)
#define IS_SOFTMODEM_NOKRNMOD ( get_softmodem_optmask() & SOFTMODEM_NOKRNMOD_BIT) #define IS_SOFTMODEM_NOKRNMOD ( get_softmodem_optmask() & SOFTMODEM_NOKRNMOD_BIT)
#define IS_SOFTMODEM_RFSIM ( get_softmodem_optmask() & SOFTMODEM_RFSIM_BIT)
extern uint64_t get_softmodem_optmask(void); extern uint64_t get_softmodem_optmask(void);
extern void get_common_options(void); extern void get_common_options(void);
......
...@@ -538,6 +538,9 @@ void trashFrame(PHY_VARS_NR_UE *UE, openair0_timestamp *timestamp) { ...@@ -538,6 +538,9 @@ void trashFrame(PHY_VARS_NR_UE *UE, openair0_timestamp *timestamp) {
dummy_rx, dummy_rx,
UE->frame_parms.samples_per_subframe, UE->frame_parms.samples_per_subframe,
UE->frame_parms.nb_antennas_rx); UE->frame_parms.nb_antennas_rx);
if (IS_SOFTMODEM_RFSIM ) {
usleep(1000); // slow down, as would do actuall rf to let cpu for the synchro thread
}
} }
for (int i=0; i<UE->frame_parms.nb_antennas_tx; i++) for (int i=0; i<UE->frame_parms.nb_antennas_tx; i++)
......
...@@ -126,7 +126,7 @@ int load_lib(openair0_device *device, ...@@ -126,7 +126,7 @@ int load_lib(openair0_device *device,
libname=OAI_RFSIM_LIBNAME; libname=OAI_RFSIM_LIBNAME;
shlib_fdesc[0].fname="device_init"; shlib_fdesc[0].fname="device_init";
} else if (flag == RAU_LOCAL_RADIO_HEAD) { } else if (flag == RAU_LOCAL_RADIO_HEAD) {
if (getenv("RFSIMULATOR") != NULL) if (IS_SOFTMODEM_RFSIM)
libname="rfsimulator"; libname="rfsimulator";
else else
libname=OAI_RF_LIBNAME; libname=OAI_RF_LIBNAME;
......
This diff is collapsed.
...@@ -262,7 +262,7 @@ RUs = ( ...@@ -262,7 +262,7 @@ RUs = (
THREAD_STRUCT = ( THREAD_STRUCT = (
{ {
#three config for level of parallelism "PARALLEL_SINGLE_THREAD", "PARALLEL_RU_L1_SPLIT", or "PARALLEL_RU_L1_TRX_SPLIT" #three config for level of parallelism "PARALLEL_SINGLE_THREAD", "PARALLEL_RU_L1_SPLIT", or "PARALLEL_RU_L1_TRX_SPLIT"
parallel_config = "PARALLEL_RU_L1_TRX_SPLIT"; parallel_config = "PARALLEL_SINGLE_THREAD";
#two option for worker "WORKER_DISABLE" or "WORKER_ENABLE" #two option for worker "WORKER_DISABLE" or "WORKER_ENABLE"
worker_config = "WORKER_DISABLE"; worker_config = "WORKER_DISABLE";
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment