Unification of sFlow module with Advanced edition: WiP

This commit is contained in:
Pavel Odintsov 2024-07-16 19:37:45 +03:00
parent 82236ba29b
commit 4a9fc72729
2 changed files with 52 additions and 44 deletions

View File

@ -91,7 +91,7 @@ class simple_packet_t {
// Timestamp of packet as reported by Netflow or IPFIX agent on device, it may be very inaccurate as nobody cares about time on equipment
struct timeval ts = { 0, 0 };
void* payload_pointer = nullptr;
const void* payload_pointer = nullptr;
// Part of packet we captured from wire. It may not be full length of packet
int32_t captured_payload_length = 0;

View File

@ -210,7 +210,9 @@ void start_sflow_collector(const std::string& sflow_host, unsigned int sflow_por
addrinfo* servinfo = NULL;
int getaddrinfo_result = getaddrinfo(sflow_host.c_str(), std::to_string(sflow_port).c_str(), &hints, &servinfo);
std::string port_as_string = std::to_string(sflow_port);
int getaddrinfo_result = getaddrinfo(sflow_host.c_str(), port_as_string.c_str(), &hints, &servinfo);
if (getaddrinfo_result != 0) {
logger << log4cpp::Priority::ERROR << "sFlow getaddrinfo function failed with code: " << getaddrinfo_result
@ -228,16 +230,13 @@ void start_sflow_collector(const std::string& sflow_host, unsigned int sflow_por
int bind_result = bind(sockfd, servinfo->ai_addr, servinfo->ai_addrlen);
if (bind_result != 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "cannot bind on " << sflow_port << ":"
<< sflow_host << " with errno: " << errno << " error: " << strerror(errno);
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "cannot bind on " << sflow_port << ":" << sflow_host
<< " with errno: " << errno << " error: " << strerror(errno);
return;
}
freeaddrinfo(servinfo);
struct sockaddr_in6 peer;
memset(&peer, 0, sizeof(peer));
/* We should specify timeout there for correct toolkit shutdown */
/* Because otherwise recvfrom will stay in blocked mode forever */
struct timeval tv;
@ -278,6 +277,9 @@ void start_sflow_collector(const std::string& sflow_host, unsigned int sflow_por
if (client_addr.sin_family == AF_INET) {
client_ipv4_address = client_addr.sin_addr.s_addr;
// Use most efficient function to avoid bottlenecks here
std::string ip_address_as_string;
// convert_ip_as_uint_to_string_fast(client_ipv4_address, ip_address_as_string);
// logger << log4cpp::Priority::ERROR << "client ip: " << convert_ip_as_uint_to_string(client_ipv4_address);
} else if (client_addr.sin_family == AF_INET6) {
// We do not support them now
@ -357,9 +359,9 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
packet.agent_ip_address = client_ipv4_address;
for (auto record : vector_tuple) {
int32_t record_type = std::get<0>(record);
uint8_t* payload_ptr = std::get<1>(record);
int32_t record_length = std::get<2>(record);
int32_t record_type = std::get<0>(record);
const uint8_t* payload_ptr = std::get<1>(record);
int32_t record_length = std::get<2>(record);
// std::cout << "flow record " << " record_type: " << record_type
// << " record_length: " << record_length << std::endl;
@ -374,7 +376,7 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
sflow_raw_protocol_header.network_to_host_byte_order();
// logger << log4cpp::Priority::DEBUG << "Raw protocol header: " << sflow_raw_protocol_header.print();
uint8_t* header_payload_pointer = payload_ptr + sizeof(sflow_raw_protocol_header_t);
const uint8_t* header_payload_pointer = payload_ptr + sizeof(sflow_raw_protocol_header_t);
if (sflow_raw_protocol_header.header_protocol == SFLOW_HEADER_PROTOCOL_ETHERNET) {
@ -435,6 +437,12 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
packet.input_interface = sflow_sample_header_unified_accessor.input_port_index;
packet.output_interface = sflow_sample_header_unified_accessor.output_port_index;
// In sFlow we have special way to learn that packet was discarded from type of output interface
if (sflow_sample_header_unified_accessor.output_port_type == SFLOW_PORT_FORMAT_PACKET_DISCARDED) {
packet.forwarding_status = forwarding_status_t::dropped;
sflow_packets_discarded++;
}
// std::cout << print_simple_packet(packet) << std::endl;
} else if (record_type == SFLOW_RECORD_TYPE_EXTENDED_ROUTER_DATA) {
sflow_extended_router_data_records++;
@ -462,7 +470,7 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
}
// We're ready to parse it
sflow_extended_gateway_information_t* gateway_details = (sflow_extended_gateway_information_t*)payload_ptr;
const sflow_extended_gateway_information_t* gateway_details = (sflow_extended_gateway_information_t*)payload_ptr;
packet.src_asn = fast_ntoh(gateway_details->router_asn);
packet.dst_asn = fast_ntoh(gateway_details->source_asn);
@ -483,9 +491,9 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
// Awesome description about v5 format from AMX-IX folks:
// Header structure from AMS-IX folks:
// http://www.sflow.org/developers/diagrams/sFlowV5Datagram.pdf
void parse_sflow_v5_packet(uint8_t* payload_ptr, unsigned int payload_length, uint32_t client_ipv4_address) {
void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_length, uint32_t client_ipv4_address) {
sflow_packet_header_unified_accessor sflow_header_accessor;
uint8_t* total_packet_end = payload_ptr + payload_length;
const uint8_t* total_packet_end = payload_ptr + payload_length;
// Increase total number of packets
sflow_total_packets++;
@ -508,7 +516,7 @@ void parse_sflow_v5_packet(uint8_t* payload_ptr, unsigned int payload_length, ui
std::vector<sample_tuple_t> samples_vector;
samples_vector.reserve(sflow_header_accessor.get_datagram_samples_count());
uint8_t* samples_block_start = payload_ptr + sflow_header_accessor.get_original_payload_length();
const uint8_t* samples_block_start = payload_ptr + sflow_header_accessor.get_original_payload_length();
bool discovered_padding = false;
@ -536,39 +544,39 @@ void parse_sflow_v5_packet(uint8_t* payload_ptr, unsigned int payload_length, ui
uint8_t* data_pointer = std::get<2>(sample);
size_t data_length = std::get<3>(sample);
if (enterprise == 0) {
sflow_sample_type_t sample_format = sflow_sample_type_from_integer(integer_format);
if (enterprise != 0) {
// We do not support vendor specific additions
continue;
}
if (sample_format == sflow_sample_type_t::BROKEN_TYPE) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type number: " << integer_format;
sflow_sample_type_t sample_format = sflow_sample_type_from_integer(integer_format);
return;
}
if (sample_format == sflow_sample_type_t::BROKEN_TYPE) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type number: " << integer_format;
continue;
}
// Move this code to separate function!!!
if (sample_format == sflow_sample_type_t::FLOW_SAMPLE) {
// std::cout << "We got flow sample" << std::endl;
process_sflow_flow_sample(data_pointer, data_length, false, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::COUNTER_SAMPLE) {
// std::cout << "We got counter sample" << std::endl;
// TODO: add support for sflow counetrs
// process_sflow_counter_sample(data_pointer, data_length, false, sflow_header_accessor);
sflow_counter_sample++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_FLOW_SAMPLE) {
// std::cout << "We got expanded flow sample" << std::endl;
process_sflow_flow_sample(data_pointer, data_length, true, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_COUNTER_SAMPLE) {
// TODO:add support for sflow counetrs
// std::cout << "We got expanded counter sample" << std::endl;
////process_sflow_counter_sample(data_pointer, data_length, true, sflow_header_accessor);
sflow_counter_sample++;
} else {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type: " << integer_format;
}
// Move this code to separate function!!!
if (sample_format == sflow_sample_type_t::FLOW_SAMPLE) {
// std::cout << "We got flow sample" << std::endl;
process_sflow_flow_sample(data_pointer, data_length, false, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::COUNTER_SAMPLE) {
// std::cout << "We got counter sample" << std::endl;
// TODO: add support for sflow counetrs
// process_sflow_counter_sample(data_pointer, data_length, false, sflow_header_accessor);
sflow_counter_sample++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_FLOW_SAMPLE) {
// std::cout << "We got expanded flow sample" << std::endl;
process_sflow_flow_sample(data_pointer, data_length, true, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_COUNTER_SAMPLE) {
// TODO:add support for sflow counetrs
// std::cout << "We got expanded counter sample" << std::endl;
////process_sflow_counter_sample(data_pointer, data_length, true, sflow_header_accessor);
sflow_counter_sample++;
} else {
// do nothing because we haven't support for custom sFLOW data formats
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type: " << integer_format;
}
}
}