Skip to content

Commit

Permalink
feat(gossip): support ContactInfo as equivalent to LegacyContactInfo
Browse files Browse the repository at this point in the history
ContactInfo was being stored in the CrdsTable but it was missing some special treatment that LegacyContactInfo gets:
- insertion of CrdsValue is normally filtered by shred version, but LegacyContactInfo is inserted regardless of shred version
- getAllContactInfos and getContactInfos are used for tasks like identifying which gossip peers to communicate with. these methods were only returning LegacyContactInfo

this change makes it so ContactInfo now gets the same treatment:
- unconditional insertion into CrdsTable
- returned from contact info methods using new EitherContactInfo tagged union
  • Loading branch information
dnut committed Jan 26, 2024
1 parent 85b7c4c commit 386205d
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 41 deletions.
6 changes: 3 additions & 3 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub const ActiveSet = struct {

pub fn rotate(
self: *Self,
crds_peers: []crds.LegacyContactInfo,
crds_peers: []crds.EitherContactInfo,
) error{OutOfMemory}!void {
// clear the existing
var iter = self.pruned_peers.iterator();
Expand All @@ -67,11 +67,11 @@ pub const ActiveSet = struct {
}
const size = @min(crds_peers.len, NUM_ACTIVE_SET_ENTRIES);
var rng = std.rand.DefaultPrng.init(getWallclockMs());
pull_request.shuffleFirstN(rng.random(), crds.LegacyContactInfo, crds_peers, size);
pull_request.shuffleFirstN(rng.random(), crds.EitherContactInfo, crds_peers, size);

const bloom_num_items = @max(crds_peers.len, MIN_NUM_BLOOM_ITEMS);
for (0..size) |i| {
var entry = try self.pruned_peers.getOrPut(crds_peers[i].id);
var entry = try self.pruned_peers.getOrPut(crds_peers[i].pubkey());
if (entry.found_existing == false) {
// *full* hard restart on blooms -- labs doesnt do this - bug?
var bloom = try Bloom.random(
Expand Down
55 changes: 54 additions & 1 deletion src/gossip/crds.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const Hash = @import("../core/hash.zig").Hash;
const Signature = @import("../core/signature.zig").Signature;
const Transaction = @import("../core/transaction.zig").Transaction;
const Slot = @import("../core/clock.zig").Slot;
const ContactInfo = @import("node.zig").ContactInfo;
const node = @import("node.zig");
const ContactInfo = node.ContactInfo;
const bincode = @import("../bincode/bincode.zig");
const ArrayList = std.ArrayList;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
Expand Down Expand Up @@ -273,6 +274,44 @@ pub const LegacyContactInfo = struct {
}
};

pub const EitherContactInfo = union(enum) {
new: ContactInfo,
legacy: LegacyContactInfo,

pub fn pubkey(self: *const EitherContactInfo) Pubkey {
switch (self.*) {
.new => |info| {
return info.pubkey;
},
.legacy => |info| {
return info.id;
},
}
}

pub fn gossipAddr(self: *const EitherContactInfo) ?SocketAddr {
switch (self.*) {
.new => |info| {
return info.getSocket(node.SOCKET_TAG_GOSSIP);
},
.legacy => |info| {
return info.gossip;
},
}
}

pub fn shredVersion(self: *const EitherContactInfo) u16 {
switch (self.*) {
.new => |info| {
return info.shred_version;
},
.legacy => |info| {
return info.shred_version;
},
}
}
};

pub fn sanitizeSocket(socket: *const SocketAddr) !void {
if (socket.port() == 0) {
return error.InvalidPort;
Expand Down Expand Up @@ -455,6 +494,20 @@ pub const CrdsData = union(enum(u32)) {
},
}
}

pub fn eitherContactInfo(self: CrdsData) ?EitherContactInfo {
switch (self) {
.LegacyContactInfo => |v| {
return .{ .legacy = v };
},
.ContactInfo => |v| {
return .{ .new = v };
},
else => {
return null;
},
}
}
};

pub const Vote = struct {
Expand Down
59 changes: 43 additions & 16 deletions src/gossip/crds_table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const CrdsData = crds.CrdsData;
const CrdsVersionedValue = crds.CrdsVersionedValue;
const CrdsValueLabel = crds.CrdsValueLabel;
const LegacyContactInfo = crds.LegacyContactInfo;
const EitherContactInfo = crds.EitherContactInfo;

const ThreadPool = @import("../sync/thread_pool.zig").ThreadPool;
const Task = ThreadPool.Task;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub const CrdsTable = struct {

// special types tracked with their index
contact_infos: AutoArrayHashSet(usize),
legacy_contact_infos: AutoArrayHashSet(usize),
votes: AutoArrayHashMap(usize, usize),
epoch_slots: AutoArrayHashMap(usize, usize),
duplicate_shreds: AutoArrayHashMap(usize, usize),
Expand Down Expand Up @@ -107,6 +109,7 @@ pub const CrdsTable = struct {
return Self{
.store = AutoArrayHashMap(CrdsValueLabel, CrdsVersionedValue).init(allocator),
.contact_infos = AutoArrayHashSet(usize).init(allocator),
.legacy_contact_infos = AutoArrayHashSet(usize).init(allocator),
.shred_versions = AutoHashMap(Pubkey, u16).init(allocator),
.votes = AutoArrayHashMap(usize, usize).init(allocator),
.epoch_slots = AutoArrayHashMap(usize, usize).init(allocator),
Expand All @@ -123,6 +126,7 @@ pub const CrdsTable = struct {
pub fn deinit(self: *Self) void {
self.store.deinit();
self.contact_infos.deinit();
self.legacy_contact_infos.deinit();
self.shred_versions.deinit();
self.votes.deinit();
self.epoch_slots.deinit();
Expand Down Expand Up @@ -161,8 +165,12 @@ pub const CrdsTable = struct {
// entry doesnt exist
if (!result.found_existing) {
switch (value.data) {
.LegacyContactInfo => |*info| {
.ContactInfo => |*info| {
try self.contact_infos.put(entry_index, {});
try self.shred_versions.put(info.pubkey, info.shred_version);
},
.LegacyContactInfo => |*info| {
try self.legacy_contact_infos.put(entry_index, {});
try self.shred_versions.put(info.id, info.shred_version);
},
.Vote => {
Expand Down Expand Up @@ -386,27 +394,37 @@ pub const CrdsTable = struct {
);
}

pub fn getAllContactInfos(self: *const Self) error{OutOfMemory}!std.ArrayList(LegacyContactInfo) {
const n_contact_infos = self.contact_infos.count();
var contact_infos = try std.ArrayList(LegacyContactInfo).initCapacity(self.allocator, n_contact_infos);
var contact_indexs = self.contact_infos.keys();
for (contact_indexs) |index| {
pub fn getAllContactInfos(self: *const Self) error{OutOfMemory}!std.ArrayList(EitherContactInfo) {
const n_contact_infos = self.contact_infos.count() + self.legacy_contact_infos.count();
var contact_infos = try std.ArrayList(EitherContactInfo).initCapacity(self.allocator, n_contact_infos);

var new_contact_indexs = self.contact_infos.keys();
for (new_contact_indexs) |index| {
const entry: CrdsVersionedValue = self.store.values()[index];
contact_infos.appendAssumeCapacity(entry.value.data.LegacyContactInfo);
contact_infos.appendAssumeCapacity(.{ .new = entry.value.data.ContactInfo });
}
var legacy_contact_indexs = self.legacy_contact_infos.keys();
for (legacy_contact_indexs) |index| {
const entry: CrdsVersionedValue = self.store.values()[index];
contact_infos.appendAssumeCapacity(.{ .legacy = entry.value.data.LegacyContactInfo });
}

return contact_infos;
}

pub fn getContactInfos(self: *const Self, buf: []CrdsVersionedValue) []CrdsVersionedValue {
const store_values = self.store.values();
const contact_indexs = self.contact_infos.iterator().keys;
const size = @min(self.contact_infos.count(), buf.len);

for (0..size) |i| {
const index = contact_indexs[i];
const entry = store_values[index];
buf[i] = entry;
var size: usize = 0;
inline for (.{ self.legacy_contact_infos, self.contact_infos }) |infos| {
const contact_indexs = infos.iterator().keys;
const n_from_current = @min(infos.count(), buf.len - size);

for (0..n_from_current) |i| {
const index = contact_indexs[i];
const entry = store_values[index];
buf[i + size] = entry;
}
size += n_from_current;
}
return buf[0..size];
}
Expand Down Expand Up @@ -469,10 +487,14 @@ pub const CrdsTable = struct {
self.shards.remove(entry_index, &hash);

switch (versioned_value.value.data) {
.LegacyContactInfo => {
.ContactInfo => {
var did_remove = self.contact_infos.swapRemove(entry_index);
std.debug.assert(did_remove);
},
.LegacyContactInfo => {
var did_remove = self.legacy_contact_infos.swapRemove(entry_index);
std.debug.assert(did_remove);
},
.Vote => {
var did_remove = self.votes.swapRemove(versioned_value.cursor_on_insertion);
std.debug.assert(did_remove);
Expand Down Expand Up @@ -513,11 +535,16 @@ pub const CrdsTable = struct {

// these also should not fail since there are no allocations - just changing the value
switch (versioned_value.value.data) {
.LegacyContactInfo => {
.ContactInfo => {
var did_remove = self.contact_infos.swapRemove(table_len);
std.debug.assert(did_remove);
self.contact_infos.put(entry_index, {}) catch unreachable;
},
.LegacyContactInfo => {
var did_remove = self.legacy_contact_infos.swapRemove(table_len);
std.debug.assert(did_remove);
self.legacy_contact_infos.put(entry_index, {}) catch unreachable;
},
.Vote => {
self.votes.put(new_index_cursor, entry_index) catch unreachable;
},
Expand Down
32 changes: 19 additions & 13 deletions src/gossip/gossip_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const Pong = @import("ping_pong.zig").Pong;
const bincode = @import("../bincode/bincode.zig");
const crds = @import("../gossip/crds.zig");
const LegacyContactInfo = crds.LegacyContactInfo;
const EitherContactInfo = crds.EitherContactInfo;
const CrdsValue = crds.CrdsValue;

const KeyPair = std.crypto.sign.Ed25519.KeyPair;
Expand Down Expand Up @@ -723,7 +724,7 @@ pub const GossipService = struct {
self: *Self,
) error{ OutOfMemory, SerializationError, ChannelClosed }!void {
const now = getWallclockMs();
var buf: [NUM_ACTIVE_SET_ENTRIES]LegacyContactInfo = undefined;
var buf: [NUM_ACTIVE_SET_ENTRIES]EitherContactInfo = undefined;
var gossip_peers = self.getGossipNodes(&buf, NUM_ACTIVE_SET_ENTRIES, now);

// filter out peers who have responded to pings
Expand All @@ -738,7 +739,7 @@ pub const GossipService = struct {
var valid_gossip_indexs = ping_cache_result.valid_peers;
defer valid_gossip_indexs.deinit();

var valid_gossip_peers: [NUM_ACTIVE_SET_ENTRIES]LegacyContactInfo = undefined;
var valid_gossip_peers: [NUM_ACTIVE_SET_ENTRIES]EitherContactInfo = undefined;
for (0.., valid_gossip_indexs.items) |i, valid_gossip_index| {
valid_gossip_peers[i] = gossip_peers[valid_gossip_index];
}
Expand Down Expand Up @@ -875,7 +876,7 @@ pub const GossipService = struct {
bloom_size: usize,
) !ArrayList(Packet) {
// get nodes from crds table
var buf: [MAX_NUM_PULL_REQUESTS]LegacyContactInfo = undefined;
var buf: [MAX_NUM_PULL_REQUESTS]EitherContactInfo = undefined;
const now = getWallclockMs();
var peers = self.getGossipNodes(
&buf,
Expand All @@ -898,7 +899,8 @@ pub const GossipService = struct {
defer contact_infos.deinit();

for (contact_infos.items) |contact_info| {
if (contact_info.gossip.eql(&entrypoint)) {
const gossip = contact_info.gossipAddr() orelse continue;
if (gossip.eql(&entrypoint)) {
// early exit - we already have the peers in our contact info
break :blk;
}
Expand Down Expand Up @@ -972,7 +974,8 @@ pub const GossipService = struct {
const peer_index = rng.random().intRangeAtMost(usize, 0, num_peers - 1);
const peer_contact_info_index = valid_gossip_peer_indexs.items[peer_index];
const peer_contact_info = peers[peer_contact_info_index];
const peer_addr = peer_contact_info.gossip.toEndpoint();
// invalid peers were filtered out so this should not be null
const peer_addr = peer_contact_info.gossipAddr().?.toEndpoint();

const protocol_msg = Protocol{ .PullRequest = .{ filter_i, my_contact_info_value } };

Expand Down Expand Up @@ -1085,10 +1088,10 @@ pub const GossipService = struct {
defer ping_cache_lock.unlock();
var ping_cache: *PingCache = ping_cache_lock.mut();

var peers = try ArrayList(LegacyContactInfo).initCapacity(self.allocator, pull_requests.items.len);
var peers = try ArrayList(EitherContactInfo).initCapacity(self.allocator, pull_requests.items.len);
defer peers.deinit();
for (pull_requests.items) |req| {
peers.appendAssumeCapacity(req.value.data.LegacyContactInfo);
peers.appendAssumeCapacity(req.value.data.eitherContactInfo().?);
}

const result = try ping_cache.filterValidPeers(self.allocator, self.my_keypair, peers.items);
Expand Down Expand Up @@ -1578,12 +1581,12 @@ pub const GossipService = struct {
pub fn getGossipNodes(
self: *Self,
/// the output slice which will be filled with gossip nodes
nodes: []LegacyContactInfo,
nodes: []EitherContactInfo,
/// the maximum number of nodes to return ( max_size == nodes.len but comptime for init of stack array)
comptime MAX_SIZE: usize,
/// current time (used to filter out nodes that are too old)
now: u64,
) []LegacyContactInfo {
) []EitherContactInfo {
std.debug.assert(MAX_SIZE == nodes.len);

// * 2 bc we might filter out some
Expand All @@ -1606,8 +1609,8 @@ pub const GossipService = struct {

var node_index: usize = 0;
for (contact_infos) |contact_info| {
const peer_info = contact_info.value.data.LegacyContactInfo;
const peer_gossip_addr = peer_info.gossip;
const peer_info = contact_info.value.data.eitherContactInfo().?;
const peer_gossip_addr = peer_info.gossipAddr();

// filter inactive nodes
if (contact_info.timestamp_on_insertion < too_old_ts) {
Expand All @@ -1618,11 +1621,12 @@ pub const GossipService = struct {
continue;
}
// filter matching shred version or my_shred_version == 0
if (self.my_shred_version != 0 and self.my_shred_version != peer_info.shred_version) {
if (self.my_shred_version != 0 and self.my_shred_version != peer_info.shredVersion()) {
continue;
}
// filter on valid gossip address
crds.sanitizeSocket(&peer_gossip_addr) catch continue;

crds.sanitizeSocket(&(peer_gossip_addr orelse continue)) catch continue;

nodes[node_index] = peer_info;
node_index += 1;
Expand All @@ -1648,6 +1652,7 @@ pub const GossipService = struct {
for (crds_values, 0..) |*crds_value, i| {
switch (crds_value.data) {
// always allow contact info + node instance to update shred versions
.ContactInfo => {},
.LegacyContactInfo => {},
.NodeInstance => {},
else => {
Expand All @@ -1665,6 +1670,7 @@ pub const GossipService = struct {
for (crds_values, 0..) |*crds_value, i| {
switch (crds_value.data) {
// always allow contact info + node instance to update shred versions
.ContactInfo => {},
.LegacyContactInfo => {},
.NodeInstance => {},
else => {
Expand Down
23 changes: 15 additions & 8 deletions src/gossip/ping_pong.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const CrdsData = crds.CrdsData;
const Version = crds.Version;
const LegacyVersion2 = crds.LegacyVersion2;
const LegacyContactInfo = crds.LegacyContactInfo;
const EitherContactInfo = crds.EitherContactInfo;
const ContactInfo = @import("node.zig").ContactInfo;

const pull_import = @import("pull_request.zig");
Expand Down Expand Up @@ -216,20 +217,26 @@ pub const PingCache = struct {
self: *Self,
allocator: std.mem.Allocator,
our_keypair: KeyPair,
peers: []LegacyContactInfo,
peers: []EitherContactInfo,
) error{OutOfMemory}!struct { valid_peers: std.ArrayList(usize), pings: std.ArrayList(PingAndSocketAddr) } {
var now = std.time.Instant.now() catch @panic("time not supported by OS!");
var valid_peers = std.ArrayList(usize).init(allocator);
var pings = std.ArrayList(PingAndSocketAddr).init(allocator);

for (peers, 0..) |*peer, i| {
if (!peer.gossip.isUnspecified()) {
var result = self.check(now, PubkeyAndSocketAddr{ peer.id, peer.gossip }, &our_keypair);
if (result.passes_ping_check) {
try valid_peers.append(i);
}
if (result.maybe_ping) |ping| {
try pings.append(.{ .ping = ping, .socket = peer.gossip });
if (peer.gossipAddr()) |gossip_addr| {
if (!gossip_addr.isUnspecified()) {
var result = self.check(
now,
PubkeyAndSocketAddr{ peer.pubkey(), gossip_addr },
&our_keypair,
);
if (result.passes_ping_check) {
try valid_peers.append(i);
}
if (result.maybe_ping) |ping| {
try pings.append(.{ .ping = ping, .socket = gossip_addr });
}
}
}
}
Expand Down

0 comments on commit 386205d

Please sign in to comment.