Commit 01943bac authored by Lionel Gauthier's avatar Lionel Gauthier

Task

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@5070 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent d971f434
libudp_OBJECTS = \ # include $(OPENAIR_TARGETS)/SIMU/USER/Makerules
udp_primitives_client.o
-include .deps/*.d OUTDIR = .
.PHONY = depdir libudp_OBJECTS = \
udp_eNB_task.o
# pull in dependency info for *existing* .o files
-include $(OUTDIR)/*.d
CFLAGS = \ CFLAGS = \
-I../UTILS \
-I$(OPENAIR2_DIR) \
-I$(OPENAIR2_DIR)/COMMON \
-I$(OPENAIR2_DIR)/GTPV1U \
-I$(OPENAIR2_DIR)/GTPV1U/nw-gtpv1u/include \
-I$(OPENAIR2_DIR)/GTPV1U/nw-gtpv1u/shared \
-I$(OPENAIR2_DIR)/UTIL \
-DUPDATE_RELEASE_9 \
-DENB_MODE \
-DENABLE_USE_MME \
-DUSER_MODE \
-O2 \
-g \
-Wall \ -Wall \
$(UDP_CFLAGS) \
-DENB_MODE \
-Werror=uninitialized \
-Werror=implicit-function-declaration -Werror=implicit-function-declaration
$(libudp_OBJECTS): %.o : %.c $(OUTDIR)/%.o : %.c
@echo "Compiling $<" @echo "Compiling $<"
@if [ ! -d $(dir $@) ]; then mkdir -p $(dir $@); fi;
@$(CC) -c $(CFLAGS) -o $@ $< @$(CC) -c $(CFLAGS) -o $@ $<
@if ! test -d ".deps/" ; then mkdir -p .deps; fi @$(CC) -MM $(CFLAGS) $< > $(basename $@).d
@$(CC) -MM $(CFLAGS) $*.c > .deps/$*.d @mv -f $(basename $@).d $(basename $@).d.tmp
@mv -f .deps/$*.d .deps/$*.d.tmp @sed -e 's|.*:|$@:|' < $(basename $@).d.tmp > $(basename $@).d
@sed -e 's|.*:|$*.o:|' < .deps/$*.d.tmp > .deps/$*.d @sed -e 's/.*://' -e 's/\\$$//' < $(basename $@).d.tmp | fmt -1 | \
@sed -e 's/.*://' -e 's/\\$$//' < .deps/$*.d.tmp | fmt -1 | \ sed -e 's/^ *//' -e 's/$$/:/' >> $(basename $@).d
sed -e 's/^ *//' -e 's/$$/:/' >> .deps/$*.d @rm -f $(basename $@).d.tmp
@rm -f .deps/$*.d.tmp
objsdir:
libudp.a: $(libudp_OBJECTS) @if [ ! -d $(OUTDIR) ]; then mkdir -p $(OUTDIR); fi;
$(OUTDIR)/libudp.a: $(addprefix $(OUTDIR)/,$(libudp_OBJECTS))
@echo Creating UDP archive @echo Creating UDP archive
@$(AR) rcvs $@ $(libudp_OBJECTS) @$(AR) rcs $@ $(addprefix $(OUTDIR)/,$(libudp_OBJECTS))
clean: clean:
rm -f $(libudp_OBJECTS) @$(RM_F_V) $(OUTDIR)/*.o
rm -rf .deps/ @$(RM_F_V) $(OUTDIR)/*.d
rm -f libudp.a @$(RM_F_V) $(OUTDIR)/libudp.a
\ No newline at end of file
cleanall: clean
/*******************************************************************************
Eurecom OpenAirInterface
Copyright(c) 1999 - 2014 Eurecom
This program is free software; you can redistribute it and/or modify it
under the terms and conditions of the GNU General Public License,
version 2, as published by the Free Software Foundation.
This program is distributed in the hope it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
The full GNU General Public License is included in this distribution in
the file called "COPYING".
Contact Information
Openair Admin: openair_admin@eurecom.fr
Openair Tech : openair_tech@eurecom.fr
Forums : http://forums.eurecom.fr/openairinterface
Address : EURECOM, Campus SophiaTech, 450 Route des Chappes
06410 Biot FRANCE
*******************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "queue.h"
#include "intertask_interface.h"
#include "assertions.h"
#include "udp_eNB_task.h"
#include "UTIL/LOG/log.h"
#define IPV4_ADDR "%u.%u.%u.%u"
#define IPV4_ADDR_FORMAT(aDDRESS) \
(uint8_t)((aDDRESS) & 0x000000ff), \
(uint8_t)(((aDDRESS) & 0x0000ff00) >> 8 ), \
(uint8_t)(((aDDRESS) & 0x00ff0000) >> 16), \
(uint8_t)(((aDDRESS) & 0xff000000) >> 24)
static void *udp_receiver_thread(void *arg_p);
struct udp_socket_desc_s {
int sd; /* Socket descriptor to use */
pthread_t listener_thread; /* Thread affected to recv */
char *local_address; /* Local ipv4 address to use */
uint16_t local_port; /* Local port to use */
task_id_t task_id; /* Task who has requested the new endpoint */
STAILQ_ENTRY(udp_socket_desc_s) entries;
};
static STAILQ_HEAD(udp_socket_list_s, udp_socket_desc_s) udp_socket_list;
static pthread_mutex_t udp_socket_list_mutex = PTHREAD_MUTEX_INITIALIZER;
/* @brief Retrieve the descriptor associated with the task_id
*/
static
struct udp_socket_desc_s *udp_get_socket_desc(task_id_t task_id)
{
struct udp_socket_desc_s *udp_sock_p = NULL;
LOG_I(UDP_, "Looking for task %d\n", task_id);
STAILQ_FOREACH(udp_sock_p, &udp_socket_list, entries) {
if (udp_sock_p->task_id == task_id) {
LOG_D(UDP_, "Found matching task desc\n");
break;
}
}
return udp_sock_p;
}
static
int udp_create_socket(int port, char *ip_addr, task_id_t task_id)
{
struct udp_socket_desc_s *thread_arg;
int sd, rc;
struct sockaddr_in sin;
LOG_I(UDP_, "Initializing UDP for local address %s with port %d\n", ip_addr, port);
sd = socket(AF_INET, SOCK_DGRAM, 0);
AssertFatal(sd > 0, "UDP: Failed to create new socket: (%s:%d)\n", strerror(errno), errno);
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
if (ip_addr == NULL) {
sin.sin_addr.s_addr = inet_addr(INADDR_ANY);
} else {
sin.sin_addr.s_addr = inet_addr(ip_addr);
}
if ((rc = bind(sd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in))) < 0) {
close(sd);
AssertFatal(rc >= 0, "UDP: Failed to bind socket: (%s:%d)\n\n", strerror(errno), errno);
}
/* Create a new descriptor for this connection */
thread_arg = calloc(1, sizeof(struct udp_socket_desc_s));
DevAssert(thread_arg != NULL);
thread_arg->sd = sd;
thread_arg->local_address = ip_addr;
thread_arg->local_port = port;
thread_arg->task_id = task_id;
if (pthread_create(&thread_arg->listener_thread, NULL,
&udp_receiver_thread, (void *)thread_arg) < 0) {
LOG_E(UDP_, "Pthred_create failed (%s)\n", strerror(errno));
return -1;
}
LOG_I(UDP_, "Initializing UDP for local address %s with port %d: DONE\n", ip_addr, port);
return sd;
}
static void *udp_receiver_thread(void *arg_p)
{
struct udp_socket_desc_s *udp_sock_p;
uint8_t buffer[2048];
udp_sock_p = (struct udp_socket_desc_s *)arg_p;
LOG_D(UDP_, "Inserting new descriptor for task %d, sd %d\n", udp_sock_p->task_id, udp_sock_p->sd);
pthread_mutex_lock(&udp_socket_list_mutex);
STAILQ_INSERT_TAIL(&udp_socket_list, udp_sock_p, entries);
pthread_mutex_unlock(&udp_socket_list_mutex);
while (1) {
int n;
socklen_t from_len;
struct sockaddr_in addr;
from_len = (socklen_t)sizeof(struct sockaddr_in);
if ((n = recvfrom(udp_sock_p->sd, buffer, sizeof(buffer), 0,
(struct sockaddr *)&addr, &from_len)) < 0) {
LOG_E(UDP_, "Recvfrom failed %s\n", strerror(errno));
break;
} else {
MessageDef *message_p = NULL;
udp_data_ind_t *udp_data_ind_p;
uint8_t *forwarded_buffer = NULL;
forwarded_buffer = calloc(n, sizeof(uint8_t));
memcpy(forwarded_buffer, buffer, n);
message_p = itti_alloc_new_message(TASK_UDP, UDP_DATA_IND);
DevAssert(message_p != NULL);
udp_data_ind_p = &message_p->ittiMsg.udp_data_ind;
udp_data_ind_p->buffer = forwarded_buffer;
udp_data_ind_p->buffer_length = n;
udp_data_ind_p->peer_port = htons(addr.sin_port);
udp_data_ind_p->peer_address = addr.sin_addr.s_addr;
LOG_D(UDP_, "Msg of length %d received from %s:%u\n",
n, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
if (itti_send_msg_to_task(udp_sock_p->task_id, INSTANCE_DEFAULT, message_p) < 0) {
LOG_D(UDP_, "Failed to send message %d to task %d\n",
UDP_DATA_IND, udp_sock_p->task_id);
break;
}
}
}
close(udp_sock_p->sd);
udp_sock_p->sd = -1;
pthread_mutex_lock(&udp_socket_list_mutex);
STAILQ_REMOVE(&udp_socket_list, udp_sock_p, udp_socket_desc_s, entries);
pthread_mutex_unlock(&udp_socket_list_mutex);
return NULL;
}
void *udp_eNB_task(void *args_p)
{
itti_mark_task_ready(TASK_UDP);
while(1) {
MessageDef *received_message_p = NULL;
itti_receive_msg(TASK_UDP, &received_message_p);
DevAssert(received_message_p != NULL);
switch (ITTI_MSG_ID(received_message_p))
{
case UDP_INIT: {
udp_init_t *udp_init_p;
udp_init_p = &received_message_p->ittiMsg.udp_init;
udp_create_socket(udp_init_p->port, udp_init_p->address,
ITTI_MSG_ORIGIN_ID(received_message_p));
} break;
case UDP_DATA_REQ: {
int udp_sd = -1;
ssize_t bytes_written;
struct udp_socket_desc_s *udp_sock_p = NULL;
udp_data_req_t *udp_data_req_p;
struct sockaddr_in peer_addr;
udp_data_req_p = &received_message_p->ittiMsg.udp_data_req;
memset(&peer_addr, 0, sizeof(struct sockaddr_in));
peer_addr.sin_family = AF_INET;
peer_addr.sin_port = htons(udp_data_req_p->peer_port);
peer_addr.sin_addr.s_addr = udp_data_req_p->peer_address;
pthread_mutex_lock(&udp_socket_list_mutex);
udp_sock_p = udp_get_socket_desc(ITTI_MSG_ORIGIN_ID(received_message_p));
if (udp_sock_p == NULL) {
LOG_E(UDP_, "Failed to retrieve the udp socket descriptor "
"associated with task %d\n", ITTI_MSG_ORIGIN_ID(received_message_p));
pthread_mutex_unlock(&udp_socket_list_mutex);
if (udp_data_req_p->buffer) {
free(udp_data_req_p->buffer);
}
goto on_error;
}
udp_sd = udp_sock_p->sd;
pthread_mutex_unlock(&udp_socket_list_mutex);
LOG_D(UDP_, "[%d] Sending message of size %u to "IPV4_ADDR" and port %u\n",
udp_sd, udp_data_req_p->buffer_length,
IPV4_ADDR_FORMAT(udp_data_req_p->peer_address),
udp_data_req_p->peer_port);
bytes_written = sendto(udp_sd, udp_data_req_p->buffer,
udp_data_req_p->buffer_length, 0,
(struct sockaddr *)&peer_addr,
sizeof(struct sockaddr_in));
if (bytes_written != udp_data_req_p->buffer_length) {
LOG_E(UDP_, "There was an error while writing to socket "
"(%d:%s)\n", errno, strerror(errno));
}
} break;
case TERMINATE_MESSAGE: {
itti_exit_task();
} break;
case MESSAGE_TEST: {
} break;
default: {
LOG_D(UDP_, "Unkwnon message ID %d:%s\n",
ITTI_MSG_ID(received_message_p), ITTI_MSG_NAME(received_message_p));
} break;
}
on_error:
free(received_message_p);
received_message_p = NULL;
}
return NULL;
}
int udp_enb_init(const Enb_properties_t *enb_config_p)
{
LOG_D(UDP_, "Initializing UDP task interface\n");
STAILQ_INIT(&udp_socket_list);
LOG_D(UDP_, "Initializing UDP task interface: DONE\n");
return 0;
}
#include <pthread.h> #include <pthread.h>
#include <stdint.h> #include <stdint.h>
#ifndef UDP_PRIMITIVES_CLIENT_H_ #ifndef UDP_ENB_TASK_H_
#define UDP_PRIMITIVES_CLIENT_H_ #define UDP_ENB_TASK_H_
#include "enb_config.h"
#include "intertask_interface_types.h"
/** \brief UDP recv callback prototype. Will be called every time a payload is /** \brief UDP recv callback prototype. Will be called every time a payload is
* received on socket. * received on socket.
...@@ -20,10 +23,11 @@ typedef int (*udp_recv_callback)(uint16_t port, ...@@ -20,10 +23,11 @@ typedef int (*udp_recv_callback)(uint16_t port,
void *arg_p); void *arg_p);
typedef struct { typedef struct {
udp_recv_callback recv_callback; int sd; /* Socket descriptor to use */
pthread_t recv_thread; pthread_t listener_thread; /* Thread affected to recv */
int sd; char *local_address; /* Local ipv4 address to use */
void *arg_p; uint16_t local_port; /* Local port to use */
task_id_t task_id; /* Task who has requested the new endpoint */
} udp_data_t; } udp_data_t;
/** \brief Create new datagram connection-less socket and create new thread /** \brief Create new datagram connection-less socket and create new thread
...@@ -51,4 +55,17 @@ int udp_create_connection(char *ip_addr, uint16_t port, ...@@ -51,4 +55,17 @@ int udp_create_connection(char *ip_addr, uint16_t port,
int udp_send_to(int sd, uint16_t port, uint32_t address, int udp_send_to(int sd, uint16_t port, uint32_t address,
const uint8_t *buffer, uint32_t length); const uint8_t *buffer, uint32_t length);
/** \brief UDP ITTI task on eNB.
* \param args_p
* @returns always NULL
*/
void *udp_eNB_task(void *args_p);
/** \brief init UDP layer.
* \param enb_config_p configuration of eNB
* @returns always 0
*/
int udp_enb_init(const Enb_properties_t *enb_config_p);
#endif /* UDP_PRIMITIVES_CLIENT_H_ */ #endif /* UDP_PRIMITIVES_CLIENT_H_ */
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "udp_primitives_client.h"
#include "UTIL/LOG/log.h"
#define IPV4_ADDR "%u.%u.%u.%u"
#define IPV4_ADDR_FORMAT(aDDRESS) \
(uint8_t)((aDDRESS) & 0x000000ff), \
(uint8_t)(((aDDRESS) & 0x0000ff00) >> 8 ), \
(uint8_t)(((aDDRESS) & 0x00ff0000) >> 16), \
(uint8_t)(((aDDRESS) & 0xff000000) >> 24)
static void *udp_recv_thread(void *arg_p);
int udp_create_connection(char *ip_addr, uint16_t port,
udp_data_t *udp_data_p,
udp_recv_callback recv_callback,
void *arg_p)
{
udp_data_t *udp_desc;
int sd;
struct sockaddr_in sin;
LOG_I(UDP, "Initializing UDP for local address %s with port %d\n",
ip_addr, port);
if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) <= 0) {
LOG_E(UDP, "Failed to create new socket: (%s:%d)\n",
strerror(errno), errno);
return -1;
}
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
if (ip_addr == NULL) {
sin.sin_addr.s_addr = inet_addr(INADDR_ANY);
} else {
sin.sin_addr.s_addr = inet_addr(ip_addr);
}
if (bind(sd, (struct sockaddr *)&sin, sizeof(struct sockaddr_in)) < 0) {
LOG_E(UDP, "Failed to bind socket: (%s:%d)\n",
strerror(errno), errno);
close(sd);
return -1;
}
/* Create a new descriptor for this connection */
udp_desc = malloc(sizeof(udp_data_t));
memset(udp_desc, 0, sizeof(udp_data_t));
udp_desc->sd = sd;
udp_desc->recv_callback = recv_callback;
udp_desc->arg_p = arg_p;
memcpy(udp_data_p, udp_desc, sizeof(udp_data_t));
if (pthread_create(&udp_desc->recv_thread, NULL, udp_recv_thread,
(void *)udp_desc) < 0) {
LOG_E(UDP, "Failed to create new thread: (%s:%d)\n",
strerror(errno), errno);
close(sd);
free(udp_desc);
return -1;
}
LOG_I(UDP, "Initializing UDP for local address %s with port %d: DONE\n",
ip_addr, port);
return sd;
}
int udp_send_to(int sd, uint16_t port, uint32_t address, const uint8_t *buffer,
uint32_t length)
{
struct sockaddr_in to;
socklen_t to_length;
if (sd <= 0 || ((buffer == NULL) && (length > 0))) {
LOG_E(UDP, "udp_send_to: bad param\n");
return -1;
}
memset(&to, 0, sizeof(struct sockaddr_in));
to_length = sizeof(to);
to.sin_family = AF_INET;
to.sin_port = htons(port);
to.sin_addr.s_addr = address;
if (sendto(sd, (void *)buffer, (size_t)length, 0, (struct sockaddr *)&to,
to_length) < 0) {
LOG_E(UDP,
"[SD %d] Failed to send data to "IPV4_ADDR" on port %d, buffer size %u\n",
sd, IPV4_ADDR_FORMAT(address), port, length);
return -1;
}
LOG_I(UDP, "[SD %d] Successfully sent to "IPV4_ADDR
" on port %d, buffer size %u, buffer address %x\n",
sd, IPV4_ADDR_FORMAT(address), port, length, buffer);
return 0;
}
static void *udp_recv_thread(void *arg_p)
{
udp_data_t *udp_desc;
udp_desc = (udp_data_t *)arg_p;
uint8_t buffer[4096];
LOG_D(UDP, "Starting receiver thread\n");
while(1) {
int n;
struct sockaddr_in from;
socklen_t from_len;
n = recvfrom(udp_desc->sd, buffer, sizeof(buffer), 0,
(struct sockaddr *)&from, &from_len);
if (n < 0) {
LOG_E(UDP, "Recvfrom failed: (%s:%d)\n",
strerror(errno), errno);
} else if (n == 0) {
LOG_I(UDP, "Peer %s on port %d has performed a shutdown\n",
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
break;
} else {
/* Normal read, notify the upper layer of incoming data */
LOG_D(UDP, "Received UDP message of length %u from %s:%u\n",
n, inet_ntoa(from.sin_addr), ntohs(from.sin_port));
if (udp_desc->recv_callback) {
udp_desc->recv_callback(from.sin_port, from.sin_addr.s_addr, buffer, n,
udp_desc->arg_p);
} else {
LOG_W(UDP, "No recv callback associated to this socket (%d), exiting\n",
udp_desc->sd);
break;
}
}
}
LOG_I(UDP, "Receiver thread exiting\n");
free(udp_desc);
return NULL;
}
...@@ -134,7 +134,7 @@ int udp_create_socket(int port, char *address, task_id_t task_id) ...@@ -134,7 +134,7 @@ int udp_create_socket(int port, char *address, task_id_t task_id)
void *udp_receiver_thread(void *arg_p) void *udp_receiver_thread(void *arg_p)
{ {
uint8_t buffer[1024]; uint8_t buffer[2048];
struct udp_socket_desc_s *udp_sock_p = (struct udp_socket_desc_s *)arg_p; struct udp_socket_desc_s *udp_sock_p = (struct udp_socket_desc_s *)arg_p;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment