Commit 8fdf4af6 authored by Raymond Knopp's avatar Raymond Knopp

added multiple thread support for ECPRI/IF5 interface. Uses SO_REUSEPORT for...

added multiple thread support for ECPRI/IF5 interface. Uses SO_REUSEPORT for listening with multiple sockets in different threads
parent 070b663a
......@@ -306,7 +306,7 @@ void fh_if5_south_in(RU_t *ru,
int is_rx=1;
int slot_type = nr_slot_select(cfg,*frame,*tti);
if (slot_type == NR_DOWNLINK_SLOT) is_rx=0;
recv_IF5(ru, &proc->timestamp_rx, *tti, IF5_RRH_GW_UL,is_rx);
ru->ifdevice.trx_read_func2(&ru->ifdevice,&proc->timestamp_rx,NULL,fp->get_samples_per_slot(*tti,fp));
if (proc->first_rx == 1) ru->ts_offset = proc->timestamp_rx;
proc->frame_rx = ((proc->timestamp_rx-ru->ts_offset) / (fp->samples_per_subframe*10))&1023;
proc->tti_rx = fp->get_slot_from_timestamp(proc->timestamp_rx-ru->ts_offset,fp);
......@@ -1029,6 +1029,7 @@ void fill_rf_config(RU_t *ru, char *rf_config_file) {
cfg->num_rb_dl=N_RB;
cfg->tx_num_channels=ru->nb_tx;
cfg->rx_num_channels=ru->nb_rx;
LOG_I(PHY,"Setting RF config for N_RB %d, NB_RX %d, NB_TX %d\n",cfg->num_rb_dl,cfg->rx_num_channels,cfg->tx_num_channels);
for (i=0; i<ru->nb_tx; i++) {
if (ru->if_frequency == 0) {
......@@ -1272,7 +1273,7 @@ void *ru_thread( void *param ) {
// Start IF device if any
if (ru->nr_start_if) {
LOG_I(PHY,"Starting IF interface for RU %d\n",ru->idx);
LOG_I(PHY,"Starting IF interface for RU %d, nb_rx %d\n",ru->idx,ru->nb_rx);
AssertFatal(ru->nr_start_if(ru,NULL) == 0, "Could not start the IF device\n");
if (ru->has_ctrl_prt > 0) {
......@@ -1362,7 +1363,7 @@ void *ru_thread( void *param ) {
}
// synchronization on input FH interface, acquire signals/data and block
LOG_D(PHY,"[RU_thread] read data: frame_rx = %d, tti_rx = %d\n", frame, slot);
LOG_I(PHY,"[RU_thread] read data: frame_rx = %d, tti_rx = %d\n", frame, slot);
if (ru->fh_south_in) ru->fh_south_in(ru,&frame,&slot);
else AssertFatal(1==0, "No fronthaul interface at south port");
......@@ -1495,6 +1496,8 @@ int start_streaming(RU_t *ru) {
}
int nr_start_if(struct RU_t_s *ru, struct PHY_VARS_gNB_s *gNB) {
for (int i=0;i<ru->nb_rx;i++) ru->openair0_cfg.rxbase[i] = ru->common.rxdata[i];
ru->openair0_cfg.rxsize = ru->nr_frame_parms->samples_per_subframe*10;
return(ru->ifdevice.trx_start_func(&ru->ifdevice));
}
......@@ -1845,6 +1848,7 @@ void set_function_spec_param(RU_t *ru) {
ru->ifdevice.host_type = RAU_HOST;
ru->ifdevice.eth_params = &ru->eth_params;
ru->ifdevice.configure_rru = configure_ru;
ret = openair0_transport_load(&ru->ifdevice,&ru->openair0_cfg,&ru->eth_params);
printf("openair0_transport_init returns %d for ru_id %u\n", ret, ru->idx);
......
......@@ -1362,14 +1362,15 @@ void recv_IF5(RU_t *ru, openair0_timestamp *proc_timestamp, int tti, uint16_t pa
for (packet_id=0; packet_id < ru->nb_rx*siglen / spp_eth; packet_id++) {
//VCD_SIGNAL_DUMPER_DUMP_VARIABLE_BY_NAME( VCD_SIGNAL_DUMPER_VARIABLES_SEND_IF5_PKT_ID, packet_id );
//VCD_SIGNAL_DUMPER_DUMP_FUNCTION_BY_NAME( VCD_SIGNAL_DUMPER_FUNCTIONS_TRX_READ_IF0, 1 );
for (i=0;i<ru->nb_rx;i++) _mm_prefetch((char*)rxp[i][packet_id*spp_eth],_MM_HINT_NTA);
clock_gettime( CLOCK_MONOTONIC, &if_time);
timein[packet_id] = if_time.tv_nsec;
ru->ifdevice.trx_read_func2(&ru->ifdevice,
/* ru->ifdevice.trx_read_func2(&ru->ifdevice,
&timestamp[packet_id],
(is_rx>0) ? rxp : NULL,
spp_eth,
packet_id,
&aid);
&aid);*/
clock_gettime( CLOCK_MONOTONIC, &if_time);
timeout[packet_id] = if_time.tv_nsec;
timestamp[packet_id] = (timestamp[packet_id]*factor_n)/factor_d;
......@@ -1380,7 +1381,7 @@ void recv_IF5(RU_t *ru, openair0_timestamp *proc_timestamp, int tti, uint16_t pa
if (firstTS==1) firstTS=0;
else if (oldTS + spp_eth != timestamp[packet_id]) {
LOG_I(PHY,"oldTS %llu, newTS %llu, diff %llu, timediff %ld\n",(long long unsigned int)oldTS,(long long unsigned int)timestamp[packet_id],(long long unsigned int)timestamp[packet_id]-oldTS,-timein[packet_id]+timeout[packet_id]);
for (int i=0;i<=packet_id;i++) LOG_I(PHY,"packet %d aid %d TS %llu, timediff %ld\n",i,aid_list[i],(long long unsigned int)timestamp[i],-timein[i]+timeout[i]);
for (int i=0;i<=packet_id;i++) LOG_I(PHY,"packet %d/%d aid %d TS %llu, timediff %ld\n",i,ru->nb_rx*siglen / spp_eth,aid_list[i],(long long unsigned int)timestamp[i],-timein[i]+timeout[i]);
AssertFatal(1==0,"fronthaul problem\n");
}
......
......@@ -16,16 +16,18 @@
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <net/if.h>
#include <netinet/ether.h>
#include <unistd.h>
#include <errno.h>
#include <linux/sysctl.h>
#include <sys/sysctl.h>
#include <pthread.h>
#include "common_lib.h"
#include "ethernet_lib.h"
#include "common/utils/system.h"
#include "ori.h"
#include "targets/ARCH/COMMON/common_lib.h"
......@@ -221,42 +223,6 @@ int aw2s_oricleanup(openair0_device *device) {
return(0);
}
typedef struct fhstate_s {
int Npackets;
openair0_timestamp olddeltaTS;
openair0_timestamp oldTS;
int fhjumps;
openair0_device *device;
int r0;
int r1;
int r2;
int r3;
} fhstate_t;
openair0_timestamp TS;
int64_t deltaTS,olddeltaTS=-1;
int aid,r0=0;
r1=(openair0_cfg->rx_num_channels > 1) ? 0 : 1;
for (i=0;i<fhstate->Npackets;i++) {
device->trx_read_func2(fhstate->device,
fhstate->TS,
NULL,
256,
0,
&aid);
if (aid == 0) fhstate->r0=1;
if (aid == 1) fhstate->r1=1;
if (aid == 2) fhstate->r2=1;
if (aid == 3) fhstate->r3=1;
if (aid==0) {
deltaTS = TS-oldTS;
fhstate-> oldTS=TS;
if (deltaTS != fhstate->olddeltaTS && fhstate->olddeltaTS > 0) {
fhstate->fhjumps++;
}
if (i>0) fhstate->olddeltaTS = deltaTS;
}
}
int aw2s_startstreaming(openair0_device *device) {
ORI_s * ori = (ORI_s*)device->thirdparty_priv;
......@@ -447,55 +413,8 @@ int aw2s_startstreaming(openair0_device *device) {
}
printf("ORI_ObjectStateModify: %s\n", ORI_Result_Print(RE_result));
}
/*
while (rx0->fst != ORI_FST_Operational ||
(openair0_cfg->rx_num_channels > 1 && rx1->fst != ORI_FST_Operational) ||
(openair0_cfg->rx_num_channels > 2 && rx2->fst != ORI_FST_Operational) ||
(openair0_cfg->rx_num_channels > 3 && rx3->fst != ORI_FST_Operational))
{}
*/
// test RX interface
uint64_t TS;
int64_t deltaTS,olddeltaTS=-1;
int aid,r0=0,r1=(openair0_cfg->rx_num_channels > 1) ? 0 : 1;
int r2=(openair0_cfg->rx_num_channels > 2) ? 0 : 1;
int r3=(openair0_cfg->rx_num_channels > 3) ? 0 : 1;
int i;
fhstate_t fhstate;
fhstate.Npackets=1024000;
fhstate.oldTS=0;
fhstate.fhjumps=0;
fhstate.r0=0;
fhstate.r1=(openair0_cfg->rx_num_channels > 1) ? 0 : 1;
fhstate.r2=(openair0_cfg->rx_num_channels > 2) ? 0 : 1;
fhstate.r3=(openair0_cfg->rx_num_channels > 3) ? 0 : 1;
for (i=0;i<fhstate->Npackets;i++) {
device->trx_read_func2(fhstate->device,
(openair0_timestamp*)&TS,
NULL,
256,
0,
&aid);
if (aid == 0) fhstate->r0=1;
if (aid == 1) fhstate->r1=1;
if (aid == 2) fhstate->r2=1;
if (aid == 3) fhstate->r3=1;
if (aid==0) {
deltaTS = TS-fhstate->oldTS;
fhstate->oldTS=TS;
if (deltaTS != fhstate->olddeltaTS && fhstate->olddeltaTS > 0) {
fhstate->fhjumps++;
}
if (i>0) fhstate->olddeltaTS = deltaTS;
}
}
if (fhstate.r0==1 && fhstate.r1==1 && fhstate.r2==1 && fhstate.r3==1) printf("Streaming started, returning to OAI, fhjumps %d\n",fhjumps);
else {
printf("Didn't get anything from one antenna port after %d packets %d,%d,%d,%d\n",fhstate.Npackets,fhstate.r0,fhstate.r1,fhstate.r2,fhstater3);
return(-1);
}
device->fhstate.active=1;
return(0);
}
......@@ -977,7 +896,7 @@ int aw2s_oriinit(openair0_device *device) {
int transport_init(openair0_device *device, openair0_config_t *openair0_cfg, eth_params_t * eth_params ) {
printf("Initializing AW2S (%p,%p,%p)\n",aw2s_oriinit,aw2s_oricleanup,aw2s_startstreaming);
device->thirdparty_init = aw2s_oriinit;
device->thirdparty_cleanup = aw2s_oricleanup;
device->thirdparty_startstreaming = aw2s_startstreaming;
......
......@@ -151,6 +151,12 @@ typedef enum {
gpsdo=2
} clock_source_t;
/*! \brief Structure used for initializing UDP read threads */
typedef struct {
openair0_device *device;
int thread_id;
pthread_t pthread;
} udp_read_t;
/*! \brief RF frontend parameters set by application */
typedef struct {
......@@ -176,9 +182,11 @@ typedef struct {
int rx_num_channels;
//! number of TX channels (=TX antennas)
int tx_num_channels;
//! \brief RX base addresses for mmapped_dma
//! \brief RX base addresses for mmapped_dma or direct access
int32_t *rxbase[4];
//! \brief TX base addresses for mmapped_dma
//! \brief RX buffer size for direct access
int rxsize;
//! \brief TX base addresses for mmapped_dma or direct access
int32_t *txbase[4];
//! \brief Center frequency in Hz for RX.
//! index: [0..rx_num_channels[
......@@ -309,6 +317,18 @@ typedef struct {
pthread_mutex_t mutex_write;
} openair0_thread_t;
typedef struct fhstate_s {
openair0_timestamp TS[8];
openair0_timestamp TS0;
openair0_timestamp olddeltaTS[8];
openair0_timestamp oldTS[8];
openair0_timestamp TS_read;
uint32_t *buff[8];
uint32_t buff_size;
int r[8];
int active;
} fhstate_t;
/*!\brief structure holds the parameters to configure USRP devices */
struct openair0_device_t {
/*!tx write thread*/
......@@ -343,6 +363,15 @@ struct openair0_device_t {
/*!brief Can be used by driver to hold internal structure*/
void *priv;
/*!brief pointer to FH state, used in ECPRI split 8*/
fhstate_t fhstate;
/*!bried Used in ECPRI split 8 to indicate numerator of sampling rate ratio*/
int sampling_rate_ratio_n;
/*!bried Used in ECPRI split 8 to indicate denominator of sampling rate ratio*/
int sampling_rate_ratio_d;
/* Functions API, which are called by the application*/
/*! \brief Called to start the transceiver. Return 0 if OK, < 0 if error
......@@ -417,7 +446,7 @@ struct openair0_device_t {
* \param antenna_id Index of antenna from which samples were received
* \returns the number of sample read
*/
int (*trx_read_func2)(openair0_device *device, openair0_timestamp *ptimestamp, uint32_t **buff, int nsamps,int packet_idx,int *antenna_id);
int (*trx_read_func2)(openair0_device *device, openair0_timestamp *ptimestamp, uint32_t **buff, int nsamps);
/*! \brief print the device statistics
* \param device the hardware to use
......@@ -466,6 +495,7 @@ struct openair0_device_t {
/*! \brief Pointer to generic RRU private information
*/
void *thirdparty_priv;
/*! \brief Callback for Third-party RRU Initialization routine
......
......@@ -80,10 +80,11 @@ int eth_socket_init_raw(openair0_device *device) {
perror("ETHERNET: Error opening RAW socket (control)");
exit(0);
}
if ((eth->sockfdd = socket(sock_dom, sock_type, sock_proto)) == -1) {
perror("ETHERNET: Error opening RAW socket (user)");
exit(0);
}
for (int i=0;i<eth->num_fd;i++)
if ((eth->sockfdd[i] = socket(sock_dom, sock_type, sock_proto)) == -1) {
perror("ETHERNET: Error opening RAW socket (user)");
exit(0);
}
/* initialize destination address */
bzero((void *)&(eth->local_addrc_ll), sizeof(struct sockaddr_ll));
......@@ -94,8 +95,9 @@ int eth_socket_init_raw(openair0_device *device) {
strcpy(eth->if_index.ifr_name,eth->if_name);
if (ioctl(eth->sockfdc, SIOCGIFINDEX, &(eth->if_index)) < 0)
perror("SIOCGIFINDEX");
if (ioctl(eth->sockfdd, SIOCGIFINDEX, &(eth->if_index)) < 0)
perror("SIOCGIFINDEX");
for (int i=0;i<eth->num_fd;i++)
if (ioctl(eth->sockfdd[i], SIOCGIFINDEX, &(eth->if_index)) < 0)
perror("SIOCGIFINDEX");
eth->local_addrc_ll.sll_family = AF_PACKET;
eth->local_addrc_ll.sll_ifindex = eth->if_index.ifr_ifindex;
......@@ -111,10 +113,11 @@ int eth_socket_init_raw(openair0_device *device) {
eth->local_addrd_ll.sll_pkttype = PACKET_OTHERHOST;
eth->addr_len = sizeof(struct sockaddr_ll);
if (bind(eth->sockfdd,(struct sockaddr *)&eth->local_addrd_ll,eth->addr_len)<0) {
perror("ETHERNET: Cannot bind to socket (user)");
exit(0);
}
for (int i=0;i<eth->num_fd;i++)
if (bind(eth->sockfdd[i],(struct sockaddr *)&eth->local_addrd_ll,eth->addr_len)<0) {
perror("ETHERNET: Cannot bind to socket (user)");
exit(0);
}
/* Construct the Ethernet header */
ether_aton_r(local_mac, (struct ether_addr *)(&(eth->ehd.ether_shost)));
......@@ -165,7 +168,7 @@ int trx_eth_write_raw(openair0_device *device, openair0_timestamp timestamp, voi
/* Send packet */
bytes_sent += send(eth->sockfdd,
bytes_sent += send(eth->sockfdd[cc % eth->num_fd],
buff2,
pktsize,
sendto_flag);
......@@ -214,7 +217,7 @@ int trx_eth_write_raw_IF4p5(openair0_device *device, openair0_timestamp timestam
memcpy(buff[0], (void*)&eth->ehd, MAC_HEADER_SIZE_BYTES);
bytes_sent = send(eth->sockfdd,
bytes_sent = send(eth->sockfdd[cc % eth->num_fd],
buff[0],
packet_size,
0);
......@@ -266,7 +269,7 @@ int trx_eth_read_raw(openair0_device *device, openair0_timestamp *timestamp, voi
while(bytes_received < receive_bytes) {
again:
ret = recv(eth->sockfdd,
ret = recv(eth->sockfdd[cc % eth->num_fd],
buff2,
receive_bytes,
rcvfrom_flag);
......@@ -345,7 +348,7 @@ int trx_eth_read_raw_IF4p5(openair0_device *device, openair0_timestamp *timestam
while (bytes_received < packet_size) {
again:
ret = recv(eth->sockfdd,
ret = recv(eth->sockfdd[cc % eth->num_fd],
buff[0],
packet_size,
MSG_PEEK);
......@@ -397,7 +400,7 @@ int trx_eth_read_raw_IF4p5(openair0_device *device, openair0_timestamp *timestam
}
while(bytes_received < packet_size) {
ret = recv(eth->sockfdd,
ret = recv(eth->sockfdd[cc % eth->num_fd],
buff[0],
packet_size,
0);
......
......@@ -101,10 +101,11 @@ int eth_socket_init_udp(openair0_device *device) {
exit(0);
}
if ((eth->sockfdd = socket(sock_dom, sock_type, sock_proto)) == -1) {
perror("ETHERNET: Error opening socket (user)");
exit(0);
}
for (int i = 0;i<eth->num_fd;i++)
if ((eth->sockfdd[i] = socket(sock_dom, sock_type, sock_proto)) == -1) {
printf("ETHERNET: Error opening socket (user %d)",i);
exit(0);
}
/* initialize addresses */
bzero((void *)&(eth->dest_addrc), sizeof(eth->dest_addrc));
......@@ -141,9 +142,15 @@ int eth_socket_init_udp(openair0_device *device) {
perror("ETHERNET: Cannot set SO_REUSEADDR option on socket (control)");
exit(0);
}
if (setsockopt(eth->sockfdd, SOL_SOCKET, SO_NO_CHECK, &enable, sizeof(int))) {
perror("ETHERNET: Cannot set SO_NO_CHECK option on socket (user)");
exit(0);
for (int i=0;i<eth->num_fd;i++) {
if (setsockopt(eth->sockfdd[i], SOL_SOCKET, SO_NO_CHECK, &enable, sizeof(int))) {
printf("ETHERNET: Cannot set SO_NO_CHECK option on socket (user %d)",i);
exit(0);
}
if (setsockopt(eth->sockfdd[i], SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int))) {
printf("ETHERNET: Cannot set SO_REUSEPORT option on socket (user %d)",i);
exit(0);
}
}
/* want to receive -> so bind */
......@@ -153,12 +160,13 @@ int eth_socket_init_udp(openair0_device *device) {
} else {
printf("[%s] binding to %s:%d (control)\n",str[hostind],str_local,ntohs(eth->local_addrc.sin_port));
}
if (bind(eth->sockfdd,(struct sockaddr *)&eth->local_addrd,eth->addr_len)<0) {
perror("ETHERNET: Cannot bind to socket (user)");
exit(0);
} else {
printf("[%s] binding to %s:%d (user)\n",str[hostind],str_local,ntohs(eth->local_addrd.sin_port));
}
for (int i=0;i<eth->num_fd;i++)
if (bind(eth->sockfdd[i],(struct sockaddr *)&eth->local_addrd,eth->addr_len)<0) {
printf("ETHERNET: Cannot bind to socket (user %d)",i);
exit(0);
} else {
printf("[%s] binding to %s:%d (user %d)\n",str[hostind],str_local,ntohs(eth->local_addrd.sin_port),i);
}
return 0;
}
......@@ -180,7 +188,7 @@ int trx_eth_read_udp_IF4p5(openair0_device *device, openair0_timestamp *timestam
while(bytes_received == -1) {
again:
bytes_received = recvfrom(eth->sockfdd,
bytes_received = recvfrom(eth->sockfdd[cc%eth->num_fd],
buff[0],
packet_size,
0,
......@@ -246,7 +254,7 @@ int trx_eth_write_udp_IF4p5(openair0_device *device, openair0_timestamp timestam
eth->tx_nsamps = nblocks;
bytes_sent = sendto(eth->sockfdd,
bytes_sent = sendto(eth->sockfdd[cc%eth->num_fd],
buff[0],
packet_size,
0,
......@@ -357,7 +365,7 @@ int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, voi
bytes_sent);
#endif
/* Send packet */
bytes_sent = sendto(eth->sockfdd,
bytes_sent = sendto(eth->sockfdd[cc%eth->num_fd],
buff2,
sent_byte,
sendto_flag,
......@@ -388,11 +396,70 @@ int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, voi
return (bytes_sent-APP_HEADER_SIZE_BYTES)>>2;
}
extern int oai_exit;
void *udp_read_thread(void *arg) {
openair0_timestamp TS;
int aid;
udp_read_t *u = (udp_read_t *)arg;
openair0_device *device=u->device;
fhstate_t *fhstate = &device->fhstate;
char buffer[UDP_PACKET_SIZE_BYTES(256)];
while (oai_exit == 0) {
printf("UDP read thread %d, waiting for start\n",u->thread_id);
while (fhstate->active > 0) {
size_t count = recvfrom(((eth_state_t*)device->priv)->sockfdd[u->thread_id],
buffer,sizeof(buffer),0,
(struct sockaddr *)&((eth_state_t*)device->priv)->dest_addrd,
(socklen_t *)&((eth_state_t*)device->priv)->addr_len);
aid = *(uint16_t*)(&buffer[ECPRICOMMON_BYTES]);
TS = *(openair0_timestamp *)(&buffer[ECPRICOMMON_BYTES+ECPRIPCID_BYTES]);
// convert TS to samples, /6 for AW2S @ 30.72 Ms/s, this is converted for other sample rates in OAI application
TS = device->sampling_rate_ratio_n*(TS/device->sampling_rate_ratio_d/6);
printf("udp_thread_id %d count %d, aid %d, TS %llu\n",u->thread_id,count,aid,(unsigned long long)TS);
if (count <= 0) { printf("problem in recvfrom\n"); exit(-1); }
AssertFatal(aid < 8,"Cannot handle more than 8 antennas, got aid %d\n",aid);
fhstate->r[aid]=1;
if (aid==0 && fhstate->TS[0] == 0) fhstate->TS0 = TS;
/* store the timestamp value from packet's header */
fhstate->TS[aid] = TS;
int64_t offset = TS - fhstate->TS0;
if (offset > 0) offset = offset % device->openair0_cfg->rxsize;
else offset = TS % device->openair0_cfg->rxsize + ((((uint64_t)1)<<63)-(fhstate->TS0-1)) % device->openair0_cfg->rxsize;
printf("udp_thread_id %d aid %d, TS %llu, TS0 %llu, offset %d, size %d\n",u->thread_id,aid,(unsigned long long)TS,fhstate->TS0,offset,device->openair0_cfg->rxsize);
// need to do memcpy since there is no guarantee that aid is the same each time, otherwise we could have used
// zero-copy and corrected the header component.
memcpy((void*)(device->openair0_cfg->rxbase[aid]+offset),
(void*)&buffer[APP_HEADER_SIZE_BYTES],
count-APP_HEADER_SIZE_BYTES);
}
sleep(1);
}
}
int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uint32_t **buff, int nsamps, int packet_idx,int *cc) {
int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uint32_t **buff, int nsamps) {
int bytes_received=0;
eth_state_t *eth = (eth_state_t*)device->priv;
fhstate_t *fhstate = &device->fhstate;
openair0_timestamp prev_read_TS= fhstate->TS_read, min_TS;
// block until FH is ready
while (fhstate->r[0] == 0 || fhstate->r[1] == 0 || fhstate->r[2] == 0 || fhstate->r[3] == 0 ||
fhstate->r[4] == 0 || fhstate->r[5] == 0 || fhstate->r[6] == 0 || fhstate->r[7] == 0) usleep(100);
// get minimum TS over all antennas
min_TS = fhstate->TS[0];
for (int i=1;i<device->openair0_cfg->rx_num_channels;i++) min_TS = min(min_TS,fhstate->TS[i]);
// poll/sleep until we accumulated enough samples on each antenna port
while (min_TS < prev_read_TS + nsamps) {
usleep(100);
min_TS = fhstate->TS[0];
for (int i=1;i<device->openair0_cfg->rx_num_channels;i++) min_TS = min(min_TS,fhstate->TS[i]);
}
/*
// openair0_timestamp prev_timestamp = -1;
int rcvfrom_flag =0;
int block_cnt=0;
......@@ -411,46 +478,20 @@ int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uin
block_cnt=0;
AssertFatal(eth->compression == NO_COMPRESS, "IF5 compression not supported for now\n");
while(bytes_received < payload_size) {
again:
// LOG_I(PHY,"temp_rx0 %p, temp_rx[0] %p temp_rx[APP_HEADER_SIZE_BYTES>>2] %p APP_HEADER_SIZE_BYTES %ld\n",temp_rx0,&temp_rx[0],&temp_rx[APP_HEADER_SIZE_BYTES>>2],APP_HEADER_SIZE_BYTES);
bytes_received +=recvfrom(eth->sockfdd,
(void*)temp_rx0,
payload_size,
rcvfrom_flag,
(struct sockaddr *)&eth->dest_addrd,
(socklen_t *)&eth->addr_len);
packet_cnt++;
if (bytes_received ==-1) {
bytes_received =recvfrom(eth->sockfdd[sockid%eth->num_fd],
(void*)temp_rx0,
payload_size,
rcvfrom_flag,
(struct sockaddr *)&eth->dest_addrd,
(socklen_t *)&eth->addr_len);
if (bytes_received ==-1) {
eth->num_rx_errors++;
if (errno == EAGAIN) {
again_cnt++;
usleep(10);
if (again_cnt == 1000) {
perror("ETHERNET READ: ");
exit(-1);
} else {
bytes_received=0;
goto again;
}
} else if (errno == EWOULDBLOCK) {
block_cnt++;
usleep(10);
if (block_cnt == 1000) {
perror("ETHERNET READ: ");
exit(-1);
} else {
printf("BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK BLOCK \n");
goto again;
}
}
} else {
/* store the timestamp value from packet's header */
} else {
// store the timestamp value from packet's header
*timestamp = *(openair0_timestamp *)(temp_rx0 + ECPRICOMMON_BYTES+ECPRIPCID_BYTES);
// convert TS to samples, /6 for AW2S @ 30.72 Ms/s, this is converted for other sample rates in OAI application
*timestamp = *timestamp/6;
*cc = *(uint16_t*)(temp_rx0 + ECPRICOMMON_BYTES);
}
eth->rx_actual_nsamps=payload_size>>2;
eth->rx_count++;
}
......@@ -458,8 +499,10 @@ int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uin
if (buff) memcpy((void*)(buff[*cc]+packet_idx*nsamps),
(void*)(temp_rx+1),
nsamps<<2);
return (payload_size>>2);
*/
*timestamp = fhstate->TS_read;
fhstate->TS_read = prev_read_TS + nsamps;
return (nsamps);
}
......
......@@ -64,8 +64,32 @@ int trx_eth_start(openair0_device *device)
AssertFatal(device->thirdparty_init != NULL, "device->thirdparty_init is null\n");
AssertFatal(device->thirdparty_init(device) == 0, "third-party init failed\n");
device->openair0_cfg->samples_per_packet = 256;
}
/* initialize socket */
eth->num_fd = max(device->openair0_cfg->rx_num_channels,device->openair0_cfg->tx_num_channels);
printf("Creating %d UDP threads ...\n",device->openair0_cfg->rx_num_channels);
udp_read_t *u[device->openair0_cfg->rx_num_channels];
for (int i=0;i<device->openair0_cfg->rx_num_channels;i++) {
u[i] = malloc(sizeof(udp_read_t));
u[i]->thread_id=i;
u[i]->device = device;
threadCreate(&u[i]->pthread,udp_read_thread,u[i],"udp read thread",-1,OAI_PRIORITY_RT_MAX);
}
device->sampling_rate_ratio_n=1;
device->sampling_rate_ratio_d=1;
if (device->openair0_cfg->nr_flag==1) { // This check if RRU knows about NR numerologies
if (device->openair0_cfg->num_rb_dl <= 162 && device->openair0_cfg->num_rb_dl > 106) device->sampling_rate_ratio_n = 2;
else if (device->openair0_cfg->num_rb_dl <= 106 && device->openair0_cfg->num_rb_dl > 51) {device->sampling_rate_ratio_d=3;device->sampling_rate_ratio_n=4;}
else if (device->openair0_cfg->num_rb_dl == 51) {device->sampling_rate_ratio_n=1;device->sampling_rate_ratio_d=1;}
else AssertFatal(1==0,"num_rb_dl %d not ok with ECPRI\n",device->openair0_cfg->num_rb_dl);
}
else {
if (device->openair0_cfg->num_rb_dl == 100) device->sampling_rate_ratio_d = 1;
else if (device->openair0_cfg->num_rb_dl == 75) {device->sampling_rate_ratio_d = 4; device->sampling_rate_ratio_n=3;}
else if (device->openair0_cfg->num_rb_dl == 50) device->sampling_rate_ratio_d = 2;
else if (device->openair0_cfg->num_rb_dl == 25) device->sampling_rate_ratio_d = 4;
else AssertFatal(1==0,"num_rb_dl %d not ok with ECPRI\n",device->openair0_cfg->num_rb_dl);
}
}
/* initialize socket */
if (eth->flags == ETH_RAW_MODE) {
printf("Setting ETHERNET to ETH_RAW_IF5_MODE\n");
if (eth_socket_init_raw(device)!=0) return -1;
......@@ -159,12 +183,13 @@ void trx_eth_end(openair0_device *device)
{
eth_state_t *eth = (eth_state_t*)device->priv;
/* destroys socket only for the processes that call the eth_end fuction-- shutdown() for beaking the pipe */
if ( close(eth->sockfdd) <0 ) {
perror("ETHERNET: Failed to close socket");
exit(0);
} else {
printf("[%s] socket has been successfully closed.\n",(device->host_type == RAU_HOST)? "RAU":"RRU");
}
for (int i=0;i<eth->num_fd;i++)
if ( close(eth->sockfdd[i]) <0 ) {
perror("ETHERNET: Failed to close socket");
exit(0);
} else {
printf("[%s] socket has been successfully closed.\n",(device->host_type == RAU_HOST)? "RAU":"RRU");
}
}
......@@ -224,7 +249,8 @@ int ethernet_tune(openair0_device *device,
/****************** socket level options ************************/
switch(option) {
case SND_BUF_SIZE: /* transmit socket buffer size */
if (setsockopt(eth->sockfdd,
for (int i=0;i<eth->num_fd;i++)
if (setsockopt(eth->sockfdd[i],
SOL_SOCKET,
SO_SNDBUF,
&value,sizeof(value))) {
......@@ -232,10 +258,11 @@ int ethernet_tune(openair0_device *device,
} else {
printf("send buffer size= %d bytes\n",value);
}
break;
break;
case RCV_BUF_SIZE: /* receive socket buffer size */
if (setsockopt(eth->sockfdd,
for (int i=0;i<eth->num_fd;i++) {
if (setsockopt(eth->sockfdd[i],
SOL_SOCKET,
SO_RCVBUF,
&value,sizeof(value))) {
......@@ -243,26 +270,29 @@ int ethernet_tune(openair0_device *device,
} else {
printf("receive bufffer size= %d bytes\n",value);
}
break;
}
break;
case RCV_TIMEOUT:
timeout.tv_sec = value/1000000;
timeout.tv_usec = value%1000000;//less than rt_period?
if (setsockopt(eth->sockfdc,
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,sizeof(timeout))) {
perror("[ETHERNET] setsockopt()");
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,sizeof(timeout))) {
perror("[ETHERNET] setsockopt()");
} else {
printf( "receive timeout= %u usec\n",(unsigned int)timeout.tv_usec);
printf( "receive timeout= %u usec\n",(unsigned int)timeout.tv_usec);
}
if (setsockopt(eth->sockfdd,
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,sizeof(timeout))) {
perror("[ETHERNET] setsockopt()");
} else {
printf( "receive timeout= %u usec\n",(unsigned int)timeout.tv_usec);
for (int i=0;i<eth->num_fd;i++) {
if (setsockopt(eth->sockfdd[i],
SOL_SOCKET,
SO_RCVTIMEO,
(char *)&timeout,sizeof(timeout))) {
perror("[ETHERNET] setsockopt()");
} else {
printf( "receive timeout= %u usec\n",(unsigned int)timeout.tv_usec);
}
}
break;
......@@ -277,13 +307,15 @@ int ethernet_tune(openair0_device *device,
} else {
printf( "send timeout= %d,%d sec\n",(int)timeout.tv_sec,(int)timeout.tv_usec);
}
if (setsockopt(eth->sockfdd,
SOL_SOCKET,
SO_SNDTIMEO,
(char *)&timeout,sizeof(timeout))) {
perror("[ETHERNET] setsockopt()");
} else {
printf( "send timeout= %d,%d sec\n",(int)timeout.tv_sec,(int)timeout.tv_usec);
for (int i=0;i<eth->num_fd;i++) {
if (setsockopt(eth->sockfdd[i],
SOL_SOCKET,
SO_SNDTIMEO,
(char *)&timeout,sizeof(timeout))) {
perror("[ETHERNET] setsockopt()");
} else {
printf( "send timeout= %d,%d sec\n",(int)timeout.tv_sec,(int)timeout.tv_usec);
}
}
break;
......@@ -293,20 +325,24 @@ int ethernet_tune(openair0_device *device,
ifr.ifr_addr.sa_family = AF_INET;
strncpy(ifr.ifr_name,eth->if_name, sizeof(ifr.ifr_name)-1);
ifr.ifr_mtu =value;
if (ioctl(eth->sockfdd,SIOCSIFMTU,(caddr_t)&ifr) < 0 )
perror ("[ETHERNET] Can't set the MTU");
else
printf("[ETHERNET] %s MTU size has changed to %d\n",eth->if_name,ifr.ifr_mtu);
for (int i=0;i<eth->num_fd;i++) {
if (ioctl(eth->sockfdd[i],SIOCSIFMTU,(caddr_t)&ifr) < 0 )
perror ("[ETHERNET] Can't set the MTU");
else
printf("[ETHERNET] %s MTU size has changed to %d\n",eth->if_name,ifr.ifr_mtu);
}
break;
case TX_Q_LEN: /* change TX queue length of eth interface */
ifr.ifr_addr.sa_family = AF_INET;
strncpy(ifr.ifr_name,eth->if_name, sizeof(ifr.ifr_name)-1);
ifr.ifr_qlen =value;
if (ioctl(eth->sockfdd,SIOCSIFTXQLEN,(caddr_t)&ifr) < 0 )
perror ("[ETHERNET] Can't set the txqueuelen");
else
printf("[ETHERNET] %s txqueuelen size has changed to %d\n",eth->if_name,ifr.ifr_qlen);
for (int i=0;i<eth->num_fd;i++) {
if (ioctl(eth->sockfdd[i],SIOCSIFTXQLEN,(caddr_t)&ifr) < 0 )
perror ("[ETHERNET] Can't set the txqueuelen");
else
printf("[ETHERNET] %s txqueuelen size has changed to %d\n",eth->if_name,ifr.ifr_qlen);
}
break;
/******************* device level options *************************/
......@@ -413,7 +449,10 @@ int transport_init(openair0_device *device,
eth->compression = ALAW_COMPRESS;
}
eth->num_fd = max(openair0_cfg->rx_num_channels,openair0_cfg->tx_num_channels);
printf("[ETHERNET]: Initializing openair0_device for %s ...\n", ((device->host_type == RAU_HOST) ? "RAU": "RRU"));
printf("[ETHERNET]: num_fd %d\n",eth->num_fd);
device->Mod_id = 0;//num_devices_eth++;
device->transp_type = ETHERNET_TP;
device->trx_start_func = trx_eth_start;
......@@ -438,6 +477,7 @@ int transport_init(openair0_device *device,
device->trx_read_func2 = trx_eth_read_udp;
device->trx_ctlsend_func = trx_eth_ctlsend_udp;
device->trx_ctlrecv_func = trx_eth_ctlrecv_udp;
memset((void*)&device->fhstate,0,sizeof(device->fhstate));
} else if (eth->flags == ETH_RAW_IF4p5_MODE) {
device->trx_write_func = trx_eth_write_raw_IF4p5;
device->trx_read_func = trx_eth_read_raw_IF4p5;
......@@ -454,6 +494,7 @@ int transport_init(openair0_device *device,
eth->if_name = eth_params->local_if_name;
device->priv = eth;
/* device specific */
// openair0_cfg[0].iq_rxrescale = 15;//rescale iqs
// openair0_cfg[0].iq_txshift = eth_params->iq_txshift;// shift
......
......@@ -59,12 +59,16 @@ typedef struct {
/*!\brief socket file desc (control)*/
int sockfdc;
/*!\brief number of sockets for user-plane*/
int num_fd;
/*!\brief socket file desc (user)*/
int sockfdd;
int sockfdd[8];
/*!\brief interface name */
char *if_name;
/*!\brief buffer size */
unsigned int buffer_size;
/*!\brief Fronthaul state */
fhstate_t *fhstate;
/*!\brief destination address (control) for UDP socket*/
struct sockaddr_in dest_addrc;
/*!\brief local address (control) for UDP socket*/
......@@ -214,7 +218,7 @@ void inline dump_txcounters(openair0_device *device);
*/
void dump_iqs(char * buff, int iq_cnt);
void *udp_read_thread(void *arg);
/*! \fn int ethernet_tune (openair0_device *device, unsigned int option, int value);
* \brief this function allows you to configure certain ethernet parameters in socket or device level
......@@ -238,7 +242,7 @@ int ethernet_tune(openair0_device *device, unsigned int option, int value);
*/
int eth_socket_init_udp(openair0_device *device);
int trx_eth_write_udp(openair0_device *device, openair0_timestamp timestamp, void *buff, int nsamps,int cc, int flags);
int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uint32_t **buff, int nsamps, int packet_idx,int *cc);
int trx_eth_read_udp(openair0_device *device, openair0_timestamp *timestamp, uint32_t **buff, int nsamps);
int eth_socket_init_raw(openair0_device *device);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment