Skip to content

Commit

Permalink
Update socket_tag -> SocketTag & lowercase tags
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption committed Jun 13, 2024
1 parent 6adc450 commit c4e1611
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 82 deletions.
5 changes: 2 additions & 3 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const ContactInfo = @import("../gossip/data.zig").ContactInfo;
const GossipTable = @import("../gossip/table.zig").GossipTable;
const SlotAndHash = @import("./snapshots.zig").SlotAndHash;
const Logger = @import("../trace/log.zig").Logger;
const socket_tag = @import("../gossip/data.zig").socket_tag;
const Hash = @import("../core/hash.zig").Hash;

const DOWNLOAD_PROGRESS_UPDATES_NS = 30 * std.time.ns_per_s;
Expand Down Expand Up @@ -96,7 +95,7 @@ pub fn findPeersToDownloadFromAssumeCapacity(
result.invalid_shred_version += 1;
continue;
}
_ = peer_contact_info.getSocket(socket_tag.RPC) orelse {
_ = peer_contact_info.getSocket(.rpc) orelse {
result.no_rpc_count += 1;
continue;
};
Expand Down Expand Up @@ -212,7 +211,7 @@ pub fn downloadSnapshotsFromGossip(
});
defer allocator.free(snapshot_filename);

const rpc_socket = peer.contact_info.getSocket(socket_tag.RPC).?;
const rpc_socket = peer.contact_info.getSocket(.rpc).?;
const rpc_url_bounded = rpc_socket.toStringBounded();
const rpc_url = rpc_url_bounded.constSlice();

Expand Down
10 changes: 5 additions & 5 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const parallelUnpackZstdTarBall = sig.accounts_db.parallelUnpackZstdTarBall;
const requestIpEcho = sig.net.requestIpEcho;
const servePrometheus = sig.prometheus.servePrometheus;

const socket_tag = sig.gossip.socket_tag;
const SocketTag = sig.gossip.SocketTag;

// TODO: use better allocator, unless GPA becomes more performant.

Expand Down Expand Up @@ -411,8 +411,8 @@ fn validator() !void {
ip_echo_data.shred_version, // TODO atomic owned at top level? or owned by gossip is good?
ip_echo_data.ip,
&.{
.{ .tag = .REPAIR, .port = repair_port },
.{ .tag = .TURBINE_RECV, .port = turbine_recv_port },
.{ .tag = .repair, .port = repair_port },
.{ .tag = .turbine_recv, .port = turbine_recv_port },
},
);
defer gossip_service.deinit();
Expand Down Expand Up @@ -515,7 +515,7 @@ fn initGossip(
entrypoints: []const SocketAddr,
shred_version: u16,
gossip_host_ip: IpAddr,
sockets: []const struct { tag: socket_tag, port: u16 },
sockets: []const struct { tag: SocketTag, port: u16 },
) !GossipService {
const gossip_port: u16 = config.current.gossip.port;
logger.infof("gossip host: {any}", .{gossip_host_ip});
Expand All @@ -524,7 +524,7 @@ fn initGossip(
// setup contact info
const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key);
var contact_info = ContactInfo.init(gpa_allocator, my_pubkey, getWallclockMs(), 0);
try contact_info.setSocket(socket_tag.GOSSIP, SocketAddr.init(gossip_host_ip, gossip_port));
try contact_info.setSocket(.gossip, SocketAddr.init(gossip_host_ip, gossip_port));
for (sockets) |s| try contact_info.setSocket(s.tag, SocketAddr.init(gossip_host_ip, s.port));
contact_info.shred_version = shred_version;

Expand Down
3 changes: 1 addition & 2 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const _gossip_data = @import("../gossip/data.zig");
const SignedGossipData = _gossip_data.SignedGossipData;
const getWallclockMs = _gossip_data.getWallclockMs;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const LegacyContactInfo = _gossip_data.LegacyContactInfo;

const Pubkey = @import("../core/pubkey.zig").Pubkey;
Expand Down Expand Up @@ -106,7 +105,7 @@ pub const ActiveSet = struct {
while (iter.next()) |entry| {
// lookup peer contact info
const peer_info = table.getContactInfo(entry.key_ptr.*) orelse continue;
const peer_gossip_addr = peer_info.getSocket(socket_tag.GOSSIP) orelse continue;
const peer_gossip_addr = peer_info.getSocket(.gossip) orelse continue;

peer_gossip_addr.sanitize() catch continue;

Expand Down
92 changes: 46 additions & 46 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ pub const GossipData = union(enum(u32)) {
pub fn gossipAddr(self: *const @This()) ?SocketAddr {
return switch (self.*) {
.LegacyContactInfo => |*v| if (v.gossip.isUnspecified()) null else v.gossip,
.ContactInfo => |*v| v.getSocket(socket_tag.GOSSIP),
.ContactInfo => |*v| v.getSocket(.gossip),
else => null,
};
}
Expand Down Expand Up @@ -537,32 +537,32 @@ pub const LegacyContactInfo = struct {
/// call ContactInfo.deinit to free
pub fn toContactInfo(self: *const LegacyContactInfo, allocator: std.mem.Allocator) !ContactInfo {
var ci = ContactInfo.init(allocator, self.id, self.wallclock, self.shred_version);
try ci.setSocket(socket_tag.GOSSIP, self.gossip);
try ci.setSocket(socket_tag.TURBINE_RECV, self.turbine_recv);
try ci.setSocket(socket_tag.TURBINE_RECV_QUIC, self.turbine_recv_quic);
try ci.setSocket(socket_tag.REPAIR, self.repair);
try ci.setSocket(socket_tag.TPU, self.tpu);
try ci.setSocket(socket_tag.TPU_FORWARDS, self.tpu_forwards);
try ci.setSocket(socket_tag.TPU_VOTE, self.tpu_vote);
try ci.setSocket(socket_tag.RPC, self.rpc);
try ci.setSocket(socket_tag.RPC_PUBSUB, self.rpc_pubsub);
try ci.setSocket(socket_tag.SERVE_REPAIR, self.serve_repair);
try ci.setSocket(.gossip, self.gossip);
try ci.setSocket(.turbine_recv, self.turbine_recv);
try ci.setSocket(.turbine_recv_quic, self.turbine_recv_quic);
try ci.setSocket(.repair, self.repair);
try ci.setSocket(.tpu, self.tpu);
try ci.setSocket(.tpu_forwards, self.tpu_forwards);
try ci.setSocket(.tpu_vote, self.tpu_vote);
try ci.setSocket(.rpc, self.rpc);
try ci.setSocket(.rpc_pubsub, self.rpc_pubsub);
try ci.setSocket(.serve_repair, self.serve_repair);
return ci;
}

pub fn fromContactInfo(ci: *const ContactInfo) LegacyContactInfo {
return .{
.id = ci.pubkey,
.gossip = ci.getSocket(socket_tag.GOSSIP) orelse SocketAddr.UNSPECIFIED,
.turbine_recv = ci.getSocket(socket_tag.TURBINE_RECV) orelse SocketAddr.UNSPECIFIED,
.turbine_recv_quic = ci.getSocket(socket_tag.TURBINE_RECV_QUIC) orelse SocketAddr.UNSPECIFIED,
.repair = ci.getSocket(socket_tag.REPAIR) orelse SocketAddr.UNSPECIFIED,
.tpu = ci.getSocket(socket_tag.TPU) orelse SocketAddr.UNSPECIFIED,
.tpu_forwards = ci.getSocket(socket_tag.TPU_FORWARDS) orelse SocketAddr.UNSPECIFIED,
.tpu_vote = ci.getSocket(socket_tag.TPU_VOTE) orelse SocketAddr.UNSPECIFIED,
.rpc = ci.getSocket(socket_tag.RPC) orelse SocketAddr.UNSPECIFIED,
.rpc_pubsub = ci.getSocket(socket_tag.RPC_PUBSUB) orelse SocketAddr.UNSPECIFIED,
.serve_repair = ci.getSocket(socket_tag.SERVE_REPAIR) orelse SocketAddr.UNSPECIFIED,
.gossip = ci.getSocket(.gossip) orelse SocketAddr.UNSPECIFIED,
.turbine_recv = ci.getSocket(.turbine_recv) orelse SocketAddr.UNSPECIFIED,
.turbine_recv_quic = ci.getSocket(.turbine_recv_quic) orelse SocketAddr.UNSPECIFIED,
.repair = ci.getSocket(.repair) orelse SocketAddr.UNSPECIFIED,
.tpu = ci.getSocket(.tpu) orelse SocketAddr.UNSPECIFIED,
.tpu_forwards = ci.getSocket(.tpu_forwards) orelse SocketAddr.UNSPECIFIED,
.tpu_vote = ci.getSocket(.tpu_vote) orelse SocketAddr.UNSPECIFIED,
.rpc = ci.getSocket(.rpc) orelse SocketAddr.UNSPECIFIED,
.rpc_pubsub = ci.getSocket(.rpc_pubsub) orelse SocketAddr.UNSPECIFIED,
.serve_repair = ci.getSocket(.serve_repair) orelse SocketAddr.UNSPECIFIED,
.wallclock = ci.wallclock,
.shred_version = ci.shred_version,
};
Expand Down Expand Up @@ -977,26 +977,26 @@ pub const SnapshotHashes = struct {
}
};

pub const socket_tag = enum(u8) {
GOSSIP = 0,
REPAIR = 1,
RPC = 2,
RPC_PUBSUB = 3,
SERVE_REPAIR = 4,
TPU = 5,
TPU_FORWARDS = 6,
TPU_FORWARDS_QUIC = 7,
TPU_QUIC = 8,
TPU_VOTE = 9,
pub const SocketTag = enum(u8) {
gossip = 0,
repair = 1,
rpc = 2,
rpc_pubsub = 3,
serve_repair = 4,
tpu = 5,
tpu_forwards = 6,
tpu_forwards_quic = 7,
tpu_quic = 8,
tpu_vote = 9,
/// Analogous to [SOCKET_TAG_TVU](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L36)
TURBINE_RECV = 10,
turbine_recv = 10,
/// Analogous to [SOCKET_TAG_TVU_QUIC](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L37)
TURBINE_RECV_QUIC = 11,
turbine_recv_quic = 11,
_,

pub const BincodeSize = u8;
};
pub const SOCKET_CACHE_SIZE: usize = @intFromEnum(socket_tag.TURBINE_RECV_QUIC) + 1;
pub const SOCKET_CACHE_SIZE: usize = @intFromEnum(SocketTag.turbine_recv_quic) + 1;

pub const ContactInfo = struct {
pubkey: Pubkey,
Expand Down Expand Up @@ -1029,7 +1029,7 @@ pub const ContactInfo = struct {

pub fn initSpy(allocator: std.mem.Allocator, id: Pubkey, gossip_socket_addr: SocketAddr, shred_version: u16) !Self {
var contact_info = Self.init(allocator, id, @intCast(std.time.microTimestamp()), shred_version);
try contact_info.setSocket(socket_tag.GOSSIP, gossip_socket_addr);
try contact_info.setSocket(.gossip, gossip_socket_addr);
return contact_info;
}

Expand Down Expand Up @@ -1068,7 +1068,7 @@ pub const ContactInfo = struct {
}

for (0..6) |_| {
sockets.append(.{ .key = .TURBINE_RECV, .index = 20, .offset = 30 }) catch unreachable;
sockets.append(.{ .key = .turbine_recv, .index = 20, .offset = 30 }) catch unreachable;
}

return ContactInfo{
Expand All @@ -1087,15 +1087,15 @@ pub const ContactInfo = struct {
try sanitizeWallclock(self.wallclock);
}

pub fn getSocket(self: *const Self, key: socket_tag) ?SocketAddr {
pub fn getSocket(self: *const Self, key: SocketTag) ?SocketAddr {
const socket = &self.cache[@intFromEnum(key)];
if (socket.eql(&SocketAddr.UNSPECIFIED)) {
return null;
}
return socket.*;
}

pub fn setSocket(self: *Self, key: socket_tag, socket_addr: SocketAddr) !void {
pub fn setSocket(self: *Self, key: SocketTag, socket_addr: SocketAddr) !void {
self.removeSocket(key);

const offset: u16, const index: ?usize = blk: {
Expand All @@ -1122,7 +1122,7 @@ pub const ContactInfo = struct {
self.cache[@intFromEnum(key)] = socket_addr;
}

pub fn removeSocket(self: *Self, key: socket_tag) void {
pub fn removeSocket(self: *Self, key: SocketTag) void {
// find existing socket index
const existing_socket_index = for (self.sockets.items, 0..) |socket, idx| {
if (socket.key == key) break idx;
Expand Down Expand Up @@ -1222,7 +1222,7 @@ const Sockets = struct {

pub const SocketEntry = struct {
/// GossipMessageIdentifier, e.g. turbine_recv, tpu, etc
key: socket_tag,
key: SocketTag,
/// IpAddr index in the accompanying addrs vector.
index: u8,
/// Port offset with respect to the previous entry.
Expand Down Expand Up @@ -1320,12 +1320,12 @@ test "gossip.data: set & get socket on contact info" {

var ci = ContactInfo.init(testing.allocator, Pubkey.random(rng), @as(u64, @intCast(std.time.microTimestamp())), 0);
defer ci.deinit();
try ci.setSocket(socket_tag.RPC, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899));
try ci.setSocket(.rpc, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899));

var set_socket = ci.getSocket(socket_tag.RPC);
var set_socket = ci.getSocket(.rpc);
try testing.expect(set_socket.?.eql(&SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899)));
try testing.expect(ci.addrs.items[0].eql(&IpAddr.newIpv4(127, 0, 0, 1)));
try testing.expect(ci.sockets.items[0].eql(&.{ .key = .RPC, .index = 0, .offset = 8899 }));
try testing.expect(ci.sockets.items[0].eql(&.{ .key = .rpc, .index = 0, .offset = 8899 }));
}

test "gossip.data: contact info bincode serialize matches rust bincode" {
Expand Down Expand Up @@ -1386,8 +1386,8 @@ test "gossip.data: ContactInfo bincode roundtrip maintains data integrity" {
test "gossip.data: SocketEntry serializer works" {
testing.log_level = .debug;

comptime std.debug.assert(@intFromEnum(socket_tag.RPC_PUBSUB) == 3);
const se: SocketEntry = .{ .key = .RPC_PUBSUB, .index = 3, .offset = 30304 };
comptime std.debug.assert(@intFromEnum(SocketTag.rpc_pubsub) == 3);
const se: SocketEntry = .{ .key = .rpc_pubsub, .index = 3, .offset = 30304 };

var buf = std.ArrayList(u8).init(testing.allocator);
defer buf.deinit();
Expand Down
3 changes: 1 addition & 2 deletions src/gossip/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const _gossip_data = @import("data.zig");
const LegacyContactInfo = _gossip_data.LegacyContactInfo;
const SignedGossipData = _gossip_data.SignedGossipData;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const AtomicBool = std.atomic.Value(bool);

const SocketAddr = @import("../net/net.zig").SocketAddr;
Expand Down Expand Up @@ -289,7 +288,7 @@ pub fn run() !void {

const fuzz_pubkey = Pubkey.fromPublicKey(&fuzz_keypair.public_key);
var fuzz_contact_info = ContactInfo.init(allocator, fuzz_pubkey, 0, 19);
try fuzz_contact_info.setSocket(socket_tag.GOSSIP, fuzz_address);
try fuzz_contact_info.setSocket(.gossip, fuzz_address);

var fuzz_exit = AtomicBool.init(false);
var gossip_service_fuzzer = try GossipService.init(
Expand Down
2 changes: 1 addition & 1 deletion src/gossip/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ pub const Ping = ping_pong.Ping;
pub const Pong = ping_pong.Pong;

pub const getWallclockMs = data.getWallclockMs;
pub const socket_tag = data.socket_tag;
pub const SocketTag = data.SocketTag;
3 changes: 1 addition & 2 deletions src/gossip/ping_pong.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const Hash = @import("../core/hash.zig").Hash;
const Signature = @import("../core/signature.zig").Signature;
const _gossip_data = @import("data.zig");
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const getWallclockMs = _gossip_data.getWallclockMs;

const DefaultPrng = std.rand.DefaultPrng;
Expand Down Expand Up @@ -222,7 +221,7 @@ pub const PingCache = struct {
var pings = std.ArrayList(PingAndSocketAddr).init(allocator);

for (peers, 0..) |*peer, i| {
if (peer.getSocket(socket_tag.GOSSIP)) |gossip_addr| {
if (peer.getSocket(.gossip)) |gossip_addr| {
const result = self.check(now, PubkeyAndSocketAddr{ .pubkey = peer.pubkey, .socket_addr = gossip_addr }, &our_keypair);
if (result.passes_ping_check) {
try valid_peers.append(i);
Expand Down
Loading

0 comments on commit c4e1611

Please sign in to comment.