Commit 2b426c0c authored by Raymond Knopp's avatar Raymond Knopp

added threadPool for IF5 TX path

parent c5fc94a3
......@@ -276,6 +276,7 @@ int connect_rau(RU_t *ru) {
void fh_if5_south_out(RU_t *ru, int frame, int slot, uint64_t timestamp) {
if (ru == RC.ru[0]) VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME( VCD_SIGNAL_DUMPER_VARIABLES_TRX_TST, ru->proc.timestamp_tx&0xffffffff );
int offset = ru->nr_frame_parms->get_samples_slot_timestamp(slot,ru->nr_frame_parms,0);
return;
start_meas(&ru->tx_fhaul);
for (int aid=0;aid<ru->nb_tx;aid++)
ru->ifdevice.trx_write_func2(&ru->ifdevice,
......@@ -1276,9 +1277,9 @@ void *ru_thread( void *param ) {
memcpy((void *)&ru->config,(void *)&RC.gNB[0]->gNB_config,sizeof(ru->config));
if(emulate_rf) {
fill_rf_config(ru,ru->rf_config_file);
nr_init_frame_parms(&ru->config, fp);
nr_dump_frame_parms(fp);
fill_rf_config(ru,ru->rf_config_file);
nr_phy_init_RU(ru);
if (setup_RU_buffers(ru)!=0) {
......@@ -1870,6 +1871,7 @@ void set_function_spec_param(RU_t *ru) {
ru->ifdevice.eth_params = &ru->eth_params;
ru->ifdevice.configure_rru = configure_ru;
printf("starting transport : rx_num_antennas %d, tx_num_antennas %d\n",ru->openair0_cfg.rx_num_channels,ru->openair0_cfg.tx_num_channels);
ret = openair0_transport_load(&ru->ifdevice,&ru->openair0_cfg,&ru->eth_params);
printf("openair0_transport_init returns %d for ru_id %u\n", ret, ru->idx);
......@@ -1990,7 +1992,8 @@ void init_NR_RU(char *rf_config_file) {
}
}
}
ru->openair0_cfg.rx_num_channels = ru->nb_rx;
ru->openair0_cfg.tx_num_channels = ru->nb_tx;
//LOG_I(PHY,"Initializing RRU descriptor %d : (%s,%s,%d)\n",ru_id,ru_if_types[ru->if_south],NB_timing[ru->if_timing],ru->function);
set_function_spec_param(ru);
LOG_I(PHY,"Starting ru_thread %d\n",ru_id);
......
......@@ -37,6 +37,7 @@
#include <sys/types.h>
#include <openair1/PHY/TOOLS/tools_defs.h>
#include "record_player.h"
#include <common/utils/threadPool/thread-pool.h>
/* default name of shared library implementing the radio front end */
#define OAI_RF_LIBNAME "oai_device"
......@@ -325,6 +326,7 @@ typedef struct fhstate_s {
openair0_timestamp olddeltaTS[8];
openair0_timestamp oldTS[8];
openair0_timestamp TS_read;
int first_read;
uint32_t *buff[8];
uint32_t buff_size;
int r[8];
......@@ -368,6 +370,12 @@ struct openair0_device_t {
/*!brief pointer to FH state, used in ECPRI split 8*/
fhstate_t fhstate;
/*!brief threadpool for UDP write*/
tpool_t *threadPool;
/*!brief message response for notification fifo*/
notifiedFIFO_t *respudpTX;
/*!brief Used in ECPRI split 8 to indicate numerator of sampling rate ratio*/
int sampling_rate_ratio_n;
......
......@@ -152,11 +152,11 @@ int eth_socket_init_udp(openair0_device *device) {
printf("ETHERNET: Cannot set SO_REUSEPORT option on socket (user %d)",i);
exit(0);
}
#ifdef SO_ATTACH_REUSEPORT_EBPF
#ifdef SO_ATTACH_REUSEPORT_EBPF*/
struct sock_filter code[]={
{ BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU },
/* return A */
{ BPF_RET | BPF_A, 0, 0, 0 },
{ BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU }, // A = #cpu
{ BPF_ALU | BPF_MOD | BPF_K, 0, 0, eth->num_fd}, // A = A % group_size
{ BPF_RET | BPF_A, 0, 0, 0 }, // return A
};
struct sock_fprog bpf = {
.len = sizeof(code)/sizeof(struct sock_filter),
......@@ -166,6 +166,7 @@ int eth_socket_init_udp(openair0_device *device) {
printf("ETHERNET: Cannot set SO_ATTACH_REUSEPORT_EBPF option on socket (user %d)",i);
exit(0);
}
else printf("ETHERNET: set SO_ATTACH_REUSEPORT_EBPF option on socket (user %d)\n",i);
#endif
}
......@@ -205,7 +206,7 @@ int trx_eth_read_udp_IF4p5(openair0_device *device, openair0_timestamp *timestam
while(bytes_received == -1) {
again:
bytes_received = recvfrom(eth->sockfdd[cc%eth->num_fd],
bytes_received = recvfrom(eth->sockfdd[0/*cc%eth->num_fd*/],
buff[0],
packet_size,
0,
......@@ -236,7 +237,6 @@ int trx_eth_read_udp_IF4p5(openair0_device *device, openair0_timestamp *timestam
eth->rx_count++;
}
}
eth->rx_nsamps = nsamps;
return(bytes_received);
}
......@@ -271,7 +271,7 @@ int trx_eth_write_udp_IF4p5(openair0_device *device, openair0_timestamp timestam
eth->tx_nsamps = nblocks;
bytes_sent = sendto(eth->sockfdd[cc%eth->num_fd],
bytes_sent = sendto(eth->sockfdd[0/*cc%eth->num_fd*/],
buff[0],
packet_size,
0,
......@@ -289,7 +289,17 @@ int trx_eth_write_udp_IF4p5(openair0_device *device, openair0_timestamp timestam
return (bytes_sent);
}
int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, void *buff, int aid, int nsamps, int flags) {
void *trx_eth_write_udp_cmd(void *arg) {
udpTXelem_t *udpTXelem=(udpTXelem_t*)arg;
openair0_device *device=udpTXelem->device;
openair0_timestamp timestamp = udpTXelem->timestamp;
void *buff = udpTXelem->buff;
int aid = udpTXelem->buff;
int nsamps = udpTXelem->nsamps;
int bytes_sent=0;
eth_state_t *eth = (eth_state_t*)device->priv;
......@@ -365,7 +375,7 @@ int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, voi
#endif
/* Send packet */
bytes_sent = sendto(eth->sockfdd[aid%eth->num_fd],
bytes_sent = sendto(eth->sockfdd[0/*aid%eth->num_fd*/],
buff2,
UDP_PACKET_SIZE_BYTES(len>>2),
sendto_flag,
......@@ -381,9 +391,22 @@ int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, voi
eth->tx_count++;
}
}
return (bytes_sent-APP_HEADER_SIZE_BYTES)>>2;
}
int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, void *buff, int aid, int nsamps, int flags) {
union udpTXReqUnion id = {.s={(uint64_t)timestamp,aid,nsamps,0}};
notifiedFIFO_elt_t *req=newNotifiedFIFO_elt(sizeof(udpTXelem_t), id.p, device->respudpTX, trx_eth_write_udp_cmd);
udpTXelem_t * udptxelem=(udpTXelem_t *) NotifiedFifoData(req);
udptxelem->device = device;
udptxelem->timestamp = timestamp;
udptxelem->buff = buff;
udptxelem->aid = aid;
udptxelem->nsamps = nsamps;
udptxelem->flags = flags;
pushTpool(device->threadPool,req);
}
extern int oai_exit;
void *udp_read_thread(void *arg) {
......@@ -394,11 +417,11 @@ void *udp_read_thread(void *arg) {
openair0_device *device=u->device;
fhstate_t *fhstate = &device->fhstate;
char buffer[UDP_PACKET_SIZE_BYTES(256)];
int first_read=0;
while (oai_exit == 0) {
LOG_I(PHY,"UDP read thread %d, waiting for start sampling_rate_d %d, sampling_rate_n %d\n",u->thread_id,device->sampling_rate_ratio_n,device->sampling_rate_ratio_d);
while (fhstate->active > 0) {
size_t count = recvfrom(((eth_state_t*)device->priv)->sockfdd[u->thread_id],
size_t count = recvfrom(((eth_state_t*)device->priv)->sockfdd[0],
buffer,sizeof(buffer),0,
(struct sockaddr *)&((eth_state_t*)device->priv)->dest_addrd,
(socklen_t *)&((eth_state_t*)device->priv)->addr_len);
......@@ -409,7 +432,8 @@ void *udp_read_thread(void *arg) {
if ((int)count <= 0) continue;
AssertFatal(aid < 8,"Cannot handle more than 8 antennas, got aid %d\n",aid);
fhstate->r[aid]=1;
if (aid==0 && fhstate->TS[0] == 0) fhstate->TS0 = TS;
if (aid==0 && first_read == 0) fhstate->TS0 = TS;
first_read = 1;
/* store the timestamp value from packet's header */
fhstate->TS[aid] = TS;
int64_t offset = TS - fhstate->TS0;
......@@ -420,6 +444,7 @@ void *udp_read_thread(void *arg) {
memcpy((void*)(device->openair0_cfg->rxbase[aid]+offset),
(void*)&buffer[APP_HEADER_SIZE_BYTES],
count-APP_HEADER_SIZE_BYTES);
LOG_D(PHY,"thread_id %d, aid %d, TS %llu, TS0 %llu, offset %d\n",u->thread_id,aid,(unsigned long long)TS,fhstate->TS0,offset);
}
sleep(1);
}
......@@ -438,14 +463,26 @@ int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uin
min_TS = fhstate->TS[0];
for (int i=1;i<device->openair0_cfg->rx_num_channels;i++) min_TS = min(min_TS,fhstate->TS[i]);
// poll/sleep until we accumulated enough samples on each antenna port
while (min_TS < fhstate->TS0+prev_read_TS + nsamps) {
int count=0;
while (fhstate->first_read == 1 && min_TS < (fhstate->TS0+prev_read_TS + nsamps)) {
usleep(100);
min_TS = fhstate->TS[0];
for (int i=1;i<device->openair0_cfg->rx_num_channels;i++) min_TS = min(min_TS,fhstate->TS[i]);
count++;
}
*timestamp = fhstate->TS_read;
fhstate->TS_read = prev_read_TS + nsamps;
if (fhstate->first_read == 0) {
*timestamp = min_TS - fhstate->TS0;
fhstate->TS_read = *timestamp+nsamps;
LOG_D(PHY,"first read : TS_read %llu, TS %llu state (%d,%d,%d,%d,%d,%d,%d,%d)\n",fhstate->TS_read,*timestamp,
fhstate->r[0],fhstate->r[1],fhstate->r[2],fhstate->r[3],fhstate->r[4],fhstate->r[5],fhstate->r[6],fhstate->r[7]);
}
else {
*timestamp = fhstate->TS_read;
fhstate->TS_read = prev_read_TS + nsamps;
LOG_D(PHY,"TS_read %llu, min_TS %llu, prev_read_TS %llu, nsamps %d, fhstate->TS0+prev_TS+nsamps %llu, wait count %d x 100us\n",fhstate->TS_read,min_TS,prev_read_TS,nsamps,fhstate->TS0+prev_read_TS+nsamps,count);
}
fhstate->first_read = 1;
return (nsamps);
}
......
......@@ -64,7 +64,7 @@ int trx_eth_start(openair0_device *device)
AssertFatal(device->thirdparty_init != NULL, "device->thirdparty_init is null\n");
AssertFatal(device->thirdparty_init(device) == 0, "third-party init failed\n");
device->openair0_cfg->samples_per_packet = 256;
eth->num_fd = max(device->openair0_cfg->rx_num_channels,device->openair0_cfg->tx_num_channels);
eth->num_fd = 1; //max(device->openair0_cfg->rx_num_channels,device->openair0_cfg->tx_num_channels);
printf("Creating %d UDP threads ...\n",device->openair0_cfg->rx_num_channels);
udp_read_t *u[device->openair0_cfg->rx_num_channels];
for (int i=0;i<device->openair0_cfg->rx_num_channels;i++) {
......@@ -112,6 +112,20 @@ int trx_eth_start(openair0_device *device)
}
else AssertFatal(1==0,"num_rb_dl %d not ok with ECPRI\n",device->openair0_cfg->num_rb_dl);
}
int threadCnt = device->openair0_cfg->tx_num_channels;
if (threadCnt < 2) LOG_E(PHY,"Number of threads for gNB should be more than 1. Allocated only %d\n",threadCnt);
char pool[80];
sprintf(pool,"-1");
int s_offset = 0;
for (int icpu=1; icpu<threadCnt; icpu++) {
sprintf(pool+2+s_offset,",-1");
s_offset += 3;
}
device->threadPool = (tpool_t*)malloc(sizeof(tpool_t));
initTpool(pool, device->threadPool, cpumeas(CPUMEAS_GETSTATE));
// ULSCH decoder result FIFO
device->respudpTX = (notifiedFIFO_t*) malloc(sizeof(notifiedFIFO_t));
initNotifiedFIFO(device->respudpTX);
}
/* initialize socket */
if (eth->flags == ETH_RAW_MODE) {
......@@ -502,6 +516,7 @@ int transport_init(openair0_device *device,
device->trx_ctlsend_func = trx_eth_ctlsend_udp;
device->trx_ctlrecv_func = trx_eth_ctlrecv_udp;
memset((void*)&device->fhstate,0,sizeof(device->fhstate));
printf("Setting %d RX channels to waiting\n",openair0_cfg->rx_num_channels);
for (int i=openair0_cfg->rx_num_channels;i<8;i++) device->fhstate.r[i] = 1;
} else if (eth->flags == ETH_RAW_IF4p5_MODE) {
device->trx_write_func = trx_eth_write_raw_IF4p5;
......
......@@ -41,6 +41,7 @@
#include <sys/socket.h>
#include <net/if.h>
#include <netinet/ether.h>
#include <common/utils/threadPool/thread-pool.h>
#define MAX_INST 4
#define DEFAULT_IF "lo"
......@@ -209,6 +210,27 @@ typedef struct {
short q;
} iqoai_t ;
typedef struct udpTXelem_s {
openair0_device *device;
openair0_timestamp timestamp;
void *buff;
int aid;
int nsamps;
int flags;
} udpTXelem_t;
struct udpTXReqId {
uint64_t TS;
int aid;
int length;
uint16_t spare;
} __attribute__((packed));
union udpTXReqUnion {
struct udpTXReqId s;
uint64_t p;
};
void dump_packet(char *title, unsigned char* pkt, int bytes, unsigned int tx_rx_flag);
unsigned short calc_csum (unsigned short *buf, int nwords);
void dump_dev(openair0_device *device);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment