Unified AF_PACKET module with Advanced edition

This commit is contained in:
Pavel Odintsov 2024-07-17 21:03:57 +03:00
parent 75ccf4aee2
commit 89bd4e0dda
2 changed files with 42 additions and 22 deletions

View File

@ -204,9 +204,10 @@ void walk_block(struct block_desc* pbd) {
packet.sample_ratio = mirror_af_packet_custom_sampling_rate;
}
auto result = parse_raw_packet_to_simple_packet_full_ng((u_char*)data_pointer, ppd->tp_snaplen, ppd->tp_snaplen,
packet, fastnetmon_global_configuration.af_packet_extract_tunnel_traffic,
fastnetmon_global_configuration.af_packet_read_packet_length_from_ip_header);
auto result =
parse_raw_packet_to_simple_packet_full_ng((u_char*)data_pointer, ppd->tp_snaplen, ppd->tp_snaplen, packet,
fastnetmon_global_configuration.af_packet_extract_tunnel_traffic,
fastnetmon_global_configuration.af_packet_read_packet_length_from_ip_header);
if (result != network_data_stuctures::parser_code_t::success) {
// This counter resets for speed calculation every second
@ -241,7 +242,14 @@ bool setup_socket(std::string interface_name, bool enable_fanout, int fanout_gro
return false;
}
// We whould use V3 bcause it could read/pool in per block basis instead per packet
if (true) {
// Add socket to global structure, we will use it to get statistics for each of them
std::lock_guard<std::mutex> lock_guard(active_af_packet_sockets_mutex);
active_af_packet_sockets.push_back(packet_socket);
}
// We should use V3 because it could read/pool in per block basis instead per
// packet
int version = TPACKET_V3;
int setsockopt_packet_version = setsockopt(packet_socket, SOL_PACKET, PACKET_VERSION, &version, sizeof(version));
@ -275,8 +283,8 @@ bool setup_socket(std::string interface_name, bool enable_fanout, int fanout_gro
return false;
}
// We will
// follow http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html
// We will follow
// http://yusufonlinux.blogspot.ru/2010/11/data-link-access-and-zero-copy.html
// And this:
// https://www.kernel.org/doc/Documentation/networking/packet_mmap.txt
struct tpacket_req3 req;
@ -297,22 +305,27 @@ bool setup_socket(std::string interface_name, bool enable_fanout, int fanout_gro
return false;
}
// We use per thread structures
uint8_t* mapped_buffer = NULL;
struct iovec* rd = NULL;
size_t buffer_size = req.tp_block_size * req.tp_block_nr;
mapped_buffer = (uint8_t*)mmap(NULL, req.tp_block_size * req.tp_block_nr, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_LOCKED, packet_socket, 0);
logger << log4cpp::Priority::DEBUG << "Allocating " << buffer_size << " byte buffer for AF_PACKET interface: " << interface_name;
uint8_t* mapped_buffer =
(uint8_t*)mmap(NULL, buffer_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, packet_socket, 0);
if (mapped_buffer == MAP_FAILED) {
logger << log4cpp::Priority::ERROR << "MMAP failed errno: " << errno << " error: " << strerror(errno);
logger << log4cpp::Priority::ERROR << "mmap failed errno: " << errno << " error: " << strerror(errno);
return false;
}
// Allocate iov structure for each block
rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec));
struct iovec* rd = (struct iovec*)malloc(req.tp_block_nr * sizeof(struct iovec));
// Initilise iov structures
if (rd == NULL) {
logger << log4cpp::Priority::ERROR << "Cannot allocate memory for iovecs for " << interface_name;
return false;
}
// Initialise iov structures
for (unsigned int i = 0; i < req.tp_block_nr; ++i) {
rd[i].iov_base = mapped_buffer + (i * req.tp_block_size);
rd[i].iov_len = req.tp_block_size;
@ -328,7 +341,7 @@ bool setup_socket(std::string interface_name, bool enable_fanout, int fanout_gro
int bind_result = bind(packet_socket, (struct sockaddr*)&bind_address, sizeof(bind_address));
if (bind_result == -1) {
logger << log4cpp::Priority::ERROR << "Can't bind to AF_PACKET socket";
logger << log4cpp::Priority::ERROR << "Can't bind to AF_PACKET socket for " << interface_name;
return false;
}
@ -338,12 +351,21 @@ bool setup_socket(std::string interface_name, bool enable_fanout, int fanout_gro
int setsockopt_fanout = setsockopt(packet_socket, SOL_PACKET, PACKET_FANOUT, &fanout_arg, sizeof(fanout_arg));
if (setsockopt_fanout < 0) {
logger << log4cpp::Priority::ERROR << "Can't configure fanout error number: " << errno
<< " error: " << strerror(errno);
logger << log4cpp::Priority::ERROR << "Can't configure fanout for interface " << interface_name
<< " error number: " << errno << " error: " << strerror(errno);
return false;
}
}
// Start traffic collection loop
read_packets_from_socket(packet_socket, rd);
return true;
}
// Reads traffic from iovec using poll
void read_packets_from_socket(int packet_socket, struct iovec* rd) {
unsigned int current_block_num = 0;
struct pollfd pfd;
@ -369,16 +391,13 @@ bool setup_socket(std::string interface_name, bool enable_fanout, int fanout_gro
current_block_num = (current_block_num + 1) % blocknum;
}
return true;
return;
}
void start_af_packet_capture(std::string interface_name, bool enable_fanout, int fanout_group_id) {
setup_socket(interface_name, enable_fanout, fanout_group_id);
}
// Could get some speed up on NUMA servers
bool afpacket_execute_strict_cpu_affinity = false;
void start_afpacket_collection(process_packet_pointer func_ptr) {
logger << log4cpp::Priority::INFO << "AF_PACKET plugin started";
afpacket_process_func_ptr = func_ptr;
@ -444,7 +463,7 @@ void start_af_packet_capture_for_interface(std::string capture_interface, int fa
boost::thread::attributes thread_attrs;
if (afpacket_execute_strict_cpu_affinity) {
if (fastnetmon_global_configuration.afpacket_execute_strict_cpu_affinity) {
cpu_set_t current_cpu_set;
int cpu_to_bind = cpu % num_cpus;

View File

@ -29,6 +29,7 @@ class fastnetmon_configuration_t {
std::string mirror_af_packet_fanout_mode{ "cpu" };
bool af_packet_read_packet_length_from_ip_header{ false };
bool af_packet_extract_tunnel_traffic{ false };
bool afpacket_execute_strict_cpu_affinity{ false };
// Clickhouse metrics
bool clickhouse_metrics{ false };