Skip to content

Commit

Permalink
Update socket_tag -> SocketTag & lowercase tags
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption committed Jun 13, 2024
1 parent 6adc450 commit b69c8ad
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 67 deletions.
5 changes: 2 additions & 3 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const ContactInfo = @import("../gossip/data.zig").ContactInfo;
const GossipTable = @import("../gossip/table.zig").GossipTable;
const SlotAndHash = @import("./snapshots.zig").SlotAndHash;
const Logger = @import("../trace/log.zig").Logger;
const socket_tag = @import("../gossip/data.zig").socket_tag;
const Hash = @import("../core/hash.zig").Hash;

const DOWNLOAD_PROGRESS_UPDATES_NS = 30 * std.time.ns_per_s;
Expand Down Expand Up @@ -96,7 +95,7 @@ pub fn findPeersToDownloadFromAssumeCapacity(
result.invalid_shred_version += 1;
continue;
}
_ = peer_contact_info.getSocket(socket_tag.RPC) orelse {
_ = peer_contact_info.getSocket(.rpc) orelse {
result.no_rpc_count += 1;
continue;
};
Expand Down Expand Up @@ -212,7 +211,7 @@ pub fn downloadSnapshotsFromGossip(
});
defer allocator.free(snapshot_filename);

const rpc_socket = peer.contact_info.getSocket(socket_tag.RPC).?;
const rpc_socket = peer.contact_info.getSocket(.rpc).?;
const rpc_url_bounded = rpc_socket.toStringBounded();
const rpc_url = rpc_url_bounded.constSlice();

Expand Down
3 changes: 1 addition & 2 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const _gossip_data = @import("../gossip/data.zig");
const SignedGossipData = _gossip_data.SignedGossipData;
const getWallclockMs = _gossip_data.getWallclockMs;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const LegacyContactInfo = _gossip_data.LegacyContactInfo;

const Pubkey = @import("../core/pubkey.zig").Pubkey;
Expand Down Expand Up @@ -106,7 +105,7 @@ pub const ActiveSet = struct {
while (iter.next()) |entry| {
// lookup peer contact info
const peer_info = table.getContactInfo(entry.key_ptr.*) orelse continue;
const peer_gossip_addr = peer_info.getSocket(socket_tag.GOSSIP) orelse continue;
const peer_gossip_addr = peer_info.getSocket(.gossip) orelse continue;

peer_gossip_addr.sanitize() catch continue;

Expand Down
86 changes: 43 additions & 43 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ pub const GossipData = union(enum(u32)) {
pub fn gossipAddr(self: *const @This()) ?SocketAddr {
return switch (self.*) {
.LegacyContactInfo => |*v| if (v.gossip.isUnspecified()) null else v.gossip,
.ContactInfo => |*v| v.getSocket(socket_tag.GOSSIP),
.ContactInfo => |*v| v.getSocket(.gossip),
else => null,
};
}
Expand Down Expand Up @@ -537,32 +537,32 @@ pub const LegacyContactInfo = struct {
/// call ContactInfo.deinit to free
pub fn toContactInfo(self: *const LegacyContactInfo, allocator: std.mem.Allocator) !ContactInfo {
var ci = ContactInfo.init(allocator, self.id, self.wallclock, self.shred_version);
try ci.setSocket(socket_tag.GOSSIP, self.gossip);
try ci.setSocket(socket_tag.TURBINE_RECV, self.turbine_recv);
try ci.setSocket(socket_tag.TURBINE_RECV_QUIC, self.turbine_recv_quic);
try ci.setSocket(socket_tag.REPAIR, self.repair);
try ci.setSocket(socket_tag.TPU, self.tpu);
try ci.setSocket(socket_tag.TPU_FORWARDS, self.tpu_forwards);
try ci.setSocket(socket_tag.TPU_VOTE, self.tpu_vote);
try ci.setSocket(socket_tag.RPC, self.rpc);
try ci.setSocket(socket_tag.RPC_PUBSUB, self.rpc_pubsub);
try ci.setSocket(socket_tag.SERVE_REPAIR, self.serve_repair);
try ci.setSocket(.gossip, self.gossip);
try ci.setSocket(.turbine_recv, self.turbine_recv);
try ci.setSocket(.turbine_recv_quic, self.turbine_recv_quic);
try ci.setSocket(.repair, self.repair);
try ci.setSocket(.tpu, self.tpu);
try ci.setSocket(.tpu_forwards, self.tpu_forwards);
try ci.setSocket(.tpu_vote, self.tpu_vote);
try ci.setSocket(.rpc, self.rpc);
try ci.setSocket(.rpc_pubsub, self.rpc_pubsub);
try ci.setSocket(.serve_repair, self.serve_repair);
return ci;
}

pub fn fromContactInfo(ci: *const ContactInfo) LegacyContactInfo {
return .{
.id = ci.pubkey,
.gossip = ci.getSocket(socket_tag.GOSSIP) orelse SocketAddr.UNSPECIFIED,
.turbine_recv = ci.getSocket(socket_tag.TURBINE_RECV) orelse SocketAddr.UNSPECIFIED,
.turbine_recv_quic = ci.getSocket(socket_tag.TURBINE_RECV_QUIC) orelse SocketAddr.UNSPECIFIED,
.repair = ci.getSocket(socket_tag.REPAIR) orelse SocketAddr.UNSPECIFIED,
.tpu = ci.getSocket(socket_tag.TPU) orelse SocketAddr.UNSPECIFIED,
.tpu_forwards = ci.getSocket(socket_tag.TPU_FORWARDS) orelse SocketAddr.UNSPECIFIED,
.tpu_vote = ci.getSocket(socket_tag.TPU_VOTE) orelse SocketAddr.UNSPECIFIED,
.rpc = ci.getSocket(socket_tag.RPC) orelse SocketAddr.UNSPECIFIED,
.rpc_pubsub = ci.getSocket(socket_tag.RPC_PUBSUB) orelse SocketAddr.UNSPECIFIED,
.serve_repair = ci.getSocket(socket_tag.SERVE_REPAIR) orelse SocketAddr.UNSPECIFIED,
.gossip = ci.getSocket(.gossip) orelse SocketAddr.UNSPECIFIED,
.turbine_recv = ci.getSocket(.turbine_recv) orelse SocketAddr.UNSPECIFIED,
.turbine_recv_quic = ci.getSocket(.turbine_recv_quic) orelse SocketAddr.UNSPECIFIED,
.repair = ci.getSocket(.repair) orelse SocketAddr.UNSPECIFIED,
.tpu = ci.getSocket(.tpu) orelse SocketAddr.UNSPECIFIED,
.tpu_forwards = ci.getSocket(.tpu_forwards) orelse SocketAddr.UNSPECIFIED,
.tpu_vote = ci.getSocket(.tpu_vote) orelse SocketAddr.UNSPECIFIED,
.rpc = ci.getSocket(.rpc) orelse SocketAddr.UNSPECIFIED,
.rpc_pubsub = ci.getSocket(.rpc_pubsub) orelse SocketAddr.UNSPECIFIED,
.serve_repair = ci.getSocket(.serve_repair) orelse SocketAddr.UNSPECIFIED,
.wallclock = ci.wallclock,
.shred_version = ci.shred_version,
};
Expand Down Expand Up @@ -977,26 +977,26 @@ pub const SnapshotHashes = struct {
}
};

pub const socket_tag = enum(u8) {
GOSSIP = 0,
REPAIR = 1,
RPC = 2,
RPC_PUBSUB = 3,
SERVE_REPAIR = 4,
TPU = 5,
TPU_FORWARDS = 6,
TPU_FORWARDS_QUIC = 7,
TPU_QUIC = 8,
TPU_VOTE = 9,
pub const SocketTag = enum(u8) {
gossip = 0,
repair = 1,
rpc = 2,
rpc_pubsub = 3,
serve_repair = 4,
tpu = 5,
tpu_forwards = 6,
tpu_forwards_quic = 7,
tpu_quic = 8,
tpu_vote = 9,
/// Analogous to [SOCKET_TAG_TVU](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L36)
TURBINE_RECV = 10,
turbine_recv = 10,
/// Analogous to [SOCKET_TAG_TVU_QUIC](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L37)
TURBINE_RECV_QUIC = 11,
turbine_recv_quic = 11,
_,

pub const BincodeSize = u8;
};
pub const SOCKET_CACHE_SIZE: usize = @intFromEnum(socket_tag.TURBINE_RECV_QUIC) + 1;
pub const SOCKET_CACHE_SIZE: usize = @intFromEnum(SocketTag.turbine_recv_quic) + 1;

pub const ContactInfo = struct {
pubkey: Pubkey,
Expand Down Expand Up @@ -1029,7 +1029,7 @@ pub const ContactInfo = struct {

pub fn initSpy(allocator: std.mem.Allocator, id: Pubkey, gossip_socket_addr: SocketAddr, shred_version: u16) !Self {
var contact_info = Self.init(allocator, id, @intCast(std.time.microTimestamp()), shred_version);
try contact_info.setSocket(socket_tag.GOSSIP, gossip_socket_addr);
try contact_info.setSocket(.gossip, gossip_socket_addr);
return contact_info;
}

Expand Down Expand Up @@ -1087,15 +1087,15 @@ pub const ContactInfo = struct {
try sanitizeWallclock(self.wallclock);
}

pub fn getSocket(self: *const Self, key: socket_tag) ?SocketAddr {
pub fn getSocket(self: *const Self, key: SocketTag) ?SocketAddr {
const socket = &self.cache[@intFromEnum(key)];
if (socket.eql(&SocketAddr.UNSPECIFIED)) {
return null;
}
return socket.*;
}

pub fn setSocket(self: *Self, key: socket_tag, socket_addr: SocketAddr) !void {
pub fn setSocket(self: *Self, key: SocketTag, socket_addr: SocketAddr) !void {
self.removeSocket(key);

const offset: u16, const index: ?usize = blk: {
Expand All @@ -1122,7 +1122,7 @@ pub const ContactInfo = struct {
self.cache[@intFromEnum(key)] = socket_addr;
}

pub fn removeSocket(self: *Self, key: socket_tag) void {
pub fn removeSocket(self: *Self, key: SocketTag) void {
// find existing socket index
const existing_socket_index = for (self.sockets.items, 0..) |socket, idx| {
if (socket.key == key) break idx;
Expand Down Expand Up @@ -1222,7 +1222,7 @@ const Sockets = struct {

pub const SocketEntry = struct {
/// GossipMessageIdentifier, e.g. turbine_recv, tpu, etc
key: socket_tag,
key: SocketTag,
/// IpAddr index in the accompanying addrs vector.
index: u8,
/// Port offset with respect to the previous entry.
Expand Down Expand Up @@ -1320,9 +1320,9 @@ test "gossip.data: set & get socket on contact info" {

var ci = ContactInfo.init(testing.allocator, Pubkey.random(rng), @as(u64, @intCast(std.time.microTimestamp())), 0);
defer ci.deinit();
try ci.setSocket(socket_tag.RPC, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899));
try ci.setSocket(.rpc, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899));

var set_socket = ci.getSocket(socket_tag.RPC);
var set_socket = ci.getSocket(.rpc);
try testing.expect(set_socket.?.eql(&SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899)));
try testing.expect(ci.addrs.items[0].eql(&IpAddr.newIpv4(127, 0, 0, 1)));
try testing.expect(ci.sockets.items[0].eql(&.{ .key = .RPC, .index = 0, .offset = 8899 }));
Expand Down Expand Up @@ -1386,7 +1386,7 @@ test "gossip.data: ContactInfo bincode roundtrip maintains data integrity" {
test "gossip.data: SocketEntry serializer works" {
testing.log_level = .debug;

comptime std.debug.assert(@intFromEnum(socket_tag.RPC_PUBSUB) == 3);
comptime std.debug.assert(@intFromEnum(.rpc_pubsub) == 3);
const se: SocketEntry = .{ .key = .RPC_PUBSUB, .index = 3, .offset = 30304 };

var buf = std.ArrayList(u8).init(testing.allocator);
Expand Down
3 changes: 1 addition & 2 deletions src/gossip/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const _gossip_data = @import("data.zig");
const LegacyContactInfo = _gossip_data.LegacyContactInfo;
const SignedGossipData = _gossip_data.SignedGossipData;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const AtomicBool = std.atomic.Value(bool);

const SocketAddr = @import("../net/net.zig").SocketAddr;
Expand Down Expand Up @@ -289,7 +288,7 @@ pub fn run() !void {

const fuzz_pubkey = Pubkey.fromPublicKey(&fuzz_keypair.public_key);
var fuzz_contact_info = ContactInfo.init(allocator, fuzz_pubkey, 0, 19);
try fuzz_contact_info.setSocket(socket_tag.GOSSIP, fuzz_address);
try fuzz_contact_info.setSocket(.gossip, fuzz_address);

var fuzz_exit = AtomicBool.init(false);
var gossip_service_fuzzer = try GossipService.init(
Expand Down
2 changes: 1 addition & 1 deletion src/gossip/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ pub const Ping = ping_pong.Ping;
pub const Pong = ping_pong.Pong;

pub const getWallclockMs = data.getWallclockMs;
pub const socket_tag = data.socket_tag;
pub const SocketTag = data.SocketTag;
3 changes: 1 addition & 2 deletions src/gossip/ping_pong.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const Hash = @import("../core/hash.zig").Hash;
const Signature = @import("../core/signature.zig").Signature;
const _gossip_data = @import("data.zig");
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const getWallclockMs = _gossip_data.getWallclockMs;

const DefaultPrng = std.rand.DefaultPrng;
Expand Down Expand Up @@ -222,7 +221,7 @@ pub const PingCache = struct {
var pings = std.ArrayList(PingAndSocketAddr).init(allocator);

for (peers, 0..) |*peer, i| {
if (peer.getSocket(socket_tag.GOSSIP)) |gossip_addr| {
if (peer.getSocket(.gossip)) |gossip_addr| {
const result = self.check(now, PubkeyAndSocketAddr{ .pubkey = peer.pubkey, .socket_addr = gossip_addr }, &our_keypair);
if (result.passes_ping_check) {
try valid_peers.append(i);
Expand Down
23 changes: 11 additions & 12 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const bincode = @import("../bincode/bincode.zig");
const gossip = @import("../gossip/data.zig");
const LegacyContactInfo = gossip.LegacyContactInfo;
const ContactInfo = @import("data.zig").ContactInfo;
const socket_tag = @import("data.zig").socket_tag;
const SignedGossipData = gossip.SignedGossipData;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
Expand Down Expand Up @@ -168,7 +167,7 @@ pub const GossipService = struct {
const active_set = ActiveSet.init(allocator);

// bind the socket
const gossip_address = my_contact_info.getSocket(socket_tag.GOSSIP) orelse return error.GossipAddrUnspecified;
const gossip_address = my_contact_info.getSocket(.gossip) orelse return error.GossipAddrUnspecified;
var gossip_socket = UdpSocket.create(.ipv4, .udp) catch return error.SocketCreateFailed;
gossip_socket.bindToPort(gossip_address.port()) catch return error.SocketBindFailed;
gossip_socket.setReadTimeout(socket_utils.SOCKET_TIMEOUT_US) catch return error.SocketSetTimeoutFailed; // 1 second
Expand Down Expand Up @@ -1135,7 +1134,7 @@ pub const GossipService = struct {
const peer_index = rng.random().intRangeAtMost(usize, 0, num_peers - 1);
const peer_contact_info_index = valid_gossip_peer_indexs.items[peer_index];
const peer_contact_info = peers[peer_contact_info_index];
if (peer_contact_info.getSocket(socket_tag.GOSSIP)) |gossip_addr| {
if (peer_contact_info.getSocket(.gossip)) |gossip_addr| {
const message = GossipMessage{ .PullRequest = .{ filter_i, my_contact_info_value } };

var packet = &packet_batch.items[packet_index];
Expand Down Expand Up @@ -1513,7 +1512,7 @@ pub const GossipService = struct {
return error.CantFindContactInfo;
};
};
const from_gossip_addr = from_contact_info.getSocket(socket_tag.GOSSIP) orelse return error.InvalidGossipAddress;
const from_gossip_addr = from_contact_info.getSocket(.gossip) orelse return error.InvalidGossipAddress;
gossip.sanitizeSocket(&from_gossip_addr) catch return error.InvalidGossipAddress;
const from_gossip_endpoint = from_gossip_addr.toEndpoint();

Expand Down Expand Up @@ -1634,7 +1633,7 @@ pub const GossipService = struct {
// unable to find contact info
continue;
};
const from_gossip_addr = from_contact_info.getSocket(socket_tag.GOSSIP) orelse continue;
const from_gossip_addr = from_contact_info.getSocket(.gossip) orelse continue;
from_gossip_addr.sanitize() catch {
// invalid gossip socket
continue;
Expand Down Expand Up @@ -1869,7 +1868,7 @@ pub const GossipService = struct {

var node_index: usize = 0;
for (contact_infos) |contact_info| {
const peer_gossip_addr = contact_info.getSocket(socket_tag.GOSSIP);
const peer_gossip_addr = contact_info.getSocket(.gossip);

// filter self
if (contact_info.pubkey.equals(&self.my_pubkey)) {
Expand Down Expand Up @@ -2431,7 +2430,7 @@ test "gossip.service: tests handle pull request" {
defer pull_requests.deinit();
try pull_requests.append(GossipService.PullRequestMessage{
.filter = filter,
.from_endpoint = (contact_info.getSocket(socket_tag.GOSSIP) orelse unreachable).toEndpoint(),
.from_endpoint = (contact_info.getSocket(.gossip) orelse unreachable).toEndpoint(),
.value = gossip_value,
});

Expand Down Expand Up @@ -2884,7 +2883,7 @@ test "gossip.service: init, exit, and deinit" {
var rng = std.rand.DefaultPrng.init(getWallclockMs());

var contact_info = try LegacyContactInfo.random(rng.random()).toContactInfo(std.testing.allocator);
try contact_info.setSocket(socket_tag.GOSSIP, gossip_address);
try contact_info.setSocket(.gossip, gossip_address);

var exit = AtomicBool.init(false);

Expand Down Expand Up @@ -2967,7 +2966,7 @@ pub const BenchmarkGossipServiceGeneral = struct {

const pubkey = Pubkey.fromPublicKey(&keypair.public_key);
var contact_info = ContactInfo.init(allocator, pubkey, 0, 19);
try contact_info.setSocket(socket_tag.GOSSIP, address);
try contact_info.setSocket(.gossip, address);

// var logger = Logger.init(allocator, .debug);
// defer logger.deinit();
Expand Down Expand Up @@ -3097,7 +3096,7 @@ pub const BenchmarkGossipServicePullRequests = struct {

const pubkey = Pubkey.fromPublicKey(&keypair.public_key);
var contact_info = ContactInfo.init(allocator, pubkey, 0, 19);
try contact_info.setSocket(socket_tag.GOSSIP, address);
try contact_info.setSocket(.gossip, address);

// var logger = Logger.init(allocator, .debug);
// defer logger.deinit();
Expand Down Expand Up @@ -3128,7 +3127,7 @@ pub const BenchmarkGossipServicePullRequests = struct {
const recv_pubkey = Pubkey.fromPublicKey(&recv_keypair.public_key);

var contact_info_recv = ContactInfo.init(allocator, recv_pubkey, 0, 19);
try contact_info_recv.setSocket(socket_tag.GOSSIP, recv_address);
try contact_info_recv.setSocket(.gossip, recv_address);
const signed_contact_info_recv = try SignedGossipData.initSigned(.{
.ContactInfo = contact_info_recv,
}, &recv_keypair);
Expand Down Expand Up @@ -3206,6 +3205,6 @@ pub const BenchmarkGossipServicePullRequests = struct {

fn localhostTestContactInfo(id: Pubkey) !ContactInfo {
var contact_info = try LegacyContactInfo.default(id).toContactInfo(std.testing.allocator);
try contact_info.setSocket(socket_tag.GOSSIP, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 0));
try contact_info.setSocket(.gossip, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 0));
return contact_info;
}
3 changes: 1 addition & 2 deletions src/gossip/table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const GossipVersionedData = _gossip_data.GossipVersionedData;
const GossipKey = _gossip_data.GossipKey;
const LegacyContactInfo = _gossip_data.LegacyContactInfo;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const getWallclockMs = _gossip_data.getWallclockMs;
const Vote = _gossip_data.Vote;

Expand Down Expand Up @@ -857,7 +856,7 @@ pub const GossipTable = struct {
for (contact_indexs) |index| {
const entry: GossipVersionedData = self.store.values()[index];
switch (entry.value.data) {
.ContactInfo => |ci| if (ci.getSocket(socket_tag.GOSSIP)) |addr| {
.ContactInfo => |ci| if (ci.getSocket(.gossip)) |addr| {
if (addr.eql(&gossip_addr)) return try ci.clone();
},
.LegacyContactInfo => |lci| if (lci.gossip.eql(&gossip_addr)) {
Expand Down

0 comments on commit b69c8ad

Please sign in to comment.