Commit 21790f9c authored by Bruno Mongazon-Cazavet's avatar Bruno Mongazon-Cazavet Committed by Robert Schmidt

Optimize RFsimulator to improve E2E performance

This improves the RFsimulator code to reach a ~40% E2E throughput
improvement (depending on the machines).

Changes:
- reduce ring buffer size
- code cleanup for readability
- set TCP kernel parameters
parent 79fd37b4
......@@ -572,17 +572,11 @@ struct openair0_device_t {
typedef int(*oai_device_initfunc_t)(openair0_device *device, openair0_config_t *openair0_cfg);
/* type of transport init function, implemented in shared lib */
typedef int(*oai_transport_initfunc_t)(openair0_device *device, openair0_config_t *openair0_cfg, eth_params_t *eth_params);
#define UE_MAGICDL 0xA5A5A5A5A5A5A5A5 // UE DL FDD record
#define UE_MAGICUL 0x5A5A5A5A5A5A5A5A // UE UL FDD record
#define ENB_MAGICDL 0xB5B5B5B5B5B5B5B5 // eNB DL FDD record
#define ENB_MAGICUL 0x5B5B5B5B5B5B5B5B // eNB UL FDD record
#define OPTION_LZ4 0x00000001 // LZ4 compression (option_value is set to compressed size)
typedef struct {
uint64_t magic; // Magic value (see defines above)
uint32_t size; // Number of samples per antenna to follow this header
uint32_t nbAnt; // Total number of antennas following this header
// Samples per antenna follow this header,
......
......@@ -54,13 +54,44 @@
#include "rfsimulator.h"
#define PORT 4043 //default TCP port for this simulator
#define CirSize 6144000 // 100ms is enough
//
// CirSize defines the number of samples inquired for a read cycle
// It is bounded by a slot read capability (which depends on bandwidth and numerology)
// up to multiple slots read to allow I/Q buffering of the I/Q TCP stream
//
// As a rule of thumb:
// -it can't be less than the number of samples for a slot
// -it can range up to multiple slots
//
// The default value is chosen for 10ms buffering which makes 23040*20 = 460800 samples
// The previous value is kept below in comment it was computed for 100ms 1x 20MHz
// #define CirSize 6144000 // 100ms SiSo 20MHz LTE
#define CirSize 460800 // 10ms SiSo 40Mhz 3/4 sampling NR78 FR1
#define sampleToByte(a,b) ((a)*(b)*sizeof(sample_t))
#define byteToSample(a,b) ((a)/(sizeof(sample_t)*(b)))
#define MAX_SIMULATION_CONNECTED_NODES 5
#define GENERATE_CHANNEL 10 //each frame in DL
#define GENERATE_CHANNEL 10 // each frame (or slot?) in DL
// This needs to be re-architected in the future
//
// File Descriptors management in rfsimulator is not optimized
// Relying on FD_SETSIZE (actually 1024) is not appropriated
// Also the use of fd value as returned by Linux as an index for buf[] structure is not appropriated
// especially for client (UE) side since only 1 fd per connection to a gNB is needed. On the server
// side the value should be tuned to the maximum number of connections with UE's which corresponds
// to the maximum number of UEs hosted by a gNB which is unlikely to be in the order of thousands
// since all I/Q's would flow through the same TCP transport.
// Until a convenient management is implemented, the MAX_FD_RFSIMU is used everywhere (instead of
// FD_SETSIE) and reduced to 125. This should allow for around 20 simultaeous UEs.
//
// #define MAX_FD_RFSIMU FD_SETSIZE
#define MAX_FD_RFSIMU 125
#define SYSCTL_MEM_VALUE 134217728 // Kernel network buffer size
#define SEND_BUFF_SIZE SYSCTL_MEM_VALUE // Socket buffer size
// Simulator role
typedef enum { SIMU_ROLE_SERVER = 1, SIMU_ROLE_CLIENT } simuRole;
//
......@@ -126,11 +157,11 @@ typedef struct {
int listen_sock, epollfd;
openair0_timestamp nextRxTstamp;
openair0_timestamp lastWroteTS;
uint64_t typeStamp;
simuRole role;
char *ip;
uint16_t port;
int saveIQfile;
buffer_t buf[FD_SETSIZE];
buffer_t buf[MAX_FD_RFSIMU];
int rx_num_channels;
int tx_num_channels;
double sample_rate;
......@@ -145,10 +176,14 @@ typedef struct {
int wait_timeout;
} rfsimulator_state_t;
static void allocCirBuf(rfsimulator_state_t *bridge, int sock) {
static int allocCirBuf(rfsimulator_state_t *bridge, int sock)
{
buffer_t *ptr=&bridge->buf[sock];
AssertFatal ( (ptr->circularBuf=(sample_t *) malloc(sampleToByte(CirSize,1))) != NULL, "");
ptr->circularBuf = malloc(sampleToByte(CirSize, 1));
if (ptr->circularBuf == NULL) {
LOG_E(HW, "malloc(%lu) failed\n", sampleToByte(CirSize, 1));
return -1;
}
ptr->circularBufEnd=((char *)ptr->circularBuf)+sampleToByte(CirSize,1);
ptr->conn_sock=sock;
ptr->lastReceivedTS=0;
......@@ -156,12 +191,18 @@ static void allocCirBuf(rfsimulator_state_t *bridge, int sock) {
ptr->trashingPacket=false;
ptr->transferPtr=(char *)&ptr->th;
ptr->remainToTransfer=sizeof(samplesBlockHeader_t);
int sendbuff=1000*1000*100;
AssertFatal ( setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)) == 0, "");
int sendbuff = SEND_BUFF_SIZE;
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sendbuff, sizeof(sendbuff)) != 0) {
LOG_E(HW, "setsockopt(SO_SNDBUF) failed\n");
return -1;
}
struct epoll_event ev= {0};
ev.events = EPOLLIN | EPOLLRDHUP;
ev.data.fd = sock;
AssertFatal(epoll_ctl(bridge->epollfd, EPOLL_CTL_ADD, sock, &ev) != -1, "");
if (epoll_ctl(bridge->epollfd, EPOLL_CTL_ADD, sock, &ev) != 0) {
LOG_E(HW, "epoll_ctl(EPOLL_CTL_ADD) failed\n");
return -1;
}
if ( bridge->channelmod > 0) {
// create channel simulation model for this mode reception
......@@ -183,23 +224,31 @@ static void allocCirBuf(rfsimulator_state_t *bridge, int sock) {
FILE *h=fopen("/dev/random","r");
if ( 1 != fread(&rand,sizeof(rand),1,h) )
LOG_W(HW, "Simulator can't read /dev/random\n");
LOG_W(HW, "Can't read /dev/random\n");
fclose(h);
randominit(rand);
tableNor(rand);
init_done=true;
}
char *modelname = (bridge->typeStamp == ENB_MAGICDL) ? "rfsimu_channel_ue0":"rfsimu_channel_enB0";
ptr->channel_model=find_channel_desc_fromname(modelname); // path_loss in dB
AssertFatal((ptr->channel_model!= NULL),"Channel model %s not found, check config file\n",modelname);
char *modelname = (bridge->role == SIMU_ROLE_SERVER) ? "rfsimu_channel_ue0" : "rfsimu_channel_enB0";
ptr->channel_model = find_channel_desc_fromname(modelname); // path_loss in dB
if (ptr->channel_model != NULL) {
LOG_E(HW, "Channel model %s not found, check config file\n", modelname);
return -1;
}
set_channeldesc_owner(ptr->channel_model, RFSIMU_MODULEID);
random_channel(ptr->channel_model,false);
LOG_I(HW, "Random channel %s in rfsimulator activated\n", modelname);
}
return 0;
}
static void removeCirBuf(rfsimulator_state_t *bridge, int sock) {
AssertFatal( epoll_ctl(bridge->epollfd, EPOLL_CTL_DEL, sock, NULL) != -1, "");
if (epoll_ctl(bridge->epollfd, EPOLL_CTL_DEL, sock, NULL) != 0) {
LOG_E(HW, "epoll_ctl(EPOLL_CTL_DEL) failed\n");
}
close(sock);
free(bridge->buf[sock].circularBuf);
// Fixme: no free_channel_desc_scm(bridge->buf[sock].channel_model) implemented
......@@ -211,10 +260,10 @@ static void removeCirBuf(rfsimulator_state_t *bridge, int sock) {
static void socketError(rfsimulator_state_t *bridge, int sock) {
if (bridge->buf[sock].conn_sock!=-1) {
LOG_W(HW,"Lost socket \n");
LOG_W(HW, "Lost socket\n");
removeCirBuf(bridge, sock);
if (bridge->typeStamp==UE_MAGICDL)
if (bridge->role == SIMU_ROLE_CLIENT)
exit(1);
}
}
......@@ -224,9 +273,13 @@ enum blocking_t {
blocking
};
static void setblocking(int sock, enum blocking_t active) {
static int setblocking(int sock, enum blocking_t active)
{
int opts = fcntl(sock, F_GETFL);
AssertFatal(opts >= 0, "fcntl(): errno %d, %s\n", errno, strerror(errno));
if (opts < 0) {
LOG_E(HW, "fcntl(F_GETFL) failed, errno(%d)\n", errno);
return -1;
}
if (active==blocking)
opts = opts & ~O_NONBLOCK;
......@@ -234,7 +287,11 @@ static void setblocking(int sock, enum blocking_t active) {
opts = opts | O_NONBLOCK;
opts = fcntl(sock, F_SETFL, opts);
AssertFatal(opts >= 0, "fcntl(): errno %d, %s\n", errno, strerror(errno));
if (opts < 0) {
LOG_E(HW, "fcntl(F_SETFL) failed, errno(%d)\n", errno);
return -1;
}
return 0;
}
static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps);
......@@ -242,11 +299,9 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps);
static void fullwrite(int fd, void *_buf, ssize_t count, rfsimulator_state_t *t) {
if (t->saveIQfile != -1) {
if (write(t->saveIQfile, _buf, count) != count )
LOG_E(HW,"write in save iq file failed (%s)\n",strerror(errno));
LOG_E(HW, "write() in save iq file failed (%d)\n", errno);
}
AssertFatal(fd>=0 && _buf && count >0 && t,
"Bug: %d/%p/%zd/%p", fd, _buf, count, t);
char *buf = _buf;
ssize_t l;
......@@ -257,11 +312,9 @@ static void fullwrite(int fd, void *_buf, ssize_t count, rfsimulator_state_t *t)
if (errno==EINTR)
continue;
if(errno==EAGAIN) {
// The opposite side is saturated
// we read incoming sockets meawhile waiting
//flushInput(t, 5);
usleep(500);
if (errno == EAGAIN) {
LOG_E(HW, "write() failed, errno(%d)\n", errno);
usleep(250);
continue;
} else
return;
......@@ -278,7 +331,12 @@ static void rfsimulator_readconfig(rfsimulator_state_t *rfsimulator) {
paramdef_t rfsimu_params[] = RFSIMULATOR_PARAMS_DESC;
int p = config_paramidx_fromname(rfsimu_params,sizeof(rfsimu_params)/sizeof(paramdef_t), RFSIMU_OPTIONS_PARAMNAME) ;
int ret = config_get( rfsimu_params,sizeof(rfsimu_params)/sizeof(paramdef_t),RFSIMU_SECTION);
AssertFatal(ret >= 0, "configuration couldn't be performed");
if (ret < 0) {
LOG_E(HW, "Configuration couldn't be performed\n");
exit(-1);
}
rfsimulator->saveIQfile = -1;
for(int i=0; i<rfsimu_params[p].numelt ; i++) {
......@@ -286,9 +344,11 @@ static void rfsimulator_readconfig(rfsimulator_state_t *rfsimulator) {
rfsimulator->saveIQfile=open(saveF,O_APPEND| O_CREAT|O_TRUNC | O_WRONLY, 0666);
if ( rfsimulator->saveIQfile != -1 )
LOG_I(HW,"rfsimulator: will save written IQ samples in %s\n", saveF);
else
LOG_E(HW, "can't open %s for IQ saving (%s)\n", saveF, strerror(errno));
LOG_I(HW, "Will save written IQ samples in %s\n", saveF);
else {
LOG_E(HW, "open(%s) failed for IQ saving, errno(%d)\n", saveF, errno);
exit(-1);
}
break;
} else if (strcmp(rfsimu_params[p].strlistptr[i],"chanmod") == 0) {
......@@ -296,7 +356,7 @@ static void rfsimulator_readconfig(rfsimulator_state_t *rfsimulator) {
load_channellist(rfsimulator->tx_num_channels, rfsimulator->rx_num_channels, rfsimulator->sample_rate, rfsimulator->tx_bw);
rfsimulator->channelmod=true;
} else {
fprintf(stderr,"Unknown rfsimulator option: %s\n",rfsimu_params[p].strlistptr[i]);
fprintf(stderr, "unknown rfsimulator option: %s\n", rfsimu_params[p].strlistptr[i]);
exit(-1);
}
}
......@@ -311,9 +371,9 @@ static void rfsimulator_readconfig(rfsimulator_state_t *rfsimulator) {
if ( strncasecmp(rfsimulator->ip,"enb",3) == 0 ||
strncasecmp(rfsimulator->ip,"server",3) == 0 )
rfsimulator->typeStamp = ENB_MAGICDL;
rfsimulator->role = SIMU_ROLE_SERVER;
else
rfsimulator->typeStamp = UE_MAGICDL;
rfsimulator->role = SIMU_ROLE_CLIENT;
}
static int rfsimu_setchanmod_cmd(char *buff, int debug, telnet_printfunc_t prnt, void *arg) {
......@@ -321,26 +381,26 @@ static int rfsimu_setchanmod_cmd(char *buff, int debug, telnet_printfunc_t prnt,
char *modeltype=NULL;
rfsimulator_state_t *t = (rfsimulator_state_t *)arg;
if (t->channelmod == false) {
prnt("ERROR channel modelisation disabled...\n");
prnt("%s: ERROR channel modelisation disabled...\n", __func__);
return 0;
}
if (buff == NULL) {
prnt("ERROR wrong rfsimu setchannelmod command...\n");
prnt("%s: ERROR wrong rfsimu setchannelmod command...\n", __func__);
return 0;
}
if (debug)
prnt("rfsimu_setchanmod_cmd buffer \"%s\"\n",buff);
prnt("%s: rfsimu_setchanmod_cmd buffer \"%s\"\n", __func__, buff);
int s = sscanf(buff,"%m[^ ] %ms\n",&modelname, &modeltype);
if (s == 2) {
int channelmod=modelid_fromstrtype(modeltype);
if (channelmod<0)
prnt("ERROR: model type %s unknown\n",modeltype);
prnt("%s: ERROR: model type %s unknown\n", __func__, modeltype);
else {
rfsimulator_state_t *t = (rfsimulator_state_t *)arg;
int found=0;
for (int i=0; i<FD_SETSIZE; i++) {
for (int i = 0; i < MAX_FD_RFSIMU; i++) {
buffer_t *b=&t->buf[i];
if ( b->channel_model==NULL)
continue;
......@@ -366,16 +426,16 @@ static int rfsimu_setchanmod_cmd(char *buff, int debug, telnet_printfunc_t prnt,
channel_desc_t *oldmodel=b->channel_model;
b->channel_model=newmodel;
free_channel_desc_scm(oldmodel);
prnt("New model type %s applied to channel %s connected to sock %d\n",modeltype,modelname,i);
prnt("%s: New model type %s applied to channel %s connected to sock %d\n", __func__, modeltype, modelname, i);
found=1;
break;
}
} /* for */
if (found==0)
prnt("Channel %s not found or not currently used\n",modelname);
prnt("%s: Channel %s not found or not currently used\n", __func__, modelname);
}
} else {
prnt("ERROR: 2 parameters required: model name and model type (%i found)\n",s);
prnt("%s: ERROR: 2 parameters required: model name and model type (%i found)\n", __func__, s);
}
free(modelname);
......@@ -477,7 +537,7 @@ static int rfsimu_setdistance_cmd(char *buff, int debug, telnet_printfunc_t prnt
int distance;
int s = sscanf(buff,"%m[^ ] %d\n", &modelname, &distance);
if (s != 2) {
prnt("require exact two parameters\n");
prnt("%s: require exact two parameters\n", __func__);
return CMDSTATUS_VARNOTFOUND;
}
......@@ -488,14 +548,14 @@ static int rfsimu_setdistance_cmd(char *buff, int debug, telnet_printfunc_t prnt
const int new_offset = (double) distance * sample_rate / c;
const double new_distance = (double) new_offset * c / sample_rate;
prnt("\nnew_offset %d new (exact) distance %.3f m\n", new_offset, new_distance);
prnt("\n%s: new_offset %d new (exact) distance %.3f m\n", __func__, new_offset, new_distance);
/* Set distance in rfsim and channel model, update channel and ringbuffer */
for (int i=0; i<FD_SETSIZE; i++) {
for (int i = 0; i < MAX_FD_RFSIMU; i++) {
buffer_t *b=&t->buf[i];
if (b->conn_sock <= 0 || b->channel_model == NULL || b->channel_model->model_name == NULL || strcmp(b->channel_model->model_name, modelname) != 0) {
if (b->channel_model != NULL && b->channel_model->model_name != NULL)
prnt(" model %s unmodified\n", b->channel_model->model_name);
prnt(" %s: model %s unmodified\n", __func__, b->channel_model->model_name);
continue;
}
......@@ -504,7 +564,7 @@ static int rfsimu_setdistance_cmd(char *buff, int debug, telnet_printfunc_t prnt
cd->channel_offset = new_offset;
const int nbTx = cd->nb_tx;
prnt(" Modifying model %s...\n", modelname);
prnt(" %s: Modifying model %s...\n", __func__, modelname);
rfsimu_offset_change_cirBuf(b->circularBuf, t->nextRxTstamp, CirSize, old_offset, new_offset, nbTx);
}
......@@ -522,7 +582,7 @@ static int rfsimu_getdistance_cmd(char *buff, int debug, telnet_printfunc_t prnt
const double sample_rate = t->sample_rate;
const double c = 299792458; /* 3e8 */
for (int i=0; i<FD_SETSIZE; i++) {
for (int i = 0; i < MAX_FD_RFSIMU; i++) {
buffer_t *b=&t->buf[i];
if (b->conn_sock <= 0 || b->channel_model == NULL || b->channel_model->model_name == NULL)
continue;
......@@ -530,7 +590,7 @@ static int rfsimu_getdistance_cmd(char *buff, int debug, telnet_printfunc_t prnt
channel_desc_t *cd = b->channel_model;
const int offset = cd->channel_offset;
const double distance = (double) offset * c / sample_rate;
prnt("\%s offset %d distance %.3f m\n", cd->model_name, offset, distance);
prnt("%s: \%s offset %d distance %.3f m\n", __func__, cd->model_name, offset, distance);
}
return CMDSTATUS_FOUND;
......@@ -541,126 +601,176 @@ static int rfsimu_vtime_cmd(char *buff, int debug, telnet_printfunc_t prnt, void
rfsimulator_state_t *t = (rfsimulator_state_t *)arg;
const openair0_timestamp ts = t->nextRxTstamp;
const double sample_rate = t->sample_rate;
prnt("vtime measurement: TS %llu sample_rate %.3f\n", ts, sample_rate);
prnt("%s: vtime measurement: TS %llu sample_rate %.3f\n", __func__, ts, sample_rate);
return CMDSTATUS_FOUND;
}
static void customNetForPerf()
{
int res = 0;
char sysctlmem[256];
memset(sysctlmem, 0, 256);
sprintf(sysctlmem, "/sbin/sysctl -n -e -q -w net.core.rmem_default=%d", SYSCTL_MEM_VALUE);
res = system(sysctlmem);
if (res != 0) {
LOG_W(HW, "Cannot set net.core.rmem_default to %d\n", SYSCTL_MEM_VALUE);
}
memset(sysctlmem, 0, 256);
sprintf(sysctlmem, "/sbin/sysctl -n -e -q -w net.core.rmem_max=%d", SYSCTL_MEM_VALUE);
res = system(sysctlmem);
if (res != 0) {
LOG_W(HW, "Cannot set net.core.rmem_max to %d\n", SYSCTL_MEM_VALUE);
}
memset(sysctlmem, 0, 256);
sprintf(sysctlmem, "/sbin/sysctl -n -e -q -w net.core.wmem_default=%d", SYSCTL_MEM_VALUE);
res = system(sysctlmem);
if (res != 0) {
LOG_W(HW, "Cannot set net.core.wmem_default to %d\n", SYSCTL_MEM_VALUE);
}
memset(sysctlmem, 0, 256);
sprintf(sysctlmem, "/sbin/sysctl -n -e -q -w net.core.wmem_max=%d", SYSCTL_MEM_VALUE);
res = system(sysctlmem);
if (res != 0) {
LOG_W(HW, "Cannot set net.core.wmem_max to %d\n", SYSCTL_MEM_VALUE);
}
}
static int startServer(openair0_device *device) {
rfsimulator_state_t *t = (rfsimulator_state_t *) device->priv;
t->typeStamp=ENB_MAGICDL;
AssertFatal((t->listen_sock = socket(AF_INET, SOCK_STREAM, 0)) >= 0, "");
t->role = SIMU_ROLE_SERVER;
t->listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (t->listen_sock < 0) {
LOG_E(HW, "socket(SOCK_STREAM) failed, errno(%d)\n", errno);
return -1;
}
int enable = 1;
AssertFatal(setsockopt(t->listen_sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) == 0, "");
struct sockaddr_in addr = {
.sin_family=
AF_INET,
.sin_port=
htons(t->port),
.sin_addr=
{ .s_addr= INADDR_ANY }
};
if (setsockopt(t->listen_sock, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) != 0) {
LOG_E(HW, "setsockopt(SO_REUSEADDR) failed, errno(%d)\n", errno);
return -1;
}
struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(t->port), .sin_addr = {.s_addr = INADDR_ANY}};
customNetForPerf();
int rc = bind(t->listen_sock, (struct sockaddr *)&addr, sizeof(addr));
AssertFatal(rc == 0, "bind failed: errno %d, %s", errno, strerror(errno));
AssertFatal(listen(t->listen_sock, 5) == 0, "");
if (rc < 0) {
LOG_E(HW, "bind() failed, errno(%d)\n", errno);
return -1;
}
if (listen(t->listen_sock, 5) != 0) {
LOG_E(HW, "listen() failed, errno(%d)\n", errno);
return -1;
}
struct epoll_event ev= {0};
ev.events = EPOLLIN;
ev.data.fd = t->listen_sock;
AssertFatal(epoll_ctl(t->epollfd, EPOLL_CTL_ADD, t->listen_sock, &ev) != -1, "");
if (epoll_ctl(t->epollfd, EPOLL_CTL_ADD, t->listen_sock, &ev) != 0) {
LOG_E(HW, "epoll_ctl(EPOLL_CTL_ADD) failed, errno(%d)\n", errno);
return -1;
}
return 0;
}
static int startClient(openair0_device *device) {
rfsimulator_state_t *t = device->priv;
t->typeStamp=UE_MAGICDL;
t->role = SIMU_ROLE_CLIENT;
int sock;
AssertFatal((sock = socket(AF_INET, SOCK_STREAM, 0)) >= 0, "");
struct sockaddr_in addr = {
.sin_family=
AF_INET,
.sin_port=
htons(t->port),
.sin_addr=
{ .s_addr= INADDR_ANY }
};
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
LOG_E(HW, "socket(SOCK_STREAM) failed, errno(%d)\n", errno);
return -1;
}
struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = htons(t->port), .sin_addr = {.s_addr = INADDR_ANY}};
addr.sin_addr.s_addr = inet_addr(t->ip);
bool connected=false;
customNetForPerf();
while(!connected) {
LOG_I(HW,"rfsimulator: trying to connect to %s:%d\n", t->ip, t->port);
LOG_I(HW, "Trying to connect to %s:%d\n", t->ip, t->port);
if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
LOG_I(HW,"rfsimulator: connection established\n");
LOG_I(HW, "Connection to %s:%d established\n", t->ip, t->port);
connected=true;
}
perror("rfsimulator");
LOG_I(HW, "connect() to %s:%d failed, errno(%d)\n", t->ip, t->port, errno);
sleep(1);
}
setblocking(sock, notBlocking);
allocCirBuf(t, sock);
return 0;
if (setblocking(sock, notBlocking) == -1) {
return -1;
}
return allocCirBuf(t, sock);
}
static int rfsimulator_write_internal(rfsimulator_state_t *t, openair0_timestamp timestamp, void **samplesVoid, int nsamps, int nbAnt, int flags, bool alreadyLocked) {
if (!alreadyLocked)
pthread_mutex_lock(&Sockmutex);
LOG_D(HW,"sending %d samples at time: %ld, nbAnt %d\n", nsamps, timestamp, nbAnt);
LOG_D(HW, "Sending %d samples at time: %ld, nbAnt %d\n", nsamps, timestamp, nbAnt);
for (int i=0; i<FD_SETSIZE; i++) {
for (int i = 0; i < MAX_FD_RFSIMU; i++) {
buffer_t *b=&t->buf[i];
if (b->conn_sock >= 0 ) {
samplesBlockHeader_t header= {t->typeStamp, nsamps, nbAnt, timestamp};
samplesBlockHeader_t header = {nsamps, nbAnt, timestamp};
fullwrite(b->conn_sock,&header, sizeof(header), t);
sample_t tmpSamples[nsamps][nbAnt];
for(int a=0; a<nbAnt; a++) {
sample_t *in=(sample_t *)samplesVoid[a];
if (nbAnt == 1) {
if (b->conn_sock >= 0) {
fullwrite(b->conn_sock, samplesVoid[0], sampleToByte(nsamps, nbAnt), t);
}
} else {
for (int a = 0; a < nbAnt; a++) {
sample_t *in = (sample_t *)samplesVoid[a];
for(int s=0; s<nsamps; s++)
tmpSamples[s][a]=in[s];
}
for (int s = 0; s < nsamps; s++)
tmpSamples[s][a] = in[s];
}
if (b->conn_sock >= 0 ) {
fullwrite(b->conn_sock, (void *)tmpSamples, sampleToByte(nsamps,nbAnt), t);
if (b->conn_sock >= 0) {
fullwrite(b->conn_sock, (void *)tmpSamples, sampleToByte(nsamps, nbAnt), t);
}
}
}
}
if ( t->lastWroteTS != 0 && fabs((double)t->lastWroteTS-timestamp) > (double)CirSize)
LOG_E(HW,"Discontinuous TX gap too large Tx:%lu, %lu\n", t->lastWroteTS, timestamp);
LOG_E(HW, "Discontinuous TX gap too large Tx:%lu, %lu\n", t->lastWroteTS, timestamp);
if (t->lastWroteTS > timestamp+nsamps)
LOG_E(HW,"Not supported to send Tx out of order (same in USRP) %lu, %lu\n",
t->lastWroteTS, timestamp);
LOG_E(HW, "Not supported to send Tx out of order (same in USRP) %lu, %lu\n", t->lastWroteTS, timestamp);
t->lastWroteTS=timestamp+nsamps;
if (!alreadyLocked)
pthread_mutex_unlock(&Sockmutex);
LOG_D(HW,"sent %d samples at time: %ld->%ld, energy in first antenna: %d\n",
nsamps, timestamp, timestamp+nsamps, signal_energy(samplesVoid[0], nsamps) );
LOG_D(HW,
"Sent %d samples at time: %ld->%ld, energy in first antenna: %d\n",
nsamps,
timestamp,
timestamp + nsamps,
signal_energy(samplesVoid[0], nsamps));
return nsamps;
}
static int rfsimulator_write(openair0_device *device, openair0_timestamp timestamp, void **samplesVoid, int nsamps, int nbAnt, int flags) {
return rfsimulator_write_internal(device->priv, timestamp, samplesVoid, nsamps, nbAnt, flags, false);
return rfsimulator_write_internal(device->priv, timestamp, samplesVoid, nsamps, nbAnt, flags, false); // false = with lock
// return rfsimulator_write_internal(device->priv, timestamp, samplesVoid, nsamps, nbAnt, flags, true);
}
static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initial) {
// Process all incoming events on sockets
// store the data in lists
struct epoll_event events[FD_SETSIZE]= {{0}};
int nfds = epoll_wait(t->epollfd, events, FD_SETSIZE, timeout);
struct epoll_event events[MAX_FD_RFSIMU] = {{0}};
int nfds = epoll_wait(t->epollfd, events, MAX_FD_RFSIMU, timeout);
if ( nfds==-1 ) {
if ( errno==EINTR || errno==EAGAIN ) {
return false;
} else
AssertFatal(false,"error in epoll_wait\n");
} else {
LOG_W(HW, "epoll_wait() failed, errno(%d)\n", errno);
return false;
}
}
for (int nbEv = 0; nbEv < nfds; ++nbEv) {
......@@ -668,19 +778,25 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
if (events[nbEv].events & EPOLLIN && fd == t->listen_sock) {
int conn_sock;
AssertFatal( (conn_sock = accept(t->listen_sock,NULL,NULL)) != -1, "");
setblocking(conn_sock, notBlocking);
allocCirBuf(t, conn_sock);
LOG_I(HW,"A client connected, sending the current time\n");
conn_sock = accept(t->listen_sock, NULL, NULL);
if (conn_sock == -1) {
LOG_E(HW, "accept() failed, errno(%d)\n", errno);
return false;
}
if (setblocking(conn_sock, notBlocking)) {
return false;
}
if (allocCirBuf(t, conn_sock) == -1) {
return false;
}
LOG_I(HW, "A client connects, sending the current time\n");
c16_t v= {0};
void *samplesVoid[t->tx_num_channels];
for ( int i=0; i < t->tx_num_channels; i++)
samplesVoid[i]=(void *)&v;
rfsimulator_write_internal(t, t->lastWroteTS > 1 ? t->lastWroteTS-1 : 0,
samplesVoid, 1,
t->tx_num_channels, 1, false);
rfsimulator_write_internal(t, t->lastWroteTS > 1 ? t->lastWroteTS - 1 : 0, samplesVoid, 1, t->tx_num_channels, 1, true);
} else {
if ( events[nbEv].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP) ) {
socketError(t,fd);
......@@ -690,7 +806,7 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
buffer_t *b=&t->buf[fd];
if ( b->circularBuf == NULL ) {
LOG_E(HW, "received data on not connected socket %d\n", events[nbEv].data.fd);
LOG_E(HW, "Received data on not connected socket %d\n", events[nbEv].data.fd);
continue;
}
......@@ -707,14 +823,14 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
if ( sz < 0 ) {
if ( errno != EAGAIN ) {
LOG_E(HW,"socket failed %s\n", strerror(errno));
LOG_E(HW, "recv() failed, errno(%d)\n", errno);
//abort();
}
} else if ( sz == 0 )
continue;
LOG_D(HW, "Socket rcv %zd bytes\n", sz);
AssertFatal((b->remainToTransfer-=sz) >= 0, "");
b->remainToTransfer -= sz;
b->transferPtr+=sz;
if (b->transferPtr==b->circularBufEnd )
......@@ -722,20 +838,18 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
// check the header and start block transfer
if ( b->headerMode==true && b->remainToTransfer==0) {
AssertFatal( (t->typeStamp == UE_MAGICDL && b->th.magic==ENB_MAGICDL) ||
(t->typeStamp == ENB_MAGICDL && b->th.magic==UE_MAGICDL), "Socket Error in protocol");
b->headerMode=false;
b->headerMode = false;
if ( t->nextRxTstamp == 0 ) { // First block in UE, resync with the eNB current TS
if (t->nextRxTstamp == 0) { // First block in UE, resync with the gNB current TS
t->nextRxTstamp=b->th.timestamp> nsamps_for_initial ?
b->th.timestamp - nsamps_for_initial :
0;
b->lastReceivedTS=b->th.timestamp> nsamps_for_initial ?
b->th.timestamp :
nsamps_for_initial;
LOG_W(HW,"UE got first timestamp: starting at %lu\n", t->nextRxTstamp);
LOG_D(HW, "UE got first timestamp: starting at %lu\n", t->nextRxTstamp);
b->trashingPacket=true;
} else if ( b->lastReceivedTS < b->th.timestamp) {
} else if (b->lastReceivedTS < b->th.timestamp) {
int nbAnt= b->th.nbAnt;
if ( b->th.timestamp-b->lastReceivedTS < CirSize ) {
......@@ -749,24 +863,21 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
memset(b->circularBuf, 0, sampleToByte(CirSize,1));
}
if (b->lastReceivedTS != 0 && b->th.timestamp-b->lastReceivedTS < 1000)
LOG_W(HW,"UEsock: %d gap of: %ld in reception\n", fd, b->th.timestamp-b->lastReceivedTS );
b->lastReceivedTS=b->th.timestamp;
} else if ( b->lastReceivedTS > b->th.timestamp && b->th.size == 1 ) {
LOG_W(HW,"Received Rx/Tx synchro out of order\n");
} else if (b->lastReceivedTS > b->th.timestamp && b->th.size == 1) {
LOG_W(HW, "Received Rx/Tx synchro out of order\n");
b->trashingPacket=true;
} else if ( b->lastReceivedTS == b->th.timestamp ) {
} else if (b->lastReceivedTS == b->th.timestamp) {
// normal case
} else {
LOG_E(HW, "received data in past: current is %lu, new reception: %lu!\n", b->lastReceivedTS, b->th.timestamp);
LOG_E(HW, "Received data in past: current is %lu, new reception: %lu!\n", b->lastReceivedTS, b->th.timestamp);
b->trashingPacket=true;
}
pthread_mutex_lock(&Sockmutex);
if (t->lastWroteTS != 0 && (fabs((double)t->lastWroteTS-b->lastReceivedTS) > (double)CirSize))
LOG_E(HW,"UEsock: %d Tx/Rx shift too large Tx:%lu, Rx:%lu\n", fd, t->lastWroteTS, b->lastReceivedTS);
LOG_E(HW, "UEsock(%d) Tx/Rx shift too large Tx:%lu, Rx:%lu\n", fd, t->lastWroteTS, b->lastReceivedTS);
pthread_mutex_unlock(&Sockmutex);
b->transferPtr=(char *)&b->circularBuf[(b->lastReceivedTS*b->th.nbAnt)%CirSize];
......@@ -776,15 +887,14 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
if ( b->headerMode==false ) {
if ( ! b->trashingPacket ) {
b->lastReceivedTS=b->th.timestamp+b->th.size-byteToSample(b->remainToTransfer,b->th.nbAnt);
LOG_D(HW,"UEsock: %d Set b->lastReceivedTS %ld\n", fd, b->lastReceivedTS);
LOG_D(HW, "UEsock: %d Set b->lastReceivedTS %ld\n", fd, b->lastReceivedTS);
}
if ( b->remainToTransfer==0) {
LOG_D(HW,"UEsock: %d Completed block reception: %ld\n", fd, b->lastReceivedTS);
LOG_D(HW, "UEsock: %d Completed block reception: %ld\n", fd, b->lastReceivedTS);
b->headerMode=true;
b->transferPtr=(char *)&b->th;
b->remainToTransfer=sizeof(samplesBlockHeader_t);
b->th.magic=-1;
b->remainToTransfer = sizeof(samplesBlockHeader_t);
b->trashingPacket=false;
}
}
......@@ -794,25 +904,23 @@ static bool flushInput(rfsimulator_state_t *t, int timeout, int nsamps_for_initi
return nfds>0;
}
static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimestamp, void **samplesVoid, int nsamps, int nbAnt) {
if (nbAnt > 4) {
LOG_W(HW, "rfsimulator: only 4 antenna tested\n");
}
static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimestamp, void **samplesVoid, int nsamps, int nbAnt)
{
rfsimulator_state_t *t = device->priv;
LOG_D(HW, "Enter rfsimulator_read, expect %d samples, will release at TS: %ld, nbAnt %d\n", nsamps, t->nextRxTstamp+nsamps, nbAnt);
// deliver data from received data
// check if a UE is connected
int first_sock;
for (first_sock=0; first_sock<FD_SETSIZE; first_sock++)
for (first_sock = 0; first_sock < MAX_FD_RFSIMU; first_sock++)
if (t->buf[first_sock].circularBuf != NULL )
break;
if ( first_sock == FD_SETSIZE ) {
if (first_sock == MAX_FD_RFSIMU) {
// no connected device (we are eNB, no UE is connected)
if ( t->nextRxTstamp == 0)
LOG_W(HW,"No connected device, generating void samples...\n");
LOG_W(HW, "No connected device, generating void samples...\n");
if (!flushInput(t, t->wait_timeout, nsamps)) {
for (int x=0; x < nbAnt; x++)
......@@ -821,20 +929,20 @@ static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimest
t->nextRxTstamp+=nsamps;
if ( ((t->nextRxTstamp/nsamps)%100) == 0)
LOG_D(HW,"No UE, Generated void samples for Rx: %ld\n", t->nextRxTstamp);
LOG_D(HW, "No UE, Generating void samples for Rx: %ld\n", t->nextRxTstamp);
*ptimestamp = t->nextRxTstamp-nsamps;
return nsamps;
}
} else {
bool have_to_wait;
do {
have_to_wait=false;
for ( int sock=0; sock<FD_SETSIZE; sock++) {
buffer_t *b=&t->buf[sock];
buffer_t *b = NULL;
for (int sock = 0; sock < MAX_FD_RFSIMU; sock++) {
b = &t->buf[sock];
if ( b->circularBuf )
if ( t->nextRxTstamp+nsamps > b->lastReceivedTS ) {
......@@ -843,12 +951,13 @@ static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimest
}
}
if (have_to_wait)
/*printf("Waiting on socket, current last ts: %ld, expected at least : %ld\n",
ptr->lastReceivedTS,
t->nextRxTstamp+nsamps);
*/
flushInput(t, 3, nsamps);
if (have_to_wait) {
LOG_D(HW,
"Waiting on socket, current last ts: %ld, expected at least : %ld\n",
b->lastReceivedTS,
t->nextRxTstamp + nsamps);
flushInput(t, 1000, nsamps); // was 3
}
} while (have_to_wait);
}
......@@ -857,7 +966,7 @@ static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimest
memset(samplesVoid[a],0,sampleToByte(nsamps,1));
// Add all input nodes signal in the output buffer
for (int sock=0; sock<FD_SETSIZE; sock++) {
for (int sock = 0; sock < MAX_FD_RFSIMU; sock++) {
buffer_t *ptr=&t->buf[sock];
if ( ptr->circularBuf ) {
......@@ -881,22 +990,35 @@ static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimest
CirSize);
}
else { // no channel modeling
double H_awgn_mimo[4][4] ={{1.0, 0.2, 0.1, 0.05}, //rx 0
{0.2, 1.0, 0.2, 0.1}, //rx 1
{0.1, 0.2, 1.0, 0.2}, //rx 2
{0.05, 0.1, 0.2, 1.0}};//rx 3
sample_t *out=(sample_t *)samplesVoid[a];
int nbAnt_tx = ptr->th.nbAnt;//number of Tx antennas
//LOG_I(HW, "nbAnt_tx %d\n",nbAnt_tx);
for (int i=0; i < nsamps; i++) {//loop over nsamps
for (int a_tx=0; a_tx<nbAnt_tx; a_tx++) { //sum up signals from nbAnt_tx antennas
out[i].r += (short)(ptr->circularBuf[((t->nextRxTstamp+i)*nbAnt_tx+a_tx)%CirSize].r*H_awgn_mimo[a][a_tx]);
out[i].i += (short)(ptr->circularBuf[((t->nextRxTstamp+i)*nbAnt_tx+a_tx)%CirSize].i*H_awgn_mimo[a][a_tx]);
} // end for a_tx
} // end for i (number of samps)
int nbAnt_tx = ptr->th.nbAnt; // number of Tx antennas
if (nbAnt_tx == 1) { // optimized for 1 Tx
sample_t *out = (sample_t *)samplesVoid[a];
int firstIndex = t->nextRxTstamp % CirSize;
sample_t *firstSample = (sample_t *)&(ptr->circularBuf[firstIndex]);
if (firstIndex + nsamps > CirSize) {
int tailSz = CirSize - firstIndex;
memcpy(out, firstSample, sampleToByte(tailSz, nbAnt_tx));
memcpy(out + tailSz, &ptr->circularBuf[0], sampleToByte(nsamps - tailSz, nbAnt_tx));
} else {
memcpy(out, firstSample, nsamps * 4);
}
} else {
// SIMD (with simde) optimization might be added here later
double H_awgn_mimo[4][4] = {{1.0, 0.2, 0.1, 0.05}, // rx 0
{0.2, 1.0, 0.2, 0.1}, // rx 1
{0.1, 0.2, 1.0, 0.2}, // rx 2
{0.05, 0.1, 0.2, 1.0}}; // rx 3
sample_t *out = (sample_t *)samplesVoid[a];
LOG_D(HW, "nbAnt_tx %d\n", nbAnt_tx);
for (int i = 0; i < nsamps; i++) { // loop over nsamps
for (int a_tx = 0; a_tx < nbAnt_tx; a_tx++) { // sum up signals from nbAnt_tx antennas
out[i].r += (short)(ptr->circularBuf[((t->nextRxTstamp + i) * nbAnt_tx + a_tx) % CirSize].r * H_awgn_mimo[a][a_tx]);
out[i].i += (short)(ptr->circularBuf[((t->nextRxTstamp + i) * nbAnt_tx + a_tx) % CirSize].i * H_awgn_mimo[a][a_tx]);
} // end for a_tx
} // end for i (number of samps)
} // end of 1 tx antenna optimization
} // end of no channel modeling
} // end for a (number of rx antennas)
}
......@@ -904,9 +1026,11 @@ static int rfsimulator_read(openair0_device *device, openair0_timestamp *ptimest
*ptimestamp = t->nextRxTstamp; // return the time of the first sample
t->nextRxTstamp+=nsamps;
LOG_D(HW,"Rx to upper layer: %d from %ld to %ld, energy in first antenna %d\n",
LOG_D(HW,
"Rx to upper layer: %d from %ld to %ld, energy in first antenna %d\n",
nsamps,
*ptimestamp, t->nextRxTstamp,
*ptimestamp,
t->nextRxTstamp,
signal_energy(samplesVoid[0], nsamps));
return nsamps;
}
......@@ -919,7 +1043,7 @@ static int rfsimulator_reset_stats(openair0_device *device) {
}
static void rfsimulator_end(openair0_device *device) {
rfsimulator_state_t* s = device->priv;
for (int i = 0; i < FD_SETSIZE; i++) {
for (int i = 0; i < MAX_FD_RFSIMU; i++) {
buffer_t *b = &s->buf[i];
if (b->conn_sock >= 0 )
close(b->conn_sock);
......@@ -942,19 +1066,20 @@ __attribute__((__visibility__("default")))
int device_init(openair0_device *device, openair0_config_t *openair0_cfg) {
// to change the log level, use this on command line
// --log_config.hw_log_level debug
rfsimulator_state_t *rfsimulator = (rfsimulator_state_t *)calloc(sizeof(rfsimulator_state_t),1);
rfsimulator_state_t *rfsimulator = calloc(sizeof(rfsimulator_state_t), 1);
// initialize channel simulation
rfsimulator->tx_num_channels=openair0_cfg->tx_num_channels;
rfsimulator->rx_num_channels=openair0_cfg->rx_num_channels;
rfsimulator->sample_rate=openair0_cfg->sample_rate;
rfsimulator->tx_bw=openair0_cfg->tx_bw;
rfsimulator_readconfig(rfsimulator);
LOG_W(HW, "rfsim: sample_rate %f\n", rfsimulator->sample_rate);
LOG_W(HW, "sample_rate %f\n", rfsimulator->sample_rate);
pthread_mutex_init(&Sockmutex, NULL);
LOG_I(HW,"rfsimulator: running as %s\n", rfsimulator-> typeStamp == ENB_MAGICDL ? "server waiting opposite rfsimulators to connect" : "client: will connect to a rfsimulator server side");
device->trx_start_func = rfsimulator->typeStamp == ENB_MAGICDL ?
startServer :
startClient;
LOG_I(HW,
"Running as %s\n",
rfsimulator->role == SIMU_ROLE_SERVER ? "server waiting opposite rfsimulators to connect"
: "client: will connect to a rfsimulator server side");
device->trx_start_func = rfsimulator->role == SIMU_ROLE_SERVER ? startServer : startClient;
device->trx_get_stats_func = rfsimulator_get_stats;
device->trx_reset_stats_func = rfsimulator_reset_stats;
device->trx_end_func = rfsimulator_end;
......@@ -970,11 +1095,10 @@ int device_init(openair0_device *device, openair0_config_t *openair0_cfg) {
device->priv = rfsimulator;
device->trx_write_init = rfsimulator_write_init;
for (int i=0; i<FD_SETSIZE; i++)
for (int i = 0; i < MAX_FD_RFSIMU; i++)
rfsimulator->buf[i].conn_sock=-1;
AssertFatal((rfsimulator->epollfd = epoll_create1(0)) != -1,"");
AssertFatal((rfsimulator->epollfd = epoll_create1(0)) != -1, "epoll_create1() failed, errno(%d)", errno);
// we need to call randominit() for telnet server (use gaussdouble=>uniformrand)
randominit(0);
set_taus_seed(0);
......
......@@ -186,14 +186,10 @@ int main(int argc, char *argv[]) {
serviceSock=client_start(argv[2],atoi(argv[3]));
}
uint64_t typeStamp=ENB_MAGICDL;
bool raw = false;
if ( argc == 5 ) {
raw=true;
if (strcmp(argv[4],"UL") == 0 )
typeStamp=UE_MAGICDL;
}
samplesBlockHeader_t header;
......@@ -213,7 +209,6 @@ int main(int argc, char *argv[]) {
setblocking(serviceSock, blocking);
if ( raw ) {
header.magic=typeStamp;
header.size=blockSize;
header.nbAnt=1;
header.timestamp=timestamp;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment