Commit 7fc6c548 authored by Tien Thinh NGUYEN's avatar Tien Thinh NGUYEN

Code refactor for SCTP

parent e83ac3f6
...@@ -40,8 +40,6 @@ extern "C" { ...@@ -40,8 +40,6 @@ extern "C" {
#include "bstrlib.h" #include "bstrlib.h"
} }
#include <iostream>
namespace sctp { namespace sctp {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
...@@ -63,10 +61,10 @@ int sctp_server::create_socket(const char* address, const uint16_t port_num) { ...@@ -63,10 +61,10 @@ int sctp_server::create_socket(const char* address, const uint16_t port_num) {
struct addrinfo* res; struct addrinfo* res;
if (getaddrinfo(address, 0, NULL, &res) < 0) { if (getaddrinfo(address, 0, NULL, &res) < 0) {
Logger::sctp().error( Logger::sctp().error(
"getaddrinfo on %s: %s:%d", address, strerror(errno), errno); "Getaddrinfo on %s: %s:%d", address, strerror(errno), errno);
return RETURNerror; return RETURNerror;
} else { } else {
Logger::sctp().debug("getaddrinfo on %s was OK", address); Logger::sctp().debug("Getaddrinfo on %s was OK", address);
} }
if ((socket_ = socket(res->ai_family, SOCK_STREAM, IPPROTO_SCTP)) < 0) { if ((socket_ = socket(res->ai_family, SOCK_STREAM, IPPROTO_SCTP)) < 0) {
Logger::sctp().error("Socket: %s:%d", strerror(errno), errno); Logger::sctp().error("Socket: %s:%d", strerror(errno), errno);
...@@ -87,9 +85,16 @@ int sctp_server::create_socket(const char* address, const uint16_t port_num) { ...@@ -87,9 +85,16 @@ int sctp_server::create_socket(const char* address, const uint16_t port_num) {
events_.sctp_data_io_event = 1; events_.sctp_data_io_event = 1;
events_.sctp_shutdown_event = 1; events_.sctp_shutdown_event = 1;
events_.sctp_association_event = 1; events_.sctp_association_event = 1;
setsockopt(socket_, IPPROTO_SCTP, SCTP_EVENTS, &events_, 8); // TODO:
listen(socket_, 5); // events_.sctp_send_failure_event = 1;
return 0; // events_.sctp_partial_delivery_event = 1;
// events_.sctp_address_event = 1;
// events_.sctp_peer_error_event = 1;
setsockopt(socket_, IPPROTO_SCTP, SCTP_EVENTS, &events_, sizeof(events_));
listen(socket_, 5); // the queue length for completely established sockets
// waiting to be accepted
return RETURNok;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
...@@ -106,11 +111,13 @@ void* sctp_server::sctp_receiver_thread(void* arg) { ...@@ -106,11 +111,13 @@ void* sctp_server::sctp_receiver_thread(void* arg) {
int clientsock; int clientsock;
fd_set master; fd_set master;
fd_set read_fds; fd_set read_fds;
if (arg == NULL) pthread_exit(NULL); if (arg == NULL) pthread_exit(NULL);
FD_ZERO(&master); FD_ZERO(&master);
FD_ZERO(&read_fds); FD_ZERO(&read_fds);
FD_SET(ptr->getSocket(), &master); FD_SET(ptr->getSocket(), &master);
fdmax = ptr->getSocket(); fdmax = ptr->getSocket();
while (1) { while (1) {
memcpy(&read_fds, &master, sizeof(master)); memcpy(&read_fds, &master, sizeof(master));
if (select(fdmax + 1, &read_fds, NULL, NULL, NULL) == -1) { if (select(fdmax + 1, &read_fds, NULL, NULL, NULL) == -1) {
...@@ -158,19 +165,24 @@ int sctp_server::sctp_read_from_socket(int sd, uint32_t ppid) { ...@@ -158,19 +165,24 @@ int sctp_server::sctp_read_from_socket(int sd, uint32_t ppid) {
struct sctp_sndrcvinfo sinfo = {0}; struct sctp_sndrcvinfo sinfo = {0};
struct sockaddr_in6 addr = {0}; struct sockaddr_in6 addr = {0};
uint8_t buffer[SCTP_RECV_BUFFER_SIZE]; uint8_t buffer[SCTP_RECV_BUFFER_SIZE];
if (sd < 0) return RETURNerror; if (sd < 0) return RETURNerror;
memset((void*) &addr, 0, sizeof(struct sockaddr_in6)); memset((void*) &addr, 0, sizeof(struct sockaddr_in6));
from_len = (socklen_t) sizeof(struct sockaddr_in6); from_len = (socklen_t) sizeof(struct sockaddr_in6);
memset((void*) &sinfo, 0, sizeof(struct sctp_sndrcvinfo)); memset((void*) &sinfo, 0, sizeof(struct sctp_sndrcvinfo));
int n = sctp_recvmsg( int n = sctp_recvmsg(
sd, (void*) buffer, SCTP_RECV_BUFFER_SIZE, (struct sockaddr*) &addr, sd, (void*) buffer, SCTP_RECV_BUFFER_SIZE, (struct sockaddr*) &addr,
&from_len, &sinfo, &flags); &from_len, &sinfo, &flags);
if (n < 0) { if (n < 0) {
Logger::sctp().error("sctp_recvmsg error:: %s:%d", strerror(errno), errno); Logger::sctp().error("sctp_recvmsg error:: %s:%d", strerror(errno), errno);
return SCTP_RC_ERROR; return SCTP_RC_ERROR;
} }
if (flags & MSG_NOTIFICATION) { if (flags & MSG_NOTIFICATION) {
union sctp_notification* snp = (union sctp_notification*) buffer; union sctp_notification* snp = (union sctp_notification*) buffer;
switch (snp->sn_header.sn_type) { switch (snp->sn_header.sn_type) {
case SCTP_SHUTDOWN_EVENT: { case SCTP_SHUTDOWN_EVENT: {
Logger::sctp().debug("SCTP Shutdown Event received"); Logger::sctp().debug("SCTP Shutdown Event received");
...@@ -180,10 +192,11 @@ int sctp_server::sctp_read_from_socket(int sd, uint32_t ppid) { ...@@ -180,10 +192,11 @@ int sctp_server::sctp_read_from_socket(int sd, uint32_t ppid) {
case SCTP_ASSOC_CHANGE: { case SCTP_ASSOC_CHANGE: {
Logger::sctp().debug("SCTP Association Change event received"); Logger::sctp().debug("SCTP Association Change event received");
return handle_assoc_change(sd, ppid, &snp->sn_assoc_change); return handle_assoc_change(sd, ppid, &snp->sn_assoc_change);
break;
} }
default: { default: {
Logger::sctp().error( Logger::sctp().error(
"Un-handled notification type (%d)", snp->sn_header.sn_type); "Unhandled notification type (%d)", snp->sn_header.sn_type);
break; break;
} }
} }
...@@ -194,17 +207,20 @@ int sctp_server::sctp_read_from_socket(int sd, uint32_t ppid) { ...@@ -194,17 +207,20 @@ int sctp_server::sctp_read_from_socket(int sd, uint32_t ppid) {
return SCTP_RC_ERROR; return SCTP_RC_ERROR;
} }
association->messages_recv++; association->messages_recv++;
if (ntohl(sinfo.sinfo_ppid) != association->ppid) { if (ntohl(sinfo.sinfo_ppid) != association->ppid) {
Logger::sctp().error( Logger::sctp().error(
"Received data from peer with unsolicited PPID (%d), expecting (%d)", "Received data from peer with unsolicited PPID (%d), expecting (%d)",
ntohl(sinfo.sinfo_ppid), association->ppid); ntohl(sinfo.sinfo_ppid), association->ppid);
return SCTP_RC_ERROR; return SCTP_RC_ERROR;
} }
Logger::sctp().info( Logger::sctp().info(
"[Assoc_id %d, Socket %d] Received a msg (length %d) from port %d, " "[Assoc_id %d, Socket %d] Received a msg (length %d) from port %d, "
"on stream %d, PPID %d", "on stream %d, PPID %d",
sinfo.sinfo_assoc_id, sd, n, ntohs(addr.sin6_port), sinfo.sinfo_stream, sinfo.sinfo_assoc_id, sd, n, ntohs(addr.sin6_port), sinfo.sinfo_stream,
ntohl(sinfo.sinfo_ppid)); ntohl(sinfo.sinfo_ppid));
bstring payload = blk2bstr(buffer, n); bstring payload = blk2bstr(buffer, n);
// Handle payload // Handle payload
app_->handle_receive( app_->handle_receive(
...@@ -230,6 +246,7 @@ int sctp_server::sctp_handle_reset(const sctp_assoc_id_t assoc_id) { ...@@ -230,6 +246,7 @@ int sctp_server::sctp_handle_reset(const sctp_assoc_id_t assoc_id) {
int sctp_server::handle_assoc_change( int sctp_server::handle_assoc_change(
int sd, uint32_t ppid, struct sctp_assoc_change* sctp_assoc_changed) { int sd, uint32_t ppid, struct sctp_assoc_change* sctp_assoc_changed) {
int rc = SCTP_RC_NORMAL_READ; int rc = SCTP_RC_NORMAL_READ;
switch (sctp_assoc_changed->sac_state) { switch (sctp_assoc_changed->sac_state) {
case SCTP_COMM_UP: { case SCTP_COMM_UP: {
if (add_new_association(sd, ppid, sctp_assoc_changed) == NULL) { if (add_new_association(sd, ppid, sctp_assoc_changed) == NULL) {
...@@ -259,9 +276,10 @@ int sctp_server::handle_assoc_change( ...@@ -259,9 +276,10 @@ int sctp_server::handle_assoc_change(
} }
default: default:
Logger::sctp().error( Logger::sctp().error(
"Unhandled sctp message (%d)", sctp_assoc_changed->sac_state); "Unhandled SCTP message (%d)", sctp_assoc_changed->sac_state);
break; break;
} }
return rc; return rc;
} }
...@@ -279,7 +297,9 @@ sctp_association_t* sctp_server::add_new_association( ...@@ -279,7 +297,9 @@ sctp_association_t* sctp_server::add_new_association(
Logger::sctp().debug( Logger::sctp().debug(
"Add new association with id (%d)", "Add new association with id (%d)",
(sctp_assoc_id_t) sctp_assoc_changed->sac_assoc_id); (sctp_assoc_id_t) sctp_assoc_changed->sac_assoc_id);
sctp_ctx.push_back(new_association); sctp_ctx.push_back(new_association);
sctp_get_localaddresses(sd, NULL, NULL); sctp_get_localaddresses(sd, NULL, NULL);
sctp_get_peeraddresses( sctp_get_peeraddresses(
sd, &new_association->peer_addresses, sd, &new_association->peer_addresses,
...@@ -287,6 +307,7 @@ sctp_association_t* sctp_server::add_new_association( ...@@ -287,6 +307,7 @@ sctp_association_t* sctp_server::add_new_association(
app_->handle_sctp_new_association( app_->handle_sctp_new_association(
new_association->assoc_id, new_association->instreams, new_association->assoc_id, new_association->instreams,
new_association->outstreams); new_association->outstreams);
return new_association; return new_association;
} }
...@@ -294,14 +315,17 @@ sctp_association_t* sctp_server::add_new_association( ...@@ -294,14 +315,17 @@ sctp_association_t* sctp_server::add_new_association(
sctp_association_t* sctp_server::sctp_is_assoc_in_list( sctp_association_t* sctp_server::sctp_is_assoc_in_list(
sctp_assoc_id_t assoc_id) { sctp_assoc_id_t assoc_id) {
sctp_association_t* assoc_desc = NULL; sctp_association_t* assoc_desc = NULL;
if (assoc_id < 0) { if (assoc_id < 0) {
return NULL; return NULL;
} }
for (int i = 0; i < sctp_ctx.size(); i++) { for (int i = 0; i < sctp_ctx.size(); i++) {
if (sctp_ctx[i]->assoc_id == assoc_id) { if (sctp_ctx[i]->assoc_id == assoc_id) {
return sctp_ctx[i]; return sctp_ctx[i];
} }
} }
return assoc_desc; return assoc_desc;
} }
...@@ -310,12 +334,15 @@ int sctp_server::sctp_get_peeraddresses( ...@@ -310,12 +334,15 @@ int sctp_server::sctp_get_peeraddresses(
int sock, struct sockaddr** remote_addr, int* nb_remote_addresses) { int sock, struct sockaddr** remote_addr, int* nb_remote_addresses) {
int nb; int nb;
struct sockaddr* temp_addr_p = NULL; struct sockaddr* temp_addr_p = NULL;
if ((nb = sctp_getpaddrs(sock, -1, &temp_addr_p)) <= 0) { if ((nb = sctp_getpaddrs(sock, -1, &temp_addr_p)) <= 0) {
Logger::sctp().error("Failed to retrieve peer addresses"); Logger::sctp().error("Failed to retrieve peer addresses");
return RETURNerror; return RETURNerror;
} }
Logger::sctp().info("----------------------"); Logger::sctp().info("----------------------");
Logger::sctp().info("Peer addresses:"); Logger::sctp().info("Peer addresses:");
for (int j = 0; j < nb; j++) { for (int j = 0; j < nb; j++) {
if (temp_addr_p[j].sa_family == AF_INET) { if (temp_addr_p[j].sa_family == AF_INET) {
char address[16] = {0}; char address[16] = {0};
...@@ -336,13 +363,16 @@ int sctp_server::sctp_get_peeraddresses( ...@@ -336,13 +363,16 @@ int sctp_server::sctp_get_peeraddresses(
} }
} }
} }
Logger::sctp().info("----------------------"); Logger::sctp().info("----------------------");
if (remote_addr != NULL && nb_remote_addresses != NULL) { if (remote_addr != NULL && nb_remote_addresses != NULL) {
*nb_remote_addresses = nb; *nb_remote_addresses = nb;
*remote_addr = temp_addr_p; *remote_addr = temp_addr_p;
} else { } else {
sctp_freepaddrs((struct sockaddr*) temp_addr_p); sctp_freepaddrs((struct sockaddr*) temp_addr_p);
} }
return RETURNok; return RETURNok;
} }
...@@ -355,6 +385,7 @@ int sctp_server::sctp_get_localaddresses( ...@@ -355,6 +385,7 @@ int sctp_server::sctp_get_localaddresses(
Logger::sctp().error("Failed to retrieve local addresses"); Logger::sctp().error("Failed to retrieve local addresses");
return RETURNerror; return RETURNerror;
} }
if (temp_addr_p) { if (temp_addr_p) {
Logger::sctp().info("----------------------"); Logger::sctp().info("----------------------");
Logger::sctp().info("Local addresses:"); Logger::sctp().info("Local addresses:");
...@@ -381,6 +412,7 @@ int sctp_server::sctp_get_localaddresses( ...@@ -381,6 +412,7 @@ int sctp_server::sctp_get_localaddresses(
" - Unknown address family %d", temp_addr_p[j].sa_family); " - Unknown address family %d", temp_addr_p[j].sa_family);
} }
} }
if (local_addr != NULL && nb_local_addresses != NULL) { if (local_addr != NULL && nb_local_addresses != NULL) {
*nb_local_addresses = nb; *nb_local_addresses = nb;
*local_addr = temp_addr_p; *local_addr = temp_addr_p;
...@@ -388,6 +420,7 @@ int sctp_server::sctp_get_localaddresses( ...@@ -388,6 +420,7 @@ int sctp_server::sctp_get_localaddresses(
sctp_freeladdrs((struct sockaddr*) temp_addr_p); sctp_freeladdrs((struct sockaddr*) temp_addr_p);
} }
} }
return RETURNok; return RETURNok;
} }
......
...@@ -22,19 +22,18 @@ ...@@ -22,19 +22,18 @@
#ifndef _SCTP_SERVER_H_ #ifndef _SCTP_SERVER_H_
#define _SCTP_SERVER_H_ #define _SCTP_SERVER_H_
#include <thread>
#include "common_defs.h" #include "common_defs.h"
#include "endpoint.hpp" #include "endpoint.hpp"
#include <thread>
#include <vector>
extern "C" { extern "C" {
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/sctp.h> #include <netinet/sctp.h>
#include "bstrlib.h" #include "bstrlib.h"
} }
#include <iostream>
#include <vector>
#define SCTP_RECV_BUFFER_SIZE 2048 #define SCTP_RECV_BUFFER_SIZE 2048
#define SCTP_RC_ERROR -1 #define SCTP_RC_ERROR -1
...@@ -47,20 +46,19 @@ typedef uint16_t sctp_stream_id_t; ...@@ -47,20 +46,19 @@ typedef uint16_t sctp_stream_id_t;
typedef uint32_t sctp_assoc_id_t; typedef uint32_t sctp_assoc_id_t;
typedef struct sctp_association_s { typedef struct sctp_association_s {
struct sctp_association_s* next_assoc; ///< Next association in the list struct sctp_association_s* next_assoc; // Next association in the list
struct sctp_association_s* struct sctp_association_s*
previous_assoc; ///< Previous association in the list previous_assoc; // Previous association in the list
int sd; ///< Socket descriptor int sd; // Socket descriptor
uint32_t ppid; ///< Payload protocol Identifier uint32_t ppid; // Payload protocol Identifier
uint16_t uint16_t instreams; // Number of input streams negociated for this connection
instreams; ///< Number of input streams negociated for this connection
uint16_t uint16_t
outstreams; ///< Number of output strams negotiated for this connection outstreams; // Number of output strams negotiated for this connection
sctp_assoc_id_t assoc_id; ///< SCTP association id for the connection sctp_assoc_id_t assoc_id; // SCTP association id for the connection
uint32_t messages_recv; ///< Number of messages received on this connection uint32_t messages_recv; // Number of messages received on this connection
uint32_t messages_sent; ///< Number of messages sent on this connection uint32_t messages_sent; // Number of messages sent on this connection
struct sockaddr* peer_addresses; ///< A list of peer addresses struct sockaddr* peer_addresses; // A list of peer addresses
int nb_peer_addresses; int nb_peer_addresses;
} sctp_association_t; } sctp_association_t;
...@@ -75,6 +73,7 @@ typedef struct sctp_descriptor_s { ...@@ -75,6 +73,7 @@ typedef struct sctp_descriptor_s {
class sctp_application { class sctp_application {
public: public:
virtual ~sctp_application(){};
virtual void handle_receive( virtual void handle_receive(
bstring payload, sctp_assoc_id_t assoc_id, sctp_stream_id_t stream, bstring payload, sctp_assoc_id_t assoc_id, sctp_stream_id_t stream,
sctp_stream_id_t instreams, sctp_stream_id_t outstreams) = 0; sctp_stream_id_t instreams, sctp_stream_id_t outstreams) = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment