Skip to content

Commit

Permalink
counts received gossip packets inside the thread-pool (solana-labs#4445)
Browse files Browse the repository at this point in the history
We already traverse packets inside the thread-pool and know the variant,
so might as well count packets there instead of a separate traversal.
  • Loading branch information
behzadnouri authored Jan 15, 2025
1 parent 2a566a4 commit b2e14ef
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 49 deletions.
61 changes: 20 additions & 41 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2272,26 +2272,25 @@ impl ClusterInfo {
thread_pool: &ThreadPool,
) -> Result<(), GossipError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
fn count_packets_received(packets: &PacketBatch, counts: &mut [u64; 7]) {
fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) {
for packet in packets {
let k = packet
.data(..4)
.and_then(|data| <[u8; 4]>::try_from(data).ok())
.map(u32::from_le_bytes)
.filter(|&k| k < 6)
.unwrap_or(/*invalid:*/ 6) as usize;
counts[k] += 1;
dropped_packets_counts[k] += 1;
}
}
let mut counts = [0u64; 7];
let mut dropped_packets_counts = [0u64; 7];
let mut num_packets = 0;
let mut packets = VecDeque::with_capacity(2);
for packet_batch in receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
.chain(receiver.try_iter())
{
count_packets_received(&packet_batch, &mut counts);
num_packets += packet_batch.len();
packets.push_back(packet_batch);
while num_packets > MAX_GOSSIP_TRAFFIC {
Expand All @@ -2300,59 +2299,39 @@ impl ClusterInfo {
break;
};
num_packets -= packet_batch.len();
self.stats
.gossip_packets_dropped_count
.add_relaxed(packet_batch.len() as u64);
count_dropped_packets(&packet_batch, &mut dropped_packets_counts);
}
}
fn verify_packet(packet: &Packet) -> Option<(SocketAddr, Protocol)> {
let protocol: Protocol = packet.deserialize_slice(..).ok()?;
let num_packets_dropped = self.stats.record_dropped_packets(&dropped_packets_counts);
self.stats
.packets_received_count
.add_relaxed(num_packets as u64 + num_packets_dropped);
fn verify_packet(packet: &Packet, stats: &GossipStats) -> Option<(SocketAddr, Protocol)> {
let protocol: Protocol =
stats.record_received_packet(packet.deserialize_slice::<Protocol, _>(..))?;
protocol.sanitize().ok()?;
protocol
.par_verify()
.then(|| (packet.meta().socket_addr(), protocol))
protocol.par_verify().then(|| {
stats.packets_received_verified_count.add_relaxed(1);
(packet.meta().socket_addr(), protocol)
})
}
let packets: Vec<_> = {
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time);
thread_pool.install(|| {
if packets.len() == 1 {
packets[0].par_iter().filter_map(verify_packet).collect()
packets[0]
.par_iter()
.filter_map(|packet| verify_packet(packet, &self.stats))
.collect()
} else {
packets
.par_iter()
.flatten()
.filter_map(verify_packet)
.filter_map(|packet| verify_packet(packet, &self.stats))
.collect()
}
})
};
self.stats
.packets_received_count
.add_relaxed(counts.iter().sum::<u64>());
self.stats
.packets_received_pull_requests_count
.add_relaxed(counts[0]);
self.stats
.packets_received_pull_responses_count
.add_relaxed(counts[1]);
self.stats
.packets_received_push_messages_count
.add_relaxed(counts[2]);
self.stats
.packets_received_prune_messages_count
.add_relaxed(counts[3]);
self.stats
.packets_received_ping_messages_count
.add_relaxed(counts[4]);
self.stats
.packets_received_pong_messages_count
.add_relaxed(counts[5]);
self.stats
.packets_received_unknown_count
.add_relaxed(counts[6]);
self.stats
.packets_received_verified_count
.add_relaxed(packets.len() as u64);
Ok(sender.send(packets)?)
}

Expand Down
62 changes: 54 additions & 8 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::crds_gossip::CrdsGossip,
crate::{crds_gossip::CrdsGossip, protocol::Protocol},
itertools::Itertools,
solana_measure::measure::Measure,
solana_sdk::{clock::Slot, pubkey::Pubkey},
Expand Down Expand Up @@ -125,13 +125,13 @@ pub struct GossipStats {
pub(crate) new_push_requests_num: Counter,
pub(crate) num_unverifed_gossip_addrs: Counter,
pub(crate) packets_received_count: Counter,
pub(crate) packets_received_ping_messages_count: Counter,
pub(crate) packets_received_pong_messages_count: Counter,
pub(crate) packets_received_prune_messages_count: Counter,
pub(crate) packets_received_pull_requests_count: Counter,
pub(crate) packets_received_pull_responses_count: Counter,
pub(crate) packets_received_push_messages_count: Counter,
pub(crate) packets_received_unknown_count: Counter,
packets_received_ping_messages_count: Counter,
packets_received_pong_messages_count: Counter,
packets_received_prune_messages_count: Counter,
packets_received_pull_requests_count: Counter,
packets_received_pull_responses_count: Counter,
packets_received_push_messages_count: Counter,
packets_received_unknown_count: Counter,
pub(crate) packets_received_verified_count: Counter,
pub(crate) packets_sent_gossip_requests_count: Counter,
pub(crate) packets_sent_prune_messages_count: Counter,
Expand Down Expand Up @@ -177,6 +177,52 @@ pub struct GossipStats {
pub(crate) window_request_loopback: Counter,
}

impl GossipStats {
#[inline]
pub(crate) fn record_received_packet<E>(
&self,
protocol: Result<Protocol, E>,
) -> Option<Protocol> {
let Ok(protocol) = protocol else {
self.packets_received_unknown_count.add_relaxed(1);
return None;
};
match protocol {
Protocol::PullRequest(..) => &self.packets_received_pull_requests_count,
Protocol::PullResponse(..) => &self.packets_received_pull_responses_count,
Protocol::PushMessage(..) => &self.packets_received_push_messages_count,
Protocol::PruneMessage(..) => &self.packets_received_prune_messages_count,
Protocol::PingMessage(_) => &self.packets_received_ping_messages_count,
Protocol::PongMessage(_) => &self.packets_received_pong_messages_count,
}
.add_relaxed(1);
Some(protocol)
}

// Updates metrics from count of dropped packets.
pub(crate) fn record_dropped_packets(&self, counts: &[u64; 7]) -> u64 {
let num_packets_dropped = counts.iter().sum::<u64>();
if num_packets_dropped > 0u64 {
self.gossip_packets_dropped_count
.add_relaxed(num_packets_dropped);
self.packets_received_pull_requests_count
.add_relaxed(counts[0]);
self.packets_received_pull_responses_count
.add_relaxed(counts[1]);
self.packets_received_push_messages_count
.add_relaxed(counts[2]);
self.packets_received_prune_messages_count
.add_relaxed(counts[3]);
self.packets_received_ping_messages_count
.add_relaxed(counts[4]);
self.packets_received_pong_messages_count
.add_relaxed(counts[5]);
self.packets_received_unknown_count.add_relaxed(counts[6]);
}
num_packets_dropped
}
}

pub(crate) fn submit_gossip_stats(
stats: &GossipStats,
gossip: &CrdsGossip,
Expand Down

0 comments on commit b2e14ef

Please sign in to comment.