Skip to content

Commit

Permalink
removes ContactInfo::set_{serve_repair,tpu_vote,tvu}_quic api (solana…
Browse files Browse the repository at this point in the history
…-labs#4505)

It is not obvious that

    ContactInfo::set_{serve_repair,tpu_vote,tvu}(...)

are only setting the UDP port. Also having separate api for

    ContactInfo::set_{serve_repair,tpu_vote,tvu}_quic(...)

is redundant and the intended protocol can instead be explicitly passed
as an argument. For example:

    node.set_tvu(Protocol::QUIC, socket)

or

    node.set_tvu(Protocol::UDP, socket)
  • Loading branch information
behzadnouri authored Jan 17, 2025
1 parent 9ec0980 commit e0d672b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 48 deletions.
21 changes: 11 additions & 10 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,7 @@ mod tests {

#[test]
fn window_index_request() {
use Protocol::{QUIC, UDP};
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(bank);
Expand Down Expand Up @@ -2022,15 +2023,15 @@ mod tests {
0u16, // shred_version
);
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu_quic((Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap();
nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap();
nxt.set_tpu_vote((Ipv4Addr::LOCALHOST, 1240)).unwrap();
nxt.set_tpu_vote(UDP, (Ipv4Addr::LOCALHOST, 1240)).unwrap();
nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap();
nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap();
nxt.set_serve_repair(serve_repair_addr).unwrap();
nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237))
nxt.set_serve_repair(UDP, serve_repair_addr).unwrap();
nxt.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 1237))
.unwrap();
cluster_info.insert_info(nxt.clone());
let rv = serve_repair
Expand All @@ -2057,15 +2058,15 @@ mod tests {
0u16, // shred_version
);
nxt.set_gossip((Ipv4Addr::LOCALHOST, 1234)).unwrap();
nxt.set_tvu((Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu_quic((Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 1235)).unwrap();
nxt.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 1236)).unwrap();
nxt.set_tpu((Ipv4Addr::LOCALHOST, 1238)).unwrap();
nxt.set_tpu_forwards((Ipv4Addr::LOCALHOST, 1239)).unwrap();
nxt.set_tpu_vote((Ipv4Addr::LOCALHOST, 1240)).unwrap();
nxt.set_tpu_vote(UDP, (Ipv4Addr::LOCALHOST, 1240)).unwrap();
nxt.set_rpc((Ipv4Addr::LOCALHOST, 1241)).unwrap();
nxt.set_rpc_pubsub((Ipv4Addr::LOCALHOST, 1242)).unwrap();
nxt.set_serve_repair(serve_repair_addr2).unwrap();
nxt.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 1237))
nxt.set_serve_repair(UDP, serve_repair_addr2).unwrap();
nxt.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 1237))
.unwrap();
cluster_info.insert_info(nxt);
let mut one = false;
Expand Down
59 changes: 42 additions & 17 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2650,31 +2650,46 @@ impl Node {
$name
))
};
($method:ident, $protocol:ident, $addr:expr, $name:literal) => {{
info.$method(contact_info::Protocol::$protocol, $addr)
.expect(&format!(
"Operator must spin up node with valid {} address",
$name
))
}};
}
set_socket!(set_gossip, gossip_addr, "gossip");
set_socket!(set_tvu, tvu.local_addr().unwrap(), "TVU");
set_socket!(set_tvu_quic, tvu_quic.local_addr().unwrap(), "TVU QUIC");
set_socket!(set_tvu, UDP, tvu.local_addr().unwrap(), "TVU");
set_socket!(set_tvu, QUIC, tvu_quic.local_addr().unwrap(), "TVU QUIC");
set_socket!(set_tpu, tpu.local_addr().unwrap(), "TPU");
set_socket!(
set_tpu_forwards,
tpu_forwards.local_addr().unwrap(),
"TPU-forwards"
);
set_socket!(set_tpu_vote, tpu_vote.local_addr().unwrap(), "TPU-vote");
set_socket!(
set_tpu_vote_quic,
set_tpu_vote,
UDP,
tpu_vote.local_addr().unwrap(),
"TPU-vote"
);
set_socket!(
set_tpu_vote,
QUIC,
tpu_vote_quic[0].local_addr().unwrap(),
"TPU-vote QUIC"
);
set_socket!(set_rpc, rpc_addr, "RPC");
set_socket!(set_rpc_pubsub, rpc_pubsub_addr, "RPC-pubsub");
set_socket!(
set_serve_repair,
UDP,
serve_repair.local_addr().unwrap(),
"serve-repair"
);
set_socket!(
set_serve_repair_quic,
set_serve_repair,
QUIC,
serve_repair_quic.local_addr().unwrap(),
"serve-repair QUIC"
);
Expand Down Expand Up @@ -2814,22 +2829,30 @@ impl Node {
$name
))
};
($method:ident, $protocol:ident, $port:ident, $name:literal) => {{
info.$method(contact_info::Protocol::$protocol, (addr, $port))
.expect(&format!(
"Operator must spin up node with valid {} address",
$name
))
}};
}
set_socket!(set_gossip, gossip_port, "gossip");
set_socket!(set_tvu, tvu_port, "TVU");
set_socket!(set_tvu_quic, tvu_quic_port, "TVU QUIC");
set_socket!(set_tvu, UDP, tvu_port, "TVU");
set_socket!(set_tvu, QUIC, tvu_quic_port, "TVU QUIC");
set_socket!(set_tpu, tpu_port, "TPU");
set_socket!(set_tpu_forwards, tpu_forwards_port, "TPU-forwards");
set_socket!(set_tpu_vote, tpu_vote_port, "TPU-vote");
set_socket!(set_tpu_vote, UDP, tpu_vote_port, "TPU-vote");
set_socket!(set_tpu_vote, QUIC, tpu_vote_quic_port, "TPU-vote QUIC");
set_socket!(set_rpc, rpc_port, "RPC");
set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub");
set_socket!(set_serve_repair, serve_repair_port, "serve-repair");
set_socket!(set_serve_repair, UDP, serve_repair_port, "serve-repair");
set_socket!(
set_serve_repair_quic,
set_serve_repair,
QUIC,
serve_repair_quic_port,
"serve-repair QUIC"
);
set_socket!(set_tpu_vote_quic, tpu_vote_quic_port, "TPU-vote QUIC");

trace!("new ContactInfo: {:?}", info);

Expand Down Expand Up @@ -2959,20 +2982,22 @@ impl Node {
0u16, // shred_version
);
let addr = gossip_addr.ip();
use contact_info::Protocol::{QUIC, UDP};
info.set_gossip((addr, gossip_port)).unwrap();
info.set_tvu((addr, tvu_port)).unwrap();
info.set_tvu_quic((addr, tvu_quic_port)).unwrap();
info.set_tvu(UDP, (addr, tvu_port)).unwrap();
info.set_tvu(QUIC, (addr, tvu_quic_port)).unwrap();
info.set_tpu(public_tpu_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_port)))
.unwrap();
info.set_tpu_forwards(
public_tpu_forwards_addr.unwrap_or_else(|| SocketAddr::new(addr, tpu_forwards_port)),
)
.unwrap();
info.set_tpu_vote((addr, tpu_vote_port)).unwrap();
info.set_serve_repair((addr, serve_repair_port)).unwrap();
info.set_serve_repair_quic((addr, serve_repair_quic_port))
info.set_tpu_vote(UDP, (addr, tpu_vote_port)).unwrap();
info.set_tpu_vote(QUIC, (addr, tpu_vote_quic_port)).unwrap();
info.set_serve_repair(UDP, (addr, serve_repair_port))
.unwrap();
info.set_serve_repair(QUIC, (addr, serve_repair_quic_port))
.unwrap();
info.set_tpu_vote_quic((addr, tpu_vote_quic_port)).unwrap();

trace!("new ContactInfo: {:?}", info);

Expand Down
53 changes: 36 additions & 17 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ macro_rules! set_socket {
self.set_socket($quic, get_quic_socket(&socket)?)
}
};
(@multi $name:ident, $udp:ident, $quic:ident) => {
pub fn $name<T>(&mut self, protocol: Protocol, socket: T) -> Result<(), Error>
where
SocketAddr: From<T>,
{
let socket = SocketAddr::from(socket);
let key = match protocol {
Protocol::UDP => $udp,
Protocol::QUIC => $quic,
};
self.set_socket(key, socket)
}
};
}

macro_rules! remove_socket {
Expand Down Expand Up @@ -259,18 +272,15 @@ impl ContactInfo {
set_socket!(set_gossip, SOCKET_TAG_GOSSIP);
set_socket!(set_rpc, SOCKET_TAG_RPC);
set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR);
set_socket!(set_serve_repair_quic, SOCKET_TAG_SERVE_REPAIR_QUIC);
set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
set_socket!(
set_tpu_forwards,
SOCKET_TAG_TPU_FORWARDS,
SOCKET_TAG_TPU_FORWARDS_QUIC
);
set_socket!(set_tpu_vote, SOCKET_TAG_TPU_VOTE);
set_socket!(set_tpu_vote_quic, SOCKET_TAG_TPU_VOTE_QUIC);
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC);
set_socket!(@multi set_serve_repair, SOCKET_TAG_SERVE_REPAIR, SOCKET_TAG_SERVE_REPAIR_QUIC);
set_socket!(@multi set_tpu_vote, SOCKET_TAG_TPU_VOTE, SOCKET_TAG_TPU_VOTE_QUIC);
set_socket!(@multi set_tvu, SOCKET_TAG_TVU, SOCKET_TAG_TVU_QUIC);

remove_socket!(
remove_serve_repair,
Expand Down Expand Up @@ -409,25 +419,30 @@ impl ContactInfo {

// Only for tests and simulations.
pub fn new_localhost(pubkey: &Pubkey, wallclock: u64) -> Self {
use Protocol::{QUIC, UDP};
let mut node = Self::new(*pubkey, wallclock, /*shred_version:*/ 0u16);
node.set_gossip((Ipv4Addr::LOCALHOST, 8000)).unwrap();
node.set_tvu((Ipv4Addr::LOCALHOST, 8001)).unwrap();
node.set_tvu_quic((Ipv4Addr::LOCALHOST, 8002)).unwrap();
node.set_tvu(UDP, (Ipv4Addr::LOCALHOST, 8001)).unwrap();
node.set_tvu(QUIC, (Ipv4Addr::LOCALHOST, 8002)).unwrap();
node.set_tpu((Ipv4Addr::LOCALHOST, 8003)).unwrap(); // quic: 8009
node.set_tpu_forwards((Ipv4Addr::LOCALHOST, 8004)).unwrap(); // quic: 8010
node.set_tpu_vote((Ipv4Addr::LOCALHOST, 8005)).unwrap();
node.set_tpu_vote(UDP, (Ipv4Addr::LOCALHOST, 8005)).unwrap();
node.set_tpu_vote(QUIC, (Ipv4Addr::LOCALHOST, 8007))
.unwrap();
node.set_rpc((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PORT))
.unwrap();
node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap();
node.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 8006))
node.set_serve_repair(UDP, (Ipv4Addr::LOCALHOST, 8008))
.unwrap();
node.set_serve_repair(QUIC, (Ipv4Addr::LOCALHOST, 8006))
.unwrap();
node
}

// Only for tests and simulations.
pub fn new_with_socketaddr(pubkey: &Pubkey, socket: &SocketAddr) -> Self {
use Protocol::{QUIC, UDP};
assert_matches!(sanitize_socket(socket), Ok(()));
let mut node = Self::new(
*pubkey,
Expand All @@ -436,16 +451,17 @@ impl ContactInfo {
);
let (addr, port) = (socket.ip(), socket.port());
node.set_gossip((addr, port + 1)).unwrap();
node.set_tvu((addr, port + 2)).unwrap();
node.set_tvu_quic((addr, port + 3)).unwrap();
node.set_tvu(UDP, (addr, port + 2)).unwrap();
node.set_tvu(QUIC, (addr, port + 3)).unwrap();
node.set_tpu((addr, port)).unwrap(); // quic: port + 6
node.set_tpu_forwards((addr, port + 5)).unwrap(); // quic: port + 11
node.set_tpu_vote((addr, port + 7)).unwrap();
node.set_tpu_vote(UDP, (addr, port + 7)).unwrap();
node.set_tpu_vote(QUIC, (addr, port + 9)).unwrap();
node.set_rpc((addr, DEFAULT_RPC_PORT)).unwrap();
node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((addr, port + 8)).unwrap();
node.set_serve_repair_quic((addr, port + 4)).unwrap();
node.set_serve_repair(UDP, (addr, port + 8)).unwrap();
node.set_serve_repair(QUIC, (addr, port + 4)).unwrap();
node
}

Expand Down Expand Up @@ -1093,7 +1109,10 @@ mod tests {
{
let mut other = node.clone();
while other.set_gossip(new_rand_socket(&mut rng)).is_err() {}
while other.set_serve_repair(new_rand_socket(&mut rng)).is_err() {}
while other
.set_serve_repair(Protocol::UDP, new_rand_socket(&mut rng))
.is_err()
{}
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
Expand Down
6 changes: 3 additions & 3 deletions gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ pub(crate) mod tests {
caller: &CrdsValue,
num_items: usize,
) {
let packet_data_size_range = (PACKET_DATA_SIZE - 5)..=PACKET_DATA_SIZE;
let packet_data_size_range = (PACKET_DATA_SIZE - 7)..=PACKET_DATA_SIZE;
let max_bytes = get_max_bloom_filter_bytes(caller);
let filters = CrdsFilterSet::new(rng, num_items, max_bytes);
let request_bytes = caller.bincode_serialized_size() as u64;
Expand All @@ -1467,7 +1467,7 @@ pub(crate) mod tests {
let keypair = Keypair::new();
let node = {
let mut node =
ContactInfo::new_localhost(&keypair.pubkey(), /*wallclock:*/ rng.gen());
ContactInfo::new_localhost(&keypair.pubkey(), /*wallclock:*/ timestamp());
node.set_shred_version(rng.gen());
node
};
Expand All @@ -1491,7 +1491,7 @@ pub(crate) mod tests {
};
{
let caller = CrdsValue::new(CrdsData::from(&node), &keypair);
assert_eq!(get_max_bloom_filter_bytes(&caller), 1165);
assert_eq!(get_max_bloom_filter_bytes(&caller), 1155);
verify_get_max_bloom_filter_bytes(&mut rng, &caller, num_items);
}
let node = LegacyContactInfo::try_from(&node).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion turbine/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let socket = bind_to_unspecified().unwrap();
let mut contact_info = ContactInfo::new_localhost(&id, timestamp());
let port = socket.local_addr().unwrap().port();
contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap();
contact_info
.set_tvu(Protocol::UDP, (Ipv4Addr::LOCALHOST, port))
.unwrap();
info!("local: {:?}", contact_info.tvu(Protocol::UDP).unwrap());
cluster_info.insert_info(contact_info);
socket.set_nonblocking(true).unwrap();
Expand Down

0 comments on commit e0d672b

Please sign in to comment.