From b2e14ef5feb56cb406ea427af526f28671717226 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 15 Jan 2025 16:01:49 +0000 Subject: [PATCH] counts received gossip packets inside the thread-pool (#4445) We already traverse packets inside the thread-pool and know the variant, so might as well count packets there instead of a separate traversal. --- gossip/src/cluster_info.rs | 61 ++++++++++------------------- gossip/src/cluster_info_metrics.rs | 62 ++++++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 49 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b2c38340891dc7..bdb39ba5099de4 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2272,7 +2272,7 @@ impl ClusterInfo { thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - fn count_packets_received(packets: &PacketBatch, counts: &mut [u64; 7]) { + fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) { for packet in packets { let k = packet .data(..4) @@ -2280,10 +2280,10 @@ impl ClusterInfo { .map(u32::from_le_bytes) .filter(|&k| k < 6) .unwrap_or(/*invalid:*/ 6) as usize; - counts[k] += 1; + dropped_packets_counts[k] += 1; } } - let mut counts = [0u64; 7]; + let mut dropped_packets_counts = [0u64; 7]; let mut num_packets = 0; let mut packets = VecDeque::with_capacity(2); for packet_batch in receiver @@ -2291,7 +2291,6 @@ impl ClusterInfo { .map(std::iter::once)? .chain(receiver.try_iter()) { - count_packets_received(&packet_batch, &mut counts); num_packets += packet_batch.len(); packets.push_back(packet_batch); while num_packets > MAX_GOSSIP_TRAFFIC { @@ -2300,59 +2299,39 @@ impl ClusterInfo { break; }; num_packets -= packet_batch.len(); - self.stats - .gossip_packets_dropped_count - .add_relaxed(packet_batch.len() as u64); + count_dropped_packets(&packet_batch, &mut dropped_packets_counts); } } - fn verify_packet(packet: &Packet) -> Option<(SocketAddr, Protocol)> { - let protocol: Protocol = packet.deserialize_slice(..).ok()?; + let num_packets_dropped = self.stats.record_dropped_packets(&dropped_packets_counts); + self.stats + .packets_received_count + .add_relaxed(num_packets as u64 + num_packets_dropped); + fn verify_packet(packet: &Packet, stats: &GossipStats) -> Option<(SocketAddr, Protocol)> { + let protocol: Protocol = + stats.record_received_packet(packet.deserialize_slice::(..))?; protocol.sanitize().ok()?; - protocol - .par_verify() - .then(|| (packet.meta().socket_addr(), protocol)) + protocol.par_verify().then(|| { + stats.packets_received_verified_count.add_relaxed(1); + (packet.meta().socket_addr(), protocol) + }) } let packets: Vec<_> = { let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); thread_pool.install(|| { if packets.len() == 1 { - packets[0].par_iter().filter_map(verify_packet).collect() + packets[0] + .par_iter() + .filter_map(|packet| verify_packet(packet, &self.stats)) + .collect() } else { packets .par_iter() .flatten() - .filter_map(verify_packet) + .filter_map(|packet| verify_packet(packet, &self.stats)) .collect() } }) }; - self.stats - .packets_received_count - .add_relaxed(counts.iter().sum::()); - self.stats - .packets_received_pull_requests_count - .add_relaxed(counts[0]); - self.stats - .packets_received_pull_responses_count - .add_relaxed(counts[1]); - self.stats - .packets_received_push_messages_count - .add_relaxed(counts[2]); - self.stats - .packets_received_prune_messages_count - .add_relaxed(counts[3]); - self.stats - .packets_received_ping_messages_count - .add_relaxed(counts[4]); - self.stats - .packets_received_pong_messages_count - .add_relaxed(counts[5]); - self.stats - .packets_received_unknown_count - .add_relaxed(counts[6]); - self.stats - .packets_received_verified_count - .add_relaxed(packets.len() as u64); Ok(sender.send(packets)?) } diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 19b5f1c5790b3a..f799761aef1ad8 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -1,5 +1,5 @@ use { - crate::crds_gossip::CrdsGossip, + crate::{crds_gossip::CrdsGossip, protocol::Protocol}, itertools::Itertools, solana_measure::measure::Measure, solana_sdk::{clock::Slot, pubkey::Pubkey}, @@ -125,13 +125,13 @@ pub struct GossipStats { pub(crate) new_push_requests_num: Counter, pub(crate) num_unverifed_gossip_addrs: Counter, pub(crate) packets_received_count: Counter, - pub(crate) packets_received_ping_messages_count: Counter, - pub(crate) packets_received_pong_messages_count: Counter, - pub(crate) packets_received_prune_messages_count: Counter, - pub(crate) packets_received_pull_requests_count: Counter, - pub(crate) packets_received_pull_responses_count: Counter, - pub(crate) packets_received_push_messages_count: Counter, - pub(crate) packets_received_unknown_count: Counter, + packets_received_ping_messages_count: Counter, + packets_received_pong_messages_count: Counter, + packets_received_prune_messages_count: Counter, + packets_received_pull_requests_count: Counter, + packets_received_pull_responses_count: Counter, + packets_received_push_messages_count: Counter, + packets_received_unknown_count: Counter, pub(crate) packets_received_verified_count: Counter, pub(crate) packets_sent_gossip_requests_count: Counter, pub(crate) packets_sent_prune_messages_count: Counter, @@ -177,6 +177,52 @@ pub struct GossipStats { pub(crate) window_request_loopback: Counter, } +impl GossipStats { + #[inline] + pub(crate) fn record_received_packet( + &self, + protocol: Result, + ) -> Option { + let Ok(protocol) = protocol else { + self.packets_received_unknown_count.add_relaxed(1); + return None; + }; + match protocol { + Protocol::PullRequest(..) => &self.packets_received_pull_requests_count, + Protocol::PullResponse(..) => &self.packets_received_pull_responses_count, + Protocol::PushMessage(..) => &self.packets_received_push_messages_count, + Protocol::PruneMessage(..) => &self.packets_received_prune_messages_count, + Protocol::PingMessage(_) => &self.packets_received_ping_messages_count, + Protocol::PongMessage(_) => &self.packets_received_pong_messages_count, + } + .add_relaxed(1); + Some(protocol) + } + + // Updates metrics from count of dropped packets. + pub(crate) fn record_dropped_packets(&self, counts: &[u64; 7]) -> u64 { + let num_packets_dropped = counts.iter().sum::(); + if num_packets_dropped > 0u64 { + self.gossip_packets_dropped_count + .add_relaxed(num_packets_dropped); + self.packets_received_pull_requests_count + .add_relaxed(counts[0]); + self.packets_received_pull_responses_count + .add_relaxed(counts[1]); + self.packets_received_push_messages_count + .add_relaxed(counts[2]); + self.packets_received_prune_messages_count + .add_relaxed(counts[3]); + self.packets_received_ping_messages_count + .add_relaxed(counts[4]); + self.packets_received_pong_messages_count + .add_relaxed(counts[5]); + self.packets_received_unknown_count.add_relaxed(counts[6]); + } + num_packets_dropped + } +} + pub(crate) fn submit_gossip_stats( stats: &GossipStats, gossip: &CrdsGossip,