Commit 14dcfd4d authored by Tien-Thinh Nguyen's avatar Tien-Thinh Nguyen

First version for curl multi interface to do multiple transfers in parallel

parent a64c3a77
...@@ -14,7 +14,7 @@ NRF_CONF[@PID_DIRECTORY@]='/var/run' ...@@ -14,7 +14,7 @@ NRF_CONF[@PID_DIRECTORY@]='/var/run'
NRF_CONF[@NRF_INTERFACE_NAME_FOR_SBI@]='wlo1' NRF_CONF[@NRF_INTERFACE_NAME_FOR_SBI@]='wlo1'
NRF_CONF[@NRF_INTERFACE_PORT_FOR_SBI@]='80' NRF_CONF[@NRF_INTERFACE_PORT_FOR_SBI@]='8080'
NRF_CONF[@NRF_INTERFACE_HTTP2_PORT_FOR_SBI@]='9090' NRF_CONF[@NRF_INTERFACE_HTTP2_PORT_FOR_SBI@]='9090'
NRF_CONF[@NRF_API_VERSION@]='v1' NRF_CONF[@NRF_API_VERSION@]='v1'
......
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
#define HEART_BEAT_TIMER 10 #define HEART_BEAT_TIMER 10
#define _unused(x) ((void)(x))
typedef enum nf_type_s { typedef enum nf_type_s {
NF_TYPE_NRF = 0, NF_TYPE_NRF = 0,
NF_TYPE_AMF = 1, NF_TYPE_AMF = 1,
...@@ -77,4 +79,6 @@ typedef uint32_t evsub_id_t; ...@@ -77,4 +79,6 @@ typedef uint32_t evsub_id_t;
#define NNRF_NFM_BASE "/nnrf-nfm/" #define NNRF_NFM_BASE "/nnrf-nfm/"
#define NNRF_NFM_NF_INSTANCE "/nf-instances/" #define NNRF_NFM_NF_INSTANCE "/nf-instances/"
#define MAX_WAIT_MSECS 1000 //1 second
#endif #endif
...@@ -67,7 +67,7 @@ nrf_app::nrf_app(const std::string &config_file, nrf_event &ev) ...@@ -67,7 +67,7 @@ nrf_app::nrf_app(const std::string &config_file, nrf_event &ev)
Logger::nrf_app().startup("Starting..."); Logger::nrf_app().startup("Starting...");
try { try {
nrf_client_inst = new nrf_client(); nrf_client_inst = new nrf_client(ev);
nrf_jwt_inst = new nrf_jwt(); nrf_jwt_inst = new nrf_jwt();
} catch (std::exception &e) { } catch (std::exception &e) {
Logger::nrf_app().error("Cannot create NRF_APP: %s", e.what()); Logger::nrf_app().error("Cannot create NRF_APP: %s", e.what());
...@@ -312,10 +312,11 @@ void nrf_app::handle_get_nf_instances( ...@@ -312,10 +312,11 @@ void nrf_app::handle_get_nf_instances(
std::string instance_uri; std::string instance_uri;
std::vector<struct in_addr> profile_addresses = {}; std::vector<struct in_addr> profile_addresses = {};
profile.get()->get_nf_ipv4_addresses(profile_addresses); profile.get()->get_nf_ipv4_addresses(profile_addresses);
//TODO: use the first IP addr // TODO: use the first IP addr
if (profile_addresses.size() > 0) { if (profile_addresses.size() > 0) {
instance_uri = std::string(inet_ntoa(*((struct in_addr *)&profile_addresses[0]))); instance_uri =
uris.push_back(instance_uri); std::string(inet_ntoa(*((struct in_addr *)&profile_addresses[0])));
uris.push_back(instance_uri);
} }
profile.get()->display(); profile.get()->display();
} }
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "3gpp_29.500.h" #include "3gpp_29.500.h"
#include "logger.hpp" #include "logger.hpp"
#include "nrf.h"
#include "nrf_config.hpp" #include "nrf_config.hpp"
using namespace Pistache::Http; using namespace Pistache::Http;
...@@ -56,15 +57,38 @@ static std::size_t callback(const char *in, std::size_t size, std::size_t num, ...@@ -56,15 +57,38 @@ static std::size_t callback(const char *in, std::size_t size, std::size_t num,
return totalBytes; return totalBytes;
} }
//------------------------------------------------------------------------------
nrf_client::nrf_client(nrf_event &ev) : m_event_sub(ev) {
curl_global_init(CURL_GLOBAL_DEFAULT);
curl_multi = curl_multi_init();
handles = {};
subscribe_task_curl();
}
//------------------------------------------------------------------------------
nrf_client::~nrf_client() {
Logger::nrf_app().debug("Delete NRF Client instance...");
// Remove handle, free memory
for (auto h : handles) {
curl_multi_remove_handle(curl_multi, h);
curl_easy_cleanup(h);
}
curl_multi_cleanup(curl_multi);
curl_global_cleanup();
if (task_connection.connected()) task_connection.disconnect();
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
CURL *nrf_client::curl_create_handle(const std::string &uri, CURL *nrf_client::curl_create_handle(const std::string &uri,
std::string *httpData) { const std::string &data,
std::string &response_data) {
// create handle for a curl request // create handle for a curl request
struct curl_slist *headers = NULL; struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json"); headers = curl_slist_append(headers, "Accept: application/json");
headers = curl_slist_append(headers, "Content-Type: application/json"); headers = curl_slist_append(headers, "Content-Type: application/json");
headers = curl_slist_append(headers, "charsets: utf-8"); headers = curl_slist_append(headers, "charsets: utf-8");
CURL *curl = curl_easy_init(); CURL *curl = curl_easy_init();
if (curl) { if (curl) {
...@@ -76,12 +100,143 @@ CURL *nrf_client::curl_create_handle(const std::string &uri, ...@@ -76,12 +100,143 @@ CURL *nrf_client::curl_create_handle(const std::string &uri,
curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100L); curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100L);
// Hook up data handling function. // Hook up data handling function.
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &callback); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, httpData); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_data);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, data.length());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, data.c_str());
} }
return curl; return curl;
} }
//------------------------------------------------------------------------------
void nrf_client::send_curl_multi(const std::string &uri,
const std::string &data,
std::string &response_data) {
// create a new handle and add to the multi handle
// the curl will actually be sent in perform_curl_multi
CURL *tmp = curl_create_handle(uri, data, response_data);
curl_multi_add_handle(curl_multi, tmp);
handles.push_back(tmp);
}
//------------------------------------------------------------------------------
void nrf_client::perform_curl_multi(uint64_t ms) {
_unused(ms);
int still_running = 0;
CURLMcode c = curl_multi_perform(curl_multi, &still_running);
if (c != CURLM_OK) {
wait_curl_end();
}
curl_release_handles();
}
//------------------------------------------------------------------------------
void nrf_client::wait_curl_end() {
// block until activity is detected on at least one of the handles or
// MAX_WAIT_MSECS has passed.
int still_running = 0, numfds = 0;
do {
CURLMcode code = curl_multi_perform(curl_multi, &still_running);
if (code == CURLM_OK) {
code = curl_multi_wait(curl_multi, NULL, 0, MAX_WAIT_MSECS, &numfds);
if (code != CURLM_OK) break;
} else {
break;
}
} while (still_running);
curl_release_handles();
}
//------------------------------------------------------------------------------
void nrf_client::curl_release_handles() {
CURLMsg *curl_msg = nullptr;
CURL *curl = nullptr;
CURLcode code = {};
int http_status_code = 0;
int msgs_left;
while ((curl_msg = curl_multi_info_read(curl_multi, &msgs_left))) {
Logger::nrf_app().debug("Process message for multiple curl");
if (curl_msg->msg == CURLMSG_DONE) {
curl = curl_msg->easy_handle;
code = curl_msg->data.result;
// int res = curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &curl_url);
if (code != CURLE_OK) {
Logger::nrf_app().debug("CURL error code %d!", curl_msg->data.result);
continue;
}
// Get HTTP status code
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_status_code);
Logger::nrf_app().debug("HTTP status code %d!", http_status_code);
// TODO: remove handle from the multi session and end this handle now, or
// later
curl_multi_remove_handle(curl_multi, curl);
curl_easy_cleanup(curl);
std::vector<CURL *>::iterator it;
it = find(handles.begin(), handles.end(), curl);
if (it != handles.end()) {
handles.erase(it);
Logger::nrf_app().debug("Erase curl handle");
}
} else {
Logger::nrf_app().debug("Error after curl_multi_info_read(), CURLMsg %s",
curl_msg->msg);
}
}
}
//------------------------------------------------------------------------------
void nrf_client::notify_subscribed_event(
const std::shared_ptr<nrf_profile> &profile, const uint8_t &event_type,
const std::vector<std::string> &uris) {
Logger::nrf_app().debug(
"Send notification for the subscribed event to the subscriptions");
std::map<std::string, std::string> responses = {};
// Fill the json part
nlohmann::json json_data = {};
json_data["event"] = notification_event_type_e2str[event_type];
std::vector<struct in_addr> instance_addrs = {};
profile.get()->get_nf_ipv4_addresses(instance_addrs);
// TODO: use the first IPv4 addr for now
std::string instance_uri =
std::string(inet_ntoa(*((struct in_addr *)&(instance_addrs[0]))));
Logger::nrf_app().debug("NF instance URI: %s", instance_uri.c_str());
json_data["nfInstanceUri"] = instance_uri;
// NF profile
if ((event_type == NOTIFICATION_TYPE_NF_REGISTERED) or
(event_type == NOTIFICATION_TYPE_NF_PROFILE_CHANGED)) {
nlohmann::json json_profile = {};
switch (profile.get()->get_nf_type()) {
case NF_TYPE_AMF: {
std::static_pointer_cast<amf_profile>(profile).get()->to_json(
json_profile);
} break;
case NF_TYPE_SMF: {
std::static_pointer_cast<smf_profile>(profile).get()->to_json(
json_profile);
} break;
default: { profile.get()->to_json(json_profile); }
}
json_data["nfProfile"] = json_profile;
}
std::string body = json_data.dump();
for (auto uri : uris) {
responses[uri] = "";
curl_create_handle(uri, body, responses[uri]);
send_curl_multi(uri, body, responses[uri]);
}
}
/*
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void nrf_client::notify_subscribed_event( void nrf_client::notify_subscribed_event(
const std::shared_ptr<nrf_profile> &profile, const uint8_t &event_type, const std::shared_ptr<nrf_profile> &profile, const uint8_t &event_type,
...@@ -219,6 +374,7 @@ void nrf_client::notify_subscribed_event( ...@@ -219,6 +374,7 @@ void nrf_client::notify_subscribed_event(
curl_global_cleanup(); curl_global_cleanup();
curl_slist_free_all(headers); curl_slist_free_all(headers);
} }
*/
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void nrf_client::notify_subscribed_event( void nrf_client::notify_subscribed_event(
...@@ -290,3 +446,8 @@ void nrf_client::notify_subscribed_event( ...@@ -290,3 +446,8 @@ void nrf_client::notify_subscribed_event(
curl_slist_free_all(headers); curl_slist_free_all(headers);
curl_global_cleanup(); curl_global_cleanup();
} }
void nrf_client::subscribe_task_curl() {
task_connection = m_event_sub.subscribe_task_tick(
boost::bind(&nrf_client::perform_curl_multi, this, _1), 1, 0);
}
...@@ -41,8 +41,15 @@ namespace app { ...@@ -41,8 +41,15 @@ namespace app {
class nrf_client { class nrf_client {
private: private:
CURLM *curl_multi;
std::vector<CURL *> handles;
nrf_event &m_event_sub;
bs2::connection
task_connection; // connection for performing curl_multi every 1ms
public: public:
nrf_client(){}; nrf_client(nrf_event &ev);
virtual ~nrf_client();
nrf_client(nrf_client const &) = delete; nrf_client(nrf_client const &) = delete;
void operator=(nrf_client const &) = delete; void operator=(nrf_client const &) = delete;
...@@ -55,6 +62,17 @@ class nrf_client { ...@@ -55,6 +62,17 @@ class nrf_client {
void notify_subscribed_event(const std::shared_ptr<nrf_profile> &profile, void notify_subscribed_event(const std::shared_ptr<nrf_profile> &profile,
const std::string &uri); const std::string &uri);
/*
* Send Notification for the associated event to the subscriber
* @param [const std::shared_ptr<nrf_profile> &] profile: NF profile
* @param [const std::string &] uri: URI of the subscribed NF
* @return void
*/
/* void notify_subscribed_event_with_curl_multi(
const std::shared_ptr<nrf_profile> &profile, const uint8_t &event_type,
const std::vector<std::string> &uris);
*/
/* /*
* Send Notification for the associated event to the subscribers * Send Notification for the associated event to the subscribers
* @param [const std::shared_ptr<nrf_profile> &] profile: NF profile * @param [const std::shared_ptr<nrf_profile> &] profile: NF profile
...@@ -69,10 +87,50 @@ class nrf_client { ...@@ -69,10 +87,50 @@ class nrf_client {
/* /*
* Create Curl handle for multi curl * Create Curl handle for multi curl
* @param [const std::string &] uri: URI of the subscribed NF * @param [const std::string &] uri: URI of the subscribed NF
* @param [std::string *] data: data * @param [std::string &] data: data to be sent
* @param [std::string &] response_data: response data
* @return pointer to the created curl * @return pointer to the created curl
*/ */
CURL *curl_create_handle(const std::string &uri, std::string *httpData); CURL *curl_create_handle(const std::string &uri, const std::string &data,
std::string &response_data);
/*
* Prepare to send a request using curl multi
* @param [const std::string &] uri: URI of the subscribed NF
* @param [std::string &] data: data to be sent
* @param [std::string &] response_data: response data
* @return void
*/
void send_curl_multi(const std::string &uri, const std::string &data,
std::string &response_data);
/*
* Perform curl multi to actually process the available data
* @param [uint64_t ms] ms: current time
* @return void
*/
void perform_curl_multi(uint64_t ms);
/*
* Finish all the curl transfers
* @param void
* @return void
*/
void wait_curl_end();
/*
* Release all the handles
* @param void
* @return void
*/
void curl_release_handles();
/*
* Subscribe to the task curl event
* @param [uint64_t] ms: current time
* @return void
*/
void subscribe_task_curl();
}; };
} // namespace app } // namespace app
} // namespace nrf } // namespace nrf
......
...@@ -82,7 +82,7 @@ class nrf_config { ...@@ -82,7 +82,7 @@ class nrf_config {
sbi_http2_port = 8080; sbi_http2_port = 8080;
sbi_api_version = "v1"; sbi_api_version = "v1";
}; };
~nrf_config(); virtual ~nrf_config();
void lock() { m_rw_lock.lock(); }; void lock() { m_rw_lock.lock(); };
void unlock() { m_rw_lock.unlock(); }; void unlock() { m_rw_lock.unlock(); };
int load(const std::string &config_file); int load(const std::string &config_file);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment