Commit 0c8ec326 authored by ghaddab's avatar ghaddab

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@6304 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 1f4a7548
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
/** ethernet_lib : API to stream I/Q samples over standard ethernet (RRH component) /** ethernet_lib : API to stream I/Q samples over standard ethernet (RRH component)
* *
* Authors: Raymond Knopp <raymond.knopp@eurecom.fr> * Authors: Raymond Knopp <raymond.knopp@eurecom.fr>, Riadh Ghaddab <riadh.ghaddab@eurecom.fr>
* *
* Changelog: * Changelog:
* 06.10.2014: Initial version * 06.10.2014: Initial version
...@@ -48,19 +48,282 @@ ...@@ -48,19 +48,282 @@
#include <signal.h> #include <signal.h>
#include <execinfo.h> #include <execinfo.h>
#include <pthread.h> #include <pthread.h>
#include <time.h>
#include "common_lib.h" #include "common_lib.h"
#define BUF_LEN 4096+32
#define RRH_eNB_PORT 50000
#define RRH_eNB_DEST_IP "192.168.12.196" //Haswell ip address
//#define RRH_eNB_DEST_IP "192.168.12.148" //Hecatonchire ip address
#define RRH_UE_PORT 22222
//#define RRH_UE_DEST_IP "192.168.12.148" //Hecatonchire ip address
#define RRH_UE_DEST_IP "192.168.12.196" //Haswell ip address
#define FRAME_MAX_SIZE 76800*4
#define DEFAULT_PERIOD_NS 133333
typedef struct { typedef struct {
int port; int eNB_port;
char *dest_ip; char *eNB_dest_ip;
int UE_port;
char *UE_dest_ip;
struct timespec time_req;
} rrh_desc_t; } rrh_desc_t;
int rrh_exit=0; int rrh_exit=0;
int32_t tx_buffer[4][(1+(sizeof(openair0_timestamp)>>2))+76800*4],rx_buffer[4][(1+(sizeof(openair0_timestamp)>>2))+76800*4]; int32_t tx_buffer_eNB[4][(1+(sizeof(openair0_timestamp)>>2))+FRAME_MAX_SIZE],rx_buffer_eNB[4][(1+(sizeof(openair0_timestamp)>>2))+FRAME_MAX_SIZE];
int32_t tx_buffer_UE[4][(1+(sizeof(openair0_timestamp)>>2))+FRAME_MAX_SIZE],rx_buffer_UE[4][(1+(sizeof(openair0_timestamp)>>2))+FRAME_MAX_SIZE];
void *rrh_eNB_thread_status;
void *rrh_UE_thread_status;
int32_t counter_UE_rx[4]={0,0,0,0};
int32_t counter_UE_tx[4]={0,0,0,0};
int32_t counter_eNB_rx[4]={0,0,0,0};
int32_t counter_eNB_tx[4]={0,0,0,0};
int32_t overflow_rx_buffer_UE[4]={0,0,0,0};
int32_t overflow_rx_buffer_eNB[4]={0,0,0,0};
int32_t nsamps_eNB[4]={0,0,0,0};
int32_t nsamps_UE[4]={0,0,0,0};
int32_t UE_tx_started=0,eNB_tx_started=0, UE_rx_started=0,eNB_rx_started=0;
int32_t RT_FLAG=0, NRT_FLAG=1;
openair0_timestamp nrt_eNB_counter[4]={0,0,0,0};
openair0_timestamp nrt_UE_counter[4]={0,0,0,0};
openair0_timestamp timestamp_eNB_tx[4],timestamp_eNB_rx[4]={0,0,0,0};
openair0_timestamp timestamp_UE_tx[4],timestamp_UE_rx[4]={0,0,0,0};
openair0_timestamp hw_counter=0;
pthread_cond_t sync_UE_cond[4];
pthread_mutex_t sync_UE_mutex[4];
pthread_cond_t sync_eNB_cond[4];
pthread_mutex_t sync_eNB_mutex[4];
pthread_mutex_t timer_mutex;
int sync_UE_rx[4]={-1,-1,-1,-1};
int sync_eNB_rx[4]={-1,-1,-1,-1};
void timer_signal_handler(int sig){
if (sig == SIGALRM)
{
pthread_mutex_lock(&timer_mutex);
hw_counter ++;
//printf("[RRH] : hw_counter : %d\n",(int)hw_counter);
pthread_mutex_unlock(&timer_mutex);
}
void *rrh_eNB_thread(void *arg) {
}
void *timer_proc(void *arg) {
timer_t timerid; // timer ID for timer
//struct sigevent event; // event to deliver
struct itimerspec *timer = (struct itimerspec*)arg; // the timer data structure
struct itimerspec *old_value;
//printf("Starting the timer\n");
if (timer_create (CLOCK_REALTIME, NULL, &timerid) == -1) {
fprintf (stderr, "couldn't create a timer\n");
perror (NULL);
exit (EXIT_FAILURE);
}
signal(SIGALRM, timer_signal_handler);
// and start it!
timer_settime (timerid, 0, timer, old_value);
while (!rrh_exit)
{
sleep(1);
}
timer_delete(timerid);
return (0);
}
void *rrh_proc_eNB_thread() {
//rrh_desc_t *rrh_desc = (rrh_desc_t *)arg;
int antenna_index;
openair0_timestamp truncated_timestamp, truncated_timestamp_final, last_hw_counter=0;
struct timespec time_req, time_rem;
time_req.tv_sec = 0;
time_req.tv_nsec = 1000;
while (rrh_exit==0)
{
//wait until some data has been copied
for (antenna_index=0;antenna_index<4;antenna_index++){
if (sync_eNB_rx[antenna_index]==0)
{
if (!eNB_tx_started)
{
eNB_tx_started=1; // set this flag to 1 to indicate that eNB started
if (RT_FLAG==1)
{
last_hw_counter=hw_counter;
}
}
else
{
if (RT_FLAG==1) {
if (hw_counter > last_hw_counter+1)
{
printf("L");
// goto end_copy_eNB;
}
else
{
while (hw_counter != last_hw_counter+1)
nanosleep(&time_req,&time_rem);
}
}
}
truncated_timestamp = timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE);
truncated_timestamp_final = (timestamp_eNB_tx[antenna_index]+nsamps_eNB[antenna_index])%FRAME_MAX_SIZE;
if ((truncated_timestamp + nsamps_eNB[antenna_index]) > FRAME_MAX_SIZE)
{
if ((timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE < nsamps_eNB[antenna_index]) && (UE_rx_started==1))
{
overflow_rx_buffer_UE[antenna_index]++;
printf("UE Overflow[%d] : %d, timestamp : %d\n",antenna_index,overflow_rx_buffer_UE[antenna_index],(int)truncated_timestamp);
if (NRT_FLAG==1)
{
while ((timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE) < nsamps_eNB[antenna_index])
nanosleep(&time_req,&time_rem);
}
}
memcpy(&rx_buffer_UE[antenna_index][truncated_timestamp + (sizeof(openair0_timestamp)>>2)],&tx_buffer_eNB[antenna_index][truncated_timestamp],(FRAME_MAX_SIZE<<2)-(truncated_timestamp<<2));
memcpy(&rx_buffer_UE[antenna_index][(sizeof(openair0_timestamp)>>2)],&tx_buffer_eNB[antenna_index][0],(nsamps_eNB[antenna_index]<<2)-(FRAME_MAX_SIZE<<2)+(truncated_timestamp<<2));
}
else
{
if (((truncated_timestamp < (timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE)) && (truncated_timestamp_final > (timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE))) && (UE_rx_started==1))
{
overflow_rx_buffer_UE[antenna_index]++;
printf("UE Overflow[%d] : %d, timestamp : %d\n",antenna_index,overflow_rx_buffer_UE[antenna_index],(int)truncated_timestamp);
if (NRT_FLAG==1)
{
while (truncated_timestamp_final > timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE)
nanosleep(&time_req,&time_rem);
}
}
memcpy(&rx_buffer_UE[antenna_index][truncated_timestamp + (sizeof(openair0_timestamp)>>2)],&tx_buffer_eNB[antenna_index][truncated_timestamp],(nsamps_eNB[antenna_index]<<2));
}
// end_copy_eNB :
last_hw_counter=hw_counter;
pthread_mutex_lock(&sync_eNB_mutex[antenna_index]);
sync_eNB_rx[antenna_index]--;
pthread_mutex_unlock(&sync_eNB_mutex[antenna_index]);
}
}
}
return(0);
}
void *rrh_proc_UE_thread() {
//rrh_desc_t *rrh_desc = (rrh_desc_t *)arg;
int antenna_index;
openair0_timestamp truncated_timestamp, truncated_timestamp_final, last_hw_counter=0;
struct timespec time_req, time_rem;
time_req.tv_sec = 0;
time_req.tv_nsec = 1000;
while (rrh_exit==0)
{
//wait until some data has been copied
for (antenna_index=0;antenna_index<4;antenna_index++){
if (sync_UE_rx[antenna_index]==0)
{
if (!UE_tx_started)
{
UE_tx_started=1; //Set this flag to 1 to indicate that a UE started retrieving data
if (RT_FLAG==1)
{
last_hw_counter=hw_counter;
}
}
else
{
if (RT_FLAG==1) {
if (hw_counter > last_hw_counter+1)
{
printf("L1");
// goto end_copy_UE;
}
else
{
while (hw_counter != last_hw_counter+1)
nanosleep(&time_req,&time_rem);
}
}
}
truncated_timestamp = timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE);
truncated_timestamp_final = (timestamp_UE_tx[antenna_index]+nsamps_UE[antenna_index])%FRAME_MAX_SIZE;
if ((truncated_timestamp + nsamps_UE[antenna_index]) > FRAME_MAX_SIZE)
{
if ((timestamp_eNB_rx[antenna_index]%FRAME_MAX_SIZE < nsamps_UE[antenna_index]) && (eNB_rx_started==1))
{
overflow_rx_buffer_eNB[antenna_index]++;
printf("eNB Overflow[%d] : %d, timestamp : %d\n",antenna_index,overflow_rx_buffer_eNB[antenna_index],(int)truncated_timestamp);
if (NRT_FLAG==1)
{
while ((timestamp_eNB_rx[antenna_index]%FRAME_MAX_SIZE) < nsamps_UE[antenna_index])
nanosleep(&time_req,&time_rem);
}
}
memcpy(&rx_buffer_eNB[antenna_index][truncated_timestamp + (sizeof(openair0_timestamp)>>2)],&tx_buffer_UE[antenna_index][truncated_timestamp],(FRAME_MAX_SIZE<<2)-(truncated_timestamp<<2));
memcpy(&rx_buffer_eNB[antenna_index][(sizeof(openair0_timestamp)>>2)],&tx_buffer_UE[antenna_index][0],(nsamps_UE[antenna_index]<<2)-(FRAME_MAX_SIZE<<2)+(truncated_timestamp<<2));
}
else
{
if (((truncated_timestamp < (timestamp_eNB_rx[antenna_index]%FRAME_MAX_SIZE)) && (truncated_timestamp_final > (timestamp_eNB_rx[antenna_index]%FRAME_MAX_SIZE))) && (eNB_rx_started==1))
{
overflow_rx_buffer_eNB[antenna_index]++;
printf("eNB Overflow[%d] : %d, timestamp : %d\n",antenna_index,overflow_rx_buffer_eNB[antenna_index],(int)truncated_timestamp);
if (NRT_FLAG==1)
{
while (truncated_timestamp_final > timestamp_eNB_rx[antenna_index]%FRAME_MAX_SIZE)
nanosleep(&time_req,&time_rem);
}
}
memcpy(&rx_buffer_eNB[antenna_index][truncated_timestamp+ (sizeof(openair0_timestamp)>>2)],&tx_buffer_UE[antenna_index][truncated_timestamp],(nsamps_UE[antenna_index]<<2));
}
//end_copy_UE :
last_hw_counter=hw_counter;
pthread_mutex_lock(&sync_UE_mutex[antenna_index]);
sync_UE_rx[antenna_index]--;
pthread_mutex_unlock(&sync_UE_mutex[antenna_index]);
}
}
}
return(0);
}
void *rrh_UE_thread(void *arg) {
int sockid=-1; int sockid=-1;
struct sockaddr_in serveraddr; struct sockaddr_in serveraddr;
...@@ -68,33 +331,40 @@ void *rrh_eNB_thread(void *arg) { ...@@ -68,33 +331,40 @@ void *rrh_eNB_thread(void *arg) {
socklen_t clientaddrlen; socklen_t clientaddrlen;
rrh_desc_t *rrh_desc = (rrh_desc_t *)arg; rrh_desc_t *rrh_desc = (rrh_desc_t *)arg;
char str[INET_ADDRSTRLEN]; char str[INET_ADDRSTRLEN];
int8_t msg_header[4+sizeof(openair0_timestamp)]; //int8_t msg_header[4+sizeof(openair0_timestamp)];
int8_t buf[BUF_LEN];
int16_t cmd,nsamps,antenna_index; int16_t cmd,nsamps,antenna_index;
ssize_t bytes_received; ssize_t bytes_received;
openair0_timestamp timestamp_tx,timestamp_rx=0; struct timespec time_rem;
ssize_t bytes_sent; ssize_t bytes_sent;
openair0_timestamp temp;
openair0_timestamp last_hw_counter=0;
struct timespec time_req_1us, time_rem_1us;
time_req_1us.tv_sec = 0;
time_req_1us.tv_nsec = 1000;
while (rrh_exit==0) { while (rrh_exit==0) {
sockid=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP); sockid=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
if (sockid==-1) { if (sockid==-1) {
perror("Cannot create socket: "); perror("Cannot create UE socket: ");
rrh_exit=1; rrh_exit=1;
} }
bzero((char *)&serveraddr,sizeof(serveraddr)); bzero((char *)&serveraddr,sizeof(serveraddr));
serveraddr.sin_family=AF_INET; serveraddr.sin_family=AF_INET;
serveraddr.sin_port=htons(rrh_desc->port); serveraddr.sin_port=htons(rrh_desc->UE_port);
inet_pton(AF_INET,"0.0.0.0",&serveraddr.sin_addr.s_addr); inet_pton(AF_INET,rrh_desc->UE_dest_ip,&serveraddr.sin_addr.s_addr);
inet_ntop(AF_INET, &(serveraddr.sin_addr), str, INET_ADDRSTRLEN); inet_ntop(AF_INET, &(serveraddr.sin_addr), str, INET_ADDRSTRLEN);
printf("Binding to socket for %s:%d\n",str,ntohs(serveraddr.sin_port)); printf("Binding to UE socket for %s:%d\n",str,ntohs(serveraddr.sin_port));
if (bind(sockid,(struct sockaddr *)&serveraddr,sizeof(serveraddr))<0) { if (bind(sockid,(struct sockaddr *)&serveraddr,sizeof(serveraddr))<0) {
perror("Cannot bind to socket: "); perror("Cannot bind to UE socket: ");
rrh_exit = 1; rrh_exit = 1;
} }
...@@ -104,35 +374,139 @@ void *rrh_eNB_thread(void *arg) { ...@@ -104,35 +374,139 @@ void *rrh_eNB_thread(void *arg) {
// get header info // get header info
bzero((void*)&clientaddr,sizeof(struct sockaddr)); bzero((void*)&clientaddr,sizeof(struct sockaddr));
clientaddrlen = sizeof(struct sockaddr); clientaddrlen = sizeof(struct sockaddr);
printf("Waiting ...\n"); //printf("Waiting for UE ...\n");
bytes_received = recvfrom(sockid,msg_header,4+sizeof(openair0_timestamp),0,&clientaddr,&clientaddrlen); bytes_received = recvfrom(sockid,buf,BUF_LEN,0,&clientaddr,&clientaddrlen);
cmd = msg_header[0]; cmd = buf[0];
antenna_index = cmd>>1; antenna_index = cmd>>1;
timestamp_tx = *(openair0_timestamp*)(msg_header+4); timestamp_UE_tx[antenna_index] = *(openair0_timestamp*)(buf+4);
nsamps = *(int16_t *)(msg_header+2); nsamps = *(int16_t *)(buf+2);
cmd = cmd&1; cmd = cmd&1;
inet_ntop(AF_INET, &(((struct sockaddr_in*)&clientaddr)->sin_addr), str, INET_ADDRSTRLEN); inet_ntop(AF_INET, &(((struct sockaddr_in*)&clientaddr)->sin_addr), str, INET_ADDRSTRLEN);
switch (cmd) { switch (cmd) {
case 0: // RX case 0: // RX
printf("Received RX request for antenna %d, nsamps %d (from %s:%d)\n",antenna_index,nsamps,str, printf("Received UE RX request for antenna %d, nsamps %d (from %s:%d)\n",antenna_index,nsamps,str,
ntohs(((struct sockaddr_in*)&clientaddr)->sin_port)); ntohs(((struct sockaddr_in*)&clientaddr)->sin_port));
// send return
if (!UE_rx_started)
{
UE_rx_started=1; //Set this flag to 1 to indicate that a UE started retrieving data
if (RT_FLAG==1)
{
last_hw_counter=hw_counter;
}
}
else
{
if (RT_FLAG==1) {
if (hw_counter > last_hw_counter+1)
{
printf("L1");
// goto end_copy_UE;
}
else
{
while (hw_counter != last_hw_counter+1)
nanosleep(&time_req_1us,&time_rem_1us);
}
}
}
// send return
temp=*(openair0_timestamp*)&rx_buffer_UE[antenna_index][timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)];
*(openair0_timestamp*)&rx_buffer_UE[antenna_index][timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)]=timestamp_UE_rx[antenna_index];
if ((bytes_sent = sendto(sockid, if ((timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)+nsamps) > FRAME_MAX_SIZE) // Wrap around if nsamps exceeds the buffer limit
&rx_buffer[antenna_index][timestamp_rx%(76800*4)], {
if ((timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE < nsamps) && (eNB_tx_started==1))
{
printf("UE underflow timestamp_UE_rx : %d, timestamp_eNB_tx : %d\n",(int)(timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)),(int)(timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE));
if (NRT_FLAG==1)
{
while ((timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE) < nsamps)
nanosleep(&time_req_1us,&time_rem_1us);
}
}
if ((bytes_sent = sendto(sockid,
&rx_buffer_UE[antenna_index][timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)],
((FRAME_MAX_SIZE)<<2) - ((timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE))<<2) + sizeof(openair0_timestamp),
0,
(struct sockaddr*)&clientaddr,
sizeof(struct sockaddr)))<0)
perror("RRH UE : sendto for RX");
if ((bytes_sent = sendto(sockid,
&rx_buffer_UE[antenna_index][0],
(nsamps<<2) - ((FRAME_MAX_SIZE)<<2) + ((timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE))<<2),
0,
(struct sockaddr*)&clientaddr,
sizeof(struct sockaddr)))<0)
perror("RRH UE : sendto for RX");
}
else{
if (((timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE)< timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE) && (((timestamp_UE_rx[antenna_index]+nsamps)%FRAME_MAX_SIZE) > (timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE)) && (eNB_tx_started==1) )
{
printf("UE underflow timestamp_UE_rx : %d, timestamp_eNB_tx : %d\n",(int)(timestamp_UE_rx[antenna_index]%FRAME_MAX_SIZE),(int)(timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE));
if (NRT_FLAG==1)
{
while (((timestamp_UE_rx[antenna_index]+nsamps)%FRAME_MAX_SIZE) > (timestamp_eNB_tx[antenna_index]%FRAME_MAX_SIZE))
{nanosleep(&time_req_1us,&time_rem_1us); }
}
}
if ((bytes_sent = sendto(sockid,
&rx_buffer_UE[antenna_index][timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)],
(nsamps<<2)+sizeof(openair0_timestamp), (nsamps<<2)+sizeof(openair0_timestamp),
0, 0,
(struct sockaddr*)&clientaddr, (struct sockaddr*)&clientaddr,
sizeof(struct sockaddr)))<0) sizeof(struct sockaddr)))<0)
perror("RRH: sendto for RX"); perror("RRH UE thread: sendto for RX");
printf("bytes_sent %d(timestamp_rx %d)\n",bytes_sent); }
timestamp_rx+=nsamps; printf("bytes_sent %d(timestamp_UE_rx[%d] %d)\n",(int)bytes_sent,antenna_index,(int)timestamp_UE_rx[antenna_index]);
*(openair0_timestamp*)&rx_buffer_UE[antenna_index][timestamp_UE_rx[antenna_index]%(FRAME_MAX_SIZE)]=temp;
timestamp_UE_rx[antenna_index]+=nsamps;
last_hw_counter=hw_counter;
break; break;
case 1: // TX case 1: // TX
printf("Received TX request for antenna %d, nsamps %d, timestamp %llu\n",antenna_index,nsamps,timestamp_tx); if (NRT_FLAG==1)
bytes_received = recvfrom(sockid,&tx_buffer[antenna_index][timestamp_tx%(76800*4)],(nsamps<<2),0,NULL,NULL); {
printf("Received TX samples for antenna %d, nsamps %d (%d)\n",antenna_index,nsamps,(bytes_received>>2)); nrt_UE_counter[antenna_index]++;
}
printf("Received UE TX request for antenna %d, nsamps %d, timestamp %d bytes_received %d\n",antenna_index,nsamps,(int)timestamp_UE_tx[antenna_index],(int)bytes_received);
if ((timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE)+nsamps) > FRAME_MAX_SIZE) // Wrap around if nsamps exceeds the buffer limit
{
memcpy(&tx_buffer_UE[antenna_index][timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE)],buf+sizeof(openair0_timestamp)+2*sizeof(int16_t),(FRAME_MAX_SIZE<<2)-((timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE))<<2));
memcpy(&tx_buffer_UE[antenna_index][0],buf+sizeof(openair0_timestamp)+2*sizeof(int16_t)+(FRAME_MAX_SIZE*4)-((timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE))<<2),(nsamps<<2)-((FRAME_MAX_SIZE-(timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE)))<<2));
//printf("Received UE TX samples for antenna %d, nsamps %d (%d)\n",antenna_index,nsamps,(int)(bytes_received>>2));
}
else {
memcpy(&tx_buffer_UE[antenna_index][timestamp_UE_tx[antenna_index]%(FRAME_MAX_SIZE)],buf+sizeof(openair0_timestamp)+2*sizeof(int16_t),(nsamps<<2));
}
//printf("Received UE TX samples for antenna %d, nsamps %d (%d)\n",antenna_index,nsamps,(int)(bytes_received>>2));
if (rrh_desc->time_req.tv_sec !=0 || rrh_desc->time_req.tv_nsec !=0)
{
nanosleep(&rrh_desc->time_req,&time_rem);
}
while (sync_UE_rx[antenna_index]==0)
nanosleep(&time_req_1us,&time_rem_1us);
pthread_mutex_lock(&sync_UE_mutex[antenna_index]);
sync_UE_rx[antenna_index]++;
if (!sync_UE_rx[antenna_index])
{
counter_UE_tx[antenna_index]=(counter_UE_tx[antenna_index]+nsamps)%FRAME_MAX_SIZE;
nsamps_UE[antenna_index]=nsamps;
}
else
{
printf("rrh_eNB_proc thread is busy, will exit\n");
exit(-1);
}
pthread_mutex_unlock(&sync_UE_mutex[antenna_index]);
break; break;
} }
// rrh_exit = 1; // rrh_exit = 1;
...@@ -141,6 +515,211 @@ void *rrh_eNB_thread(void *arg) { ...@@ -141,6 +515,211 @@ void *rrh_eNB_thread(void *arg) {
close(sockid); close(sockid);
rrh_UE_thread_status = 0;
pthread_exit(&rrh_UE_thread_status);
return(0);
}
void *rrh_eNB_thread(void *arg) {
int sockid=-1;
struct sockaddr_in serveraddr;
struct sockaddr clientaddr;
socklen_t clientaddrlen;
rrh_desc_t *rrh_desc = (rrh_desc_t *)arg;
char str[INET_ADDRSTRLEN];
//int8_t msg_header[4+sizeof(openair0_timestamp)];
int8_t buf[BUF_LEN];
int16_t cmd,nsamps,antenna_index;
ssize_t bytes_received;
ssize_t bytes_sent;
openair0_timestamp temp, last_hw_counter=0;
struct timespec time_rem;
struct timespec time_req_1us, time_rem_1us;
time_req_1us.tv_sec = 0;
time_req_1us.tv_nsec = 1000;
while (rrh_exit==0) {
sockid=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
if (sockid==-1) {
perror("Cannot create eNB socket: ");
rrh_exit=1;
}
bzero((char *)&serveraddr,sizeof(serveraddr));
serveraddr.sin_family=AF_INET;
serveraddr.sin_port=htons(rrh_desc->eNB_port);
inet_pton(AF_INET,rrh_desc->eNB_dest_ip,&serveraddr.sin_addr.s_addr);
inet_ntop(AF_INET, &(serveraddr.sin_addr), str, INET_ADDRSTRLEN);
printf("Binding to eNB socket for %s:%d\n",str,ntohs(serveraddr.sin_port));
if (bind(sockid,(struct sockaddr *)&serveraddr,sizeof(serveraddr))<0) {
perror("Cannot bind to eNB socket: ");
rrh_exit = 1;
}
// now wait for commands from eNB
while (rrh_exit==0) {
// get header info
bzero((void*)&clientaddr,sizeof(struct sockaddr));
clientaddrlen = sizeof(struct sockaddr);
//printf("Waiting for eNB ...\n");
bytes_received = recvfrom(sockid,buf,BUF_LEN,0,&clientaddr,&clientaddrlen);
cmd = buf[0];
antenna_index = cmd>>1;
timestamp_eNB_tx[antenna_index] = *(openair0_timestamp*)(buf+4);
nsamps = *(int16_t *)(buf+2);
cmd = cmd&1;
inet_ntop(AF_INET, &(((struct sockaddr_in*)&clientaddr)->sin_addr), str, INET_ADDRSTRLEN);
switch (cmd) {
case 0: // RX
printf("Received eNB RX request for antenna %d, nsamps %d (from %s:%d)\n",antenna_index,nsamps,str,
ntohs(((struct sockaddr_in*)&clientaddr)->sin_port));
if (!eNB_rx_started)
{
eNB_rx_started=1; // set this flag to 1 to indicate that eNB started
if (RT_FLAG==1)
{
last_hw_counter=hw_counter;
}
}
else
{
if (RT_FLAG==1) {
if (hw_counter > last_hw_counter+1)
{
printf("L");
// goto end_copy_eNB;
}
else
{
while (hw_counter != last_hw_counter+1)
nanosleep(&time_req_1us,&time_rem_1us);
}
}
}
// send return
temp=*(openair0_timestamp*)&rx_buffer_eNB[antenna_index][timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE)];
*(openair0_timestamp*)&rx_buffer_eNB[antenna_index][timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE)]=timestamp_eNB_rx[antenna_index];
if ((timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE)+nsamps) > FRAME_MAX_SIZE) // Wrap around if nsamps exceeds the buffer limit
{
if ((timestamp_UE_tx[antenna_index]%FRAME_MAX_SIZE < nsamps) && (UE_tx_started==1))
{
printf("eNB underflow\n");
if (NRT_FLAG==1)
{
while ((timestamp_UE_tx[antenna_index]%FRAME_MAX_SIZE) < nsamps)
nanosleep(&time_req_1us,&time_rem_1us);
}
}
if ((bytes_sent = sendto(sockid,
&rx_buffer_eNB[antenna_index][timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE)],
((FRAME_MAX_SIZE)<<2) - ((timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE))<<2) + sizeof(openair0_timestamp),
0,
(struct sockaddr*)&clientaddr,
sizeof(struct sockaddr)))<0)
perror("RRH eNB : sendto for RX");
if ((bytes_sent = sendto(sockid,
&rx_buffer_eNB[antenna_index][0],
(nsamps<<2) - ((FRAME_MAX_SIZE)<<2) + ((timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE))<<2),
0,
(struct sockaddr*)&clientaddr,
sizeof(struct sockaddr)))<0)
perror("RRH eNB : sendto for RX");
}
else{
if (((timestamp_eNB_rx[antenna_index]%FRAME_MAX_SIZE)< timestamp_UE_tx[antenna_index]%FRAME_MAX_SIZE) && (((timestamp_eNB_rx[antenna_index]+nsamps)%FRAME_MAX_SIZE) > (timestamp_UE_tx[antenna_index]%FRAME_MAX_SIZE)) && (UE_tx_started==1))
{
printf("eNB underflow\n");
if (NRT_FLAG==1)
{
while (((timestamp_eNB_rx[antenna_index]+nsamps)%FRAME_MAX_SIZE) > (timestamp_UE_tx[antenna_index]%FRAME_MAX_SIZE))
nanosleep(&time_req_1us,&time_rem_1us);
}
}
if ((bytes_sent = sendto(sockid,
&rx_buffer_eNB[antenna_index][timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE)],
(nsamps<<2)+sizeof(openair0_timestamp),
0,
(struct sockaddr*)&clientaddr,
sizeof(struct sockaddr)))<0)
perror("RRH eNB : sendto for RX");
}
printf("bytes_sent %d(timestamp_eNB_rx[%d] %d)\n",(int)bytes_sent,antenna_index,(int)timestamp_eNB_rx[antenna_index]);
*(openair0_timestamp*)&rx_buffer_eNB[antenna_index][timestamp_eNB_rx[antenna_index]%(FRAME_MAX_SIZE)]=temp;
timestamp_eNB_rx[antenna_index]+=nsamps;
last_hw_counter=hw_counter;
break;
case 1: // TX
if (NRT_FLAG==1)
{
nrt_eNB_counter[antenna_index]++;
}
printf("Received eNB TX request for antenna %d, nsamps %d, timestamp %d bytes_received %d\n",antenna_index,nsamps,(int)timestamp_eNB_tx[antenna_index],(int)bytes_received);
if ((timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE)+nsamps) > FRAME_MAX_SIZE) // Wrap around if nsamps exceeds the buffer limit
{
memcpy(&tx_buffer_eNB[antenna_index][timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE)],buf+sizeof(openair0_timestamp)+2*sizeof(int16_t),((FRAME_MAX_SIZE)<<2)-((timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE))<<2));
//printf("Done first part size : %d\n",(int32_t)(((FRAME_MAX_SIZE)<<2)-((timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE))<<2)));
memcpy(&tx_buffer_eNB[antenna_index][0],buf+sizeof(openair0_timestamp)+2*sizeof(int16_t) + ((FRAME_MAX_SIZE)<<2) -((timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE))<<2),(nsamps<<2)-((FRAME_MAX_SIZE)<<2)+((timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE))<<2));
//printf("Received eNB TX samples for antenna %d, nsamps %d (%d)\n",antenna_index,nsamps,(int)(bytes_received>>2));
}
else {
memcpy(&tx_buffer_eNB[antenna_index][timestamp_eNB_tx[antenna_index]%(FRAME_MAX_SIZE)],buf+sizeof(openair0_timestamp)+2*sizeof(int16_t),nsamps<<2);
}
if (rrh_desc->time_req.tv_sec !=0 || rrh_desc->time_req.tv_nsec !=0)
{
nanosleep(&rrh_desc->time_req,&time_rem);
}
while (sync_eNB_rx[antenna_index]==0)
nanosleep(&time_req_1us,&time_rem_1us);
pthread_mutex_lock(&sync_eNB_mutex[antenna_index]);
sync_eNB_rx[antenna_index]++;
if (!sync_eNB_rx[antenna_index])
{
counter_eNB_tx[antenna_index]=(counter_eNB_tx[antenna_index]+nsamps)%FRAME_MAX_SIZE;
nsamps_eNB[antenna_index]=nsamps;
}
else
{
printf("rrh_eNB_proc thread is busy, will exit\n");
exit(-1);
}
pthread_mutex_unlock(&sync_eNB_mutex[antenna_index]);
break;
} //switch(cmd)
} //while (rrh_exit==0)
} //while (rrh_exit==0)
close(sockid);
rrh_eNB_thread_status = 0;
pthread_exit(&rrh_eNB_thread_status);
return(0); return(0);
} }
...@@ -168,23 +747,99 @@ void signal_handler(int sig) ...@@ -168,23 +747,99 @@ void signal_handler(int sig)
int main(int argc, char **argv) { int main(int argc, char **argv) {
pthread_t main_rrh_eNB_thread; pthread_t main_rrh_eNB_thread, main_rrh_UE_thread, main_rrh_proc_eNB_thread, main_rrh_proc_UE_thread, main_timer_proc_thread;
pthread_attr_t attr; pthread_attr_t attr, attr_proc, attr_timer;
struct sched_param sched_param_rrh; struct sched_param sched_param_rrh, sched_param_rrh_proc, sched_param_timer;
int error_code; int error_code_eNB, error_code_UE, error_code_proc_eNB, error_code_proc_UE, error_code_timer;
int i;
int opt;
int nsecs=0, rt_period=0;
nsecs = 0;
while ((opt = getopt(argc, argv, "d:t:r")) != -1) {
switch (opt) {
case 'd':
nsecs = atoi(optarg);
NRT_FLAG=1;
break;
case 't':
rt_period = atoi(optarg);
RT_FLAG=1;
NRT_FLAG=0;
break;
case 'r':
rt_period = DEFAULT_PERIOD_NS;
RT_FLAG=1;
NRT_FLAG=0;
break;
default: /* '?' */
fprintf(stderr, "Usage: %s [-d nsecs]\n", argv[0]);
exit(-1);
}
}
// if (optind >= argc) {
// fprintf(stderr, "Expected argument after options\n");
// exit(EXIT_FAILURE);
// }
struct itimerspec timer;
// setup the timer (1s delay, 1s reload)
timer.it_value.tv_sec = rt_period/1000000000;
timer.it_value.tv_nsec = rt_period%1000000000;
timer.it_interval.tv_sec = rt_period/1000000000;
timer.it_interval.tv_nsec = rt_period%1000000000;
rrh_desc_t rrh; rrh_desc_t rrh;
rrh.time_req.tv_sec = nsecs/1000000000;
rrh.time_req.tv_nsec = nsecs%1000000000;
rrh.eNB_port = RRH_eNB_PORT;
rrh.eNB_dest_ip = RRH_eNB_DEST_IP;
rrh.UE_port = RRH_UE_PORT;
rrh.UE_dest_ip = RRH_UE_DEST_IP;
// to make a graceful exit when ctrl-c is pressed
signal(SIGSEGV, signal_handler);
signal(SIGINT, signal_handler);
rrh.port = 22222;
rrh.dest_ip = "127.0.0.1";
pthread_attr_init(&attr); pthread_attr_init(&attr);
sched_param_rrh.sched_priority = sched_get_priority_max(SCHED_FIFO); sched_param_rrh.sched_priority = sched_get_priority_max(SCHED_FIFO);
pthread_attr_init(&attr_proc);
sched_param_rrh_proc.sched_priority = sched_get_priority_max(SCHED_FIFO-1);
pthread_attr_init(&attr_timer);
sched_param_timer.sched_priority = sched_get_priority_max(SCHED_FIFO-2);
pthread_attr_setschedparam(&attr,&sched_param_rrh); pthread_attr_setschedparam(&attr,&sched_param_rrh);
pthread_attr_setschedpolicy(&attr,SCHED_FIFO); pthread_attr_setschedpolicy(&attr,SCHED_FIFO);
pthread_attr_setschedparam(&attr_proc,&sched_param_rrh_proc);
pthread_attr_setschedpolicy(&attr_proc,SCHED_FIFO-1);
pthread_attr_setschedparam(&attr_timer,&sched_param_timer);
pthread_attr_setschedpolicy(&attr_timer,SCHED_FIFO-2);
for (i=0;i<4;i++){
pthread_mutex_init(&sync_eNB_mutex[i],NULL);
pthread_cond_init(&sync_eNB_cond[i],NULL);
pthread_mutex_init(&sync_UE_mutex[i],NULL);
pthread_cond_init(&sync_UE_cond[i],NULL);
}
pthread_mutex_init(&timer_mutex,NULL);
error_code_eNB = pthread_create(&main_rrh_eNB_thread, &attr, rrh_eNB_thread, (void *)&rrh);
error_code_UE = pthread_create(&main_rrh_UE_thread, &attr, rrh_UE_thread, (void *)&rrh);
error_code_proc_UE = pthread_create(&main_rrh_proc_UE_thread, &attr_proc, rrh_proc_UE_thread,NULL);
error_code_proc_eNB = pthread_create(&main_rrh_proc_eNB_thread, &attr_proc, rrh_proc_eNB_thread, NULL);
error_code_timer = pthread_create(&main_timer_proc_thread, &attr_timer, timer_proc, (void *)&timer);
error_code = pthread_create(&main_rrh_eNB_thread, &attr, rrh_eNB_thread, (void *)&rrh); if (error_code_eNB) {printf("Error while creating eNB thread\n"); exit(-1);}
if (error_code_UE) {printf("Error while creating UE thread\n"); exit(-1);}
if (error_code_proc_UE) {printf("Error while creating UE proc thread\n"); exit(-1);}
if (error_code_proc_eNB) {printf("Error while creating eNB proc thread\n"); exit(-1);}
if (error_code_timer) {printf("Error while creating timer proc thread\n"); exit(-1);}
printf("TYPE <CTRL-C> TO TERMINATE\n"); printf("TYPE <CTRL-C> TO TERMINATE\n");
...@@ -192,4 +847,6 @@ int main(int argc, char **argv) { ...@@ -192,4 +847,6 @@ int main(int argc, char **argv) {
sleep(1); sleep(1);
//pthread_join(main_rrh_eNB_thread,&rrh_eNB_thread_status);
return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment