Commit 70252aa7 authored by Niuhaiwen's avatar Niuhaiwen

stateless support

parent 94643d49
...@@ -16,7 +16,20 @@ ...@@ -16,7 +16,20 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include "TransData.hpp" #include "TransData.hpp"
#include <stdlib.h> #include <stdlib.h>
#include <map>
#include <algorithm>
#include "plugin_config.hpp"
#include <cmath>
using namespace config;
std::map<std::string,long> ip2len;
std::map<std::string,bool> all_rp;
long msg_len[4]={0};
long pro_cap[4] = {0};
bool isall[4] = {0};
extern int weight[15];
extern plugin_config plugin_cfg;
using json = nlohmann::json; using json = nlohmann::json;
//using namespace org::openapitools::server::model; //using namespace org::openapitools::server::model;
...@@ -47,10 +60,183 @@ void PluginApi::setupRoutes() { ...@@ -47,10 +60,183 @@ void PluginApi::setupRoutes() {
// Routes::Get(*router, base + "/:realmId/:storageId/records/:recordId", Routes::bind(&PluginApi::get_record_handler, this)); // Routes::Get(*router, base + "/:realmId/:storageId/records/:recordId", Routes::bind(&PluginApi::get_record_handler, this));
Routes::Put(*router, base + "/:assoid/:stream", Routes::bind(&PluginApi::N2_trans_handler, this)); Routes::Put(*router, base + "/:assoid/:stream", Routes::bind(&PluginApi::N2_trans_handler, this));
// Routes::Patch(*router, base + "/:realmId/:storageId/records/:recordId/meta", Routes::bind(&PluginApi::update_meta_handler, this)); // Routes::Patch(*router, base + "/:realmId/:storageId/records/:recordId/meta", Routes::bind(&PluginApi::update_meta_handler, this));
Routes::Post(*router, base+"/:amf-id/algorithm/omf", Routes::bind(&PluginApi::algorithm_omf_handler, this));
// Default handler, called when a route is not found // Default handler, called when a route is not found
//router->addCustomHandler(Routes::bind(&PluginApi::record_crud_api_default_handler, this)); //router->addCustomHandler(Routes::bind(&PluginApi::record_crud_api_default_handler, this));
} }
void PluginApi::algorithm_omf_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
auto amf_id = request.param(":amf-id").as<std::uint32_t>();
Logger::plugin_server().debug("Receiving algorithm-omf info from amf-%d",amf_id);
Logger::plugin_server().debug("%s",request.body().c_str());
TransData transdata;
try {
nlohmann::json::parse(request.body()).get_to(transdata);
Logger::plugin_server().debug("ip address : %s",transdata.ip_addr.c_str());
Logger::plugin_server().debug("current message queue length : %d",transdata.mq_len);
Logger::plugin_server().debug("current capbility : %f",transdata.cap);
if(!transdata.ip_addr.compare("10.103.239.116")){
msg_len[0] = transdata.mq_len;
pro_cap[0] = (long)transdata.cap;
isall[0] = true;
Logger::plugin_server().debug("capability(0) %d",pro_cap[0]);
}
if(!transdata.ip_addr.compare("10.103.239.114")){
msg_len[1] = transdata.mq_len;
pro_cap[1] = (long)transdata.cap;
isall[1] = true;
Logger::plugin_server().debug("capability(1) %d",pro_cap[1]);
}
if(!transdata.ip_addr.compare("10.103.239.112")){
msg_len[2] = transdata.mq_len;
pro_cap[2] = (long)transdata.cap;
isall[2] = true;
Logger::plugin_server().debug("capability(2) %d",pro_cap[2]);
}
if(!transdata.ip_addr.compare("10.103.238.75")){
msg_len[3] = transdata.mq_len;
pro_cap[3] = (long)transdata.cap;
isall[3] = true;
Logger::plugin_server().debug("capability(3) %d",pro_cap[3]);
}
bool update = true;
for(int i=0; i<4; i++){
if(!isall[i]){
Logger::plugin_server().debug("update = false");
update = false;
break;
}
}
long N = 50;
long K[4] = {0};
double sum_c = 0;
double sum = 0;
double tmp_w[4]={0};
if(update){
Logger::plugin_server().debug("new round...");
Logger::plugin_server().debug("capability(0) %d",pro_cap[0]);
Logger::plugin_server().debug("capability(1) %d",pro_cap[1]);
Logger::plugin_server().debug("capability(2) %d",pro_cap[2]);
Logger::plugin_server().debug("capability(3) %d",pro_cap[3]);
for(int i=0; i<4; i++){
isall[i] = false;
Logger::plugin_server().debug("capability(%d) %d",i,pro_cap[i]);
Logger::plugin_server().debug("message_length(%d) %d",i,msg_len[i]);
if((pro_cap[i]-0)<1e-8) {
Logger::plugin_server().debug("update = false capability = 0");
response.send(Pistache::Http::Code::Ok, "n2 receive \n");
return;
}else{
pro_cap[i] = 1000000/(double)pro_cap[i];
Logger::plugin_server().debug("update = true capability = %d",pro_cap[i]);
}
}
Logger::plugin_server().debug("Updating...");
for(int i=0; i<4; i++){
//isall[i] = false;
sum_c += pro_cap[i];
K[i] = msg_len[i] - pro_cap[i]*1;
//K[i] = msg_len[i] - (1/(double)pro_cap[i]*1000000)*1;
if(K[i]<0) K[i] = 0;
Logger::plugin_server().debug("K[%d] = %d",i,K[i]);
}
if(plugin_cfg.is_omf){
for(int j=0; j<4; j++){
sum += (double)(K[j]*pro_cap[0]-K[0]*pro_cap[j]);
//sum += (double)(K[j]*pro_cap[0]-K[0]*pro_cap[j])/1000000;
}
Logger::plugin_server().debug("sum = %f",sum);
tmp_w[0] = pro_cap[0]/sum_c + sum/N*sum_c;
//tmp_w[0] = pro_cap[0]/sum_c + sum/N*sum_c/1000000;
Logger::plugin_server().debug("weight[0] = %f",tmp_w[0]);
for(int j=1; j<4; j++){
tmp_w[j] =(double)(pro_cap[j]*K[0]-pro_cap[0]*K[j])/(pro_cap[0]*N)+((double)pro_cap[j]/pro_cap[0])*tmp_w[0];
}
double dwt = 0;
for(int j=0; j<4; j++){
Logger::plugin_server().debug("weight[%d] %f",j,tmp_w[j]);
dwt +=tmp_w[j];
//weight[j] = std::floor(weight[j]*N);
}
for(int j=0; j<4; j++){
Logger::plugin_server().debug("weight[%d] %f",j,tmp_w[j]/dwt);
weight[j] = std::floor(tmp_w[j]/dwt*N);
}
}
response.send(Pistache::Http::Code::Ok, "Updated \n");
}else{
Logger::plugin_server().debug("not receiving all information");
response.send(Pistache::Http::Code::Ok, "n2 receive \n");
return;
}
# if 0
ip2len[transdata.ip_addr] = transdata.mq_len;
all_rp[transdata.ip_addr] = true;
bool update = true;
if(ip2len.size() == 4){
Logger::plugin_server().debug("received 4 amf information...");
for(std::map<std::string,bool>::iterator it = all_rp.begin(); it!=all_rp.end(); it++){
if(!it->second) update = false;
}
}else{
response.send(Pistache::Http::Code::Ok, "n2 receive \n");
return;
}
if(update){
Logger::plugin_server().debug("Updating...");
int len_72 = 0, len_116 = 0, len_114 = 0, len_112 = 0;
for(std::map<std::string,long>::iterator it = ip2len.begin();it!=ip2len.end();it++){
//if(!it->first.compare("10.103.239.116")) len_116 = it->second;
//if(!it->first.compare("10.103.239.114")) len_114 = it->second;
//if(!it->first.compare("10.103.239.112")) len_112 = it->second;
//if(!it->first.compare("10.103.238.72")) len_72 = it->second;
if(!it->first.compare("10.103.239.116")) msg_len[0] = it->second;
if(!it->first.compare("10.103.239.114")) msg_len[1] = it->second;
if(!it->first.compare("10.103.239.112")) msg_len[2] = it->second;
if(!it->first.compare("10.103.238.72")) msg_len[3] = it->second;
}
for(std::map<std::string,bool>::iterator it = all_rp.begin(); it!=all_rp.end(); it++){
it->second = false;
}
if(len_72 ==0 && len_112 ==0 && len_114 ==0 && len_116 ==0){
Logger::plugin_server().debug("all instances are with no signaling, return");
return;
}
if(len_72 == 0) len_72 = 1;
if(len_116 == 0) len_116 = 1;
if(len_114 == 0) len_114 = 1;
if(len_112 == 0) len_112 = 1;
int x = (50)/(1+(float)len_114/len_116+(float)len_112/len_116+(float)len_72/len_116);
int x_[4];
x_[0] = x; x_[1] = (float)len_114/len_116*x; x_[2] = (float)len_112/len_116*x; x_[3] = (float)len_72/len_116*x;
if(plugin_cfg.is_omf){
weight[0] = (50-x_[0])/3;
weight[1] = (50-x_[1])/3;
weight[2] = (50-x_[2])/3;
weight[3] = (50-x_[3])/3;
}
#endif
//weight[0] = 50*(len_72+len_114+len_112)/(len_116+len_114+len_112+len_72);
//weight[1] = 50*(len_72+len_116+len_112)/(len_116+len_114+len_112+len_72);
//weight[2] = 50*(len_72+len_114+len_116)/(len_116+len_114+len_112+len_72);
//weight[3] = 50-weight[0]-weight[1]-weight[2];
Logger::plugin_server().debug("Updated weight, weight[0]=%d, weight[1]=%d, weight[2]=%d, weight[3]=%d", weight[0],weight[1],weight[2],weight[3]);
//this->n2_trans_to_gnb(assoc_id,stream,transdata,response);
} catch (nlohmann::detail::exception &e) {
//send a 400 error
response.send(Pistache::Http::Code::Bad_Request, e.what());
return;
} catch (Pistache::Http::HttpError &e) {
response.send(static_cast<Pistache::Http::Code>(e.code()), e.what());
return;
} catch (std::exception &e) {
//send a 500 error
response.send(Pistache::Http::Code::Internal_Server_Error, e.what());
return;
}
response.send(Pistache::Http::Code::Ok, "n2 receive \n");
return;
}
void PluginApi::N2_trans_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) { void PluginApi::N2_trans_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response) {
// Getting the path params // Getting the path params
auto assoc_id = request.param(":assoid").as<std::uint32_t>(); auto assoc_id = request.param(":assoid").as<std::uint32_t>();
......
...@@ -52,7 +52,8 @@ private: ...@@ -52,7 +52,8 @@ private:
// void delete_record_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); // void delete_record_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// void get_meta_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); // void get_meta_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// void get_record_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); // void get_record_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
void N2_trans_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); void N2_trans_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
void algorithm_omf_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// void update_meta_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); // void update_meta_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
// void record_crud_api_default_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); // void record_crud_api_default_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
void modify_AmfIP_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response); void modify_AmfIP_handler(const Pistache::Rest::Request &request, Pistache::Http::ResponseWriter response);
......
...@@ -35,7 +35,16 @@ void to_json(nlohmann::json& j, const TransData& o) { ...@@ -35,7 +35,16 @@ void to_json(nlohmann::json& j, const TransData& o) {
} }
void from_json(const nlohmann::json& j, TransData& o) { void from_json(const nlohmann::json& j, TransData& o) {
j.at("n2_data").get_to(o.m_transdata); if(j.find("n2_data") != j.end())
j.at("n2_data").get_to(o.m_transdata);
if(j.find("ip_address") != j.end())
j.at("ip_address").get_to(o.ip_addr);
if(j.find("current_message_queue_len") != j.end())
j.at("current_message_queue_len").get_to(o.mq_len);
if(j.find("capability") != j.end()){
j.at("capability").get_to(o.cap);
std::cout << "capability_dukl"<<std::endl;
}
} }
......
...@@ -40,9 +40,11 @@ class TransData { ...@@ -40,9 +40,11 @@ class TransData {
friend void to_json(nlohmann::json& j, const TransData& o); friend void to_json(nlohmann::json& j, const TransData& o);
friend void from_json(const nlohmann::json& j, TransData& o); friend void from_json(const nlohmann::json& j, TransData& o);
std::string get_amf_data(); std::string get_amf_data();
protected: // protected:
std::string m_transdata; std::string m_transdata;
std::string ip_addr;
long mq_len;
double cap;
}; };
} // namespace model } // namespace model
......
...@@ -4,6 +4,8 @@ ...@@ -4,6 +4,8 @@
#include <curl/curl.h> #include <curl/curl.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include "logger.hpp" #include "logger.hpp"
#include <sys/time.h>
#include <fstream>
using namespace std; using namespace std;
using namespace sctp; using namespace sctp;
using namespace config; using namespace config;
...@@ -13,7 +15,12 @@ extern plugin_config plugin_cfg; ...@@ -13,7 +15,12 @@ extern plugin_config plugin_cfg;
extern std::string amf_ip; extern std::string amf_ip;
extern unsigned int amf_port; extern unsigned int amf_port;
extern std::map<std::string,long> ip2len;
ofstream timestamp_plugin("/home/xgcore/plugin_timestamp.txt");
string amf_ip_list[15] = {"10.103.239.116","10.103.239.114","10.103.239.112","10.103.238.75","10.103.239.112","10.103.238.72","10.103.239.53","10.103.239.61","10.103.239.63","10.103.238.105","10.103.238.104","10.103.238.103","10.103.238.98","10.103.238.97","10.103.238.96"};
int amf_ports[15] = {8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282};
int weight[15] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
namespace amf_application{ namespace amf_application{
plugin_app::plugin_app() {} plugin_app::plugin_app() {}
...@@ -34,10 +41,23 @@ static std::size_t callback( ...@@ -34,10 +41,23 @@ static std::size_t callback(
out->append(in, totalBytes); out->append(in, totalBytes);
return totalBytes; return totalBytes;
} }
//string amf_ip_list[15] = {"10.103.239.56","10.103.239.116","10.103.239.114","10.103.239.113","10.103.239.112","10.103.238.72","10.103.239.53","10.103.239.61","10.103.239.63","10.103.238.105","10.103.238.104","10.103.238.103","10.103.238.98","10.103.238.97","10.103.238.96"};
//int amf_ports[15] = {8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282,8282};
//int weight[15] = {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
int index = -1;
int random_index = 0;
int choose = 0;
int num_inst = 4;
void plugin_app::handle_receive( void plugin_app::handle_receive(
bstring payload, sctp_assoc_id_t assoc_id, sctp_stream_id_t stream, bstring payload, sctp_assoc_id_t assoc_id, sctp_stream_id_t stream,
sctp_stream_id_t instreams, sctp_stream_id_t outstreams) sctp_stream_id_t instreams, sctp_stream_id_t outstreams)
{ {
struct timeval tv1; struct timezone tz1; gettimeofday(&tv1,&tz1); long start = tv1.tv_sec*1000000 +tv1.tv_usec;
Logger::plugin_sbi().debug("Begining at time %ld", start);
timestamp_plugin<< "time_stamp "<<start<<endl;
index ++;
Logger::plugin_sbi().debug("index %d",index);
printf("使用http发送到amf\n"); printf("使用http发送到amf\n");
Logger::plugin_sbi().debug("receive n2 from gnb then Send N2 message to AMF"); Logger::plugin_sbi().debug("receive n2 from gnb then Send N2 message to AMF");
std::string ngapmsg; std::string ngapmsg;
...@@ -48,13 +68,57 @@ void plugin_app::handle_receive( ...@@ -48,13 +68,57 @@ void plugin_app::handle_receive(
nlohmann::json json_data = {}; nlohmann::json json_data = {};
json_data["n2_data"] = ngapmsg; json_data["n2_data"] = ngapmsg;
std::string url;
if(plugin_cfg.is_wrr || plugin_cfg.is_omf){
Logger::plugin_sbi().debug("Algorithm WRR or OMF");
Logger::plugin_sbi().debug("current weight weight[0]=%d, weight[0]=%d, weight[0]=%d, weight[0]=%d", weight[0],weight[1],weight[2],weight[3]);
int sum = 0;
int index_num = 0;
for(int i=0; i < num_inst; i++)
index_num += weight[i];
for(int i=0; i <= choose; i++)
sum += weight[i];
if((index + 1) > sum)
choose = (choose + 1) % num_inst;
if((index + 1) > index_num)
index = 0;
Logger::sctp().debug("Choosing %d th AMF instance ", choose+1);
url =
"http://" + std::string(amf_ip_list[choose].c_str()) +
":" + std::to_string(amf_ports[choose]) + "/namf-comm/v1/plugin_n2/" +
to_string (assoc_id ) + "/" + to_string (stream) ;
}
#if 0
srand(time(NULL));
random_index = rand()%4;
Logger::sctp().debug("Choosing %d th AMF instance ", random_index+1);
std::string url =
"http://" + std::string(amf_ip_list[random_index].c_str()) +
":" + std::to_string(amf_ports[random_index]) + "/namf-comm/v1/plugin_n2/" +
to_string (assoc_id ) + "/" + to_string (stream) ;
#endif
if(plugin_cfg.is_rr){
Logger::plugin_sbi().debug("Algorithm RR ");
url =
"http://" + std::string(amf_ip_list[index % 4].c_str()) +
":" + std::to_string(amf_ports[index % 4]) + "/namf-comm/v1/plugin_n2/" +
to_string (assoc_id ) + "/" + to_string (stream) ;
}
#if 0
url =
"http://" + std::string(amf_ip_list[0].c_str()) +
":" + std::to_string(amf_ports[0]) + "/namf-comm/v1/plugin_n2/" +
to_string (assoc_id ) + "/" + to_string (stream) ;
#endif
#if 0
std::string url = std::string url =
"http://" + std::string(amf_ip.c_str()) + "http://" + std::string(amf_ip.c_str()) +
":" + std::to_string(amf_port) + "/namf-comm/v1/plugin_n2/" + ":" + std::to_string(amf_port) + "/namf-comm/v1/plugin_n2/" +
to_string (assoc_id ) + "/" + to_string (stream) ; to_string (assoc_id ) + "/" + to_string (stream) ;
#endif
Logger::plugin_sbi().debug( Logger::plugin_sbi().debug(
"Send N2 message to AMF , AMF URL %s", url.c_str()); "Send N2 message to AMF , AMF URL %s", url.c_str());
......
...@@ -18,7 +18,13 @@ using namespace libconfig; ...@@ -18,7 +18,13 @@ using namespace libconfig;
extern std::string amf_ip; extern std::string amf_ip;
extern unsigned int amf_port; extern unsigned int amf_port;
namespace config { namespace config {
plugin_config::plugin_config() {} plugin_config::plugin_config() {
is_wrr = false;
is_rr = false;
is_omf = false;
rrRate = 0;
}
plugin_config::~plugin_config() {} plugin_config::~plugin_config() {}
int plugin_config::load(const std ::string &config_file) { int plugin_config::load(const std ::string &config_file) {
...@@ -45,18 +51,22 @@ int plugin_config::load(const std ::string &config_file) { ...@@ -45,18 +51,22 @@ int plugin_config::load(const std ::string &config_file) {
return -1; return -1;
} }
const Setting &plugin_cfg = root[PLUGIN_CONFIG_STRING_PLUGIN_CONFIG]; const Setting &plugin_cfg = root[PLUGIN_CONFIG_STRING_PLUGIN_CONFIG];
// try { try {
// plugin_cfg.lookupValue(PLUGIN_CONFIG_STRING_INSTANCE_ID, instance); std::string alg;
// } catch (const SettingNotFoundException &nfex) { plugin_cfg.lookupValue("ALGORITHM", alg);
// Logger::plugin_app().warn("%s : %s, using defaults", nfex.what(), if(!alg.compare("WRR")) is_wrr = true;
// nfex.getPath()); if(!alg.compare("RR")) is_rr = true;
// } if(!alg.compare("OMF")) is_omf = true;
// try { } catch (const SettingNotFoundException &nfex) {
// plugin_cfg.lookupValue(PLUGIN_CONFIG_STRING_PID_DIRECTORY, pid_dir); Logger::plugin_app().warn("%s : %s, using defaults", nfex.what(),
// } catch (const SettingNotFoundException &nfex) { nfex.getPath());
// Logger::plugin_app().warn("%s : %s, using defaults", nfex.what(), }
// nfex.getPath()); try {
// } plugin_cfg.lookupValue("REQUESTRATE", rrRate);
} catch (const SettingNotFoundException &nfex) {
Logger::plugin_app().warn("%s : %s, using defaults", nfex.what(),
nfex.getPath());
}
try { try {
const Setting &new_if_cfg = plugin_cfg[PLUGIN_CONFIG_STRING_INTERFACES]; const Setting &new_if_cfg = plugin_cfg[PLUGIN_CONFIG_STRING_INTERFACES];
...@@ -136,6 +146,12 @@ void plugin_config::display() { ...@@ -136,6 +146,12 @@ void plugin_config::display() {
Logger::config().info(" ip ...................: %s", nrf.addr4.c_str()); Logger::config().info(" ip ...................: %s", nrf.addr4.c_str());
Logger::config().info(" port .................: %d", nrf.port); Logger::config().info(" port .................: %d", nrf.port);
Logger::config().info(""); Logger::config().info("");
if(is_wrr)
Logger::config().info(" algorithm WRR.......: true");
if(is_rr)
Logger::config().info(" algorithm RR........: true");
if(is_omf)
Logger::config().info(" algorithm OMF.......: true");
} }
......
...@@ -41,7 +41,11 @@ public: ...@@ -41,7 +41,11 @@ public:
interface_cfg_t n2; interface_cfg_t n2;
interface_cfg_t sbi; interface_cfg_t sbi;
interface_cfg_t nrf; interface_cfg_t nrf;
bool is_wrr;
bool is_rr;
bool is_omf;
int rrRate;
}; };
} // namespace config } // namespace config
#endif #endif
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment