Commit e508be85 authored by Navid Nikaein's avatar Navid Nikaein

add async interface and flexsplit messages

parent 560170a0
#!/bin/bash
function main()
{
mkdir -p $1
#echo generate protobuf messages inside $1 $2
c_out=$1
shift
proto_path=$1
shift
protoc-c --c_out=$c_out --proto_path=$proto_path $*
#protoc --cpp_out=$c_out --proto_path=$proto_path $*
}
main "$@"
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file link_manager.c
* \brief this is the implementation of a link manager
* \author Cedric Roux
* \date November 2015
* \version 1.0
* \email: cedric.roux@eurecom.fr
* @ingroup _mac
*/
#include "link_manager.h"
#include "log.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
/* that thread reads messages in the queue and sends them to the link */
static void *link_manager_sender_thread(void *_manager)
{
link_manager_t *manager = _manager;
void *data;
int size;
int priority;
LOG_D(MAC, "starting link manager sender thread\n");
while (manager->run) {
while (message_get(manager->send_queue, &data, &size, &priority) == 0) {
link_send_packet(manager->socket_link, data, size);
free(data);
}
// if (message_get(manager->send_queue, &data, &size, &priority))
// goto error;
//if (link_send_packet(manager->socket_link, data, size))
// goto error;
//free(data);
}
LOG_D(MAC, "link manager sender thread quits\n");
return NULL;
error:
LOG_E(MAC, "%s: error\n", __FUNCTION__);
return NULL;
}
/* that thread receives messages from the link and puts them in the queue */
static void *link_manager_receiver_thread(void *_manager)
{
link_manager_t *manager = _manager;
void *data;
int size;
LOG_D(MAC, "starting link manager receiver thread\n");
while (manager->run) {
if (link_receive_packet(manager->socket_link, &data, &size))
goto error;
/* todo: priority */
if (message_put(manager->receive_queue, data, size, 0))
goto error;
}
LOG_D(MAC, "link manager receiver thread quits\n");
return NULL;
error:
LOG_E(MAC, "%s: error\n", __FUNCTION__);
return NULL;
}
link_manager_t *create_link_manager(
message_queue_t *send_queue,
message_queue_t *receive_queue,
socket_link_t *link)
{
link_manager_t *ret = NULL;
pthread_attr_t attr;
pthread_t t;
LOG_D(MAC, "create new link manager\n");
ret = calloc(1, sizeof(link_manager_t));
if (ret == NULL)
goto error;
ret->send_queue = send_queue;
ret->receive_queue = receive_queue;
ret->socket_link = link;
ret->run = 1;
if (pthread_attr_init(&attr))
goto error;
// Make the async interface threads real-time
//#ifndef LOWLATENCY
struct sched_param sched_param_recv_thread;
struct sched_param sched_param_send_thread;
sched_param_recv_thread.sched_priority = sched_get_priority_max(SCHED_RR) - 1;
pthread_attr_setschedparam(&attr, &sched_param_recv_thread);
pthread_attr_setschedpolicy(&attr, SCHED_RR);
//#endif
if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))
goto error;
if (pthread_create(&t, &attr, link_manager_sender_thread, ret))
goto error;
ret->sender = t;
if (pthread_create(&t, &attr, link_manager_receiver_thread, ret))
/* we should destroy the other thread here */
goto error;
ret->receiver = t;
if (pthread_attr_destroy(&attr))
/* to be clean we should destroy the threads at this point,
* even if in practice we never reach it */
goto error;
return ret;
error:
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
free(ret);
return NULL;
}
void destroy_link_manager(link_manager_t *manager)
{
LOG_D(MAC, "destroying link manager\n");
manager->run = 0;
/* todo: force threads to stop (using a dummy message?) */
}
#ifdef SERVER_TEST
#include <string.h>
int main(void)
{
socket_link_t *link;
message_queue_t *send_queue;
message_queue_t *receive_queue;
link_manager_t *manager;
void *data;
int size;
int priority;
printf("starting server\n");
link = new_link_server(2210);
if (link == NULL) goto error;
send_queue = new_message_queue();
if (send_queue == NULL) goto error;
receive_queue = new_message_queue();
if (receive_queue == NULL) goto error;
manager = create_link_manager(send_queue, receive_queue, link);
if (manager == NULL) goto error;
data = strdup("hello"); if (data == NULL) goto error;
if (message_put(send_queue, data, 6, 100)) goto error;
if (message_get(receive_queue, &data, &size, &priority)) goto error;
printf("received message:\n");
printf(" data: %s\n", (char *)data);
printf(" size: %d\n", size);
printf(" priority: %d\n", priority);
printf("server ends\n");
return 0;
error:
printf("there was an error\n");
return 1;
}
#endif
#ifdef CLIENT_TEST
#include <string.h>
#include <unistd.h>
int main(void)
{
socket_link_t *link;
message_queue_t *send_queue;
message_queue_t *receive_queue;
link_manager_t *manager;
void *data;
int size;
int priority;
printf("starting client\n");
link = new_link_client("127.0.0.1", 2210);
if (link == NULL) goto error;
send_queue = new_message_queue();
if (send_queue == NULL) goto error;
receive_queue = new_message_queue();
if (receive_queue == NULL) goto error;
manager = create_link_manager(send_queue, receive_queue, link);
if (manager == NULL) goto error;
if (message_get(receive_queue, &data, &size, &priority)) goto error;
printf("received message:\n");
printf(" data: %s\n", (char *)data);
printf(" size: %d\n", size);
printf(" priority: %d\n", priority);
data = strdup("world"); if (data == NULL) goto error;
if (message_put(send_queue, data, 6, 200)) goto error;
/* let's wait for the message to be sent (unreliable sleep, but does it for the test) */
sleep(1);
printf("client ends\n");
return 0;
error:
printf("there was an error\n");
return 1;
}
#endif
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file link_manager.h
* \brief this is the implementation of a link manager
* \author Cedric Roux
* \date November 2015
* \version 1.0
* \email: cedric.roux@eurecom.fr
* @ingroup _mac
*/
#ifndef LINK_MANAGER_H
#define LINK_MANAGER_H
//#include "message_queue.h"
#include "ringbuffer_queue.h"
#include "socket_link.h"
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
message_queue_t *send_queue;
message_queue_t *receive_queue;
socket_link_t *socket_link;
pthread_t sender;
pthread_t receiver;
volatile int run;
} link_manager_t;
link_manager_t *create_link_manager(
message_queue_t *send_queue,
message_queue_t *receive_queue,
socket_link_t *link);
void destroy_link_manager(link_manager_t *);
#ifdef __cplusplus
}
#endif
#endif /* LINK_MANAGER_H */
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file message_queue.c
* \brief this is the implementation of a message queue
* \author Cedric Roux
* \date November 2015
* \version 1.0
* \email: cedric.roux@eurecom.fr
* @ingroup _mac
*/
#include "message_queue.h"
#include "log.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
message_queue_t *new_message_queue(void)
{
message_queue_t *ret = NULL;
ret = calloc(1, sizeof(message_queue_t));
if (ret == NULL)
goto error;
ret->mutex = calloc(1, sizeof(pthread_mutex_t));
if (ret->mutex == NULL)
goto error;
if (pthread_mutex_init(ret->mutex, NULL))
goto error;
ret->cond = calloc(1, sizeof(pthread_cond_t));
if (ret->cond == NULL)
goto error;
if (pthread_cond_init(ret->cond, NULL))
goto error;
return ret;
error:
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
if (ret != NULL) {
free(ret->mutex);
free(ret->cond);
memset(ret, 0, sizeof(message_queue_t));
free(ret);
}
return NULL;
}
int message_put(message_queue_t *queue, void *data, int size, int priority)
{
message_t *m = NULL;
m = calloc(1, sizeof(message_t));
if (m == NULL)
goto error;
m->data = data;
m->size = size;
m->priority = priority;
m->next = NULL;
if (pthread_mutex_lock(queue->mutex))
goto error;
if (queue->count == 0)
queue->head = m;
else
queue->tail->next = m;
queue->tail = m;
queue->count++;
if (pthread_cond_signal(queue->cond)) {
LOG_E(MAC, "%s:%d:%s: fatal error\n", __FILE__, __LINE__, __FUNCTION__);
pthread_mutex_unlock(queue->mutex);
exit(1);
}
if (pthread_mutex_unlock(queue->mutex)) {
LOG_E(MAC, "%s:%d:%s: fatal error\n", __FILE__, __LINE__, __FUNCTION__);
exit(1);
}
return 0;
error:
free(m);
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1;
}
int message_get(message_queue_t *queue, void **data, int *size, int *priority)
{
message_t *m;
if (pthread_mutex_lock(queue->mutex))
goto error;
while (queue->count == 0) {
if (pthread_cond_wait(queue->cond, queue->mutex)) {
pthread_mutex_unlock(queue->mutex);
goto error;
}
}
m = queue->head;
queue->head = queue->head->next;
if (queue->head == NULL)
queue->tail = NULL;
queue->count--;
if (pthread_mutex_unlock(queue->mutex))
goto error;
*data = m->data;
*size = m->size;
*priority = m->priority;
free(m);
return 0;
error:
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1;
}
/* when calling this function, the queue must not be used anymore (we don't lock it) */
/* we suppose that the data pointer in messages was allocated by malloc/calloc/realloc */
void destroy_message_queue(message_queue_t *queue)
{
while (queue->head) {
message_t *m = queue->head;
queue->head = queue->head->next;
free(m->data);
memset(m, 0, sizeof(message_t));
free(m);
}
free(queue->mutex);
free(queue->cond);
memset(queue, 0, sizeof(message_queue_t));
free(queue);
}
#ifdef TEST
/* some very basic tests */
int main(void)
{
void *data;
int size;
int priority;
message_queue_t *q;
char *s;
q = new_message_queue();
if (q == NULL) goto error;
if (message_put(q, "hello", 6, 0)) goto error;
if (message_put(q, "world", 6, 1)) goto error;
if (message_get(q, &data, &size, &priority)) goto error;
printf("message:\n data: '%s'\n size: %d\n priority: %d\n",
(char *)data, size, priority);
if (message_get(q, &data, &size, &priority)) goto error;
printf("message:\n data: '%s'\n size: %d\n priority: %d\n",
(char *)data, size, priority);
/* let's put a message before destroying the queue */
s = malloc(10); if (s == NULL) goto error;
sprintf(s, "hello");
if (message_put(q, s, 6, 0)) goto error;
destroy_message_queue(q);
return 0;
error:
printf("error\n");
return 1;
}
#endif
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file message_queue.h
* \brief this is the implementation of a message queue
* \author Cedric Roux
* \date November 2015
* \version 1.0
* \email: cedric.roux@eurecom.fr
* @ingroup _mac
*/
#ifndef MESSAGE_QUEUE_H
#define MESSAGE_QUEUE_H
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct message_t {
void *data;
int size;
int priority;
struct message_t *next;
} message_t;
typedef struct {
message_t *head;
message_t *tail;
volatile int count;
pthread_mutex_t *mutex;
pthread_cond_t *cond;
} message_queue_t;
message_queue_t *new_message_queue(void);
int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *size, int *priority);
void destroy_message_queue(message_queue_t *queue);
#ifdef __cplusplus
}
#endif
#endif /* MESSAGE_QUEUE_H */
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file ringbuffer_queue.c
* \brief Lock-free ringbuffer used for async message passing of agent
* \author Xenofon Foukas
* \date March 2016
* \version 1.0
* \email: x.foukas@sms.ed.ac.uk
* @ingroup _mac
*/
#include "ringbuffer_queue.h"
#include "log.h"
message_queue_t * new_message_queue(int size) {
message_queue_t *ret = NULL;
ret = calloc(1, sizeof(message_queue_t));
if (ret == NULL)
goto error;
lfds700_misc_library_init_valid_on_current_logical_core();
lfds700_misc_prng_init(&(ret->ps));
ret->ringbuffer_array = malloc(sizeof(struct lfds700_ringbuffer_element) * size);
lfds700_ringbuffer_init_valid_on_current_logical_core(&(ret->ringbuffer_state),
ret->ringbuffer_array,
size,
&(ret->ps),
NULL);
return ret;
error:
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
if (ret != NULL) {
free(ret->ringbuffer_array);
memset(ret, 0, sizeof(message_queue_t));
free(ret);
}
return NULL;
}
int message_put(message_queue_t *queue, void *data, int size, int priority) {
struct lfds700_misc_prng_state ls;
enum lfds700_misc_flag overwrite_occurred_flag;
message_t *overwritten_msg;
message_t *m = NULL;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls);
m = calloc(1, sizeof(message_t));
if (m == NULL)
goto error;
m->data = data;
m->size = size;
m->priority = priority;
lfds700_ringbuffer_write(&(queue->ringbuffer_state),
NULL,
(void *) m,
&overwrite_occurred_flag,
NULL,
(void **) &overwritten_msg,
&ls);
if (overwrite_occurred_flag == LFDS700_MISC_FLAG_RAISED) {
free(overwritten_msg->data);
free(overwritten_msg);
}
return 0;
error:
free(m);
LOG_E(MAC, "%s: an error occured\n", __FUNCTION__);
return -1;
}
int message_get(message_queue_t *queue, void **data, int *size, int *priority) {
message_t *m;
struct lfds700_misc_prng_state ls;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls);
if (lfds700_ringbuffer_read(&(queue->ringbuffer_state), NULL, (void **) &m, &ls) == 0) {
return -1;
}
*data = m->data;
*size = m->size;
*priority = m->priority;
free(m);
return 0;
}
message_queue_t destroy_message_queue(message_queue_t *queue) {
struct lfds700_misc_prng_state ls;
message_t *m;
LFDS700_MISC_MAKE_VALID_ON_CURRENT_LOGICAL_CORE_INITS_COMPLETED_BEFORE_NOW_ON_ANY_OTHER_LOGICAL_CORE;
lfds700_misc_prng_init(&ls);
while (lfds700_ringbuffer_read(&(queue->ringbuffer_state), NULL, (void **) &m, &ls) != 0) {
free(m->data);
memset(m, 0, sizeof(message_t));
free(m);
}
free(queue->ringbuffer_array);
memset(queue, 0, sizeof(message_queue_t));
free(queue);
}
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file ringbuffer_queue.h
* \brief Lock-free ringbuffer used for async message passing of agent
* \author Xenofon Foukas
* \date March 2016
* \version 1.0
* \email: x.foukas@sms.ed.ac.uk
* @ingroup _mac
*/
#ifndef RINGBUFFER_QUEUE_H
#define RINGBUFFER_QUEUE_H
#include "liblfds700.h"
typedef struct message_s {
void *data;
int size;
int priority;
} message_t;
typedef struct {
struct lfds700_misc_prng_state ps;
struct lfds700_ringbuffer_element *ringbuffer_array;
struct lfds700_ringbuffer_state ringbuffer_state;
} message_queue_t;
message_queue_t * new_message_queue(int size);
int message_put(message_queue_t *queue, void *data, int size, int priority);
int message_get(message_queue_t *queue, void **data, int *size, int *priority);
message_queue_t destroy_message_queue(message_queue_t *queue);
#endif /* RINGBUFFER_QUEUE_H */
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file socket_link.c
* \brief this is the implementation of a TCP socket ASYNC IF
* \author Cedric Roux
* \date November 2015
* \version 1.0
* \email: cedric.roux@eurecom.fr
* @ingroup _mac
*/
#include "socket_link.h"
#include "log.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <stdint.h>
socket_link_t *new_link_server(int port)
{
socket_link_t *ret = NULL;
int reuse;
struct sockaddr_in addr;
socklen_t addrlen;
int socket_server = -1;
int no_delay;
ret = calloc(1, sizeof(socket_link_t));
if (ret == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
ret->socket_fd = -1;
LOG_D(MAC, "create a new link server socket at port %d\n", port);
socket_server = socket(AF_INET, SOCK_STREAM, 0);
if (socket_server == -1) {
LOG_E(MAC, "%s:%d: socket: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
reuse = 1;
if (setsockopt(socket_server, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) {
LOG_E(MAC, "%s:%d: setsockopt: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
no_delay = 1;
if (setsockopt(socket_server, IPPROTO_TCP, TCP_NODELAY, &no_delay, sizeof(no_delay)) == -1) {
LOG_E(MAC, "%s:%d: setsockopt: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(socket_server, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
LOG_E(MAC, "%s:%d: bind: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
if (listen(socket_server, 5)) {
LOG_E(MAC, "%s:%d: listen: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
addrlen = sizeof(addr);
ret->socket_fd = accept(socket_server, (struct sockaddr *)&addr, &addrlen);
if (ret->socket_fd == -1) {
LOG_E(MAC, "%s:%d: accept: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
close(socket_server);
LOG_D(MAC, "connection from %s:%d\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
return ret;
error:
close(socket_server);
if (ret != NULL) close(ret->socket_fd);
free(ret);
LOG_E(MAC, "ERROR in new_link_server (see above), returning NULL\n");
return NULL;
}
socket_link_t *new_link_client(char *server, int port)
{
socket_link_t *ret = NULL;
struct sockaddr_in addr;
int no_delay;
ret = calloc(1, sizeof(socket_link_t));
if (ret == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
ret->socket_fd = -1;
LOG_D(MAC, "create a new link client socket connecting to %s:%d\n", server, port);
ret->socket_fd = socket(AF_INET, SOCK_STREAM, 0);
if (ret->socket_fd == -1) {
LOG_E(MAC, "%s:%d: socket: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
no_delay = 1;
if (setsockopt(ret->socket_fd, SOL_TCP, TCP_NODELAY, &no_delay, sizeof(no_delay)) == -1) {
LOG_E(MAC, "%s:%d: setsockopt: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
if (inet_aton(server, &addr.sin_addr) == 0) {
LOG_E(MAC, "invalid IP address '%s', use a.b.c.d notation\n", server);
goto error;
}
if (connect(ret->socket_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
LOG_E(MAC, "%s:%d: connect: %s\n", __FILE__, __LINE__, strerror(errno));
goto error;
}
LOG_D(MAC, "connection to %s:%d established\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
return ret;
error:
if (ret != NULL) close(ret->socket_fd);
free(ret);
LOG_E(MAC, "ERROR in new_link_client (see above), returning NULL\n");
return NULL;
}
/*
* return -1 on error and 0 if the sending was fine
*/
static int socket_send(int socket_fd, void *buf, int size)
{
char *s = buf;
int l;
while (size) {
l = send(socket_fd, s, size, MSG_NOSIGNAL);
if (l == -1) goto error;
if (l == 0) { LOG_E(MAC, "%s:%d: this cannot happen, normally...\n", __FILE__, __LINE__); abort(); }
size -= l;
s += l;
}
return 0;
error:
LOG_E(MAC, "socket_send: ERROR: %s\n", strerror(errno));
return -1;
}
/*
* return -1 on error and 0 if the receiving was fine
*/
static int socket_receive(int socket_fd, void *buf, int size)
{
char *s = buf;
int l;
while (size) {
l = read(socket_fd, s, size);
if (l == -1) goto error;
if (l == 0) goto socket_closed;
size -= l;
s += l;
}
return 0;
error:
LOG_E(MAC, "socket_receive: ERROR: %s\n", strerror(errno));
return -1;
socket_closed:
LOG_E(MAC, "socket_receive: socket closed\n");
return -1;
}
/*
* return -1 on error and 0 if the sending was fine
*/
int link_send_packet(socket_link_t *link, void *data, int size)
{
char sizebuf[4];
int32_t s = size;
/* send the size first, maximum is 2^31 bytes */
sizebuf[0] = (s >> 24) & 255;
sizebuf[1] = (s >> 16) & 255;
sizebuf[2] = (s >> 8) & 255;
sizebuf[3] = s & 255;
if (socket_send(link->socket_fd, sizebuf, 4) == -1)
goto error;
link->bytes_sent += 4;
if (socket_send(link->socket_fd, data, size) == -1)
goto error;
link->bytes_sent += size;
link->packets_sent++;
return 0;
error:
return -1;
}
/*
* return -1 on error and 0 if the sending was fine
*/
int link_receive_packet(socket_link_t *link, void **ret_data, int *ret_size)
{
unsigned char sizebuf[4];
int32_t size;
void *data = NULL;
/* received the size first, maximum is 2^31 bytes */
if (socket_receive(link->socket_fd, sizebuf, 4) == -1)
goto error;
size = (sizebuf[0] << 24) |
(sizebuf[1] << 16) |
(sizebuf[2] << 8) |
sizebuf[3];
link->bytes_received += 4;
data = malloc(size);
if (data == NULL) {
LOG_E(MAC, "%s:%d: out of memory\n", __FILE__, __LINE__);
goto error;
}
if (socket_receive(link->socket_fd, data, size) == -1)
goto error;
link->bytes_received += size;
link->packets_received++;
*ret_data = data;
*ret_size = size;
return 0;
error:
free(data);
*ret_data = NULL;
*ret_size = 0;
return -1;
}
/*
* return -1 on error, 0 if all is fine
*/
int close_link(socket_link_t *link)
{
close(link->socket_fd);
memset(link, 0, sizeof(socket_link_t));
free(link);
return 0;
}
#ifdef SERVER_TEST
#include <inttypes.h>
int main(void)
{
void *data;
int size;
socket_link_t *l = new_link_server(2210);
if (l == NULL) { printf("no link created\n"); return 1; }
printf("link is up\n");
printf("server starts sleeping...\n");
/* this sleep is here to test for broken pipe. You can run "nc localhost 2210"
* and interrupt it quickly so that the server gets a 'broken' pipe on the
* following link_send_packet.
*/
sleep(1);
printf("... done\n");
if (link_send_packet(l, "hello\n", 6+1) ||
link_send_packet(l, "world\n", 6+1)) return 1;
if (link_receive_packet(l, &data, &size)) return 1; printf("%s", (char *)data); free(data);
if (link_receive_packet(l, &data, &size)) return 1; printf("%s", (char *)data); free(data);
printf("stats:\n");
printf(" sent packets %"PRIu64"\n", l->packets_sent);
printf(" sent bytes %"PRIu64"\n", l->bytes_sent);
printf(" received packets %"PRIu64"\n", l->packets_received);
printf(" received bytes %"PRIu64"\n", l->bytes_received);
if (close_link(l)) return 1;
printf("link is down\n");
return 0;
}
#endif
#ifdef CLIENT_TEST
#include <inttypes.h>
int main(void)
{
void *data;
int size;
socket_link_t *l = new_link_client("127.0.0.1", 2210);
if (l == NULL) { printf("no link created\n"); return 1; }
printf("link is up\n");
if (link_receive_packet(l, &data, &size)) return 1; printf("%s", (char *)data); free(data);
if (link_receive_packet(l, &data, &size)) return 1; printf("%s", (char *)data); free(data);
if (link_send_packet(l, "bye\n", 4+1) ||
link_send_packet(l, "server\n", 7+1)) return 1;
printf("stats:\n");
printf(" sent packets %"PRIu64"\n", l->packets_sent);
printf(" sent bytes %"PRIu64"\n", l->bytes_sent);
printf(" received packets %"PRIu64"\n", l->packets_received);
printf(" received bytes %"PRIu64"\n", l->bytes_received);
if (close_link(l)) return 1;
printf("link is down\n");
return 0;
}
#endif
/*******************************************************************************
OpenAirInterface
Copyright(c) 1999 - 2015 Eurecom
OpenAirInterface is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OpenAirInterface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OpenAirInterface.The full GNU General Public License is
included in this distribution in the file called "COPYING". If not,
see <http://www.gnu.org/licenses/>.
Contact Information
OpenAirInterface Admin: openair_admin@eurecom.fr
OpenAirInterface Tech : openair_tech@eurecom.fr
OpenAirInterface Dev : openair4g-devel@eurecom.fr
Address : Eurecom, Campus SophiaTech, 450 Route des Chappes, CS 50193 - 06904 Biot Sophia Antipolis cedex, FRANCE
*******************************************************************************/
/*! \file socket_link.h
* \brief this is the implementation of a TCP socket ASYNC IF
* \author Cedric Roux
* \date November 2015
* \version 1.0
* \email: cedric.roux@eurecom.fr
* @ingroup _mac
*/
#ifndef SOCKET_LINK_H
#define SOCKET_LINK_H
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int socket_fd;
uint64_t bytes_sent;
uint64_t packets_sent;
uint64_t bytes_received;
uint64_t packets_received;
} socket_link_t;
socket_link_t *new_link_server(int port);
socket_link_t *new_link_client(char *server, int port);
int link_send_packet(socket_link_t *link, void *data, int size);
int link_receive_packet(socket_link_t *link, void **data, int *size);
int close_link(socket_link_t *link);
#ifdef __cplusplus
}
#endif
#endif /* SOCKET_LINK_H */
package protocol;
import "header.proto";
message flexsplit_message {
optional flexsplit_direction msg_dir = 100;
oneof msg {
fsp_hello hello_msg = 1;
}
}
enum flexsplit_direction {
//option allow_alias = true;
NOT_SET = 0;
INITIATING_MESSAGE = 1;
SUCCESSFUL_OUTCOME=2;
UNSUCCESSFUL_OUTCOME=3;
}
enum flexsplit_err {
option allow_alias = true;
// message errors
NO_ERR = 0;
MSG_DEQUEUING = -1;
MSG_ENQUEUING = -2;
MSG_DECODING = -3;
MSG_ENCODING = -4;
MSG_BUILD = -5;
MSG_NOT_SUPPORTED = -6;
MSG_NOT_HANDLED = -7;
MSG_NOT_VALIDATED = -8;
MSG_OUT_DATED = -9;
// other erros
UNEXPECTED = -100;
}
//
// Maintenance and discovery messages
//
message fsp_hello {
optional fsp_header header = 1;
}
\ No newline at end of file
package protocol;
message fsp_header {
optional uint32 version = 1;
optional uint32 type = 2;
optional uint32 xid = 4;
}
enum fsp_type {
// Discovery and maintenance messages
FSPT_HELLO = 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment