WiP to sync InfluxDB metrics plugin with Advanced edition

This commit is contained in:
Pavel Odintsov 2024-07-13 16:21:52 +03:00
parent 60a8dc7afb
commit 0aba63edd1
2 changed files with 128 additions and 192 deletions

View File

@ -114,6 +114,55 @@ bool push_system_counters_to_influxdb(const std::string& influx_database,
return true;
}
// Push network traffic to InfluxDB
template <typename T, typename C>
requires std::is_same_v<C, subnet_counter_t> bool
push_network_traffic_counters_to_influxdb(abstract_subnet_counters_t<T, C>& network_counters,
const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::string& tag_name) {
std::vector<std::pair<T, C>> speed_elements;
// Retrieve copy of all counters
network_counters.get_all_non_zero_average_speed_elements_as_pairs(speed_elements);
// Structure for InfluxDB
std::vector<std::pair<std::string, std::map<std::string, uint64_t>>> networks_vector;
for (const auto& speed_element : speed_elements) {
std::map<std::string, uint64_t> plain_total_counters_map;
// This function can convert both IPv4 and IPv6 subnets to text format
std::string network_as_cidr_string = convert_any_subnet_to_string(speed_element.first);
fill_fixed_counters_for_influxdb(speed_element.second, plain_total_counters_map, true);
networks_vector.push_back(std::make_pair(network_as_cidr_string, plain_total_counters_map));
}
if (networks_vector.size() > 0) {
influxdb_writes_total++;
std::string error_text;
bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, measurement, tag_name, networks_vector, error_text);
if (!result) {
influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for " << measurement << " with error " << error_text;
return false;
}
}
return true;
}
// Push total traffic counters to InfluxDB
bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
@ -125,17 +174,14 @@ bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
const std::string& measurement_name,
total_counter_element_t total_speed_average_counters_param[4],
bool ipv6) {
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
for (auto packet_direction : directions) {
std::map<std::string, uint64_t> plain_total_counters_map;
uint64_t speed_in_pps = total_speed_average_counters_param[packet_direction].total.packets;
uint64_t speed_in_bits_per_second = total_speed_average_counters_param[packet_direction].total.bytes * 8;
// We do not have this counter for IPv6
if (!ipv6) {
// We have flow information only for incoming and outgoing directions
@ -152,8 +198,26 @@ bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
}
}
plain_total_counters_map["packets"] = speed_in_pps;
plain_total_counters_map["bits"] = speed_in_bits_per_second;
plain_total_counters_map["packets"] = total_speed_average_counters_param[packet_direction].total.packets;
plain_total_counters_map["bits"] = total_speed_average_counters_param[packet_direction].total.bytes * 8;
plain_total_counters_map["udp_packets"] = total_speed_average_counters_param[packet_direction].udp.packets;
plain_total_counters_map["udp_bits"] = total_speed_average_counters_param[packet_direction].udp.bytes * 8;
plain_total_counters_map["tcp_packets"] = total_speed_average_counters_param[packet_direction].tcp.packets;
plain_total_counters_map["tcp_bits"] = total_speed_average_counters_param[packet_direction].tcp.bytes * 8;
plain_total_counters_map["icmp_packets"] = total_speed_average_counters_param[packet_direction].icmp.packets;
plain_total_counters_map["icmp_bits"] = total_speed_average_counters_param[packet_direction].icmp.bytes * 8;
plain_total_counters_map["fragmented_packets"] = total_speed_average_counters_param[packet_direction].fragmented.packets;
plain_total_counters_map["fragmented_bits"] = total_speed_average_counters_param[packet_direction].fragmented.bytes * 8;
plain_total_counters_map["tcp_syn_packets"] = total_speed_average_counters_param[packet_direction].tcp_syn.packets;
plain_total_counters_map["tcp_syn_bits"] = total_speed_average_counters_param[packet_direction].tcp_syn.bytes * 8;
plain_total_counters_map["dropped_packets"] = total_speed_average_counters_param[packet_direction].dropped.packets;
plain_total_counters_map["dropped_bits"] = total_speed_average_counters_param[packet_direction].dropped.bytes * 8;
std::string direction_as_string = get_direction_name(packet_direction);
@ -161,12 +225,14 @@ bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
std::map<std::string, std::string> tags = { { "direction", direction_as_string } };
std::string error_text;
bool result = write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, measurement_name, tags, plain_total_counters_map);
influx_password, measurement_name, tags, plain_total_counters_map, error_text);
if (!result) {
influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for total_traffic";
logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for total_traffic with error: " << error_text;
return false;
}
}
@ -175,45 +241,6 @@ bool push_total_traffic_counters_to_influxdb(const std::string& influx_database,
return true;
}
// Simple helper function to add additional metrics easily
void add_counter_to_influxdb(std::map<std::string, uint64_t>& plain_total_counters_map,
const traffic_counter_element_t& counter,
const std::string& counter_name) {
plain_total_counters_map[counter_name + "_packets_incoming"] = counter.in_packets;
plain_total_counters_map[counter_name + "_bits_incoming"] = counter.in_bytes * 8;
plain_total_counters_map[counter_name + "_packets_outgoing"] = counter.out_packets;
plain_total_counters_map[counter_name + "_bits_outgoing"] = counter.out_bytes * 8;
}
// Fills special structure which we use to export metrics into InfluxDB
void fill_per_protocol_countres_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map) {
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.dropped, "dropped");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.fragmented, "fragmented");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.tcp, "tcp");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.tcp_syn, "tcp_syn");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.udp, "udp");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.icmp, "icmp");
}
// Fills counters for standard fixed counters
void fill_fixed_counters_for_influxdb(const subnet_counter_t& counter,
std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow) {
fill_main_counters_for_influxdb(counter, plain_total_counters_map, populate_flow);
bool influxdb_per_protocol_counters = false;
if (influxdb_per_protocol_counters) {
fill_per_protocol_countres_for_influxdb(counter, plain_total_counters_map);
}
return;
}
// Push host traffic to InfluxDB
template <typename T, typename C>
// Apply limitation on type of keys because we use special string conversion function inside and we must not instantiate it for other unknown types
@ -269,7 +296,7 @@ template <typename T, typename C>
if (!result) {
influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic with error ";
logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic with error " << error_text;
return false;
}
}
@ -277,57 +304,6 @@ template <typename T, typename C>
return true;
}
// Push network traffic to InfluxDB
template <typename T, typename C>
requires std::is_same_v<C, subnet_counter_t> bool
push_network_traffic_counters_to_influxdb(abstract_subnet_counters_t<T, C>& network_counters,
const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::string& tag_name) {
std::vector<std::pair<T, C>> speed_elements;
// Retrieve copy of all counters
network_counters.get_all_non_zero_average_speed_elements_as_pairs(speed_elements);
// Structure for InfluxDB
std::vector<std::pair<std::string, std::map<std::string, uint64_t>>> networks_vector;
for (const auto& speed_element : speed_elements) {
std::map<std::string, uint64_t> plain_total_counters_map;
// This function can convert both IPv4 and IPv6 subnets to text format
std::string network_as_cidr_string = convert_any_subnet_to_string(speed_element.first);
fill_fixed_counters_for_influxdb(speed_element.second, plain_total_counters_map, true);
networks_vector.push_back(std::make_pair(network_as_cidr_string, plain_total_counters_map));
}
if (networks_vector.size() > 0) {
influxdb_writes_total++;
std::string error_text;
bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, measurement, tag_name, networks_vector, error_text);
if (!result) {
influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for " << measurement << " with error " << error_text;
return false;
}
}
return true;
}
// This thread pushes data to InfluxDB
void influxdb_push_thread() {
extern abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
@ -404,15 +380,15 @@ void influxdb_push_thread() {
}
// Write batch of data for particular InfluxDB database
bool write_batch_of_data_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool write_batch_of_data_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string measurement,
std::string tag_name,
std::vector<std::pair<std::string, std::map<std::string, uint64_t>>>& hosts_vector,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::string& tag_name,
const std::vector<std::pair<std::string, std::map<std::string, uint64_t>>>& hosts_vector,
std::string& error_text) {
// Nothing to write
if (hosts_vector.size() == 0) {
@ -434,19 +410,19 @@ bool write_batch_of_data_to_influxdb(std::string influx_database,
// logger << log4cpp::Priority::INFO << "Raw data to InfluxDB: " << buffer.str();
return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password,
buffer.str());
buffer.str(), error_text);
}
// Set block of data into InfluxDB
bool write_line_of_data_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool write_line_of_data_to_influxdb(const std::string& influx_database,
const std::string& influx_host,
const std::string& influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string measurement,
std::map<std::string, std::string>& tags,
std::map<std::string, uint64_t>& plain_total_counters_map,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::map<std::string, std::string>& tags,
const std::map<std::string, uint64_t>& plain_total_counters_map,
std::string& error_text) {
uint64_t unix_timestamp_nanoseconds = get_current_unix_time_in_nanoseconds();
@ -458,6 +434,29 @@ bool write_line_of_data_to_influxdb(std::string influx_database,
return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, influxdb_line, error_text);
}
// Simple helper function to add additional metrics easily
void add_counter_to_influxdb(std::map<std::string, uint64_t>& plain_total_counters_map,
const traffic_counter_element_t& counter,
const std::string& counter_name) {
plain_total_counters_map[counter_name + "_packets_incoming"] = counter.in_packets;
plain_total_counters_map[counter_name + "_bits_incoming"] = counter.in_bytes * 8;
plain_total_counters_map[counter_name + "_packets_outgoing"] = counter.out_packets;
plain_total_counters_map[counter_name + "_bits_outgoing"] = counter.out_bytes * 8;
}
// Fills special structure which we use to export metrics into InfluxDB
void fill_per_protocol_countres_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map) {
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.dropped, "dropped");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.fragmented, "fragmented");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.tcp, "tcp");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.tcp_syn, "tcp_syn");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.udp, "udp");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.icmp, "icmp");
}
// Fills special structure which we use to export metrics into InfluxDB
void fill_main_counters_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map,
@ -476,10 +475,21 @@ void fill_main_counters_for_influxdb(const subnet_counter_t& current_speed_eleme
}
}
// Fills counters for standard fixed counters
void fill_fixed_counters_for_influxdb(const subnet_counter_t& counter,
std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow) {
fill_main_counters_for_influxdb(counter, plain_total_counters_map, populate_flow);
fill_per_protocol_countres_for_influxdb(counter, plain_total_counters_map);
return;
}
// Prepare string to insert data into InfluxDB
std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds,
std::string measurement,
const std::string& measurement,
const std::map<std::string, std::string>& tags,
const std::map<std::string, uint64_t>& plain_total_counters_map) {
std::stringstream buffer;

View File

@ -5,80 +5,6 @@
#include "../fastnetmon_types.hpp"
bool push_system_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password);
bool push_total_traffic_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string measurement_name,
total_counter_element_t total_speed_average_counters_param[4],
bool ipv6);
void send_grafana_alert(std::string title, std::string text, std::vector<std::string>& tags);
void influxdb_push_thread();
bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password);
bool push_hosts_ipv4_traffic_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password);
bool push_hostgroup_traffic_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password);
bool write_batch_of_data_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string measurement,
std::string tag_name,
std::vector<std::pair<std::string, std::map<std::string, uint64_t>>>& hosts_vector,
std::string& error_text);
// Set block of data into InfluxDB
bool write_line_of_data_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password,
std::string measurement,
std::map<std::string, std::string>& tags,
std::map<std::string, uint64_t>& plain_total_counters_map);
void fill_per_protocol_countres_for_influxdb(const subnet_counter_t* current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map);
void fill_main_counters_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow);
// Prepare string to insert data into InfluxDB
std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds,
std::string measurement,
const std::map<std::string, std::string>& tags,
const std::map<std::string, uint64_t>& plain_total_counters_map);