WiP to unify InfluxDB metrics plugin with Advanced edition

This commit is contained in:
Pavel Odintsov 2024-07-13 16:04:00 +03:00
parent e2931827a6
commit 60a8dc7afb
3 changed files with 95 additions and 40 deletions

View File

@ -1790,13 +1790,14 @@ bool execute_web_request(std::string address,
}
// Write data to influxdb
bool write_data_to_influxdb(std::string database,
std::string host,
std::string port,
bool write_data_to_influxdb(const std::string& database,
const std::string& host,
const std::string& port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string query) {
const std::string& influx_user,
const std::string& influx_password,
const std::string& query,
std::string& error_text) {
uint32_t response_code = 0;
std::string address = host + ":" + port;
@ -1818,7 +1819,6 @@ bool write_data_to_influxdb(std::string database,
std::string response_body;
std::map<std::string, std::string> headers;
std::string error_text;
bool result = execute_web_request(influxdb_query_string, "post", query, response_code, response_body, headers, error_text);
if (!result) {
@ -1826,6 +1826,7 @@ bool write_data_to_influxdb(std::string database,
}
if (response_code != 204) {
error_text = "Unexpected response code: " + std::to_string(response_code);
return false;
}

View File

@ -108,13 +108,14 @@ bool read_ipv6_host_from_string(std::string ipv6_host_as_string, in6_addr& resul
bool validate_ipv6_or_ipv4_host(const std::string host);
uint64_t get_current_unix_time_in_nanoseconds();
bool write_data_to_influxdb(std::string database,
std::string host,
std::string port,
bool write_data_to_influxdb(const std::string& database,
const std::string& host,
const std::string& port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string query);
const std::string& influx_user,
const std::string& influx_password,
const std::string& query,
std::string& error_text);
std::string join_by_comma_and_equal(const std::map<std::string, std::string>& data);
bool parse_meminfo_into_map(std::map<std::string, uint64_t>& parsed_meminfo);

View File

@ -13,32 +13,73 @@
#include <vector>
extern struct timeval graphite_thread_execution_time;
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern abstract_subnet_counters_t<subnet_cidr_mask_t, subnet_counter_t> ipv4_network_counters;
extern uint64_t influxdb_writes_total;
extern uint64_t influxdb_writes_failed;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t, subnet_counter_t> ipv6_host_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t, subnet_counter_t> ipv6_subnet_counters;
extern std::vector<ban_settings_t> hostgroup_list_total_calculation;
extern std::mutex hostgroup_list_total_calculation_mutex;
extern abstract_subnet_counters_t<int64_t, subnet_counter_t> per_hostgroup_total_counters;
extern log4cpp::Category& logger;
extern total_speed_counters_t total_counters_ipv4;
extern total_speed_counters_t total_counters_ipv6;
extern fastnetmon_configuration_t fastnetmon_global_configuration;
// I do this declaration here to avoid circular dependencies between fastnetmon_logic and this file
bool get_statistics(std::vector<system_counter_t>& system_counters);
// Push system counters to InfluxDB
bool push_system_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool write_batch_of_data_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::string& tag_name,
const std::vector<std::pair<std::string, std::map<std::string, uint64_t>>>& hosts_vector,
std::string& error_text);
// Set block of data into InfluxDB
bool write_line_of_data_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::map<std::string, std::string>& tags,
const std::map<std::string, uint64_t>& plain_total_counters_map,
std::string& error_text);
// Prepare string to insert data into InfluxDB
std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds,
const std::string& measurement,
const std::map<std::string, std::string>& tags,
const std::map<std::string, uint64_t>& plain_total_counters_map);
void fill_fixed_counters_for_influxdb(const subnet_counter_t& counter,
std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow);
bool push_system_counters_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password) {
const std::string& influx_user,
const std::string& influx_password);
bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement_name,
total_counter_element_t total_speed_average_counters_param[4],
bool ipv6);
// Push system counters to InfluxDB
bool push_system_counters_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password) {
std::vector<system_counter_t> system_counters;
bool result = get_statistics(system_counters);
@ -58,12 +99,15 @@ bool push_system_counters_to_influxdb(std::string influx_database,
std::map<std::string, std::string> tags = { { "metric", "metric_value" } };
bool influx_result = write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, "system_counters", tags, plain_total_counters_map);
std::string error_text;
bool influx_result =
write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, "system_counters", tags, plain_total_counters_map, error_text);
if (!influx_result) {
influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for system counters";
logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for system counters with error " << error_text;
return false;
}
@ -72,17 +116,20 @@ bool push_system_counters_to_influxdb(std::string influx_database,
// Push total traffic counters to InfluxDB
bool push_total_traffic_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string measurement_name,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement_name,
total_counter_element_t total_speed_average_counters_param[4],
bool ipv6) {
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
for (auto packet_direction : directions) {
std::map<std::string, uint64_t> plain_total_counters_map;
@ -284,6 +331,11 @@ push_network_traffic_counters_to_influxdb(abstract_subnet_counters_t<T, C>& netw
// This thread pushes data to InfluxDB
void influxdb_push_thread() {
extern abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t, subnet_counter_t> ipv4_network_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t, subnet_counter_t> ipv6_host_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t, subnet_counter_t> ipv6_subnet_counters;
extern total_speed_counters_t total_counters_ipv4;
extern total_speed_counters_t total_counters_ipv6;
// Sleep for a half second for shift against calculation thread
boost::this_thread::sleep(boost::posix_time::milliseconds(500));
@ -394,7 +446,8 @@ bool write_line_of_data_to_influxdb(std::string influx_database,
std::string influx_password,
std::string measurement,
std::map<std::string, std::string>& tags,
std::map<std::string, uint64_t>& plain_total_counters_map) {
std::map<std::string, uint64_t>& plain_total_counters_map,
std::string& error_text) {
uint64_t unix_timestamp_nanoseconds = get_current_unix_time_in_nanoseconds();
auto influxdb_line =
@ -402,7 +455,7 @@ bool write_line_of_data_to_influxdb(std::string influx_database,
// logger << log4cpp::Priority::INFO << "Raw data to InfluxDB: " << buffer.str();
return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, influxdb_line);
return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, influxdb_line, error_text);
}
// Fills special structure which we use to export metrics into InfluxDB