Complete rework of speed counters for IPv4 prefixes and IPv6 hosts and networks

This commit is contained in:
Pavel Odintsov 2023-02-28 19:24:05 +00:00
parent 075216e8a9
commit d5ce086244
12 changed files with 512 additions and 323 deletions

View File

@ -415,6 +415,10 @@ endif()
# Library with data types for parsing network structures
add_library(network_data_structures STATIC network_data_structures.cpp)
# Speed counters lib
add_library(speed_counters STATIC speed_counters.cpp)
target_link_libraries(speed_counters fast_library)
# Our new parser for parsing traffic up to L4
add_library(simple_packet_parser_ng STATIC simple_packet_parser_ng.cpp)
target_link_libraries(simple_packet_parser_ng network_data_structures)
@ -769,6 +773,8 @@ target_link_libraries(fastnetmon_logic bgp_protocol exabgp_action)
target_link_libraries(fastnetmon_logic protobuf_traffic_format)
target_link_libraries(fastnetmon_logic speed_counters)
# Link to our functions
target_link_libraries(fastnetmon fast_library)
@ -882,7 +888,7 @@ if (BUILD_TESTS)
target_link_libraries(traffic_structures_tests_real_traffic ${Boost_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${LOG4CPP_LIBRARY_PATH} fast_library)
add_executable(speed_counters_performance_test tests/speed_counters_performance_test.cpp)
target_link_libraries(speed_counters_performance_test ${Boost_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} fast_library ${LOG4CPP_LIBRARY_PATH} fastnetmon_logic)
target_link_libraries(speed_counters_performance_test ${Boost_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} fast_library ${LOG4CPP_LIBRARY_PATH} speed_counters)
add_executable(patricia_performance_tests tests/patricia_performance_tests.cpp)
target_link_libraries(patricia_performance_tests patricia fast_library ${LOG4CPP_LIBRARY_PATH})

View File

@ -1,62 +1,98 @@
#pragma once
#include <algorithm>
#include <mutex>
#include <unordered_map>
#include <functional>
#include "speed_counters.hpp"
#include <boost/serialization/unordered_map.hpp>
// I keep these declaration here because of following error:
// error: there are no arguments to increment_outgoing_counters that depend on a template parameter, so a declaration
// of increment_outgoing_counters must be available [-fpermissive]
// increment_outgoing_counters(counter_ptr, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
void increment_incoming_counters(subnet_counter_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes);
void build_speed_counters_from_packet_counters(subnet_counter_t& new_speed_element, subnet_counter_t* vector_itr, double speed_calc_period);
void increment_outgoing_counters(subnet_counter_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes);
void build_average_speed_counters_from_speed_counters(subnet_counter_t* current_average_speed_element,
subnet_counter_t& new_speed_element,
double exp_value,
double exp_power);
// Class for abstract per key counters
template <typename T> class abstract_subnet_counters_t {
template <typename T, typename Counter> class abstract_subnet_counters_t {
public:
std::unordered_map<T, subnet_counter_t> counter_map;
std::unordered_map<T, Counter> counter_map;
std::mutex counter_map_mutex;
std::unordered_map<T, subnet_counter_t> speed_map;
std::unordered_map<T, subnet_counter_t> average_speed_map;
std::unordered_map<T, Counter> average_speed_map;
// By using single map for speed and data we can accomplish imprevement from 3-4 seconds for 14m hosts to 2-3 seconds
template <class Archive> void serialize(Archive& ar, [[maybe_unused]] const unsigned int version) {
ar& BOOST_SERIALIZATION_NVP(counter_map);
ar& BOOST_SERIALIZATION_NVP(speed_map);
ar& BOOST_SERIALIZATION_NVP(average_speed_map);
}
// Increments outgoing counters for specified key
void increment_outgoing_counters_for_key(T key, simple_packet_t& current_packet, uint64_t sampled_number_of_packets, uint64_t sampled_number_of_bytes) {
void increment_outgoing_counters_for_key(const T& key,
const simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
std::lock_guard<std::mutex> lock_guard(counter_map_mutex);
subnet_counter_t* counter_ptr = &counter_map[key];
increment_outgoing_counters(counter_ptr, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
Counter& counters = counter_map[key];
increment_outgoing_counters(counters, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
}
// Increments outgoing counters for specified key using multimatch array with indexes of matched thresholds
template <size_t N>
void increment_outgoing_counters_for_key(const T& key,
const std::array<bool, N>& matched_indexes,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
std::lock_guard<std::mutex> lock_guard(counter_map_mutex);
Counter& counters = counter_map[key];
extern time_t current_inaccurate_time;
// Update last update time
counters.last_update_time = current_inaccurate_time;
for (std::size_t current_index = 0; current_index < counters.flexible_counters.size(); current_index++) {
// Increment only counters which are relevant to specific flexible threshold
if (matched_indexes[current_index]) {
counters.flexible_counters[current_index].out_packets += sampled_number_of_packets;
counters.flexible_counters[current_index].out_bytes += sampled_number_of_bytes;
}
}
}
// Increments incoming counters for specified key
void increment_incoming_counters_for_key(T key, simple_packet_t& current_packet, uint64_t sampled_number_of_packets, uint64_t sampled_number_of_bytes) {
void increment_incoming_counters_for_key(const T& key,
const simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
std::lock_guard<std::mutex> lock_guard(counter_map_mutex);
subnet_counter_t* counter_ptr = &counter_map[key];
increment_incoming_counters(counter_ptr, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
Counter& counters = counter_map[key];
increment_incoming_counters(counters, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
}
// Increments incoming counters for specified key using multi match array with indexes of matched thresholds
template <size_t N>
void increment_incoming_counters_for_key(const T& key,
const std::array<bool, N>& matched_indexes,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
std::lock_guard<std::mutex> lock_guard(counter_map_mutex);
Counter& counters = counter_map[key];
extern time_t current_inaccurate_time;
// Update last update time
counters.last_update_time = current_inaccurate_time;
for (std::size_t current_index = 0; current_index < counters.flexible_counters.size(); current_index++) {
// Increment only counters which are relevant to specific flexible threshold
if (matched_indexes[current_index]) {
counters.flexible_counters[current_index].in_packets += sampled_number_of_packets;
counters.flexible_counters[current_index].in_bytes += sampled_number_of_bytes;
}
}
}
// Retrieves all elements
void get_all_average_speed_elements(std::unordered_map<T, Counter>& copy_of_average_speed_map) {
std::lock_guard<std::mutex> lock_guard(counter_map_mutex);
copy_of_average_speed_map = this->average_speed_map;
}
uint64_t purge_old_data(unsigned int automatic_data_cleanup_threshold) {
std::lock_guard<std::mutex> lock_guard(this->counter_map_mutex);
@ -75,7 +111,6 @@ template <typename T> class abstract_subnet_counters_t {
for (const auto& key : keys_to_remove) {
counter_map.erase(key);
speed_map.erase(key);
average_speed_map.erase(key);
}
@ -84,42 +119,48 @@ template <typename T> class abstract_subnet_counters_t {
}
void recalculate_speed(double speed_calc_period,
double average_calculation_time_for_subnets,
std::function<void(T*, subnet_counter_t*)> speed_check_callback) {
double average_calculation_time,
std::function<void(const T&, const Counter&)> speed_check_callback = nullptr,
std::function<void(const T&, Counter&, double)> new_speed_calc_callback = nullptr) {
// http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
double exp_power_subnet = -speed_calc_period / average_calculation_time;
double exp_value_subnet = exp(exp_power_subnet);
std::lock_guard<std::mutex> lock_guard(this->counter_map_mutex);
for (auto itr = this->counter_map.begin(); itr != this->counter_map.end(); ++itr) {
T current_key = itr->first;
subnet_counter_t* subnet_traffic = &itr->second;
// Create const reference to key to easily reference to it in code
const T& current_key = itr->first;
subnet_counter_t new_speed_element;
// Create normal reference
Counter& traffic_counters = itr->second;
build_speed_counters_from_packet_counters(new_speed_element, subnet_traffic, speed_calc_period);
// Create element for instant speed
Counter new_speed_element;
/* Moving average recalculation for subnets */
/* http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
*/
double exp_power_subnet = -speed_calc_period / average_calculation_time_for_subnets;
double exp_value_subnet = exp(exp_power_subnet);
build_speed_counters_from_packet_counters(new_speed_element, traffic_counters, speed_calc_period);
subnet_counter_t* current_average_speed_element = &average_speed_map[current_key];
// We can call callback function to populate more data here
if (new_speed_calc_callback != nullptr) {
new_speed_calc_callback(current_key, new_speed_element, speed_calc_period);
}
build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element,
exp_value_subnet, exp_power_subnet);
// Get reference to average speed element
Counter& current_average_speed_element = average_speed_map[current_key];
// Update speed calculation structure in single step
this->speed_map[current_key] = new_speed_element;
subnet_traffic->zeroify();
build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element, exp_value_subnet);
traffic_counters.zeroify();
// Check thresholds
if (speed_check_callback != nullptr) {
speed_check_callback(&current_key, current_average_speed_element);
speed_check_callback(current_key, current_average_speed_element);
}
}
}
// Returns all non zero average speed elements
void get_all_non_zero_average_speed_elements_as_pairs(std::vector<std::pair<T, subnet_counter_t>>& all_elements) {
void get_all_non_zero_average_speed_elements_as_pairs(std::vector<std::pair<T, Counter>>& all_elements) {
std::lock_guard<std::mutex> lock_guard(this->counter_map_mutex);
for (auto itr = this->average_speed_map.begin(); itr != this->average_speed_map.end(); ++itr) {
@ -131,18 +172,20 @@ template <typename T> class abstract_subnet_counters_t {
}
}
void get_sorted_average_speed(std::vector<std::pair<T, subnet_counter_t>>& vector_for_sort, sort_type_t sorter_type, direction_t sort_direction) {
void get_sorted_average_speed(std::vector<std::pair<T, Counter>>& vector_for_sort,
const attack_detection_threshold_type_t& sorter_type,
const attack_detection_direction_type_t& sort_direction) {
std::lock_guard<std::mutex> lock_guard(this->counter_map_mutex);
vector_for_sort.reserve(average_speed_map.size());
std::copy(average_speed_map.begin(), average_speed_map.end(), std::back_inserter(vector_for_sort));
std::sort(vector_for_sort.begin(), vector_for_sort.end(),
TrafficComparatorClass<std::pair<T, subnet_counter_t>>(sort_direction, sorter_type));
TrafficComparatorClass<std::pair<T, Counter>>(sort_direction, sorter_type));
}
// Retrieves average speed for specified key with all locks
bool get_average_speed_subnet(T key, subnet_counter_t& average_speed_element) {
bool get_average_speed_subnet(const T& key, Counter& average_speed_element) {
std::lock_guard<std::mutex> lock_guard(this->counter_map_mutex);
auto average_speed_itr = this->average_speed_map.find(key);
@ -156,12 +199,13 @@ template <typename T> class abstract_subnet_counters_t {
}
// Please create vector_for_sort this way on callers side: top_four(4);
void get_top_k_average_speed(std::vector<std::pair<T, subnet_counter_t>>& vector_for_sort, sort_type_t sorter_type, direction_t sort_direction) {
void get_top_k_average_speed(std::vector<std::pair<T, Counter>>& vector_for_sort,
const attack_detection_threshold_type_t& sorter_type,
const attack_detection_direction_type_t& sort_direction) {
std::lock_guard<std::mutex> lock_guard(this->counter_map_mutex);
std::partial_sort_copy(average_speed_map.begin(), average_speed_map.end(), vector_for_sort.begin(),
vector_for_sort.end(),
TrafficComparatorClass<std::pair<T, subnet_counter_t>>(sort_direction, sorter_type));
vector_for_sort.end(), TrafficComparatorClass<std::pair<T, Counter>>(sort_direction, sorter_type));
}
};

View File

@ -374,13 +374,13 @@ uint64_t outgoing_total_flows_speed = 0;
map_of_vector_counters_t SubnetVectorMap;
// Network counters for IPv6
abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t> ipv6_subnet_counters;
abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t,subnet_counter_t> ipv6_subnet_counters;
// Host counters for IPv6
abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t> ipv6_host_counters;
abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t,subnet_counter_t> ipv6_host_counters;
// Here we store traffic per subnet
abstract_subnet_counters_t<subnet_cidr_mask_t> ipv4_network_counters;
abstract_subnet_counters_t<subnet_cidr_mask_t,subnet_counter_t> ipv4_network_counters;
// Flow tracking structures
map_of_vector_counters_for_flow_t SubnetVectorMapFlow;

View File

@ -67,7 +67,7 @@ extern uint64_t influxdb_writes_failed;
extern packet_buckets_storage_t<subnet_ipv6_cidr_mask_t> packet_buckets_ipv6_storage;
extern std::string cli_stats_file_path;
extern unsigned int total_number_of_hosts_in_our_networks;
extern abstract_subnet_counters_t<subnet_cidr_mask_t> ipv4_network_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t, subnet_counter_t> ipv4_network_counters;
extern unsigned int recalculate_speed_timeout;
extern map_of_vector_counters_for_flow_t SubnetVectorMapFlow;
extern bool DEBUG_DUMP_ALL_PACKETS;
@ -89,8 +89,8 @@ extern double drawing_thread_execution_time;
extern std::chrono::steady_clock::time_point last_call_of_traffic_recalculation;
extern std::string cli_stats_ipv6_file_path;
extern unsigned int check_for_availible_for_processing_packets_buckets;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t> ipv6_host_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t> ipv6_subnet_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t,subnet_counter_t> ipv6_host_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t,subnet_counter_t> ipv6_subnet_counters;
extern bool process_incoming_traffic;
extern bool process_outgoing_traffic;
extern uint64_t total_unparsed_packets;
@ -187,162 +187,6 @@ unsigned int detect_attack_protocol(subnet_counter_t& speed_element, direction_t
}
}
// We calculate speed from packet counters here
void build_speed_counters_from_packet_counters(subnet_counter_t& new_speed_element, subnet_counter_t* vector_itr, double speed_calc_period) {
// calculate_speed(new_speed_element speed_element, vector_itr* );
new_speed_element.total.in_packets = uint64_t((double)vector_itr->total.in_packets / speed_calc_period);
new_speed_element.total.out_packets = uint64_t((double)vector_itr->total.out_packets / speed_calc_period);
new_speed_element.total.in_bytes = uint64_t((double)vector_itr->total.in_bytes / speed_calc_period);
new_speed_element.total.out_bytes = uint64_t((double)vector_itr->total.out_bytes / speed_calc_period);
// Fragmented
new_speed_element.fragmented.in_packets = uint64_t((double)vector_itr->fragmented.in_packets / speed_calc_period);
new_speed_element.fragmented.out_packets = uint64_t((double)vector_itr->fragmented.out_packets / speed_calc_period);
new_speed_element.fragmented.in_bytes = uint64_t((double)vector_itr->fragmented.in_bytes / speed_calc_period);
new_speed_element.fragmented.out_bytes = uint64_t((double)vector_itr->fragmented.out_bytes / speed_calc_period);
// By protocol counters
// TCP
new_speed_element.tcp.in_packets = uint64_t((double)vector_itr->tcp.in_packets / speed_calc_period);
new_speed_element.tcp.out_packets = uint64_t((double)vector_itr->tcp.out_packets / speed_calc_period);
new_speed_element.tcp.in_bytes = uint64_t((double)vector_itr->tcp.in_bytes / speed_calc_period);
new_speed_element.tcp.out_bytes = uint64_t((double)vector_itr->tcp.out_bytes / speed_calc_period);
// TCP syn
new_speed_element.tcp_syn.in_packets = uint64_t((double)vector_itr->tcp_syn.in_packets / speed_calc_period);
new_speed_element.tcp_syn.out_packets = uint64_t((double)vector_itr->tcp_syn.out_packets / speed_calc_period);
new_speed_element.tcp_syn.in_bytes = uint64_t((double)vector_itr->tcp_syn.in_bytes / speed_calc_period);
new_speed_element.tcp_syn.out_bytes = uint64_t((double)vector_itr->tcp_syn.out_bytes / speed_calc_period);
// UDP
new_speed_element.udp.in_packets = uint64_t((double)vector_itr->udp.in_packets / speed_calc_period);
new_speed_element.udp.out_packets = uint64_t((double)vector_itr->udp.out_packets / speed_calc_period);
new_speed_element.udp.in_bytes = uint64_t((double)vector_itr->udp.in_bytes / speed_calc_period);
new_speed_element.udp.out_bytes = uint64_t((double)vector_itr->udp.out_bytes / speed_calc_period);
// ICMP
new_speed_element.icmp.in_packets = uint64_t((double)vector_itr->icmp.in_packets / speed_calc_period);
new_speed_element.icmp.out_packets = uint64_t((double)vector_itr->icmp.out_packets / speed_calc_period);
new_speed_element.icmp.in_bytes = uint64_t((double)vector_itr->icmp.in_bytes / speed_calc_period);
new_speed_element.icmp.out_bytes = uint64_t((double)vector_itr->icmp.out_bytes / speed_calc_period);
}
void build_average_speed_counters_from_speed_counters(subnet_counter_t* current_average_speed_element,
subnet_counter_t& new_speed_element,
double exp_value,
double exp_power) {
// Global bytes counters
current_average_speed_element->total.in_bytes =
uint64_t(new_speed_element.total.in_bytes + exp_value * ((double)current_average_speed_element->total.in_bytes -
(double)new_speed_element.total.in_bytes));
current_average_speed_element->total.out_bytes =
uint64_t(new_speed_element.total.out_bytes + exp_value * ((double)current_average_speed_element->total.out_bytes -
(double)new_speed_element.total.out_bytes));
// Global packet counters
current_average_speed_element->total.in_packets =
uint64_t(new_speed_element.total.in_packets + exp_value * ((double)current_average_speed_element->total.in_packets -
(double)new_speed_element.total.in_packets));
current_average_speed_element->total.out_packets =
uint64_t(new_speed_element.total.out_packets + exp_value * ((double)current_average_speed_element->total.out_packets -
(double)new_speed_element.total.out_packets));
// Per packet type packet counters for in traffic
current_average_speed_element->fragmented.in_packets = uint64_t(
new_speed_element.fragmented.in_packets + exp_value * ((double)current_average_speed_element->fragmented.in_packets -
(double)new_speed_element.fragmented.in_packets));
current_average_speed_element->tcp.in_packets =
uint64_t(new_speed_element.tcp.in_packets + exp_value * ((double)current_average_speed_element->tcp.in_packets -
(double)new_speed_element.tcp.in_packets));
current_average_speed_element->tcp_syn.in_packets =
uint64_t(new_speed_element.tcp_syn.in_packets + exp_value * ((double)current_average_speed_element->tcp_syn.in_packets -
(double)new_speed_element.tcp_syn.in_packets));
current_average_speed_element->udp.in_packets =
uint64_t(new_speed_element.udp.in_packets + exp_value * ((double)current_average_speed_element->udp.in_packets -
(double)new_speed_element.udp.in_packets));
current_average_speed_element->icmp.in_packets =
uint64_t(new_speed_element.icmp.in_packets + exp_value * ((double)current_average_speed_element->icmp.in_packets -
(double)new_speed_element.icmp.in_packets));
// Per packet type packets counters for out
current_average_speed_element->fragmented.out_packets = uint64_t(
new_speed_element.fragmented.out_packets + exp_value * ((double)current_average_speed_element->fragmented.out_packets -
(double)new_speed_element.fragmented.out_packets));
current_average_speed_element->tcp.out_packets =
uint64_t(new_speed_element.tcp.out_packets + exp_value * ((double)current_average_speed_element->tcp.out_packets -
(double)new_speed_element.tcp.out_packets));
current_average_speed_element->tcp_syn.out_packets =
uint64_t(new_speed_element.tcp_syn.out_packets + exp_value * ((double)current_average_speed_element->tcp_syn.out_packets -
(double)new_speed_element.tcp_syn.out_packets));
current_average_speed_element->udp.out_packets =
uint64_t(new_speed_element.udp.out_packets + exp_value * ((double)current_average_speed_element->udp.out_packets -
(double)new_speed_element.udp.out_packets));
current_average_speed_element->icmp.out_packets =
uint64_t(new_speed_element.icmp.out_packets + exp_value * ((double)current_average_speed_element->icmp.out_packets -
(double)new_speed_element.icmp.out_packets));
// Per packet type bytes counter for out
current_average_speed_element->fragmented.out_bytes =
uint64_t(new_speed_element.fragmented.out_bytes + exp_value * ((double)current_average_speed_element->fragmented.out_bytes -
(double)new_speed_element.fragmented.out_bytes));
current_average_speed_element->tcp.out_bytes =
uint64_t(new_speed_element.tcp.out_bytes + exp_value * ((double)current_average_speed_element->tcp.out_bytes -
(double)new_speed_element.tcp.out_bytes));
current_average_speed_element->tcp_syn.out_bytes =
uint64_t(new_speed_element.tcp_syn.out_bytes + exp_value * ((double)current_average_speed_element->tcp_syn.out_bytes -
(double)new_speed_element.tcp_syn.out_bytes));
current_average_speed_element->udp.out_bytes =
uint64_t(new_speed_element.udp.out_bytes + exp_value * ((double)current_average_speed_element->udp.out_bytes -
(double)new_speed_element.udp.out_bytes));
current_average_speed_element->icmp.out_bytes =
uint64_t(new_speed_element.icmp.out_bytes + exp_value * ((double)current_average_speed_element->icmp.out_bytes -
(double)new_speed_element.icmp.out_bytes));
// Per packet type bytes counter for in
current_average_speed_element->fragmented.in_bytes =
uint64_t(new_speed_element.fragmented.in_bytes + exp_value * ((double)current_average_speed_element->fragmented.in_bytes -
(double)new_speed_element.fragmented.in_bytes));
current_average_speed_element->tcp.in_bytes =
uint64_t(new_speed_element.tcp.in_bytes +
exp_value * ((double)current_average_speed_element->tcp.in_bytes - (double)new_speed_element.tcp.in_bytes));
current_average_speed_element->tcp_syn.in_bytes =
uint64_t(new_speed_element.tcp_syn.in_bytes + exp_value * ((double)current_average_speed_element->tcp_syn.in_bytes -
(double)new_speed_element.tcp_syn.in_bytes));
current_average_speed_element->udp.in_bytes =
uint64_t(new_speed_element.udp.in_bytes +
exp_value * ((double)current_average_speed_element->udp.in_bytes - (double)new_speed_element.udp.in_bytes));
current_average_speed_element->icmp.in_bytes =
uint64_t(new_speed_element.icmp.in_bytes + exp_value * ((double)current_average_speed_element->icmp.in_bytes -
(double)new_speed_element.icmp.in_bytes));
}
std::string print_flow_tracking_for_ip(conntrack_main_struct_t& conntrack_element, std::string client_ip) {
std::stringstream buffer;
@ -400,21 +244,22 @@ std::string print_flow_tracking_for_ip(conntrack_main_struct_t& conntrack_elemen
std::string print_subnet_ipv4_load() {
std::stringstream buffer;
sort_type_t sorter;
attack_detection_threshold_type_t sorter_type;
if (sort_parameter == "packets") {
sorter = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
} else if (sort_parameter == "bytes") {
sorter = BYTES;
sorter_type = attack_detection_threshold_type_t::bytes_per_second;
} else if (sort_parameter == "flows") {
sorter = FLOWS;
sorter_type = attack_detection_threshold_type_t::flows_per_second;
} else {
logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
sorter = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
}
std::vector<std::pair<subnet_cidr_mask_t, subnet_counter_t>> vector_for_sort;
ipv4_network_counters.get_sorted_average_speed(vector_for_sort, sorter, INCOMING);
ipv4_network_counters.get_sorted_average_speed(vector_for_sort, sorter_type, attack_detection_direction_type_t::incoming);
for (auto itr = vector_for_sort.begin(); itr != vector_for_sort.end(); ++itr) {
subnet_counter_t* speed = &itr->second;
@ -658,8 +503,8 @@ bool exceed_mbps_speed(uint64_t in_counter, uint64_t out_counter, unsigned int t
}
// Return true when we should ban this entity
bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
ban_settings_t& current_ban_settings,
bool we_should_ban_this_entity(const subnet_counter_t& average_speed_element,
const ban_settings_t& current_ban_settings,
attack_detection_threshold_type_t& attack_detection_source,
attack_detection_direction_type_t& attack_detection_direction) {
@ -669,7 +514,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
// we detect overspeed by packets
if (current_ban_settings.enable_ban_for_pps &&
exceed_pps_speed(average_speed_element->total.in_packets, average_speed_element->total.out_packets,
exceed_pps_speed(average_speed_element.total.in_packets, average_speed_element.total.out_packets,
current_ban_settings.ban_threshold_pps)) {
attack_detection_source = attack_detection_threshold_type_t::packets_per_second;
@ -677,7 +522,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
}
if (current_ban_settings.enable_ban_for_bandwidth &&
exceed_mbps_speed(average_speed_element->total.in_bytes, average_speed_element->total.out_bytes,
exceed_mbps_speed(average_speed_element.total.in_bytes, average_speed_element.total.out_bytes,
current_ban_settings.ban_threshold_mbps)) {
attack_detection_source = attack_detection_threshold_type_t::bytes_per_second;
@ -685,7 +530,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
}
if (current_ban_settings.enable_ban_for_flows_per_second &&
exceed_flow_speed(average_speed_element->in_flows, average_speed_element->out_flows, current_ban_settings.ban_threshold_flows)) {
exceed_flow_speed(average_speed_element.in_flows, average_speed_element.out_flows, current_ban_settings.ban_threshold_flows)) {
attack_detection_source = attack_detection_threshold_type_t::flows_per_second;
return true;
@ -695,7 +540,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
// Per protocol pps thresholds
if (current_ban_settings.enable_ban_for_tcp_pps &&
exceed_pps_speed(average_speed_element->tcp.in_packets, average_speed_element->tcp.out_packets,
exceed_pps_speed(average_speed_element.tcp.in_packets, average_speed_element.tcp.out_packets,
current_ban_settings.ban_threshold_tcp_pps)) {
attack_detection_source = attack_detection_threshold_type_t::tcp_packets_per_second;
@ -703,7 +548,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
}
if (current_ban_settings.enable_ban_for_udp_pps &&
exceed_pps_speed(average_speed_element->udp.in_packets, average_speed_element->udp.out_packets,
exceed_pps_speed(average_speed_element.udp.in_packets, average_speed_element.udp.out_packets,
current_ban_settings.ban_threshold_udp_pps)) {
attack_detection_source = attack_detection_threshold_type_t::udp_packets_per_second;
@ -711,7 +556,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
}
if (current_ban_settings.enable_ban_for_icmp_pps &&
exceed_pps_speed(average_speed_element->icmp.in_packets, average_speed_element->icmp.out_packets,
exceed_pps_speed(average_speed_element.icmp.in_packets, average_speed_element.icmp.out_packets,
current_ban_settings.ban_threshold_icmp_pps)) {
attack_detection_source = attack_detection_threshold_type_t::icmp_packets_per_second;
return true;
@ -719,7 +564,7 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
// Per protocol bandwidth thresholds
if (current_ban_settings.enable_ban_for_tcp_bandwidth &&
exceed_mbps_speed(average_speed_element->tcp.in_bytes, average_speed_element->tcp.out_bytes,
exceed_mbps_speed(average_speed_element.tcp.in_bytes, average_speed_element.tcp.out_bytes,
current_ban_settings.ban_threshold_tcp_mbps)) {
attack_detection_source = attack_detection_threshold_type_t::tcp_bytes_per_second;
;
@ -727,14 +572,14 @@ bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
}
if (current_ban_settings.enable_ban_for_udp_bandwidth &&
exceed_mbps_speed(average_speed_element->udp.in_bytes, average_speed_element->udp.out_bytes,
exceed_mbps_speed(average_speed_element.udp.in_bytes, average_speed_element.udp.out_bytes,
current_ban_settings.ban_threshold_udp_mbps)) {
attack_detection_source = attack_detection_threshold_type_t::udp_bytes_per_second;
return true;
}
if (current_ban_settings.enable_ban_for_icmp_bandwidth &&
exceed_mbps_speed(average_speed_element->icmp.in_bytes, average_speed_element->icmp.out_bytes,
exceed_mbps_speed(average_speed_element.icmp.in_bytes, average_speed_element.icmp.out_bytes,
current_ban_settings.ban_threshold_icmp_mbps)) {
attack_detection_source = attack_detection_threshold_type_t::icmp_bytes_per_second;
return true;
@ -915,7 +760,7 @@ void cleanup_ban_list() {
continue;
}
subnet_counter_t* average_speed_element = &itr_average_speed->second[shift_in_vector];
subnet_counter_t& average_speed_element = itr_average_speed->second[shift_in_vector];
// We get ban settings from host subnet
std::string host_group_name;
@ -1792,16 +1637,17 @@ void traffic_draw_ipv6_program() {
std::stringstream output_buffer;
// logger<<log4cpp::Priority::INFO<<"Draw table call";
sort_type_t sorter;
attack_detection_threshold_type_t sorter_type;
if (sort_parameter == "packets") {
sorter = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
} else if (sort_parameter == "bytes") {
sorter = BYTES;
sorter_type = attack_detection_threshold_type_t::bytes_per_second;
} else if (sort_parameter == "flows") {
sorter = FLOWS;
sorter_type = attack_detection_threshold_type_t::flows_per_second;
} else {
logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
sorter = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
}
output_buffer << "FastNetMon " << fastnetmon_platform_configuration.fastnetmon_version << " Try Advanced edition: https://fastnetmon.com"
@ -1811,14 +1657,14 @@ void traffic_draw_ipv6_program() {
output_buffer << print_channel_speed_ipv6("Incoming traffic", INCOMING) << std::endl;
if (process_incoming_traffic) {
output_buffer << draw_table_ipv6(INCOMING, true, sorter);
output_buffer << draw_table_ipv6(attack_detection_direction_type_t::incoming, true, sorter_type);
output_buffer << std::endl;
}
output_buffer << print_channel_speed_ipv6("Outgoing traffic", OUTGOING) << std::endl;
if (process_outgoing_traffic) {
output_buffer << draw_table_ipv6(OUTGOING, false, sorter);
output_buffer << draw_table_ipv6(attack_detection_direction_type_t::outgoing, false, sorter_type);
output_buffer << std::endl;
}
@ -1837,22 +1683,21 @@ void traffic_draw_ipv6_program() {
std::string print_subnet_ipv6_load() {
std::stringstream buffer;
sort_type_t sorter_type;
attack_detection_threshold_type_t sorter_type;
if (sort_parameter == "packets") {
sorter_type = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
} else if (sort_parameter == "bytes") {
sorter_type = BYTES;
sorter_type = attack_detection_threshold_type_t::bytes_per_second;
} else if (sort_parameter == "flows") {
sorter_type = FLOWS;
sorter_type = attack_detection_threshold_type_t::flows_per_second;
} else {
logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
sorter_type = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
}
direction_t sort_direction = OUTGOING;
std::vector<pair_of_map_for_ipv6_subnet_counters_elements_t> vector_for_sort;
ipv6_subnet_counters.get_sorted_average_speed(vector_for_sort, sorter_type, sort_direction);
ipv6_subnet_counters.get_sorted_average_speed(vector_for_sort, sorter_type, attack_detection_direction_type_t::outgoing);
for (std::vector<pair_of_map_for_ipv6_subnet_counters_elements_t>::iterator itr = vector_for_sort.begin();
@ -1877,16 +1722,17 @@ void traffic_draw_ipv4_program() {
// logger<<log4cpp::Priority::INFO<<"Draw table call";
std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
sort_type_t sorter;
attack_detection_threshold_type_t sorter_type;
if (sort_parameter == "packets") {
sorter = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
} else if (sort_parameter == "bytes") {
sorter = BYTES;
sorter_type = attack_detection_threshold_type_t::bytes_per_second;
} else if (sort_parameter == "flows") {
sorter = FLOWS;
sorter_type = attack_detection_threshold_type_t::flows_per_second;
} else {
logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
sorter = PACKETS;
sorter_type = attack_detection_threshold_type_t::packets_per_second;
}
output_buffer << "FastNetMon " << fastnetmon_platform_configuration.fastnetmon_version << " Try Advanced edition: https://fastnetmon.com"
@ -1896,14 +1742,14 @@ void traffic_draw_ipv4_program() {
output_buffer << print_channel_speed("Incoming traffic", INCOMING) << std::endl;
if (process_incoming_traffic) {
output_buffer << draw_table_ipv4(INCOMING, true, sorter);
output_buffer << draw_table_ipv4(attack_detection_direction_type_t::incoming, true, sorter_type);
output_buffer << std::endl;
}
output_buffer << print_channel_speed("Outgoing traffic", OUTGOING) << std::endl;
if (process_outgoing_traffic) {
output_buffer << draw_table_ipv4(OUTGOING, false, sorter);
output_buffer << draw_table_ipv4(attack_detection_direction_type_t::outgoing, false, sorter_type);
output_buffer << std::endl;
}
@ -2084,7 +1930,7 @@ bool fill_attack_information(subnet_counter_t average_speed_element,
// Speed recalculation function for IPv6 hosts calls it for each host during speed recalculation
void speed_callback_ipv6(subnet_ipv6_cidr_mask_t* current_subnet, subnet_counter_t* current_average_speed_element) {
void speed_callback_ipv6(const subnet_ipv6_cidr_mask_t& current_subnet, const subnet_counter_t& current_average_speed_element) {
// We should check thresholds only for per host counters for IPv6 and only when any ban actions for IPv6 traffic were enabled
if (!global_ban_settings.enable_ban_ipv6) {
return;
@ -2104,20 +1950,20 @@ void speed_callback_ipv6(subnet_ipv6_cidr_mask_t* current_subnet, subnet_counter
}
// This code works only for /128 subnets
bool in_white_list = ip_belongs_to_patricia_tree_ipv6(whitelist_tree_ipv6, current_subnet->subnet_address);
bool in_white_list = ip_belongs_to_patricia_tree_ipv6(whitelist_tree_ipv6, current_subnet.subnet_address);
if (in_white_list) {
// logger << log4cpp::Priority::INFO << "This IP was whitelisted";
return;
}
bool we_already_have_buckets_for_this_ip = packet_buckets_ipv6_storage.we_have_bucket_for_this_ip(*current_subnet);
bool we_already_have_buckets_for_this_ip = packet_buckets_ipv6_storage.we_have_bucket_for_this_ip(current_subnet);
if (we_already_have_buckets_for_this_ip) {
return;
}
bool this_ip_is_already_banned = ban_list_ipv6_ng.is_blackholed(*current_subnet);
bool this_ip_is_already_banned = ban_list_ipv6_ng.is_blackholed(current_subnet);
if (this_ip_is_already_banned) {
return;
@ -2125,27 +1971,27 @@ void speed_callback_ipv6(subnet_ipv6_cidr_mask_t* current_subnet, subnet_counter
std::string ddos_detection_threshold_as_string = get_human_readable_threshold_type(attack_detection_source);
logger << log4cpp::Priority::INFO << "We have detected IPv6 attack for " << print_ipv6_cidr_subnet(*current_subnet)
logger << log4cpp::Priority::INFO << "We have detected IPv6 attack for " << print_ipv6_cidr_subnet(current_subnet)
<< " with " << ddos_detection_threshold_as_string << " threshold host group: " << host_group_name;
std::string parent_group;
attack_details_t attack_details;
fill_attack_information(*current_average_speed_element, attack_details, host_group_name, parent_group, unban_enabled, global_ban_time);
fill_attack_information(current_average_speed_element, attack_details, host_group_name, parent_group, unban_enabled, global_ban_time);
attack_details.ipv6 = true;
// TODO: Also, we should find IPv6 network for attack here
bool enable_backet_capture =
packet_buckets_ipv6_storage.enable_packet_capture(*current_subnet, attack_details, collection_pattern_t::ONCE);
packet_buckets_ipv6_storage.enable_packet_capture(current_subnet, attack_details, collection_pattern_t::ONCE);
if (!enable_backet_capture) {
logger << log4cpp::Priority::ERROR << "Could not enable packet capture for deep analytics for IPv6 "
<< print_ipv6_cidr_subnet(*current_subnet);
<< print_ipv6_cidr_subnet(current_subnet);
return;
}
logger << log4cpp::Priority::INFO << "Enabled packet capture for IPv6 " << print_ipv6_address(current_subnet->subnet_address);
logger << log4cpp::Priority::INFO << "Enabled packet capture for IPv6 " << print_ipv6_address(current_subnet.subnet_address);
}
@ -2224,7 +2070,7 @@ void recalculate_speed() {
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
// Calculate speed for IP or whole subnet
build_speed_counters_from_packet_counters(new_speed_element, &*vector_itr, speed_calc_period);
build_speed_counters_from_packet_counters(new_speed_element, *vector_itr, speed_calc_period);
conntrack_main_struct_t* flow_counter_ptr = &SubnetVectorMapFlow[itr->first][current_index];
@ -2256,18 +2102,18 @@ void recalculate_speed() {
double exp_power = -speed_calc_period / average_calculation_amount;
double exp_value = exp(exp_power);
subnet_counter_t* current_average_speed_element = &SubnetVectorMapSpeedAverage[itr->first][current_index];
subnet_counter_t& current_average_speed_element = SubnetVectorMapSpeedAverage[itr->first][current_index];
// Calculate average speed from per-second speed
build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element, exp_value, exp_power);
build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element, exp_value);
if (enable_connection_tracking) {
current_average_speed_element->out_flows =
uint64_t(new_speed_element.out_flows + exp_value * ((double)current_average_speed_element->out_flows -
current_average_speed_element.out_flows =
uint64_t(new_speed_element.out_flows + exp_value * ((double)current_average_speed_element.out_flows -
(double)new_speed_element.out_flows));
current_average_speed_element->in_flows =
uint64_t(new_speed_element.in_flows + exp_value * ((double)current_average_speed_element->in_flows -
current_average_speed_element.in_flows =
uint64_t(new_speed_element.in_flows + exp_value * ((double)current_average_speed_element.in_flows -
(double)new_speed_element.in_flows));
}
@ -2290,7 +2136,7 @@ void recalculate_speed() {
}
// TODO: we should pass type of ddos ban source (pps, flowd, bandwidth)!
execute_ip_ban(client_ip, *current_average_speed_element, flow_attack_details, itr->first);
execute_ip_ban(client_ip, current_average_speed_element, flow_attack_details, itr->first);
}
SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element;
@ -2300,7 +2146,7 @@ void recalculate_speed() {
}
// Calculate IPv6 per network traffic
ipv6_subnet_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, speed_callback_subnet_ipv6);
ipv6_subnet_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, nullptr);
// Recalculate traffic for hosts
ipv6_host_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, speed_callback_ipv6);
@ -2394,7 +2240,7 @@ void recalculate_speed() {
}
}
std::string draw_table_ipv6(direction_t sort_direction, bool do_redis_update, sort_type_t sorter_type) {
std::string draw_table_ipv6(attack_detection_direction_type_t sort_direction, bool do_redis_update, attack_detection_threshold_type_t sorter_type) {
std::vector<pair_of_map_for_ipv6_subnet_counters_elements_t> vector_for_sort;
ssize_t size_of_ipv6_counters_map = 0;
std::stringstream output_buffer;
@ -2461,11 +2307,11 @@ std::string draw_table_ipv6(direction_t sort_direction, bool do_redis_update, so
subnet_counter_t* current_speed_element = &ii->second;
// Create polymorphic pps, byte and flow counters
if (sort_direction == INCOMING) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
pps = current_speed_element->total.in_packets;
bps = current_speed_element->total.in_bytes;
flows = current_speed_element->in_flows;
} else if (sort_direction == OUTGOING) {
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
pps = current_speed_element->total.out_packets;
bps = current_speed_element->total.out_bytes;
flows = current_speed_element->out_flows;
@ -2488,7 +2334,7 @@ std::string draw_table_ipv6(direction_t sort_direction, bool do_redis_update, so
return output_buffer.str();
}
std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, sort_type_t sort_item) {
std::string draw_table_ipv4(const attack_detection_direction_type_t& data_direction, bool do_redis_update, const attack_detection_threshold_type_t& sorter_type) {
std::vector<pair_of_map_elements> vector_for_sort;
std::stringstream output_buffer;
@ -2525,7 +2371,7 @@ std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, so
// Sort only first X elements in this vector
unsigned int shift_for_sort = max_ips_in_list;
if (data_direction == INCOMING or data_direction == OUTGOING) {
if (data_direction == attack_detection_direction_type_t::incoming or data_direction == attack_detection_direction_type_t::outgoing) {
// Because in another case we will got segmentation fault
unsigned int vector_size = vector_for_sort.size();
@ -2534,7 +2380,7 @@ std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, so
}
std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + shift_for_sort, vector_for_sort.end(),
TrafficComparatorClass<pair_of_map_elements>(data_direction, sort_item));
TrafficComparatorClass<pair_of_map_elements>(data_direction, sorter_type));
} else {
logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function";
return "Internal error";
@ -2560,11 +2406,11 @@ std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, so
subnet_counter_t* current_speed_element = &ii->second;
// Create polymorphic pps, byte and flow counters
if (data_direction == INCOMING) {
if (data_direction == attack_detection_direction_type_t::incoming) {
pps = current_speed_element->total.in_packets;
bps = current_speed_element->total.in_bytes;
flows = current_speed_element->in_flows;
} else if (data_direction == OUTGOING) {
} else if (data_direction == attack_detection_direction_type_t::outgoing) {
pps = current_speed_element->total.out_packets;
bps = current_speed_element->total.out_bytes;
flows = current_speed_element->out_flows;

View File

@ -11,18 +11,11 @@
#include "fastnetmon.grpc.pb.h"
#include <grpc++/grpc++.h>
void build_speed_counters_from_packet_counters(subnet_counter_t& new_speed_element, subnet_counter_t* vector_itr, double speed_calc_period);
void build_average_speed_counters_from_speed_counters(subnet_counter_t* current_average_speed_element,
subnet_counter_t& new_speed_element,
double exp_value,
double exp_power);
std::string get_amplification_attack_type(amplification_attack_type_t attack_type);
std::string generate_flow_spec_for_amplification_attack(amplification_attack_type_t amplification_attack_type, std::string destination_ip);
bool we_should_ban_this_entity(subnet_counter_t* average_speed_element,
ban_settings_t& current_ban_settings,
bool we_should_ban_this_entity(const subnet_counter_t& average_speed_element,
const ban_settings_t& current_ban_settings,
attack_detection_threshold_type_t& attack_detection_source,
attack_detection_direction_type_t& attack_detection_direction);
@ -90,8 +83,8 @@ std::string print_channel_speed_ipv6(std::string traffic_type, direction_t packe
std::string print_channel_speed(std::string traffic_type, direction_t packet_direction);
void traffic_draw_ipv4_program();
void recalculate_speed();
std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, sort_type_t sort_item);
std::string draw_table_ipv6(direction_t data_direction, bool do_redis_update, sort_type_t sort_item);
std::string draw_table_ipv4(const attack_detection_direction_type_t& sort_direction, bool do_redis_update, const attack_detection_threshold_type_t& sorter_type);
std::string draw_table_ipv6(attack_detection_direction_type_t sort_direction, bool do_redis_update, attack_detection_threshold_type_t sorter_type);
void print_screen_contents_into_file(std::string screen_data_stats_param, std::string file_path);
void zeroify_all_flow_counters();
void process_packet(simple_packet_t& current_packet);

View File

@ -66,6 +66,9 @@ enum class attack_detection_threshold_type_t {
tcp_syn_packets_per_second,
tcp_syn_bytes_per_second,
ip_fragments_packets_per_second,
ip_fragments_bytes_per_second,
};
// Types of metrics as in Prometheus:
@ -88,44 +91,123 @@ class system_counter_t {
std::string counter_description;
};
/* Class for custom comparison fields by different fields */
template <typename T> class TrafficComparatorClass {
private:
sort_type_t sort_field;
direction_t sort_direction;
attack_detection_threshold_type_t sort_field;
attack_detection_direction_type_t sort_direction;
public:
TrafficComparatorClass(direction_t sort_direction, sort_type_t sort_field) {
TrafficComparatorClass(attack_detection_direction_type_t sort_direction, attack_detection_threshold_type_t sort_field) {
this->sort_field = sort_field;
this->sort_direction = sort_direction;
}
bool operator()(T a, T b) {
if (sort_field == FLOWS) {
if (sort_direction == INCOMING) {
if (sort_field == attack_detection_threshold_type_t::flows_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.in_flows > b.second.in_flows;
} else if (sort_direction == OUTGOING) {
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.out_flows > b.second.out_flows;
} else {
return false;
}
} else if (sort_field == PACKETS) {
if (sort_direction == INCOMING) {
} else if (sort_field == attack_detection_threshold_type_t::packets_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.total.in_packets > b.second.total.in_packets;
} else if (sort_direction == OUTGOING) {
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.total.out_packets > b.second.total.out_packets;
} else {
return false;
}
} else if (sort_field == BYTES) {
if (sort_direction == INCOMING) {
} else if (sort_field == attack_detection_threshold_type_t::bytes_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.total.in_bytes > b.second.total.in_bytes;
} else if (sort_direction == OUTGOING) {
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.total.out_bytes > b.second.total.out_bytes;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::tcp_packets_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.tcp.in_packets > b.second.tcp.in_packets;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.tcp.out_packets > b.second.tcp.out_packets;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::udp_packets_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.udp.in_packets > b.second.udp.in_packets;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.udp.out_packets > b.second.udp.out_packets;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::icmp_packets_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.icmp.in_packets > b.second.icmp.in_packets;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.icmp.out_packets > b.second.icmp.out_packets;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::tcp_bytes_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.tcp.in_bytes > b.second.tcp.in_bytes;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.tcp.out_bytes > b.second.tcp.out_bytes;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::udp_bytes_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.udp.in_bytes > b.second.udp.in_bytes;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.udp.out_bytes > b.second.udp.out_bytes;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::icmp_bytes_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.icmp.in_bytes > b.second.icmp.in_bytes;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.icmp.out_bytes > b.second.icmp.out_bytes;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::tcp_syn_packets_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.tcp_syn.in_packets > b.second.tcp_syn.in_packets;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.tcp_syn.out_packets > b.second.tcp_syn.out_packets;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::tcp_syn_bytes_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.tcp_syn.in_bytes > b.second.tcp_syn.in_bytes;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.tcp_syn.out_bytes > b.second.tcp_syn.out_bytes;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::ip_fragments_packets_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.fragmented.in_packets > b.second.fragmented.in_packets;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.fragmented.out_packets > b.second.fragmented.out_packets;
} else {
return false;
}
} else if (sort_field == attack_detection_threshold_type_t::ip_fragments_bytes_per_second) {
if (sort_direction == attack_detection_direction_type_t::incoming) {
return a.second.fragmented.in_bytes > b.second.fragmented.in_bytes;
} else if (sort_direction == attack_detection_direction_type_t::outgoing) {
return a.second.fragmented.out_bytes > b.second.fragmented.out_bytes;
} else {
return false;
}
} else {
return false;
}

View File

@ -15,7 +15,7 @@ extern map_of_vector_counters_t SubnetVectorMapSpeed;
extern map_of_vector_counters_t SubnetVectorMapSpeedAverage;
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern abstract_subnet_counters_t<subnet_cidr_mask_t> ipv4_network_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t, subnet_counter_t> ipv4_network_counters;
extern total_speed_counters_t total_counters_ipv4;
extern total_speed_counters_t total_counters_ipv6;

View File

@ -15,15 +15,15 @@ extern map_of_vector_counters_t SubnetVectorMapSpeed;
extern map_of_vector_counters_t SubnetVectorMapSpeedAverage;
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern abstract_subnet_counters_t<subnet_cidr_mask_t> ipv4_network_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t,subnet_counter_t> ipv4_network_counters;
extern uint64_t influxdb_writes_total;
extern uint64_t influxdb_writes_failed;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t> ipv6_host_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t> ipv4_host_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t> ipv4_remote_host_counters;
extern abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t,subnet_counter_t> ipv6_host_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t,subnet_counter_t> ipv4_host_counters;
extern abstract_subnet_counters_t<subnet_cidr_mask_t,subnet_counter_t> ipv4_remote_host_counters;
extern std::vector<ban_settings_t> hostgroup_list_total_calculation;
extern std::mutex hostgroup_list_total_calculation_mutex;
extern abstract_subnet_counters_t<int64_t> per_hostgroup_total_counters;
extern abstract_subnet_counters_t<int64_t,subnet_counter_t> per_hostgroup_total_counters;
extern log4cpp::Category& logger;
extern total_speed_counters_t total_counters_ipv4;
extern total_speed_counters_t total_counters_ipv6;

132
src/speed_counters.cpp Normal file
View File

@ -0,0 +1,132 @@
#include "speed_counters.hpp"
#include "fast_library.hpp"
extern time_t current_inaccurate_time;
// This function increments all our accumulators according to data from packet
void increment_incoming_counters(subnet_counter_t& current_element,
const simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
// Update last update time
current_element.last_update_time = current_inaccurate_time;
// Main packet/bytes counter
current_element.total.in_packets += sampled_number_of_packets;
current_element.total.in_bytes += sampled_number_of_bytes;
// Count fragmented IP packets
if (current_packet.ip_fragmented) {
current_element.fragmented.in_packets += sampled_number_of_packets;
current_element.fragmented.in_bytes += sampled_number_of_bytes;
}
// Count dropped packets
if (current_packet.forwarding_status == forwarding_status_t::dropped) {
current_element.dropped.in_packets += sampled_number_of_packets;
current_element.dropped.in_bytes += sampled_number_of_bytes;
}
// Count per protocol packets
if (current_packet.protocol == IPPROTO_TCP) {
current_element.tcp.in_packets += sampled_number_of_packets;
current_element.tcp.in_bytes += sampled_number_of_bytes;
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
current_element.tcp_syn.in_packets += sampled_number_of_packets;
current_element.tcp_syn.in_bytes += sampled_number_of_bytes;
}
} else if (current_packet.protocol == IPPROTO_UDP) {
current_element.udp.in_packets += sampled_number_of_packets;
current_element.udp.in_bytes += sampled_number_of_bytes;
} else if (current_packet.protocol == IPPROTO_ICMP) {
current_element.icmp.in_packets += sampled_number_of_packets;
current_element.icmp.in_bytes += sampled_number_of_bytes;
} else {
// TBD
}
}
// We calculate speed from packet counters here
void build_speed_counters_from_packet_counters(subnet_counter_t& new_speed_element, const subnet_counter_t& data_counters, double speed_calc_period) {
new_speed_element.total.calculate_speed(data_counters.total, speed_calc_period);
new_speed_element.dropped.calculate_speed(data_counters.dropped, speed_calc_period);
new_speed_element.fragmented.calculate_speed(data_counters.fragmented, speed_calc_period);
new_speed_element.tcp_syn.calculate_speed(data_counters.tcp_syn, speed_calc_period);
new_speed_element.tcp.calculate_speed(data_counters.tcp, speed_calc_period);
new_speed_element.udp.calculate_speed(data_counters.udp, speed_calc_period);
new_speed_element.icmp.calculate_speed(data_counters.icmp, speed_calc_period);
}
// Increment fields using data from specified packet
void increment_outgoing_counters(subnet_counter_t& current_element,
const simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
// Update last update time
current_element.last_update_time = current_inaccurate_time;
// Main packet/bytes counter
current_element.total.out_packets += sampled_number_of_packets;
current_element.total.out_bytes += sampled_number_of_bytes;
// Fragmented IP packets
if (current_packet.ip_fragmented) {
current_element.fragmented.out_packets += sampled_number_of_packets;
current_element.fragmented.out_bytes += sampled_number_of_bytes;
}
// Count dropped packets
if (current_packet.forwarding_status == forwarding_status_t::dropped) {
current_element.dropped.out_packets += sampled_number_of_packets;
current_element.dropped.out_bytes += sampled_number_of_bytes;
}
if (current_packet.protocol == IPPROTO_TCP) {
current_element.tcp.out_packets += sampled_number_of_packets;
current_element.tcp.out_bytes += sampled_number_of_bytes;
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
current_element.tcp_syn.out_packets += sampled_number_of_packets;
current_element.tcp_syn.out_bytes += sampled_number_of_bytes;
}
} else if (current_packet.protocol == IPPROTO_UDP) {
current_element.udp.out_packets += sampled_number_of_packets;
current_element.udp.out_bytes += sampled_number_of_bytes;
} else if (current_packet.protocol == IPPROTO_ICMP) {
current_element.icmp.out_packets += sampled_number_of_packets;
current_element.icmp.out_bytes += sampled_number_of_bytes;
// no flow tracking for icmp
} else {
}
}
// We use this code to create smoothed speed of traffic from instant speed (per second)
void build_average_speed_counters_from_speed_counters(subnet_counter_t& current_average_speed_element,
const subnet_counter_t& new_speed_element,
double exp_value) {
current_average_speed_element.total.calulate_exponential_moving_average_speed(new_speed_element.total, exp_value);
current_average_speed_element.dropped.calulate_exponential_moving_average_speed(new_speed_element.dropped, exp_value);
current_average_speed_element.fragmented.calulate_exponential_moving_average_speed(new_speed_element.fragmented, exp_value);
current_average_speed_element.tcp_syn.calulate_exponential_moving_average_speed(new_speed_element.tcp_syn, exp_value);
current_average_speed_element.tcp.calulate_exponential_moving_average_speed(new_speed_element.tcp, exp_value);
current_average_speed_element.udp.calulate_exponential_moving_average_speed(new_speed_element.udp, exp_value);
current_average_speed_element.icmp.calulate_exponential_moving_average_speed(new_speed_element.icmp, exp_value);
// We do calculate flow counters for all cases
current_average_speed_element.out_flows =
uint64_t(new_speed_element.out_flows +
exp_value * ((double)current_average_speed_element.out_flows - (double)new_speed_element.out_flows));
current_average_speed_element.in_flows =
uint64_t(new_speed_element.in_flows +
exp_value * ((double)current_average_speed_element.in_flows - (double)new_speed_element.in_flows));
}

17
src/speed_counters.hpp Normal file
View File

@ -0,0 +1,17 @@
#pragma once
#include "fastnetmon_types.hpp"
void increment_incoming_counters(subnet_counter_t& current_element,
const simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes);
void build_speed_counters_from_packet_counters(subnet_counter_t& new_speed_element, const subnet_counter_t& data_counter, double speed_calc_period);
void increment_outgoing_counters(subnet_counter_t& current_element,
const simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes);
void build_average_speed_counters_from_speed_counters(subnet_counter_t& current_average_speed_element,
const subnet_counter_t& new_speed_element,
double exp_value);

View File

@ -22,18 +22,65 @@ class traffic_counter_element_t {
return in_bytes == 0 && out_bytes == 0 && in_packets == 0 && out_packets == 0;
}
// This function compares value in object with passed value and updates our object value to it if it passed value exceeds value we have in place
void update_if_larger(const traffic_counter_element_t& another_value) {
if (another_value.in_bytes > this->in_bytes) {
this->in_bytes = another_value.in_bytes;
}
if (another_value.out_bytes > this->out_bytes) {
this->out_bytes = another_value.out_bytes;
}
if (another_value.in_packets > this->in_packets) {
this->in_packets = another_value.in_packets;
}
if (another_value.out_packets > this->out_packets) {
this->out_packets = another_value.out_packets;
}
}
template <class Archive> void serialize(Archive& ar, [[maybe_unused]] const unsigned int version) {
ar& BOOST_SERIALIZATION_NVP(in_bytes);
ar& BOOST_SERIALIZATION_NVP(out_bytes);
ar& BOOST_SERIALIZATION_NVP(in_packets);
ar& BOOST_SERIALIZATION_NVP(out_packets);
}
// Calculates speed for all counters from input data counter
void calculate_speed(const traffic_counter_element_t& traffic_counter, double speed_calc_period) {
this->in_packets = uint64_t((double)traffic_counter.in_packets / speed_calc_period);
this->out_packets = uint64_t((double)traffic_counter.out_packets / speed_calc_period);
this->in_bytes = uint64_t((double)traffic_counter.in_bytes / speed_calc_period);
this->out_bytes = uint64_t((double)traffic_counter.out_bytes / speed_calc_period);
}
// Calculates exponential moving average speed from instant speed
void calulate_exponential_moving_average_speed(const traffic_counter_element_t& new_speed_element, double exp_value) {
// Bytes counters
this->in_bytes =
uint64_t(new_speed_element.in_bytes + exp_value * ((double)this->in_bytes - (double)new_speed_element.in_bytes));
this->out_bytes =
uint64_t(new_speed_element.out_bytes + exp_value * ((double)this->out_bytes - (double)new_speed_element.out_bytes));
// Packet counters
this->in_packets = uint64_t(new_speed_element.in_packets +
exp_value * ((double)this->in_packets - (double)new_speed_element.in_packets));
this->out_packets = uint64_t(new_speed_element.out_packets +
exp_value * ((double)this->out_packets - (double)new_speed_element.out_packets));
}
};
// main data structure for storing traffic and speed data for all our IPs
// Main data structure for storing traffic and speed data for all our IPs
class subnet_counter_t {
public:
// We use inaccurate time source for it becasue we do not care about precise time in this case
// We use inaccurate time source for it because we do not care about precise time in this case
time_t last_update_time = 0;
traffic_counter_element_t total;
@ -48,6 +95,28 @@ class subnet_counter_t {
// Total number of dropped traffic
traffic_counter_element_t dropped;
// Updates specific value if any of fields from another_value exceed values in our object
void update_if_larger(const subnet_counter_t& another_value) {
this->total.update_if_larger(another_value.total);
this->tcp.update_if_larger(another_value.tcp);
this->udp.update_if_larger(another_value.udp);
this->icmp.update_if_larger(another_value.icmp);
this->fragmented.update_if_larger(another_value.fragmented);
this->tcp_syn.update_if_larger(another_value.tcp_syn);
this->dropped.update_if_larger(another_value.dropped);
if (in_flows < another_value.in_flows) {
this->in_flows = another_value.in_flows;
}
if (out_flows < another_value.out_flows) {
this->out_flows = another_value.out_flows;
}
}
uint64_t in_flows = 0;
uint64_t out_flows = 0;

View File

@ -20,7 +20,7 @@
log4cpp::Category& logger = log4cpp::Category::getRoot();
abstract_subnet_counters_t<uint32_t> ipv4_host_counters;
abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
time_t current_inaccurate_time = 0;
// Try to copy source map into vector
@ -311,7 +311,7 @@ int main() {
const int sharding_value = 32;
std::array<abstract_subnet_counters_t<uint32_t>, sharding_value> speed_counters;
std::array<abstract_subnet_counters_t<uint32_t, subnet_counter_t>, sharding_value> speed_counters;
for (const auto& itr : ipv4_host_counters.counter_map) {
// NB!!!! TODO: without ntoh conversion it will be 2 all the time and sharding will not work!!!