Commit 8dcb8752 authored by Cedric Roux's avatar Cedric Roux

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4018 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 67257cea
...@@ -30,6 +30,10 @@ static unsigned int byte_tx_count; ...@@ -30,6 +30,10 @@ static unsigned int byte_tx_count;
unsigned int Master_list_rx; unsigned int Master_list_rx;
static uint64_t seq_num_tx = 0; static uint64_t seq_num_tx = 0;
#if defined(ENABLE_PGM_TRANSPORT)
extern unsigned int pgm_would_block;
#endif
mapping transport_names[] = { mapping transport_names[] = {
{"WAIT PM TRANSPORT INFO", EMU_TRANSPORT_INFO_WAIT_PM}, {"WAIT PM TRANSPORT INFO", EMU_TRANSPORT_INFO_WAIT_PM},
{"WAIT SM TRANSPORT INFO", EMU_TRANSPORT_INFO_WAIT_SM}, {"WAIT SM TRANSPORT INFO", EMU_TRANSPORT_INFO_WAIT_SM},
...@@ -37,6 +41,9 @@ mapping transport_names[] = { ...@@ -37,6 +41,9 @@ mapping transport_names[] = {
{"ENB_TRANSPORT INFO", EMU_TRANSPORT_INFO_ENB}, {"ENB_TRANSPORT INFO", EMU_TRANSPORT_INFO_ENB},
{"UE TRANSPORT INFO", EMU_TRANSPORT_INFO_UE}, {"UE TRANSPORT INFO", EMU_TRANSPORT_INFO_UE},
{"RELEASE TRANSPORT INFO", EMU_TRANSPORT_INFO_RELEASE}, {"RELEASE TRANSPORT INFO", EMU_TRANSPORT_INFO_RELEASE},
#if defined(ENABLE_PGM_TRANSPORT)
{"NACK TRANSPORT INFO", EMU_TRANSPORT_NACK},
#endif
{NULL, -1} {NULL, -1}
}; };
...@@ -77,6 +84,8 @@ int emu_transport_handle_sync(bypass_msg_header_t *messg) ...@@ -77,6 +84,8 @@ int emu_transport_handle_sync(bypass_msg_header_t *messg)
{ {
int m_id; int m_id;
DevAssert(messg != NULL);
// determite the total number of remote enb & ue // determite the total number of remote enb & ue
oai_emulation.info.nb_enb_remote += messg->nb_enb; oai_emulation.info.nb_enb_remote += messg->nb_enb;
oai_emulation.info.nb_ue_remote += messg->nb_ue; oai_emulation.info.nb_ue_remote += messg->nb_ue;
...@@ -123,6 +132,7 @@ int emu_transport_handle_sync(bypass_msg_header_t *messg) ...@@ -123,6 +132,7 @@ int emu_transport_handle_sync(bypass_msg_header_t *messg)
int emu_transport_handle_wait_sm(bypass_msg_header_t *messg) int emu_transport_handle_wait_sm(bypass_msg_header_t *messg)
{ {
DevAssert(messg != NULL);
Master_list_rx = ((Master_list_rx) | (1 << messg->master_id)); Master_list_rx = ((Master_list_rx) | (1 << messg->master_id));
return 0; return 0;
...@@ -130,6 +140,7 @@ int emu_transport_handle_wait_sm(bypass_msg_header_t *messg) ...@@ -130,6 +140,7 @@ int emu_transport_handle_wait_sm(bypass_msg_header_t *messg)
int emu_transport_handle_wait_pm(bypass_msg_header_t *messg) int emu_transport_handle_wait_pm(bypass_msg_header_t *messg)
{ {
DevAssert(messg != NULL);
if (messg->master_id == 0) { if (messg->master_id == 0) {
Master_list_rx = ((Master_list_rx) | (1 << messg->master_id)); Master_list_rx = ((Master_list_rx) | (1 << messg->master_id));
} }
...@@ -145,6 +156,10 @@ int emu_transport_handle_enb_info(bypass_msg_header_t *messg, ...@@ -145,6 +156,10 @@ int emu_transport_handle_enb_info(bypass_msg_header_t *messg,
eNB_transport_info_t *eNB_info; eNB_transport_info_t *eNB_info;
int total_header = 0, total_tbs = 0; int total_header = 0, total_tbs = 0;
int n_dci, n_enb; int n_dci, n_enb;
DevAssert(bytes_read >= 0);
DevAssert(messg != NULL);
#ifdef DEBUG_EMU #ifdef DEBUG_EMU
LOG_D(EMU," RX ENB_TRANSPORT INFO from master %d \n",messg->master_id); LOG_D(EMU," RX ENB_TRANSPORT INFO from master %d \n",messg->master_id);
#endif #endif
...@@ -169,7 +184,7 @@ int emu_transport_handle_enb_info(bypass_msg_header_t *messg, ...@@ -169,7 +184,7 @@ int emu_transport_handle_enb_info(bypass_msg_header_t *messg,
total_header+total_tbs, total_header,total_tbs); total_header+total_tbs, total_header,total_tbs);
} }
memcpy (&eNB_transport_info[n_enb],eNB_info, total_header + total_tbs); memcpy(&eNB_transport_info[n_enb], eNB_info, total_header + total_tbs);
/* Go to the next eNB info */ /* Go to the next eNB info */
eNB_info += (total_header + total_tbs); eNB_info += (total_header + total_tbs);
...@@ -202,6 +217,9 @@ int emu_transport_handle_ue_info(bypass_msg_header_t *messg, ...@@ -202,6 +217,9 @@ int emu_transport_handle_ue_info(bypass_msg_header_t *messg,
int n_ue, n_enb; int n_ue, n_enb;
int total_tbs = 0, total_header = 0; int total_tbs = 0, total_header = 0;
DevAssert(bytes_read >= 0);
DevAssert(messg != NULL);
#ifdef DEBUG_EMU #ifdef DEBUG_EMU
LOG_D(EMU," RX UE TRANSPORT INFO from master %d\n",messg->master_id); LOG_D(EMU," RX UE TRANSPORT INFO from master %d\n",messg->master_id);
#endif #endif
...@@ -226,7 +244,7 @@ int emu_transport_handle_ue_info(bypass_msg_header_t *messg, ...@@ -226,7 +244,7 @@ int emu_transport_handle_ue_info(bypass_msg_header_t *messg,
n_ue, total_header+total_tbs,total_header,total_tbs); n_ue, total_header+total_tbs,total_header,total_tbs);
} }
memcpy (&UE_transport_info[n_ue], UE_info, total_header + total_tbs); memcpy(&UE_transport_info[n_ue], UE_info, total_header + total_tbs);
/* Go to the next UE info */ /* Go to the next UE info */
UE_info += (total_header + total_tbs); UE_info += (total_header + total_tbs);
...@@ -263,7 +281,8 @@ int bypass_rx_data(unsigned int frame, unsigned int last_slot, ...@@ -263,7 +281,8 @@ int bypass_rx_data(unsigned int frame, unsigned int last_slot,
#if defined(ENABLE_NEW_MULTICAST) #if defined(ENABLE_NEW_MULTICAST)
# if defined(ENABLE_PGM_TRANSPORT) # if defined(ENABLE_PGM_TRANSPORT)
num_bytesP = pgm_recv_msg(oai_emulation.info.multicast_group, num_bytesP = pgm_recv_msg(oai_emulation.info.multicast_group,
(uint8_t *)&rx_bufferP[0], sizeof(rx_bufferP)); (uint8_t *)&rx_bufferP[0], sizeof(rx_bufferP),
frame, next_slot);
DevCheck(num_bytesP > 0, num_bytesP, 0, 0); DevCheck(num_bytesP > 0, num_bytesP, 0, 0);
# else # else
...@@ -298,6 +317,9 @@ int bypass_rx_data(unsigned int frame, unsigned int last_slot, ...@@ -298,6 +317,9 @@ int bypass_rx_data(unsigned int frame, unsigned int last_slot,
num_bytesP, map_int_to_str(transport_names, messg->Message_type), num_bytesP, map_int_to_str(transport_names, messg->Message_type),
messg->master_id, messg->master_id,
messg->seq_num); messg->seq_num);
#if defined(ENABLE_PGM_TRANSPORT)
if (messg->Message_type != EMU_TRANSPORT_NACK)
#endif
DevCheck4((messg->frame == frame) && (messg->subframe == (next_slot>>1)), DevCheck4((messg->frame == frame) && (messg->subframe == (next_slot>>1)),
messg->frame, frame, messg->subframe, next_slot>>1); messg->frame, frame, messg->subframe, next_slot>>1);
#else #else
...@@ -331,6 +353,18 @@ int bypass_rx_data(unsigned int frame, unsigned int last_slot, ...@@ -331,6 +353,18 @@ int bypass_rx_data(unsigned int frame, unsigned int last_slot,
Master_list_rx = oai_emulation.info.master_list; Master_list_rx = oai_emulation.info.master_list;
LOG_E(EMU, "RX EMU_TRANSPORT_INFO_RELEASE\n"); LOG_E(EMU, "RX EMU_TRANSPORT_INFO_RELEASE\n");
break; break;
#if defined(ENABLE_PGM_TRANSPORT)
case EMU_TRANSPORT_NACK:
if (messg->failing_master_id == oai_emulation.info.master_id) {
/* We simply re-send the last message */
pgm_link_send_msg(oai_emulation.info.multicast_group,
(uint8_t *)bypass_tx_buffer, byte_tx_count);
} else {
/* Sleep awhile till other peers have recovered data */
usleep(500);
}
break;
#endif
default: default:
LOG_E(EMU, "[MAC][BYPASS] ERROR RX UNKNOWN MESSAGE\n"); LOG_E(EMU, "[MAC][BYPASS] ERROR RX UNKNOWN MESSAGE\n");
//mac_xface->macphy_exit(""); //mac_xface->macphy_exit("");
...@@ -452,6 +486,13 @@ int multicast_link_write_sock (int groupP, char *dataP, unsigned int sizeP) ...@@ -452,6 +486,13 @@ int multicast_link_write_sock (int groupP, char *dataP, unsigned int sizeP)
} }
#endif #endif
#if defined(ENABLE_PGM_TRANSPORT)
void bypass_tx_nack(unsigned int frame, unsigned int next_slot)
{
bypass_tx_data(NACK_TRANSPORT, frame, next_slot);
}
#endif
/***************************************************************************/ /***************************************************************************/
void bypass_tx_data(emu_transport_info_t Type, unsigned int frame, unsigned int next_slot) void bypass_tx_data(emu_transport_info_t Type, unsigned int frame, unsigned int next_slot)
{ {
...@@ -474,12 +515,29 @@ void bypass_tx_data(emu_transport_info_t Type, unsigned int frame, unsigned int ...@@ -474,12 +515,29 @@ void bypass_tx_data(emu_transport_info_t Type, unsigned int frame, unsigned int
messg->frame = frame; messg->frame = frame;
messg->subframe = next_slot>>1; messg->subframe = next_slot>>1;
messg->seq_num = seq_num_tx; messg->seq_num = seq_num_tx;
messg->failing_master_id = 0;
seq_num_tx++; seq_num_tx++;
byte_tx_count = sizeof (bypass_msg_header_t) + sizeof ( byte_tx_count = sizeof (bypass_msg_header_t) + sizeof (
bypass_proto2multicast_header_t); bypass_proto2multicast_header_t);
#if defined(ENABLE_PGM_TRANSPORT)
if (Type == NACK_TRANSPORT) {
int i;
messg->Message_type = EMU_TRANSPORT_NACK;
for (i = 0; i < oai_emulation.info.nb_master; i++) {
/* Skip our id */
if (i == oai_emulation.info.master_id)
continue;
if ((Master_list_rx & (1 << i)) == 0) {
messg->failing_master_id = i;
break;
}
}
LOG_T(EMU,"[TX_DATA] NACK TRANSPORT\n");
} else
#endif
if (Type == WAIT_PM_TRANSPORT) { if (Type == WAIT_PM_TRANSPORT) {
messg->Message_type = EMU_TRANSPORT_INFO_WAIT_PM; messg->Message_type = EMU_TRANSPORT_INFO_WAIT_PM;
LOG_T(EMU,"[TX_DATA] WAIT SYNC PM TRANSPORT\n"); LOG_T(EMU,"[TX_DATA] WAIT SYNC PM TRANSPORT\n");
......
...@@ -20,6 +20,9 @@ typedef enum { ...@@ -20,6 +20,9 @@ typedef enum {
EMU_TRANSPORT_INFO_ENB, EMU_TRANSPORT_INFO_ENB,
EMU_TRANSPORT_INFO_UE, EMU_TRANSPORT_INFO_UE,
EMU_TRANSPORT_INFO_RELEASE EMU_TRANSPORT_INFO_RELEASE
#if defined(ENABLE_PGM_TRANSPORT)
,EMU_TRANSPORT_NACK
#endif
} emu_transport_info_t; } emu_transport_info_t;
#define WAIT_PM_TRANSPORT 1 #define WAIT_PM_TRANSPORT 1
...@@ -28,6 +31,9 @@ typedef enum { ...@@ -28,6 +31,9 @@ typedef enum {
#define ENB_TRANSPORT 4 #define ENB_TRANSPORT 4
#define UE_TRANSPORT 5 #define UE_TRANSPORT 5
#define RELEASE_TRANSPORT 6 #define RELEASE_TRANSPORT 6
#if defined(ENABLE_PGM_TRANSPORT)
# define NACK_TRANSPORT 7
#endif
#define WAIT_SYNC_TRANSPORT 1 #define WAIT_SYNC_TRANSPORT 1
#define SYNCED_TRANSPORT 2 #define SYNCED_TRANSPORT 2
...@@ -83,7 +89,7 @@ typedef struct { ...@@ -83,7 +89,7 @@ typedef struct {
u8 ue_id[MAX_NUM_DCI]; u8 ue_id[MAX_NUM_DCI];
u16 tbs[MAX_NUM_DCI*2]; // times 2 for dual-stream MIMO formats u16 tbs[MAX_NUM_DCI*2]; // times 2 for dual-stream MIMO formats
u8 transport_blocks[MAX_TRANSPORT_BLOCKS_BUFFER_SIZE]; u8 transport_blocks[MAX_TRANSPORT_BLOCKS_BUFFER_SIZE];
} __attribute__ ((__packed__)) eNB_transport_info_t ; } __attribute__((__packed__)) eNB_transport_info_t ;
typedef struct { typedef struct {
UE_cntl cntl; UE_cntl cntl;
...@@ -93,7 +99,7 @@ typedef struct { ...@@ -93,7 +99,7 @@ typedef struct {
u8 harq_pid[NUMBER_OF_CONNECTED_eNB_MAX]; u8 harq_pid[NUMBER_OF_CONNECTED_eNB_MAX];
u16 tbs[NUMBER_OF_CONNECTED_eNB_MAX]; u16 tbs[NUMBER_OF_CONNECTED_eNB_MAX];
u8 transport_blocks[MAX_TRANSPORT_BLOCKS_BUFFER_SIZE];//*NUMBER_OF_CONNECTED_eNB_MAX]; u8 transport_blocks[MAX_TRANSPORT_BLOCKS_BUFFER_SIZE];//*NUMBER_OF_CONNECTED_eNB_MAX];
} __attribute__ ((__packed__)) UE_transport_info_t ; } __attribute__((__packed__)) UE_transport_info_t ;
/*! \brief */ /*! \brief */
typedef struct bypass_msg_header { typedef struct bypass_msg_header {
...@@ -106,7 +112,8 @@ typedef struct bypass_msg_header { ...@@ -106,7 +112,8 @@ typedef struct bypass_msg_header {
unsigned int frame; unsigned int frame;
unsigned int subframe; unsigned int subframe;
uint64_t seq_num; uint64_t seq_num;
}__attribute__ ((__packed__)) bypass_msg_header_t; unsigned int failing_master_id;
} __attribute__((__packed__)) bypass_msg_header_t;
typedef struct bypass_proto2multicast_header_t { typedef struct bypass_proto2multicast_header_t {
unsigned int size; unsigned int size;
......
...@@ -26,6 +26,10 @@ extern unsigned char NB_INST; ...@@ -26,6 +26,10 @@ extern unsigned char NB_INST;
//#define DEBUG_CONTROL 1 //#define DEBUG_CONTROL 1
//#define DEBUG_EMU 1 //#define DEBUG_EMU 1
#if defined(ENABLE_PGM_TRANSPORT)
extern unsigned int pgm_would_block;
#endif
void emu_transport_sync(void) void emu_transport_sync(void)
{ {
LOG_D(EMU, "Entering EMU transport SYNC is primary master %d\n", LOG_D(EMU, "Entering EMU transport SYNC is primary master %d\n",
...@@ -84,6 +88,7 @@ retry2: ...@@ -84,6 +88,7 @@ retry2:
LOG_D(EMU,"TX secondary master SYNC_TRANSPORT state \n"); LOG_D(EMU,"TX secondary master SYNC_TRANSPORT state \n");
} }
#endif #endif
LOG_D(EMU, "Leaving EMU transport SYNC is primary master %d\n", LOG_D(EMU, "Leaving EMU transport SYNC is primary master %d\n",
oai_emulation.info.is_primary_master); oai_emulation.info.is_primary_master);
} }
...@@ -118,6 +123,9 @@ void emu_transport(unsigned int frame, unsigned int last_slot, ...@@ -118,6 +123,9 @@ void emu_transport(unsigned int frame, unsigned int last_slot,
emu_transport_UL(frame, last_slot, next_slot); emu_transport_UL(frame, last_slot, next_slot);
} }
} }
#if defined(ENABLE_PGM_TRANSPORT)
pgm_would_block = 0;
#endif
vcd_signal_dumper_dump_function_by_name( vcd_signal_dumper_dump_function_by_name(
VCD_SIGNAL_DUMPER_FUNCTIONS_EMU_TRANSPORT, VCD_FUNCTION_OUT); VCD_SIGNAL_DUMPER_FUNCTIONS_EMU_TRANSPORT, VCD_FUNCTION_OUT);
} }
...@@ -236,6 +244,7 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot) ...@@ -236,6 +244,7 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot)
LTE_eNB_DLSCH_t *dlsch_eNB; LTE_eNB_DLSCH_t *dlsch_eNB;
unsigned short ue_id; unsigned short ue_id;
u8 nb_total_dci; u8 nb_total_dci;
int i;
#ifdef DEBUG_EMU #ifdef DEBUG_EMU
LOG_D(EMU, " pbch fill phy eNB %d vars for slot %d fault %d\n", LOG_D(EMU, " pbch fill phy eNB %d vars for slot %d fault %d\n",
...@@ -271,7 +280,7 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot) ...@@ -271,7 +280,7 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot)
memcpy(PHY_vars_eNB_g[enb_id]->dci_alloc[(next_slot>>1)&1], memcpy(PHY_vars_eNB_g[enb_id]->dci_alloc[(next_slot>>1)&1],
&eNB_transport_info[enb_id].dci_alloc, &eNB_transport_info[enb_id].dci_alloc,
(nb_total_dci)* sizeof(DCI_ALLOC_t)); (nb_total_dci) * sizeof(DCI_ALLOC_t));
n_dci_dl=0; n_dci_dl=0;
// fill dlsch_eNB structure from DCI // fill dlsch_eNB structure from DCI
...@@ -297,7 +306,6 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot) ...@@ -297,7 +306,6 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot)
#endif #endif
break; break;
case 1: //RA: case 1: //RA:
memcpy(PHY_vars_eNB_g[enb_id]->dlsch_eNB_ra->harq_processes[0]->b, memcpy(PHY_vars_eNB_g[enb_id]->dlsch_eNB_ra->harq_processes[0]->b,
&eNB_transport_info[enb_id].transport_blocks[payload_offset], &eNB_transport_info[enb_id].transport_blocks[payload_offset],
eNB_transport_info[enb_id].tbs[n_dci_dl]); eNB_transport_info[enb_id].tbs[n_dci_dl]);
...@@ -317,7 +325,6 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot) ...@@ -317,7 +325,6 @@ void fill_phy_enb_vars(unsigned int enb_id, unsigned int next_slot)
" enb_id %d ue id is %d rnti is %x dci index %d, harq_pid %d tbs %d \n", " enb_id %d ue id is %d rnti is %x dci index %d, harq_pid %d tbs %d \n",
enb_id, ue_id, eNB_transport_info[enb_id].dci_alloc[n_dci_dl].rnti, enb_id, ue_id, eNB_transport_info[enb_id].dci_alloc[n_dci_dl].rnti,
n_dci_dl, harq_pid, eNB_transport_info[enb_id].tbs[n_dci_dl]); n_dci_dl, harq_pid, eNB_transport_info[enb_id].tbs[n_dci_dl]);
int i;
for (i=0; i<eNB_transport_info[enb_id].tbs[n_dci_dl]; i++) { for (i=0; i<eNB_transport_info[enb_id].tbs[n_dci_dl]; i++) {
LOG_T(EMU, "%x.", LOG_T(EMU, "%x.",
(unsigned char) eNB_transport_info[enb_id].transport_blocks[payload_offset+i]); (unsigned char) eNB_transport_info[enb_id].transport_blocks[payload_offset+i]);
...@@ -363,9 +370,8 @@ void fill_phy_ue_vars(unsigned int ue_id, unsigned int last_slot) ...@@ -363,9 +370,8 @@ void fill_phy_ue_vars(unsigned int ue_id, unsigned int last_slot)
// u8 ue_transport_info_index[NUMBER_OF_eNB_MAX]; // u8 ue_transport_info_index[NUMBER_OF_eNB_MAX];
u8 subframe = (last_slot+1)>>1; u8 subframe = (last_slot+1)>>1;
memcpy (&ue_cntl_delay[ue_id][(last_slot+1)%2], memcpy(&ue_cntl_delay[ue_id][(last_slot+1)%2], &UE_transport_info[ue_id].cntl,
&UE_transport_info[ue_id].cntl, sizeof(UE_cntl));
sizeof(UE_cntl));
#ifdef DEBUG_EMU #ifdef DEBUG_EMU
LOG_D(EMU, LOG_D(EMU,
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#include "UTIL/LOG/log.h" #include "UTIL/LOG/log.h"
// #define ENABLE_PGM_DEBUG
typedef struct { typedef struct {
pgm_sock_t *sock; pgm_sock_t *sock;
uint16_t port; uint16_t port;
...@@ -30,6 +32,8 @@ pgm_multicast_group_t pgm_multicast_group[MULTICAST_LINK_NUM_GROUPS]; ...@@ -30,6 +32,8 @@ pgm_multicast_group_t pgm_multicast_group[MULTICAST_LINK_NUM_GROUPS];
static static
int pgm_create_socket(int index, const char *if_addr); int pgm_create_socket(int index, const char *if_addr);
unsigned int pgm_would_block = 1;
#if defined(ENABLE_PGM_DEBUG) #if defined(ENABLE_PGM_DEBUG)
static void static void
log_handler ( log_handler (
...@@ -66,38 +70,66 @@ int pgm_oai_init(char *if_name) ...@@ -66,38 +70,66 @@ int pgm_oai_init(char *if_name)
return pgm_create_socket(oai_emulation.info.multicast_group, if_name); return pgm_create_socket(oai_emulation.info.multicast_group, if_name);
} }
int pgm_recv_msg(int group, uint8_t *buffer, uint32_t length) int pgm_recv_msg(int group, uint8_t *buffer, uint32_t length,
unsigned int frame, unsigned int next_slot)
{ {
size_t num_bytes = 0; size_t num_bytes = 0;
int status = 0; int status = 0;
pgm_error_t* pgm_err = NULL; pgm_error_t* pgm_err = NULL;
struct pgm_sockaddr_t from; struct pgm_sockaddr_t from;
socklen_t fromlen = sizeof(from); socklen_t fromlen = sizeof(from);
uint32_t timeout = 0;
int flags = 0;
if (pgm_would_block == 0) {
flags = MSG_DONTWAIT;
}
DevCheck((group <= MULTICAST_LINK_NUM_GROUPS) && (group >= 0), DevCheck((group <= MULTICAST_LINK_NUM_GROUPS) && (group >= 0),
group, MULTICAST_LINK_NUM_GROUPS, 0); group, MULTICAST_LINK_NUM_GROUPS, 0);
LOG_I(EMU, "[PGM] Entering recv function for group %d\n", group); LOG_I(EMU, "[PGM] Entering recv function for group %d\n", group);
status = pgm_recvfrom(pgm_multicast_group[group].sock, do {
buffer, status = pgm_recvfrom(pgm_multicast_group[group].sock,
length, buffer,
0, length,
&num_bytes, flags,
&from, &num_bytes,
&fromlen, &from,
&pgm_err); &fromlen,
&pgm_err);
if (PGM_IO_STATUS_NORMAL == status) {
LOG_D(EMU, "[PGM] Received %d bytes for group %d\n", num_bytes, group); if (PGM_IO_STATUS_NORMAL == status) {
return num_bytes; LOG_D(EMU, "[PGM] Received %d bytes for group %d\n", num_bytes, group);
} else { return num_bytes;
if (pgm_err) { } else if (PGM_IO_STATUS_TIMER_PENDING == status) {
LOG_E(EMU, "[PGM] recvform failed: %s", pgm_err->message); if (pgm_would_block == 0) {
pgm_error_free (pgm_err); /* We sleep for 50 usec */
pgm_err = NULL; usleep(50);
timeout ++;
if (timeout == (1000000 / 50))
{
LOG_W(EMU, "[PGM] A packet has been lost -> ask for retransmit\n");
/* If we do not receive a packet after 10000usec
* -> send a NACK */
bypass_tx_nack(frame, next_slot);
timeout = 0;
}
}
} else if (PGM_IO_STATUS_RESET == status) {
LOG_W(EMU, "[PGM] Got session reset\n");
} else {
LOG_D(EMU, "[PGM] Got status %d\n", status);
if (pgm_err) {
LOG_E(EMU, "[PGM] recvform failed: %s", pgm_err->message);
pgm_error_free (pgm_err);
pgm_err = NULL;
}
} }
} } while(status != PGM_IO_STATUS_NORMAL);
return -1; return -1;
} }
...@@ -106,7 +138,9 @@ int pgm_link_send_msg(int group, uint8_t *data, uint32_t len) ...@@ -106,7 +138,9 @@ int pgm_link_send_msg(int group, uint8_t *data, uint32_t len)
int status; int status;
size_t bytes_written = 0; size_t bytes_written = 0;
status = pgm_send(pgm_multicast_group[group].sock, data, len, &bytes_written); do {
status = pgm_send(pgm_multicast_group[group].sock, data, len, &bytes_written);
} while(status == PGM_IO_STATUS_WOULD_BLOCK);
if (status != PGM_IO_STATUS_NORMAL) { if (status != PGM_IO_STATUS_NORMAL) {
return -1; return -1;
...@@ -125,13 +159,16 @@ int pgm_create_socket(int index, const char *if_addr) ...@@ -125,13 +159,16 @@ int pgm_create_socket(int index, const char *if_addr)
int sqns = 100; int sqns = 100;
int port; int port;
struct pgm_sockaddr_t addr; struct pgm_sockaddr_t addr;
int blocking = 0; int blocking = 1;
int multicast_loop = 0; int multicast_loop = 0;
int multicast_hops = 0; int multicast_hops = 0;
int dscp, i; int dscp, i;
port = udp_encap_port; port = udp_encap_port;
/* Use PGM */
udp_encap_port = 0;
LOG_D(EMU, "[PGM] Preparing socket for group %d and address %s\n", LOG_D(EMU, "[PGM] Preparing socket for group %d and address %s\n",
index, if_addr); index, if_addr);
...@@ -179,7 +216,7 @@ int pgm_create_socket(int index, const char *if_addr) ...@@ -179,7 +216,7 @@ int pgm_create_socket(int index, const char *if_addr)
passive = 0, passive = 0,
peer_expiry = pgm_secs (300), peer_expiry = pgm_secs (300),
spmr_expiry = pgm_msecs (250), spmr_expiry = pgm_msecs (250),
nak_bo_ivl = pgm_msecs (50), nak_bo_ivl = pgm_msecs (10),
nak_rpt_ivl = pgm_secs (2), nak_rpt_ivl = pgm_secs (2),
nak_rdata_ivl = pgm_secs (2), nak_rdata_ivl = pgm_secs (2),
nak_data_retries = 50, nak_data_retries = 50,
...@@ -287,6 +324,9 @@ int pgm_create_socket(int index, const char *if_addr) ...@@ -287,6 +324,9 @@ int pgm_create_socket(int index, const char *if_addr)
sizeof(struct group_req)); sizeof(struct group_req));
} }
pgm_freeaddrinfo(res);
res = NULL;
/* set IP parameters */ /* set IP parameters */
multicast_hops = 64; multicast_hops = 64;
dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */ dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */
...@@ -314,11 +354,11 @@ err_abort: ...@@ -314,11 +354,11 @@ err_abort:
pgm_multicast_group[index].sock = NULL; pgm_multicast_group[index].sock = NULL;
} }
if (NULL != res) { if (NULL != res) {
pgm_freeaddrinfo (res); pgm_freeaddrinfo(res);
res = NULL; res = NULL;
} }
if (NULL != pgm_err) { if (NULL != pgm_err) {
pgm_error_free (pgm_err); pgm_error_free(pgm_err);
pgm_err = NULL; pgm_err = NULL;
} }
......
...@@ -6,7 +6,8 @@ ...@@ -6,7 +6,8 @@
int pgm_oai_init(char *if_name); int pgm_oai_init(char *if_name);
int pgm_recv_msg(int group, uint8_t *buffer, uint32_t length); int pgm_recv_msg(int group, uint8_t *buffer, uint32_t length,
unsigned int frame, unsigned int next_slot);
int pgm_link_send_msg(int group, uint8_t *data, uint32_t len); int pgm_link_send_msg(int group, uint8_t *data, uint32_t len);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment