Commit f3c8ea84 authored by Lionel Gauthier's avatar Lionel Gauthier

sync ENB source code (debug session on PDCP GTP UDP)

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@5965 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 11a003db
...@@ -858,7 +858,8 @@ void *gtpv1u_eNB_task(void *args) ...@@ -858,7 +858,8 @@ void *gtpv1u_eNB_task(void *args)
udp_data_ind_p->buffer_length, udp_data_ind_p->buffer_length,
udp_data_ind_p->peer_port, udp_data_ind_p->peer_port,
udp_data_ind_p->peer_address); udp_data_ind_p->peer_address);
free(udp_data_ind_p->buffer); itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_ind_p->buffer);
udp_data_ind_p->buffer = NULL;
} }
break; break;
...@@ -914,7 +915,8 @@ void *gtpv1u_eNB_task(void *args) ...@@ -914,7 +915,8 @@ void *gtpv1u_eNB_task(void *args)
} }
} }
/* Buffer is no longer needed, free it */ /* Buffer is no longer needed, free it */
free(data_req_p->buffer); itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), data_req_p->buffer);
data_req_p->buffer = NULL;
} }
break; break;
......
...@@ -387,7 +387,8 @@ static void *gtpv1u_thread(void *args) ...@@ -387,7 +387,8 @@ static void *gtpv1u_thread(void *args)
udp_data_ind_p->buffer_length, udp_data_ind_p->buffer_length,
udp_data_ind_p->peer_port, udp_data_ind_p->peer_port,
udp_data_ind_p->peer_address); udp_data_ind_p->peer_address);
free(udp_data_ind_p->buffer); itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), udp_data_ind_p->buffer);
udp_data_ind_p->buffer = NULL;
} }
break; break;
...@@ -453,7 +454,8 @@ static void *gtpv1u_thread(void *args) ...@@ -453,7 +454,8 @@ static void *gtpv1u_thread(void *args)
} }
} }
/* Buffer is no longer needed, free it */ /* Buffer is no longer needed, free it */
free(data_req_p->buffer); itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), data_req_p->buffer);
data_req_p->buffer = NULL;
} }
break; break;
case TERMINATE_MESSAGE: { case TERMINATE_MESSAGE: {
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "sgi.h" #include "sgi.h"
#include "intertask_interface.h" #include "intertask_interface.h"
#include "assertions.h"
#include <netinet/ip6.h> #include <netinet/ip6.h>
#include <netinet/ip.h> #include <netinet/ip.h>
...@@ -187,7 +188,7 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int ...@@ -187,7 +188,7 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int
SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__);
return; return;
} }
message_payload_p = malloc(packet_sizeP); message_payload_p = itti_malloc(TASK_FW_IP, TASK_GTPV1_U, packet_sizeP);
if (message_payload_p == NULL) { if (message_payload_p == NULL) {
SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__);
return; return;
...@@ -201,7 +202,7 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int ...@@ -201,7 +202,7 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int
gtpv1u_tunnel_data_req_p->local_S1u_teid = addr_mapping_p->sgw_S1U_teid; gtpv1u_tunnel_data_req_p->local_S1u_teid = addr_mapping_p->sgw_S1U_teid;
gtpv1u_tunnel_data_req_p->length = packet_sizeP; gtpv1u_tunnel_data_req_p->length = packet_sizeP;
gtpv1u_tunnel_data_req_p->buffer = message_payload_p; gtpv1u_tunnel_data_req_p->buffer = message_payload_p;
SGI_IF_DEBUG("%s send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", __FUNCTION__, gtpv1u_tunnel_data_req_p->S1u_enb_teid, gtpv1u_tunnel_data_req_p->local_S1u_teid, packet_sizeP); SGI_IF_DEBUG("%s ETHER send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", __FUNCTION__, gtpv1u_tunnel_data_req_p->S1u_enb_teid, gtpv1u_tunnel_data_req_p->local_S1u_teid, packet_sizeP);
itti_send_msg_to_task(TASK_GTPV1_U, INSTANCE_DEFAULT, message_p); itti_send_msg_to_task(TASK_GTPV1_U, INSTANCE_DEFAULT, message_p);
...@@ -319,7 +320,8 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int ...@@ -319,7 +320,8 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int
SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__);
return; return;
} }
message_payload_p = malloc(packet_sizeP - sizeof(sgi_data_pP->eh)); AssertFatal((packet_sizeP - sizeof(sgi_data_pP->eh)) > 20, "BAD IP PACKET SIZE");
message_payload_p = itti_malloc(TASK_FW_IP, TASK_GTPV1_U, packet_sizeP - sizeof(sgi_data_pP->eh));
if (message_payload_p == NULL) { if (message_payload_p == NULL) {
SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__); SGI_IF_ERROR("%s OUT OF MEMORY DROP EGRESS PACKET\n", __FUNCTION__);
return; return;
...@@ -331,9 +333,13 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int ...@@ -331,9 +333,13 @@ void sgi_process_raw_packet(sgi_data_t *sgi_data_pP, unsigned char* data_pP, int
//#warning forced S1u_enb_teid to 1 for testing, waiting for MODIFY_BEARER REQUEST //#warning forced S1u_enb_teid to 1 for testing, waiting for MODIFY_BEARER REQUEST
// gtpv1u_tunnel_data_req_p->S1u_enb_teid = 1; // gtpv1u_tunnel_data_req_p->S1u_enb_teid = 1;
gtpv1u_tunnel_data_req_p->local_S1u_teid = addr_mapping_p->sgw_S1U_teid; gtpv1u_tunnel_data_req_p->local_S1u_teid = addr_mapping_p->sgw_S1U_teid;
gtpv1u_tunnel_data_req_p->length = packet_sizeP; gtpv1u_tunnel_data_req_p->length = packet_sizeP - sizeof(sgi_data_pP->eh);
gtpv1u_tunnel_data_req_p->buffer = message_payload_p; gtpv1u_tunnel_data_req_p->buffer = message_payload_p;
SGI_IF_DEBUG("%s send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n", __FUNCTION__, gtpv1u_tunnel_data_req_p->S1u_enb_teid, gtpv1u_tunnel_data_req_p->local_S1u_teid, packet_sizeP); SGI_IF_DEBUG("%s send GTPV1U_TUNNEL_DATA_REQ to GTPV1U S1u_enb_teid %u local_S1u_teid %u size %u\n",
__FUNCTION__,
gtpv1u_tunnel_data_req_p->S1u_enb_teid,
gtpv1u_tunnel_data_req_p->local_S1u_teid,
gtpv1u_tunnel_data_req_p->length);
itti_send_msg_to_task(TASK_GTPV1_U, INSTANCE_DEFAULT, message_p); itti_send_msg_to_task(TASK_GTPV1_U, INSTANCE_DEFAULT, message_p);
......
...@@ -835,6 +835,7 @@ void* sgi_sock_raw_fw_2_gtpv1u_thread(void* args_p) ...@@ -835,6 +835,7 @@ void* sgi_sock_raw_fw_2_gtpv1u_thread(void* args_p)
while (1) { while (1) {
num_bytes = recvfrom(sgi_data_p->sd[socket_index], &sgi_data_p->recv_buffer[0][socket_index], SGI_BUFFER_RECV_LEN, 0, NULL, NULL); num_bytes = recvfrom(sgi_data_p->sd[socket_index], &sgi_data_p->recv_buffer[0][socket_index], SGI_BUFFER_RECV_LEN, 0, NULL, NULL);
if (num_bytes > 0) { if (num_bytes > 0) {
SGI_IF_DEBUG("recvfrom bearer id %d %d bytes\n", socket_index + SGI_MIN_EPS_BEARER_ID, num_bytes);
sgi_process_raw_packet(sgi_data_p, &sgi_data_p->recv_buffer[0][socket_index], num_bytes); sgi_process_raw_packet(sgi_data_p, &sgi_data_p->recv_buffer[0][socket_index], num_bytes);
} else { } else {
SGI_IF_DEBUG("recvfrom bearer id %d %d (%s:%d)\n", socket_index + SGI_MIN_EPS_BEARER_ID, num_bytes, strerror(errno), errno); SGI_IF_DEBUG("recvfrom bearer id %d %d (%s:%d)\n", socket_index + SGI_MIN_EPS_BEARER_ID, num_bytes, strerror(errno), errno);
......
...@@ -247,7 +247,8 @@ void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP) ...@@ -247,7 +247,8 @@ void udp_eNB_receiver(struct udp_socket_desc_s *udp_sock_pP)
LOG_W(UDP_, "Recvfrom returned 0\n"); LOG_W(UDP_, "Recvfrom returned 0\n");
return; return;
} else{ } else{
forwarded_buffer = calloc(n, sizeof(uint8_t)); forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, n*sizeof(uint8_t));
DevAssert(forwarded_buffer != NULL);
memcpy(forwarded_buffer, l_buffer, n); memcpy(forwarded_buffer, l_buffer, n);
message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND); message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
DevAssert(message_p != NULL); DevAssert(message_p != NULL);
......
...@@ -42,6 +42,8 @@ ...@@ -42,6 +42,8 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <fcntl.h>
#include <pthread.h> #include <pthread.h>
...@@ -54,9 +56,10 @@ ...@@ -54,9 +56,10 @@
#define UDP_DEBUG(x, args...) do { fprintf(stdout, "[UDP] [D]"x, ##args); } while(0) #define UDP_DEBUG(x, args...) do { fprintf(stdout, "[UDP] [D]"x, ##args); } while(0)
#define UDP_ERROR(x, args...) do { fprintf(stderr, "[UDP] [E]"x, ##args); } while(0) #define UDP_ERROR(x, args...) do { fprintf(stderr, "[UDP] [E]"x, ##args); } while(0)
void *udp_receiver_thread(void *args_p); //void *udp_receiver_thread(void *args_p);
struct udp_socket_desc_s { struct udp_socket_desc_s {
uint8_t buffer[4096];
int sd; /* Socket descriptor to use */ int sd; /* Socket descriptor to use */
pthread_t listener_thread; /* Thread affected to recv */ pthread_t listener_thread; /* Thread affected to recv */
...@@ -72,10 +75,13 @@ struct udp_socket_desc_s { ...@@ -72,10 +75,13 @@ struct udp_socket_desc_s {
static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list; static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list;
static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER;
static void udp_server_receive_and_process(struct udp_socket_desc_s *udp_sock_pP);
/* @brief Retrieve the descriptor associated with the task_id /* @brief Retrieve the descriptor associated with the task_id
*/ */
static static
struct udp_socket_desc_s *udp_get_socket_desc(task_id_t task_id) struct udp_socket_desc_s *udp_server_get_socket_desc(task_id_t task_id)
{ {
struct udp_socket_desc_s *udp_sock_p = NULL; struct udp_socket_desc_s *udp_sock_p = NULL;
...@@ -89,14 +95,29 @@ struct udp_socket_desc_s *udp_get_socket_desc(task_id_t task_id) ...@@ -89,14 +95,29 @@ struct udp_socket_desc_s *udp_get_socket_desc(task_id_t task_id)
} }
return udp_sock_p; return udp_sock_p;
} }
static
struct udp_socket_desc_s *udp_server_get_socket_desc_by_sd(int sdP)
{
struct udp_socket_desc_s *udp_sock_p = NULL;
UDP_DEBUG("Looking for sd %d\n", sdP);
STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
if (udp_sock_p->sd == sdP) {
UDP_DEBUG("Found matching task desc\n");
break;
}
}
return udp_sock_p;
}
static static
int udp_create_socket(int port, char *address, task_id_t task_id) int udp_server_create_socket(int port, char *address, task_id_t task_id)
{ {
struct sockaddr_in addr; struct sockaddr_in addr;
int sd; int sd;
struct udp_socket_desc_s *thread_arg = NULL; struct udp_socket_desc_s *socket_desc_p = NULL;
UDP_DEBUG("Creating new listen socket on address "IPV4_ADDR" and port %u\n", UDP_DEBUG("Creating new listen socket on address "IPV4_ADDR" and port %u\n",
IPV4_ADDR_FORMAT(inet_addr(address)), port); IPV4_ADDR_FORMAT(inet_addr(address)), port);
...@@ -120,54 +141,84 @@ int udp_create_socket(int port, char *address, task_id_t task_id) ...@@ -120,54 +141,84 @@ int udp_create_socket(int port, char *address, task_id_t task_id)
return -1; return -1;
} }
thread_arg = calloc(1, sizeof(struct udp_socket_desc_s)); /* Add the socket to list of fd monitored by ITTI */
/* Mark the socket as non-blocking */
if (fcntl(sd, F_SETFL, O_NONBLOCK) < 0) {
UDP_ERROR("fcntl F_SETFL O_NONBLOCK failed: %s\n",
strerror(errno));
close(sd);
return -1;
}
DevAssert(thread_arg != NULL); socket_desc_p = calloc(1, sizeof(struct udp_socket_desc_s));
DevAssert(socket_desc_p != NULL);
socket_desc_p->sd = sd;
socket_desc_p->local_address = address;
socket_desc_p->local_port = port;
socket_desc_p->task_id = task_id;
UDP_DEBUG("Inserting new descriptor for task %d, sd %d\n",
socket_desc_p->task_id, socket_desc_p->sd);
pthread_mutex_lock(&udp_socket_list_mutex);
STAILQ_INSERT_TAIL(&udp_socket_list, socket_desc_p, entries);
pthread_mutex_unlock(&udp_socket_list_mutex);
thread_arg->sd = sd; itti_subscribe_event_fd(TASK_UDP, sd);
thread_arg->local_address = address;
thread_arg->local_port = port;
thread_arg->task_id = task_id;
if (pthread_create(&thread_arg->listener_thread, NULL,
&udp_receiver_thread, (void *)thread_arg) < 0) {
UDP_ERROR("Pthred_create failed (%s)\n", strerror(errno));
return -1;
}
return sd; return sd;
} }
void *udp_receiver_thread(void *arg_p) static void udp_server_flush_sockets(struct epoll_event *events, int nb_events)
{ {
uint8_t buffer[2048]; int event;
struct udp_socket_desc_s *udp_sock_p = NULL;
UDP_DEBUG("Received %d events\n", nb_events);
struct udp_socket_desc_s *udp_sock_p = (struct udp_socket_desc_s *)arg_p; for (event = 0; event < nb_events; event++) {
if (events[event].events != 0) {
/* If the event has not been yet been processed (not an itti message) */
pthread_mutex_lock(&udp_socket_list_mutex);
udp_sock_p = udp_server_get_socket_desc_by_sd(events[event].data.fd);
if (udp_sock_p != NULL) {
udp_server_receive_and_process(udp_sock_p);
} else {
UDP_ERROR("Failed to retrieve the udp socket descriptor %d",
events[event].data.fd);
}
pthread_mutex_unlock(&udp_socket_list_mutex);
}
}
}
static void udp_server_receive_and_process(struct udp_socket_desc_s *udp_sock_pP)
{
UDP_DEBUG("Inserting new descriptor for task %d, sd %d\n", UDP_DEBUG("Inserting new descriptor for task %d, sd %d\n",
udp_sock_p->task_id, udp_sock_p->sd); udp_sock_pP->task_id, udp_sock_pP->sd);
pthread_mutex_lock(&udp_socket_list_mutex);
STAILQ_INSERT_TAIL(&udp_socket_list, udp_sock_p, entries);
pthread_mutex_unlock(&udp_socket_list_mutex);
while (1) { {
int n; int bytes_received = 0;
socklen_t from_len; socklen_t from_len;
struct sockaddr_in addr; struct sockaddr_in addr;
from_len = (socklen_t)sizeof(struct sockaddr_in); from_len = (socklen_t)sizeof(struct sockaddr_in);
if ((n = recvfrom(udp_sock_p->sd, buffer, sizeof(buffer), 0, if ((bytes_received = recvfrom(udp_sock_pP->sd, udp_sock_pP->buffer, sizeof(udp_sock_pP->buffer), 0,
(struct sockaddr *)&addr, &from_len)) < 0) { (struct sockaddr *)&addr, &from_len)) <= 0) {
UDP_ERROR("Recvfrom failed %s\n", strerror(errno)); UDP_ERROR("Recvfrom failed %s\n", strerror(errno));
break; //break;
} else { } else {
MessageDef *message_p = NULL; MessageDef *message_p = NULL;
udp_data_ind_t *udp_data_ind_p; udp_data_ind_t *udp_data_ind_p;
uint8_t *forwarded_buffer = NULL; uint8_t *forwarded_buffer = NULL;
forwarded_buffer = calloc(n, sizeof(uint8_t)); AssertFatal(sizeof(udp_sock_pP->buffer) >= bytes_received, "UDP BUFFER OVERFLOW");
forwarded_buffer = itti_malloc(TASK_UDP, udp_sock_pP->task_id, bytes_received);
memcpy(forwarded_buffer, buffer, n); DevAssert(forwarded_buffer != NULL);
memcpy(forwarded_buffer, udp_sock_pP->buffer, bytes_received);
message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND); message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
...@@ -176,110 +227,122 @@ void *udp_receiver_thread(void *arg_p) ...@@ -176,110 +227,122 @@ void *udp_receiver_thread(void *arg_p)
udp_data_ind_p = &message_p->ittiMsg.udp_data_ind; udp_data_ind_p = &message_p->ittiMsg.udp_data_ind;
udp_data_ind_p->buffer = forwarded_buffer; udp_data_ind_p->buffer = forwarded_buffer;
udp_data_ind_p->buffer_length = n; udp_data_ind_p->buffer_length = bytes_received;
udp_data_ind_p->peer_port = htons(addr.sin_port); udp_data_ind_p->peer_port = htons(addr.sin_port);
udp_data_ind_p->peer_address = addr.sin_addr.s_addr; udp_data_ind_p->peer_address = addr.sin_addr.s_addr;
UDP_DEBUG("Msg of length %d received from %s:%u\n", UDP_DEBUG("Msg of length %d received from %s:%u\n",
n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); bytes_received, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
if (itti_send_msg_to_task(udp_sock_p->task_id, INSTANCE_DEFAULT, message_p) < 0) { if (itti_send_msg_to_task(udp_sock_pP->task_id, INSTANCE_DEFAULT, message_p) < 0) {
UDP_DEBUG("Failed to send message %d to task %d\n", UDP_DEBUG("Failed to send message %d to task %d\n",
UDP_DATA_IND, udp_sock_p->task_id); UDP_DATA_IND, udp_sock_pP->task_id);
break; //break;
} }
} }
} }
close(udp_sock_p->sd); //close(udp_sock_pP->sd);
udp_sock_p->sd = -1; //udp_sock_pP->sd = -1;
pthread_mutex_lock(&udp_socket_list_mutex); //pthread_mutex_lock(&udp_socket_list_mutex);
STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries); //STAILQ_REMOVE(&udp_socket_list, udp_sock_pP, udp_socket_desc_s, entries);
pthread_mutex_unlock(&udp_socket_list_mutex); //pthread_mutex_unlock(&udp_socket_list_mutex);
return NULL; //return NULL;
} }
static void *udp_intertask_interface(void *args_p) static void *udp_intertask_interface(void *args_p)
{ {
int rc = 0;
int nb_events = 0;
struct epoll_event *events = NULL;
itti_mark_task_ready(TASK_UDP); itti_mark_task_ready(TASK_UDP);
while(1) { while(1) {
MessageDef *received_message_p = NULL; MessageDef *received_message_p = NULL;
itti_receive_msg(TASK_UDP, &received_message_p); itti_receive_msg(TASK_UDP, &received_message_p);
DevAssert(received_message_p != NULL);
switch (ITTI_MSG_ID(received_message_p))
{
case UDP_INIT: {
udp_init_t *udp_init_p;
udp_init_p = &received_message_p->ittiMsg.udp_init;
udp_create_socket(
udp_init_p->port,
udp_init_p->address,
ITTI_MSG_ORIGIN_ID(received_message_p));
} break;
case UDP_DATA_REQ: { if (received_message_p != NULL) {
int udp_sd = -1; switch (ITTI_MSG_ID(received_message_p))
ssize_t bytes_written; {
case UDP_INIT: {
struct udp_socket_desc_s *udp_sock_p = NULL; udp_init_t *udp_init_p;
udp_data_req_t *udp_data_req_p; udp_init_p = &received_message_p->ittiMsg.udp_init;
struct sockaddr_in peer_addr; rc = udp_server_create_socket(
udp_init_p->port,
udp_data_req_p = &received_message_p->ittiMsg.udp_data_req; udp_init_p->address,
ITTI_MSG_ORIGIN_ID(received_message_p));
memset(&peer_addr, 0, sizeof(struct sockaddr_in)); } break;
peer_addr.sin_family = AF_INET; case UDP_DATA_REQ: {
peer_addr.sin_port = htons(udp_data_req_p->peer_port); int udp_sd = -1;
peer_addr.sin_addr.s_addr = udp_data_req_p->peer_address; ssize_t bytes_written;
pthread_mutex_lock(&udp_socket_list_mutex); struct udp_socket_desc_s *udp_sock_p = NULL;
udp_sock_p = udp_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p)); udp_data_req_t *udp_data_req_p;
struct sockaddr_in peer_addr;
if (udp_sock_p == NULL) {
UDP_ERROR("Failed to retrieve the udp socket descriptor " udp_data_req_p = &received_message_p->ittiMsg.udp_data_req;
"associated with task %d\n", ITTI_MSG_ORIGIN_ID(received_message_p));
pthread_mutex_unlock(&udp_socket_list_mutex); memset(&peer_addr, 0, sizeof(struct sockaddr_in));
if (udp_data_req_p->buffer) {
free(udp_data_req_p->buffer); peer_addr.sin_family = AF_INET;
peer_addr.sin_port = htons(udp_data_req_p->peer_port);
peer_addr.sin_addr.s_addr = udp_data_req_p->peer_address;
pthread_mutex_lock(&udp_socket_list_mutex);
udp_sock_p = udp_server_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p));
if (udp_sock_p == NULL) {
UDP_ERROR("Failed to retrieve the udp socket descriptor "
"associated with task %d\n", ITTI_MSG_ORIGIN_ID(received_message_p));
pthread_mutex_unlock(&udp_socket_list_mutex);
if (udp_data_req_p->buffer) {
free(udp_data_req_p->buffer);
}
goto on_error;
} }
goto on_error; udp_sd = udp_sock_p->sd;
} pthread_mutex_unlock(&udp_socket_list_mutex);
udp_sd = udp_sock_p->sd;
pthread_mutex_unlock(&udp_socket_list_mutex);
UDP_DEBUG("[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n", UDP_DEBUG("[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n",
udp_sd, udp_data_req_p->buffer_length, udp_sd, udp_data_req_p->buffer_length,
IPV4_ADDR_FORMAT(udp_data_req_p->peer_address), IPV4_ADDR_FORMAT(udp_data_req_p->peer_address),
udp_data_req_p->peer_port); udp_data_req_p->peer_port);
bytes_written = sendto(udp_sd, udp_data_req_p->buffer, bytes_written = sendto(udp_sd, udp_data_req_p->buffer,
udp_data_req_p->buffer_length, 0, udp_data_req_p->buffer_length, 0,
(struct sockaddr *)&peer_addr, (struct sockaddr *)&peer_addr,
sizeof(struct sockaddr_in)); sizeof(struct sockaddr_in));
if (bytes_written != udp_data_req_p->buffer_length) { if (bytes_written != udp_data_req_p->buffer_length) {
UDP_ERROR("There was an error while writing to socket " UDP_ERROR("There was an error while writing to socket "
"(%d:%s)\n", errno, strerror(errno)); "(%d:%s)\n", errno, strerror(errno));
} }
} break; } break;
case TERMINATE_MESSAGE: { case TERMINATE_MESSAGE: {
itti_exit_task(); itti_exit_task();
} break; } break;
case MESSAGE_TEST: { case MESSAGE_TEST: {
} break; } break;
default: { default: {
UDP_DEBUG("Unkwnon message ID %d:%s\n", UDP_DEBUG("Unkwnon message ID %d:%s\n",
ITTI_MSG_ID(received_message_p), ITTI_MSG_NAME(received_message_p)); ITTI_MSG_ID(received_message_p), ITTI_MSG_NAME(received_message_p));
} break; } break;
} }
on_error: on_error:
itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p); rc = itti_free(ITTI_MSG_ORIGIN_ID(received_message_p), received_message_p);
received_message_p = NULL; AssertFatal(rc == EXIT_SUCCESS, "Failed to free memory (%d)!\n", rc);
received_message_p = NULL;
}
nb_events = itti_get_events(TASK_UDP, &events);
if ((nb_events > 0) && (events != NULL)) {
/* Now handle notifications for other sockets */
udp_server_flush_sockets(events, nb_events);
}
} }
return NULL; return NULL;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment