Commit f060385b authored by Robert Schmidt's avatar Robert Schmidt

Improve UDP support of ASYNC_IF

* peer_addr is now const
* new_link_udp_server() can bind to address or INADDR_ANY
* socket_udp_receive() happens without a loop
* socket_udp_send() happens without a loop
* sending in two packets (first size, then data) is only performed for TCP,
  SCTP, not for UDP anymore (it is unreliable, so we could miss something and
  will receive complete garbage)
parent 5f758074
...@@ -76,7 +76,7 @@ static void *link_manager_receiver_thread(void *_manager) ...@@ -76,7 +76,7 @@ static void *link_manager_receiver_thread(void *_manager)
LOG_D(MAC, "starting link manager receiver thread\n"); LOG_D(MAC, "starting link manager receiver thread\n");
while (manager->run) { while (manager->run) {
if (link_receive_packet(manager->socket_link, &data, &size, manager->type, manager->peer_addr, manager->port)) if (link_receive_packet(manager->socket_link, &data, &size, manager->type))
goto error; goto error;
/* todo: priority */ /* todo: priority */
if (message_put(manager->receive_queue, data, size, 0)) if (message_put(manager->receive_queue, data, size, 0))
......
...@@ -46,7 +46,7 @@ typedef struct { ...@@ -46,7 +46,7 @@ typedef struct {
message_queue_t *receive_queue; message_queue_t *receive_queue;
socket_link_t *socket_link; socket_link_t *socket_link;
uint16_t type; uint16_t type;
char *peer_addr; const char *peer_addr;
int port; int port;
pthread_t sender; pthread_t sender;
pthread_t receiver; pthread_t receiver;
......
...@@ -163,8 +163,8 @@ error: ...@@ -163,8 +163,8 @@ error:
return NULL; return NULL;
} }
socket_link_t *new_link_udp_server(int port){ socket_link_t *new_link_udp_server(const char *bind_addr, int bind_port)
{
socket_link_t *ret = NULL; socket_link_t *ret = NULL;
struct sockaddr_in si_me; struct sockaddr_in si_me;
...@@ -181,19 +181,25 @@ socket_link_t *new_link_udp_server(int port){ ...@@ -181,19 +181,25 @@ socket_link_t *new_link_udp_server(int port){
//create a UDP socket //create a UDP socket
if ((socket_server=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { if ((socket_server=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
goto error; goto error;
} }
// zero out the structure // zero out the structure
memset((char *) &si_me, 0, sizeof(si_me)); memset((char *) &si_me, 0, sizeof(si_me));
si_me.sin_family = AF_INET; si_me.sin_family = AF_INET;
si_me.sin_port = htons(port); si_me.sin_port = htons(bind_port);
si_me.sin_addr.s_addr = INADDR_ANY; if (bind_addr) {
if (!inet_aton(bind_addr, &si_me.sin_addr))
goto error;
} else {
si_me.sin_addr.s_addr = INADDR_ANY;
}
//bind socket to port //bind socket to port
if( bind(socket_server , (struct sockaddr*)&si_me, sizeof(si_me) ) == -1) { if( bind(socket_server , (struct sockaddr*)&si_me, sizeof(si_me) ) == -1) {
goto error; fprintf(stderr, "could not bind to address %s: %s\n", bind_addr, strerror(errno));
goto error;
} }
ret->socket_fd = socket_server; ret->socket_fd = socket_server;
ret->peer_port = 0; ret->peer_port = 0;
...@@ -356,13 +362,11 @@ error: ...@@ -356,13 +362,11 @@ error:
return NULL; return NULL;
} }
static int socket_udp_send(int socket_fd, void *buf, int size, char *peer_addr, int port) static int socket_udp_send(int socket_fd, void *buf, int size, const char *peer_addr, int port)
{ {
struct sockaddr_in si_other; struct sockaddr_in si_other;
int slen = sizeof(si_other); int slen = sizeof(si_other);
char *s = buf; int l;
int l;
int my_socket; int my_socket;
LOG_D(PROTO_AGENT,"UDP send\n"); LOG_D(PROTO_AGENT,"UDP send\n");
...@@ -378,15 +382,10 @@ static int socket_udp_send(int socket_fd, void *buf, int size, char *peer_addr, ...@@ -378,15 +382,10 @@ static int socket_udp_send(int socket_fd, void *buf, int size, char *peer_addr,
exit(1); exit(1);
} }
while (size) { l = sendto(my_socket, buf, size, 0, (struct sockaddr *) &si_other, slen);
l = sendto(my_socket, s, size, 0, (struct sockaddr *) &si_other, slen); if (l == -1) goto error;
if (l == -1) goto error;
if (l == 0) { printf("\n\n\nERROR PROTO_AGENT: %s:%d: this cannot happen, normally...\n", __FILE__, __LINE__); abort(); }
size -= l;
s += l;
}
return 0; return l;
error: error:
LOG_E(MAC,"socket_udp_send: ERROR: %s\n", strerror(errno)); LOG_E(MAC,"socket_udp_send: ERROR: %s\n", strerror(errno));
return -1; return -1;
...@@ -398,20 +397,14 @@ static int socket_udp_receive(int socket_fd, void *buf, int size) ...@@ -398,20 +397,14 @@ static int socket_udp_receive(int socket_fd, void *buf, int size)
struct sockaddr_in client; struct sockaddr_in client;
socklen_t slen; socklen_t slen;
char *s = buf;
int l; int l;
while (size) { l = recvfrom(socket_fd, buf, size, 0, (struct sockaddr *) &client, &slen);
l = recvfrom(socket_fd, s, size, 0, (struct sockaddr *) &client, &slen); //getsockname(socket_fd, (struct sockaddr *)&client, &slen);
getsockname(socket_fd, (struct sockaddr *)&client, &slen); if (l == -1) goto error;
LOG_D(PROTO_AGENT, "Got message from src port: %u\n", ntohs(client.sin_port)); if (l == 0) goto socket_closed;
if (l == -1) goto error;
if (l == 0) goto socket_closed;
size -= l;
s += l;
}
return ntohs(client.sin_port); return l;
error: error:
LOG_E(MAC, "socket_udp_receive: ERROR: %s\n", strerror(errno)); LOG_E(MAC, "socket_udp_receive: ERROR: %s\n", strerror(errno));
...@@ -476,18 +469,18 @@ socket_closed: ...@@ -476,18 +469,18 @@ socket_closed:
/* /*
* return -1 on error and 0 if the sending was fine * return -1 on error and 0 if the sending was fine
*/ */
int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_type, char *peer_addr, int port) int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_type, const char *peer_addr, int peer_port)
{ {
char sizebuf[4]; char sizebuf[4];
int32_t s = size; int32_t s = size;
/* send the size first, maximum is 2^31 bytes */
sizebuf[0] = (s >> 24) & 255;
sizebuf[1] = (s >> 16) & 255;
sizebuf[2] = (s >> 8) & 255;
sizebuf[3] = s & 255;
if ((proto_type == 0) || (proto_type == 2)) if ((proto_type == 0) || (proto_type == 2))
{ {
/* send the size first, maximum is 2^31 bytes */
sizebuf[0] = (s >> 24) & 255;
sizebuf[1] = (s >> 16) & 255;
sizebuf[2] = (s >> 8) & 255;
sizebuf[3] = s & 255;
if (socket_send(link->socket_fd, sizebuf, 4) == -1) if (socket_send(link->socket_fd, sizebuf, 4) == -1)
goto error; goto error;
...@@ -496,29 +489,18 @@ int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_t ...@@ -496,29 +489,18 @@ int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_t
if (socket_send(link->socket_fd, data, size) == -1) if (socket_send(link->socket_fd, data, size) == -1)
goto error; goto error;
link->bytes_sent += size;
link->packets_sent++;
} }
else if (proto_type == 1 ) else if (proto_type == 1 )
{ {
while (link->peer_port == 0) /* UDP is connectionless -> only send the data */
{ if (socket_udp_send(link->socket_fd, data, size, peer_addr, peer_port) == -1)
sleep(0.1);
}
LOG_D(PROTO_AGENT, "peer port is %d", link->peer_port);
if (socket_udp_send(link->socket_fd, sizebuf, 4, peer_addr, link->peer_port) == -1)
goto error;
LOG_I(PROTO_AGENT,"sent 4 bytes over the channel\n");
link->bytes_sent += 4;
if (socket_udp_send(link->socket_fd, data, size, peer_addr, link->peer_port) == -1)
goto error; goto error;
link->bytes_sent += size;
link->packets_sent++;
} }
link->bytes_sent += size;
link->packets_sent++;
return 0; return 0;
error: error:
...@@ -529,50 +511,47 @@ error: ...@@ -529,50 +511,47 @@ error:
* return -1 on error and 0 if the sending was fine * return -1 on error and 0 if the sending was fine
*/ */
int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size, uint16_t proto_type, char *peer_addr, int port) int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size, uint16_t proto_type)
{ {
unsigned char sizebuf[4]; unsigned char sizebuf[4];
int32_t size; int32_t size = 0;
void *data = NULL; void *data = NULL;
int peer_port = 0;
/* received the size first, maximum is 2^31 bytes */ /* received the size first, maximum is 2^31 bytes */
if ((proto_type == 0) || (proto_type == 2)) if ((proto_type == 0) || (proto_type == 2))
{ {
if (socket_receive(link->socket_fd, sizebuf, 4) == -1) if (socket_receive(link->socket_fd, sizebuf, 4) == -1)
goto error; goto error;
}
else if (proto_type == 1)
{
/* received the size first, maximum is 2^31 bytes */
peer_port = socket_udp_receive(link->socket_fd, sizebuf, 4);
if ( peer_port == -1)
goto error;
if (peer_port == 0) link->peer_port = peer_port;
}
size = (sizebuf[0] << 24) |
(sizebuf[1] << 16) |
(sizebuf[2] << 8) |
sizebuf[3];
link->bytes_received += 4; size = (sizebuf[0] << 24) |
(sizebuf[1] << 16) |
(sizebuf[2] << 8) |
sizebuf[3];
link->bytes_received += 4;
data = malloc(size);
if (data == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
data = malloc(size);
if (data == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
if ((proto_type == 0) || (proto_type == 2))
{
if (socket_receive(link->socket_fd, data, size) == -1) if (socket_receive(link->socket_fd, data, size) == -1)
goto error; goto error;
} }
else if (proto_type == 1) else if (proto_type == 1)
{ {
if (socket_udp_receive(link->socket_fd, data, size) == -1) /* we get a single packet (no size, UDP could lose it). Therefore, prepare
* for the maximum UDP packet size */
size = 65535;
data = malloc(size);
if (data == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
size = socket_udp_receive(link->socket_fd, data, size);
if (size < 0)
goto error; goto error;
} }
......
...@@ -48,12 +48,13 @@ typedef struct { ...@@ -48,12 +48,13 @@ typedef struct {
socket_link_t *new_link_server(int port); socket_link_t *new_link_server(int port);
socket_link_t *new_link_client(const char *server, int port); socket_link_t *new_link_client(const char *server, int port);
socket_link_t *new_link_udp_server(int port); /* setting bind_addr to NULL binds server to INADDR_ANY */
socket_link_t *new_link_udp_server(const char *bind_addr, int bind_port);
socket_link_t *new_link_udp_client(const char *server, int port); socket_link_t *new_link_udp_client(const char *server, int port);
socket_link_t *new_link_sctp_server(int port); socket_link_t *new_link_sctp_server(int port);
socket_link_t *new_link_sctp_client(const char *server, int port); socket_link_t *new_link_sctp_client(const char *server, int port);
int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_type, char *peer_addr, int port); int link_send_packet(socket_link_t *link, void *data, int size, uint16_t proto_type, const char *peer_addr, int port);
int link_receive_packet(socket_link_t *link, void **data, int *size, uint16_t proto_type, char *peer_addr, int port); int link_receive_packet(socket_link_t *link, void **data, int *size, uint16_t proto_type);
int close_link(socket_link_t *link); int close_link(socket_link_t *link);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment