Commit e28dbb7f authored by gauthier's avatar gauthier

itti

parent e6e80008
This diff is collapsed.
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
/*! \file itti.hpp
\brief
\author Lionel GAUTHIER
\date 2018
\email: lionel.gauthier@eurecom.fr
*/
#ifndef SRC_OAI_ITTI_ITTI_HPP_INCLUDED_
#define SRC_OAI_ITTI_ITTI_HPP_INCLUDED_
#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#include <iostream>
#include <stdint.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <set>
#include <memory>
#include <chrono>
#include <iomanip>
#include "itti_msg.hpp"
namespace oai::cn::core::itti {
typedef volatile enum task_state_s {
TASK_STATE_NOT_CONFIGURED, TASK_STATE_STARTING, TASK_STATE_READY, TASK_STATE_ENDED, TASK_STATE_MAX,
} task_state_t;
typedef uint32_t timer_id_t;
#define ITTI_INVALID_TIMER_ID (uintptr_t)0
class itti_timer {
public:
itti_timer(const timer_id_t id, const task_id_t task_id, const uint32_t interval_sec, const uint32_t interval_us, uint64_t arg1_user, uint64_t arg2_user) :
id(id), task_id(task_id), arg1_user(arg1_user), arg2_user(arg2_user) {
time_out = std::chrono::system_clock::now() + std::chrono::seconds(interval_sec) + std::chrono::microseconds(interval_us);
}
itti_timer(const timer_id_t id, const task_id_t task_id, const std::chrono::system_clock::time_point time_out, uint64_t arg1_user, uint64_t arg2_user) :
id(id), task_id(task_id), time_out(time_out), arg1_user(arg1_user), arg2_user(arg2_user) {
}
itti_timer(const itti_timer &t) : id(t.id), task_id(t.task_id) , time_out(t.time_out), arg1_user(t.arg1_user), arg2_user(t.arg2_user) {}
//itti_timer(itti_timer&& t) noexcept : id(std::move(t.id)), task_id(std::move(t.task_id)) , time_out(std::move(t.time_out)) {}
bool operator<(const itti_timer& t) const { return time_out < t.time_out; }
~itti_timer() {}
timer_id_t id;
task_id_t task_id;
std::chrono::system_clock::time_point time_out;
uint64_t arg1_user;
uint64_t arg2_user;
};
//------------------------------------------------------------------------------
struct timer_comparator {
bool operator()(const itti_timer &left, const itti_timer &right) const
{
return (left.time_out < right.time_out);
}
};
class itti_task_ctxt {
public:
explicit itti_task_ctxt(const task_id_t task_id) :
task_id(task_id), m_state(), task_state(TASK_STATE_STARTING), msg_queue() , m_queue(), c_queue() {}
~itti_task_ctxt() {}
const task_id_t task_id;
/*
* pthread associated with the thread
*/
//std::thread::id thread_id;
std::thread thread;
/*
* State of the thread
*/
std::mutex m_state;
volatile task_state_t task_state;
std::queue<std::shared_ptr<itti_msg>> msg_queue;
std::mutex m_queue;
std::condition_variable c_queue;
};
class itti_mw {
private:
itti_task_ctxt *itti_task_ctxts[TASK_MAX];
/*
* Current message number. Incremented every call to send_msg_to_task
*/
unsigned long msg_number;
timer_id_t timer_id;
std::mutex m_timer_id;
std::thread timer_thread;
std::atomic<int> created_tasks;
std::atomic<int> ready_tasks;
std::set<itti_timer, timer_comparator> timers;
oai::cn::core::itti::itti_timer current_timer;
std::mutex m_timers;
std::condition_variable c_timers;
std::mutex m_timeout;
std::condition_variable c_timeout;
bool terminate;
static void timer_manager_task(void);
public:
itti_mw();
itti_mw(itti_mw const&) = delete;
void operator=(itti_mw const&) = delete;
~itti_mw();
void start(void);
timer_id_t increment_timer_id ();
unsigned int increment_message_number ();
/** \brief Send a broadcast message to every task
\param message_p Pointer to the message to send
@returns < 0 on failure, 0 otherwise
**/
int send_broadcast_msg(std::shared_ptr<itti_msg> message);
/** \brief Send a message to a task (could be itself)
\param message message to send
@returns -1 on failure, 0 otherwise
**/
int send_msg(std::shared_ptr<itti_msg> message);
/** \brief Retrieves a message in the queue associated to task_id.
* If the queue is empty, the thread is blocked till a new message arrives.
\param task_id Task ID of the receiving task
\param received_msg Pointer to the allocated message
**/
std::shared_ptr<itti_msg> receive_msg (task_id_t task_id);
/** \brief Try to retrieves a message in the queue associated to task_id.
\param task_id Task ID of the receiving task
\param received_msg Pointer to the allocated message
**/
std::shared_ptr<itti_msg> poll_msg(task_id_t task_id);
/** \brief Start thread associated to the task
* \param task_id task to start
* \param start_routine entry point for the task
* \param args_p Optional argument to pass to the start routine
* @returns -1 on failure, 0 otherwise
**/
int create_task (const task_id_t task_id,
void (*start_routine) (void *),
void *args_p);
/** \brief Notify ITTI of a started thread
* \param task_id of started task
* \param start_routine entry point for the task
* \param args_p Optional argument to pass to the start routine
* @returns -1 on failure, 0 otherwise
**/
int notify_task_ready(const task_id_t task_id);
/** \brief Indicates to ITTI if newly created tasks should wait for all tasks to be ready
* \param wait_tasks non 0 to make new created tasks to wait, 0 to let created tasks to run
**/
//void wait_ready(int wait_tasks);
/** \brief Mark the task as in ready state
* \param task_id task to mark as ready
**/
//void mark_task_ready(task_id_t task_id);
/** \brief handle signals and wait for all threads to join when the process complete.
* This function should be called from the main thread after having created all ITTI tasks.
**/
void wait_tasks_end(void);
/** \brief Send a termination message to all tasks.
* \param src_task_id task that is broadcasting the message.
**/
int send_terminate_msg(task_id_t src_task_id);
/** \brief Request a new timer
* \param interval_sec timer interval in seconds
* \param interval_us timer interval in micro seconds
* \param task_id task id of the task requesting the timer
* @returns 0 on failure, timer id otherwise
**/
timer_id_t timer_setup(
uint32_t interval_sec,
uint32_t interval_us,
task_id_t task_id,
uint64_t arg1_user = 0,
uint64_t arg2_user = 0);
/** \brief Remove the timer from list
* \param timer_id unique timer id
* \param task_id task id of the task that requested the timer
* @returns -1 on failure, 0 otherwise
**/
int timer_remove (timer_id_t timer_id);
static void signal_handler( int signum );
};
}
#endif /* SRC_OAI_ITTI_ITTI_HPP_INCLUDED_ */
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
/*! \file itti_msg.cpp
\brief
\author Lionel GAUTHIER
\date 2018
\email: lionel.gauthier@eurecom.fr
*/
#include "itti_msg.hpp"
#include "itti.hpp"
using namespace oai::cn::core::itti;
extern itti_mw *itti_inst;
itti_msg::itti_msg() :
msg_type(ITTI_MSG_TYPE_NONE), origin(TASK_NONE), destination(TASK_NONE) {
msg_num = itti_inst->increment_message_number();
};
itti_msg::itti_msg(const itti_msg_type_t msg_type, task_id_t origin, task_id_t destination) :
msg_type(msg_type), origin(origin), destination(destination) {
msg_num = itti_inst->increment_message_number();
};
itti_msg::itti_msg(const itti_msg& i) :
msg_type(i.msg_type),msg_num(i.msg_num), origin(i.origin), destination(i.destination) {
};
const char* itti_msg::get_msg_name()
{
return "UNINITIALIZED";
}
/*
* Licensed to the OpenAirInterface (OAI) Software Alliance under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The OpenAirInterface Software Alliance licenses this file to You under
* the OAI Public License, Version 1.1 (the "License"); you may not use this file
* except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.openairinterface.org/?page_id=698
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*-------------------------------------------------------------------------------
* For more information about the OpenAirInterface (OAI) Software Alliance:
* contact@openairinterface.org
*/
/*! \file itti_msg.hpp
\brief
\author Lionel GAUTHIER
\date 2018
\email: lionel.gauthier@eurecom.fr
*/
#ifndef SRC_ITTI_ITTI_MSG_H_INCLUDED_
#define SRC_ITTI_ITTI_MSG_H_INCLUDED_
#include <stdint.h>
#include <typeinfo>
#include <iostream>
namespace oai::cn::core::itti {
typedef enum {
TASK_FIRST = 0,
TASK_ITTI_TIMER = TASK_FIRST,
TASK_ASYNC_SHELL_CMD,
TASK_ENB_S1U,
TASK_GTPV1_U,
TASK_GTPV2_C,
TASK_MME_S11,
TASK_PGWC_APP,
TASK_PGWU_APP,
TASK_SPGWU_APP,
TASK_PGWC_S5S8,
TASK_PGWC_SX,
TASK_PGWU_SX,
TASK_PGW_UDP,
TASK_SGWC_APP,
TASK_SGWC_S11,
TASK_SGWC_S5S8,
TASK_SGWC_SXA,
TASK_SGWU_SXA,
TASK_SPGWU_SX,
TASK_SPGWU_S1U,
TASK_SGW_UDP,
TASK_MAX,
TASK_NONE,
TASK_ALL = 255
} task_id_t;
typedef enum message_priorities_e {
MESSAGE_PRIORITY_MAX = 100,
MESSAGE_PRIORITY_MAX_LEAST = 85,
MESSAGE_PRIORITY_MED_PLUS = 70,
MESSAGE_PRIORITY_MED = 55,
MESSAGE_PRIORITY_MED_LEAST = 40,
MESSAGE_PRIORITY_MIN_PLUS = 25,
MESSAGE_PRIORITY_MIN = 10,
} message_priorities_t;
typedef enum {
ITTI_MSG_TYPE_NONE = -1,
ITTI_MSG_TYPE_FIRST = 0,
ASYNC_SHELL_CMD = ITTI_MSG_TYPE_FIRST,
GTPV1U_CREATE_TUNNEL_REQ,
GTPV1U_CREATE_TUNNEL_RESP,
GTPV1U_UPDATE_TUNNEL_REQ,
GTPV1U_UPDATE_TUNNEL_RESP,
GTPV1U_DELETE_TUNNEL_REQ,
GTPV1U_DELETE_TUNNEL_RESP,
GTPV1U_TUNNEL_DATA_IND,
GTPV1U_TUNNEL_DATA_REQ,
GTPV1U_DOWNLINK_DATA_NOTIFICATION,
S11_CREATE_SESSION_REQUEST,
S11_CREATE_SESSION_RESPONSE,
S11_CREATE_BEARER_REQUEST,
S11_CREATE_BEARER_RESPONSE,
S11_MODIFY_BEARER_REQUEST,
S11_MODIFY_BEARER_RESPONSE,
S11_DELETE_BEARER_COMMAND,
S11_DELETE_BEARER_FAILURE_INDICATION,
S11_DELETE_SESSION_REQUEST,
S11_DELETE_SESSION_RESPONSE,
S11_RELEASE_ACCESS_BEARERS_REQUEST,
S11_RELEASE_ACCESS_BEARERS_RESPONSE,
S11_DOWNLINK_DATA_NOTIFICATION,
S11_DOWNLINK_DATA_NOTIFICATION_ACKNOWLEDGE,
S11_DOWNLINK_DATA_NOTIFICATION_FAILURE_INDICATION,
S1U_ECHO_REQUEST,
S1U_ECHO_RESPONSE,
S1U_ERROR_INDICATION,
S1U_SUPPORTED_EXTENSION_HEADERS_NOTIFICATION,
S1U_END_MARKER,
S1U_G_PDU, // UNUSED
S5S8_CREATE_SESSION_REQUEST,
S5S8_CREATE_SESSION_RESPONSE,
S5S8_CREATE_BEARER_REQUEST,
S5S8_CREATE_BEARER_RESPONSE,
S5S8_MODIFY_BEARER_REQUEST,
S5S8_MODIFY_BEARER_RESPONSE,
S5S8_DELETE_BEARER_COMMAND,
S5S8_DELETE_BEARER_FAILURE_INDICATION,
S5S8_DELETE_SESSION_REQUEST,
S5S8_DELETE_SESSION_RESPONSE,
S5S8_RELEASE_ACCESS_BEARERS_REQUEST,
S5S8_RELEASE_ACCESS_BEARERS_RESPONSE,
SXAB_HEARTBEAT_REQUEST,
SXAB_HEARTBEAT_RESPONSE,
SXAB_PFCP_PFD_MANAGEMENT_REQUEST,
SXAB_PFCP_PFD_MANAGEMENT_RESPONSE,
SXAB_ASSOCIATION_SETUP_REQUEST,
SXAB_ASSOCIATION_SETUP_RESPONSE,
SXAB_ASSOCIATION_UPDATE_REQUEST,
SXAB_ASSOCIATION_UPDATE_RESPONSE,
SXAB_ASSOCIATION_RELEASE_REQUEST,
SXAB_ASSOCIATION_RELEASE_RESPONSE,
SXAB_VERSION_NOT_SUPPORTED_RESPONSE,
SXAB_NODE_REPORT_REQUEST,
SXAB_NODE_REPORT_RESPONSE,
SXAB_SESSION_SET_DELETION_REQUEST,
SXAB_SESSION_SET_DELETION_RESPONSE,
SXAB_SESSION_ESTABLISHMENT_REQUEST,
SXAB_SESSION_ESTABLISHMENT_RESPONSE,
SXAB_SESSION_MODIFICATION_REQUEST,
SXAB_SESSION_MODIFICATION_RESPONSE,
SXAB_SESSION_DELETION_REQUEST,
SXAB_SESSION_DELETION_RESPONSE,
SXAB_SESSION_REPORT_REQUEST,
SXAB_SESSION_REPORT_RESPONSE,
UDP_INIT,
UDP_DATA_REQ,
UDP_DATA_IND,
TIME_OUT,
HEALTH_PING,
TERMINATE,
ITTI_MSG_TYPE_MAX
} itti_msg_type_t;
typedef unsigned long message_number_t;
class itti_msg {
public:
itti_msg();
itti_msg(const itti_msg_type_t msg_type, const task_id_t origin, const task_id_t destination);
itti_msg(const itti_msg& i);
virtual ~itti_msg() = default;
static const char* get_msg_name();
message_number_t msg_num;
task_id_t origin;
task_id_t destination;
itti_msg_type_t msg_type;
};
class itti_msg_timeout : public itti_msg {
public:
itti_msg_timeout(const task_id_t origin, const task_id_t destination, uint32_t timer_id, uint64_t arg1_user, uint64_t arg2_user):
itti_msg(TIME_OUT, origin, destination), timer_id(timer_id), arg1_user(arg1_user), arg2_user(arg2_user) {}
itti_msg_timeout(const itti_msg_timeout& i) : itti_msg(i), timer_id(i.timer_id), arg1_user(i.arg1_user), arg2_user(i.arg2_user) {}
static const char* get_msg_name() {return "TIME_OUT";};
uint32_t timer_id;
uint64_t arg1_user;
uint64_t arg2_user;
};
class itti_msg_ping : public itti_msg {
public:
itti_msg_ping(const task_id_t origin, const task_id_t destination, uint32_t seq): itti_msg(HEALTH_PING, origin, destination), seq(seq) {}
itti_msg_ping(const itti_msg_ping& i) : itti_msg(i), seq(i.seq) {}
static const char* get_msg_name() {return "HEALTH_PING";};
uint32_t seq;
};
class itti_msg_terminate : public itti_msg {
public:
itti_msg_terminate(const task_id_t origin, const task_id_t destination):
itti_msg(TERMINATE, origin, destination) {}
itti_msg_terminate(const itti_msg_terminate& i) : itti_msg(i) {}
static const char* get_msg_name() {return "TERMINATE";};
};
}
#endif /* SRC_ITTI_ITTI_MSG_H_INCLUDED_ */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment