Commit 0b10b80f authored by Cedric Roux's avatar Cedric Roux

- Subscribe to errors events in itti

- Fixed timeout for sctp when host is unreachable

git-svn-id: http://svn.eurecom.fr/openair4G/trunk@4418 818b1a75-f10b-46b9-bf7c-635c3b92a50f
parent 12e8d647
...@@ -383,7 +383,7 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd) ...@@ -383,7 +383,7 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd)
itti_desc.tasks[task_id].events, itti_desc.tasks[task_id].events,
itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event)); itti_desc.tasks[task_id].nb_events * sizeof(struct epoll_event));
event.events = EPOLLIN; event.events = EPOLLIN | EPOLLERR;
event.data.fd = fd; event.data.fd = fd;
/* Add the event fd to the list of monitored events */ /* Add the event fd to the list of monitored events */
...@@ -395,6 +395,8 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd) ...@@ -395,6 +395,8 @@ void itti_subscribe_event_fd(task_id_t task_id, int fd)
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
} }
ITTI_DEBUG("Successfully subscribed fd %d for task %s\n", fd, itti_get_task_name(task_id));
} }
void itti_unsubscribe_event_fd(task_id_t task_id, int fd) void itti_unsubscribe_event_fd(task_id_t task_id, int fd)
...@@ -579,7 +581,7 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar ...@@ -579,7 +581,7 @@ int itti_create_task(task_id_t task_id, void *(*start_routine)(void *), void *ar
itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING; itti_desc.threads[thread_id].task_state = TASK_STATE_STARTING;
result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p); result = pthread_create (&itti_desc.threads[thread_id].task_thread, NULL, start_routine, args_p);
DevCheck(result>= 0, task_id, thread_id, result); DevCheck(result >= 0, task_id, thread_id, result);
/* Wait till the thread is completely ready */ /* Wait till the thread is completely ready */
while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY) while (itti_desc.threads[thread_id].task_state != TASK_STATE_READY)
...@@ -663,7 +665,8 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -663,7 +665,8 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
} }
itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE); itti_desc.tasks[task_id].task_event_fd = eventfd(0, EFD_SEMAPHORE);
if (itti_desc.tasks[task_id].task_event_fd == -1) { if (itti_desc.tasks[task_id].task_event_fd == -1)
{
ITTI_ERROR("eventfd failed: %s\n", strerror(errno)); ITTI_ERROR("eventfd failed: %s\n", strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
...@@ -673,17 +676,20 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i ...@@ -673,17 +676,20 @@ int itti_init(task_id_t task_max, thread_id_t thread_max, MessagesIds messages_i
itti_desc.tasks[task_id].events = malloc(sizeof(struct epoll_event)); itti_desc.tasks[task_id].events = malloc(sizeof(struct epoll_event));
itti_desc.tasks[task_id].events->events = EPOLLIN; itti_desc.tasks[task_id].events->events = EPOLLIN | EPOLLERR;
itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd; itti_desc.tasks[task_id].events->data.fd = itti_desc.tasks[task_id].task_event_fd;
/* Add the event fd to the list of monitored events */ /* Add the event fd to the list of monitored events */
if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD, if (epoll_ctl(itti_desc.tasks[task_id].epoll_fd, EPOLL_CTL_ADD,
itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0) itti_desc.tasks[task_id].task_event_fd, itti_desc.tasks[task_id].events) != 0)
{ {
ITTI_ERROR("epoll_ctl failed: %s\n", strerror(errno)); ITTI_ERROR("epoll_ctl (EPOLL_CTL_ADD) failed: %s\n", strerror(errno));
/* Always assert on this condition */ /* Always assert on this condition */
DevAssert(0 == 1); DevAssert(0 == 1);
} }
ITTI_DEBUG("Successfully subscribed fd %d for task %s\n",
itti_desc.tasks[task_id].task_event_fd, itti_get_task_name(task_id));
#else #else
STAILQ_INIT (&itti_desc.tasks[task_id].message_queue); STAILQ_INIT (&itti_desc.tasks[task_id].message_queue);
itti_desc.tasks[task_id].message_in_queue = 0; itti_desc.tasks[task_id].message_in_queue = 0;
......
...@@ -14,9 +14,4 @@ ...@@ -14,9 +14,4 @@
#define X2AP_PORT_NUMBER (36422) #define X2AP_PORT_NUMBER (36422)
#define X2AP_SCTP_PPID (27) #define X2AP_SCTP_PPID (27)
#define SCTP_OUT_STREAMS (64)
#define SCTP_IN_STREAMS (64)
#define SCTP_MAX_ATTEMPTS (2)
#define SCTP_RECV_BUFFER_SIZE (1024)
#endif /* S1AP_ENB_DEFAULT_VALUES_H_ */ #endif /* S1AP_ENB_DEFAULT_VALUES_H_ */
...@@ -64,6 +64,7 @@ int sctp_set_init_opt(int sd, uint16_t instreams, uint16_t outstreams, ...@@ -64,6 +64,7 @@ int sctp_set_init_opt(int sd, uint16_t instreams, uint16_t outstreams,
init.sinit_num_ostreams = outstreams; init.sinit_num_ostreams = outstreams;
init.sinit_max_instreams = instreams; init.sinit_max_instreams = instreams;
init.sinit_max_attempts = max_attempts; init.sinit_max_attempts = max_attempts;
init.sinit_max_init_timeo = init_timeout;
if (setsockopt(sd, IPPROTO_SCTP, SCTP_INITMSG, &init, sizeof(struct sctp_initmsg)) < 0) if (setsockopt(sd, IPPROTO_SCTP, SCTP_INITMSG, &init, sizeof(struct sctp_initmsg)) < 0)
{ {
......
...@@ -47,11 +47,6 @@ ...@@ -47,11 +47,6 @@
# define SCTP_ERROR(x, args...) LOG_E(SCTP, x, ##args) # define SCTP_ERROR(x, args...) LOG_E(SCTP, x, ##args)
# define SCTP_WARN(x, args...) LOG_W(SCTP, x, ##args) # define SCTP_WARN(x, args...) LOG_W(SCTP, x, ##args)
# define SCTP_DEBUG(x, args...) LOG_D(SCTP, x, ##args) # define SCTP_DEBUG(x, args...) LOG_D(SCTP, x, ##args)
# define SCTP_OUT_STREAMS (64)
# define SCTP_IN_STREAMS (64)
# define SCTP_MAX_ATTEMPTS (5)
# define SCTP_RECV_BUFFER_SIZE (1024)
#else #else
# define SCTP_ERROR(x, args...) do { fprintf(stderr, "[SCTP][E]"x, ##args); } while(0) # define SCTP_ERROR(x, args...) do { fprintf(stderr, "[SCTP][E]"x, ##args); } while(0)
# define SCTP_DEBUG(x, args...) do { fprintf(stdout, "[SCTP][D]"x, ##args); } while(0) # define SCTP_DEBUG(x, args...) do { fprintf(stdout, "[SCTP][D]"x, ##args); } while(0)
......
#ifndef SCTP_DEFAULT_VALUES_H_
#define SCTP_DEFAULT_VALUES_H_
#define SCTP_OUT_STREAMS (64)
#define SCTP_IN_STREAMS (64)
#define SCTP_MAX_ATTEMPTS (2)
#define SCTP_TIMEOUT (5)
#define SCTP_RECV_BUFFER_SIZE (1024)
#endif /* SCTP_DEFAULT_VALUES_H_ */
...@@ -46,6 +46,7 @@ ...@@ -46,6 +46,7 @@
#include "intertask_interface.h" #include "intertask_interface.h"
#include "sctp_default_values.h"
#include "sctp_common.h" #include "sctp_common.h"
#include "sctp_eNB_itti_messaging.h" #include "sctp_eNB_itti_messaging.h"
...@@ -122,7 +123,6 @@ void sctp_handle_new_association_req( ...@@ -122,7 +123,6 @@ void sctp_handle_new_association_req(
int sd; int sd;
int32_t assoc_id; int32_t assoc_id;
struct sctp_initmsg init;
struct sctp_event_subscribe events; struct sctp_event_subscribe events;
struct sctp_cnx_list_elm_s *sctp_cnx = NULL; struct sctp_cnx_list_elm_s *sctp_cnx = NULL;
...@@ -142,18 +142,12 @@ void sctp_handle_new_association_req( ...@@ -142,18 +142,12 @@ void sctp_handle_new_association_req(
/* Add the socket to list of fd monitored by ITTI */ /* Add the socket to list of fd monitored by ITTI */
itti_subscribe_event_fd(TASK_SCTP, sd); itti_subscribe_event_fd(TASK_SCTP, sd);
/* Request a number of in/out streams */ if (sctp_set_init_opt(sd, SCTP_IN_STREAMS, SCTP_OUT_STREAMS,
init.sinit_num_ostreams = SCTP_OUT_STREAMS; SCTP_MAX_ATTEMPTS, SCTP_TIMEOUT) != 0)
init.sinit_max_instreams = SCTP_IN_STREAMS; {
init.sinit_max_attempts = SCTP_MAX_ATTEMPTS;
SCTP_DEBUG("Requesting (%d %d) (in out) streams\n", init.sinit_num_ostreams,
init.sinit_max_instreams);
if (setsockopt(sd, IPPROTO_SCTP, SCTP_INITMSG,
&init, (socklen_t)sizeof(struct sctp_initmsg)) < 0) {
SCTP_ERROR("Setsockopt IPPROTO_SCTP_INITMSG failed: %s\n", SCTP_ERROR("Setsockopt IPPROTO_SCTP_INITMSG failed: %s\n",
strerror(errno)); strerror(errno));
itti_unsubscribe_event_fd(TASK_SCTP, sd);
close(sd); close(sd);
return; return;
} }
...@@ -229,8 +223,6 @@ void sctp_handle_new_association_req( ...@@ -229,8 +223,6 @@ void sctp_handle_new_association_req(
address_index++; address_index++;
} }
SCTP_DEBUG("Connecting...\n");
/* Connect to remote host and port */ /* Connect to remote host and port */
if (sctp_connectx(sd, (struct sockaddr *)addr, used_address, &assoc_id) < 0) if (sctp_connectx(sd, (struct sockaddr *)addr, used_address, &assoc_id) < 0)
{ {
...@@ -413,6 +405,8 @@ inline void sctp_eNB_read_from_socket(struct sctp_cnx_list_elm_s *sctp_cnx) ...@@ -413,6 +405,8 @@ inline void sctp_eNB_read_from_socket(struct sctp_cnx_list_elm_s *sctp_cnx)
if (errno == ENOTCONN) { if (errno == ENOTCONN) {
itti_unsubscribe_event_fd(TASK_SCTP, sctp_cnx->sd); itti_unsubscribe_event_fd(TASK_SCTP, sctp_cnx->sd);
SCTP_DEBUG("Received not connected for sd %d\n", sctp_cnx->sd);
sctp_itti_send_association_resp( sctp_itti_send_association_resp(
sctp_cnx->task_id, sctp_cnx->instance, -1, sctp_cnx->task_id, sctp_cnx->instance, -1,
sctp_cnx->cnx_id, SCTP_STATE_UNREACHABLE, 0, 0); sctp_cnx->cnx_id, SCTP_STATE_UNREACHABLE, 0, 0);
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
enum sctp_state_e { enum sctp_state_e {
SCTP_STATE_CLOSED, SCTP_STATE_CLOSED,
SCTP_STATE_SHUTDOWN_PENDING, SCTP_STATE_SHUTDOWN,
SCTP_STATE_ESTABLISHED, SCTP_STATE_ESTABLISHED,
SCTP_STATE_UNREACHABLE SCTP_STATE_UNREACHABLE
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment