Skip to content

Commit

Permalink
Merge pull request #173 from Syndica/ink/safer-socket-tag
Browse files Browse the repository at this point in the history
improvement(bincode,gossip): safer socket tag & bincode fix
  • Loading branch information
0xNineteen authored Jun 13, 2024
2 parents 94f2212 + c4e1611 commit 263e620
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 113 deletions.
5 changes: 2 additions & 3 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const ContactInfo = @import("../gossip/data.zig").ContactInfo;
const GossipTable = @import("../gossip/table.zig").GossipTable;
const SlotAndHash = @import("./snapshots.zig").SlotAndHash;
const Logger = @import("../trace/log.zig").Logger;
const socket_tag = @import("../gossip/data.zig").socket_tag;
const Hash = @import("../core/hash.zig").Hash;

const DOWNLOAD_PROGRESS_UPDATES_NS = 30 * std.time.ns_per_s;
Expand Down Expand Up @@ -96,7 +95,7 @@ pub fn findPeersToDownloadFromAssumeCapacity(
result.invalid_shred_version += 1;
continue;
}
_ = peer_contact_info.getSocket(socket_tag.RPC) orelse {
_ = peer_contact_info.getSocket(.rpc) orelse {
result.no_rpc_count += 1;
continue;
};
Expand Down Expand Up @@ -212,7 +211,7 @@ pub fn downloadSnapshotsFromGossip(
});
defer allocator.free(snapshot_filename);

const rpc_socket = peer.contact_info.getSocket(socket_tag.RPC).?;
const rpc_socket = peer.contact_info.getSocket(.rpc).?;
const rpc_url_bounded = rpc_socket.toStringBounded();
const rpc_url = rpc_url_bounded.constSlice();

Expand Down
6 changes: 5 additions & 1 deletion src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,17 @@ pub fn write(writer: anytype, data: anytype, params: bincode.Params) !void {
.Type, .Void, .NoReturn, .Undefined, .Null, .Fn, .Opaque, .Frame, .AnyFrame => return,
.Bool => return writer.writeByte(@intFromBool(data)),
.Enum => |_| {
comptime var SerializedSize = u32;
comptime if (@hasDecl(T, "BincodeSize")) {
SerializedSize = T.BincodeSize;
};
if (getConfig(T)) |type_config| {
if (type_config.serializer) |serialize_fcn| {
return serialize_fcn(writer, data, params);
}
}

return bincode.write(writer, @as(u32, @intFromEnum(data)), params);
return bincode.write(writer, @as(SerializedSize, @intFromEnum(data)), params);
},
.Union => |info| {
try bincode.write(writer, @as(u32, @intFromEnum(data)), params);
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const parallelUnpackZstdTarBall = sig.accounts_db.parallelUnpackZstdTarBall;
const requestIpEcho = sig.net.requestIpEcho;
const servePrometheus = sig.prometheus.servePrometheus;

const socket_tag = sig.gossip.socket_tag;
const SocketTag = sig.gossip.SocketTag;

// TODO: use better allocator, unless GPA becomes more performant.

Expand Down Expand Up @@ -411,8 +411,8 @@ fn validator() !void {
ip_echo_data.shred_version, // TODO atomic owned at top level? or owned by gossip is good?
ip_echo_data.ip,
&.{
.{ .tag = socket_tag.REPAIR, .port = repair_port },
.{ .tag = socket_tag.TURBINE_RECV, .port = turbine_recv_port },
.{ .tag = .repair, .port = repair_port },
.{ .tag = .turbine_recv, .port = turbine_recv_port },
},
);
defer gossip_service.deinit();
Expand Down Expand Up @@ -515,7 +515,7 @@ fn initGossip(
entrypoints: []const SocketAddr,
shred_version: u16,
gossip_host_ip: IpAddr,
sockets: []const struct { tag: u8, port: u16 },
sockets: []const struct { tag: SocketTag, port: u16 },
) !GossipService {
const gossip_port: u16 = config.current.gossip.port;
logger.infof("gossip host: {any}", .{gossip_host_ip});
Expand All @@ -524,7 +524,7 @@ fn initGossip(
// setup contact info
const my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key);
var contact_info = ContactInfo.init(gpa_allocator, my_pubkey, getWallclockMs(), 0);
try contact_info.setSocket(socket_tag.GOSSIP, SocketAddr.init(gossip_host_ip, gossip_port));
try contact_info.setSocket(.gossip, SocketAddr.init(gossip_host_ip, gossip_port));
for (sockets) |s| try contact_info.setSocket(s.tag, SocketAddr.init(gossip_host_ip, s.port));
contact_info.shred_version = shred_version;

Expand Down
3 changes: 1 addition & 2 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const _gossip_data = @import("../gossip/data.zig");
const SignedGossipData = _gossip_data.SignedGossipData;
const getWallclockMs = _gossip_data.getWallclockMs;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const LegacyContactInfo = _gossip_data.LegacyContactInfo;

const Pubkey = @import("../core/pubkey.zig").Pubkey;
Expand Down Expand Up @@ -106,7 +105,7 @@ pub const ActiveSet = struct {
while (iter.next()) |entry| {
// lookup peer contact info
const peer_info = table.getContactInfo(entry.key_ptr.*) orelse continue;
const peer_gossip_addr = peer_info.getSocket(socket_tag.GOSSIP) orelse continue;
const peer_gossip_addr = peer_info.getSocket(.gossip) orelse continue;

peer_gossip_addr.sanitize() catch continue;

Expand Down
151 changes: 75 additions & 76 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ pub const GossipData = union(enum(u32)) {
pub fn gossipAddr(self: *const @This()) ?SocketAddr {
return switch (self.*) {
.LegacyContactInfo => |*v| if (v.gossip.isUnspecified()) null else v.gossip,
.ContactInfo => |*v| v.getSocket(socket_tag.GOSSIP),
.ContactInfo => |*v| v.getSocket(.gossip),
else => null,
};
}
Expand Down Expand Up @@ -537,32 +537,32 @@ pub const LegacyContactInfo = struct {
/// call ContactInfo.deinit to free
pub fn toContactInfo(self: *const LegacyContactInfo, allocator: std.mem.Allocator) !ContactInfo {
var ci = ContactInfo.init(allocator, self.id, self.wallclock, self.shred_version);
try ci.setSocket(socket_tag.GOSSIP, self.gossip);
try ci.setSocket(socket_tag.TURBINE_RECV, self.turbine_recv);
try ci.setSocket(socket_tag.TURBINE_RECV_QUIC, self.turbine_recv_quic);
try ci.setSocket(socket_tag.REPAIR, self.repair);
try ci.setSocket(socket_tag.TPU, self.tpu);
try ci.setSocket(socket_tag.TPU_FORWARDS, self.tpu_forwards);
try ci.setSocket(socket_tag.TPU_VOTE, self.tpu_vote);
try ci.setSocket(socket_tag.RPC, self.rpc);
try ci.setSocket(socket_tag.RPC_PUBSUB, self.rpc_pubsub);
try ci.setSocket(socket_tag.SERVE_REPAIR, self.serve_repair);
try ci.setSocket(.gossip, self.gossip);
try ci.setSocket(.turbine_recv, self.turbine_recv);
try ci.setSocket(.turbine_recv_quic, self.turbine_recv_quic);
try ci.setSocket(.repair, self.repair);
try ci.setSocket(.tpu, self.tpu);
try ci.setSocket(.tpu_forwards, self.tpu_forwards);
try ci.setSocket(.tpu_vote, self.tpu_vote);
try ci.setSocket(.rpc, self.rpc);
try ci.setSocket(.rpc_pubsub, self.rpc_pubsub);
try ci.setSocket(.serve_repair, self.serve_repair);
return ci;
}

pub fn fromContactInfo(ci: *const ContactInfo) LegacyContactInfo {
return .{
.id = ci.pubkey,
.gossip = ci.getSocket(socket_tag.GOSSIP) orelse SocketAddr.UNSPECIFIED,
.turbine_recv = ci.getSocket(socket_tag.TURBINE_RECV) orelse SocketAddr.UNSPECIFIED,
.turbine_recv_quic = ci.getSocket(socket_tag.TURBINE_RECV_QUIC) orelse SocketAddr.UNSPECIFIED,
.repair = ci.getSocket(socket_tag.REPAIR) orelse SocketAddr.UNSPECIFIED,
.tpu = ci.getSocket(socket_tag.TPU) orelse SocketAddr.UNSPECIFIED,
.tpu_forwards = ci.getSocket(socket_tag.TPU_FORWARDS) orelse SocketAddr.UNSPECIFIED,
.tpu_vote = ci.getSocket(socket_tag.TPU_VOTE) orelse SocketAddr.UNSPECIFIED,
.rpc = ci.getSocket(socket_tag.RPC) orelse SocketAddr.UNSPECIFIED,
.rpc_pubsub = ci.getSocket(socket_tag.RPC_PUBSUB) orelse SocketAddr.UNSPECIFIED,
.serve_repair = ci.getSocket(socket_tag.SERVE_REPAIR) orelse SocketAddr.UNSPECIFIED,
.gossip = ci.getSocket(.gossip) orelse SocketAddr.UNSPECIFIED,
.turbine_recv = ci.getSocket(.turbine_recv) orelse SocketAddr.UNSPECIFIED,
.turbine_recv_quic = ci.getSocket(.turbine_recv_quic) orelse SocketAddr.UNSPECIFIED,
.repair = ci.getSocket(.repair) orelse SocketAddr.UNSPECIFIED,
.tpu = ci.getSocket(.tpu) orelse SocketAddr.UNSPECIFIED,
.tpu_forwards = ci.getSocket(.tpu_forwards) orelse SocketAddr.UNSPECIFIED,
.tpu_vote = ci.getSocket(.tpu_vote) orelse SocketAddr.UNSPECIFIED,
.rpc = ci.getSocket(.rpc) orelse SocketAddr.UNSPECIFIED,
.rpc_pubsub = ci.getSocket(.rpc_pubsub) orelse SocketAddr.UNSPECIFIED,
.serve_repair = ci.getSocket(.serve_repair) orelse SocketAddr.UNSPECIFIED,
.wallclock = ci.wallclock,
.shred_version = ci.shred_version,
};
Expand Down Expand Up @@ -977,23 +977,26 @@ pub const SnapshotHashes = struct {
}
};

pub const socket_tag = struct {
pub const GOSSIP: u8 = 0;
pub const REPAIR: u8 = 1;
pub const RPC: u8 = 2;
pub const RPC_PUBSUB: u8 = 3;
pub const SERVE_REPAIR: u8 = 4;
pub const TPU: u8 = 5;
pub const TPU_FORWARDS: u8 = 6;
pub const TPU_FORWARDS_QUIC: u8 = 7;
pub const TPU_QUIC: u8 = 8;
pub const TPU_VOTE: u8 = 9;
pub const SocketTag = enum(u8) {
gossip = 0,
repair = 1,
rpc = 2,
rpc_pubsub = 3,
serve_repair = 4,
tpu = 5,
tpu_forwards = 6,
tpu_forwards_quic = 7,
tpu_quic = 8,
tpu_vote = 9,
/// Analogous to [SOCKET_TAG_TVU](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L36)
pub const TURBINE_RECV: u8 = 10;
turbine_recv = 10,
/// Analogous to [SOCKET_TAG_TVU_QUIC](https://github.com/anza-xyz/agave/blob/0d34a1a160129c4293dac248e14231e9e773b4ce/gossip/src/contact_info.rs#L37)
pub const TURBINE_RECV_QUIC: u8 = 11;
turbine_recv_quic = 11,
_,

pub const BincodeSize = u8;
};
pub const SOCKET_CACHE_SIZE: usize = socket_tag.TURBINE_RECV_QUIC + 1;
pub const SOCKET_CACHE_SIZE: usize = @intFromEnum(SocketTag.turbine_recv_quic) + 1;

pub const ContactInfo = struct {
pubkey: Pubkey,
Expand Down Expand Up @@ -1026,7 +1029,7 @@ pub const ContactInfo = struct {

pub fn initSpy(allocator: std.mem.Allocator, id: Pubkey, gossip_socket_addr: SocketAddr, shred_version: u16) !Self {
var contact_info = Self.init(allocator, id, @intCast(std.time.microTimestamp()), shred_version);
try contact_info.setSocket(socket_tag.GOSSIP, gossip_socket_addr);
try contact_info.setSocket(.gossip, gossip_socket_addr);
return contact_info;
}

Expand Down Expand Up @@ -1065,7 +1068,7 @@ pub const ContactInfo = struct {
}

for (0..6) |_| {
sockets.append(.{ .key = 10, .index = 20, .offset = 30 }) catch unreachable;
sockets.append(.{ .key = .turbine_recv, .index = 20, .offset = 30 }) catch unreachable;
}

return ContactInfo{
Expand All @@ -1084,26 +1087,30 @@ pub const ContactInfo = struct {
try sanitizeWallclock(self.wallclock);
}

pub fn getSocket(self: *const Self, key: u8) ?SocketAddr {
if (self.cache[key].eql(&SocketAddr.UNSPECIFIED)) {
pub fn getSocket(self: *const Self, key: SocketTag) ?SocketAddr {
const socket = &self.cache[@intFromEnum(key)];
if (socket.eql(&SocketAddr.UNSPECIFIED)) {
return null;
}
return self.cache[key];
return socket.*;
}

pub fn setSocket(self: *Self, key: u8, socket_addr: SocketAddr) !void {
pub fn setSocket(self: *Self, key: SocketTag, socket_addr: SocketAddr) !void {
self.removeSocket(key);

var offset = socket_addr.port();
var index: ?usize = null;
for (self.sockets.items, 0..) |socket, idx| {
offset = std.math.sub(u16, offset, socket.offset) catch {
index = idx;
break;
};
}
const offset: u16, const index: ?usize = blk: {
var offset = socket_addr.port();
const index = for (self.sockets.items, 0..) |socket, idx| {
offset = std.math.sub(u16, offset, socket.offset) catch break idx;
} else null;
break :blk .{ offset, index };
};

const entry = SocketEntry.init(key, try self.pushAddr(socket_addr.ip()), offset);
const entry: SocketEntry = .{
.key = key,
.index = try self.pushAddr(socket_addr.ip()),
.offset = offset,
};

if (index) |i| {
self.sockets.items[i].offset -= entry.offset;
Expand All @@ -1112,18 +1119,14 @@ pub const ContactInfo = struct {
try self.sockets.append(entry);
}

self.cache[key] = socket_addr;
self.cache[@intFromEnum(key)] = socket_addr;
}

pub fn removeSocket(self: *Self, key: u8) void {
pub fn removeSocket(self: *Self, key: SocketTag) void {
// find existing socket index
var existing_socket_index: ?usize = null;
for (self.sockets.items, 0..) |socket, idx| {
if (socket.key == key) {
existing_socket_index = idx;
break;
}
}
const existing_socket_index = for (self.sockets.items, 0..) |socket, idx| {
if (socket.key == key) break idx;
} else null;
// if found, remove it, it's associated IpAddr, set cache[key] to unspecified
if (existing_socket_index) |index| {
// first we remove this existing socket
Expand All @@ -1134,7 +1137,7 @@ pub const ContactInfo = struct {
next_entry.offset += removed_entry.offset;
}
self.removeAddrIfUnused(removed_entry.index);
self.cache[key] = SocketAddr.unspecified();
self.cache[@intFromEnum(key)] = SocketAddr.unspecified();
}
}

Expand Down Expand Up @@ -1218,21 +1221,16 @@ const Sockets = struct {
};

pub const SocketEntry = struct {
key: u8, // GossipMessageidentifier, e.g. turbine_recv, tpu, etc
index: u8, // IpAddr index in the accompanying addrs vector.
offset: u16, // Port offset with respect to the previous entry.

pub const @"!bincode-config:offset" = var_int_config_u16;
/// GossipMessageIdentifier, e.g. turbine_recv, tpu, etc
key: SocketTag,
/// IpAddr index in the accompanying addrs vector.
index: u8,
/// Port offset with respect to the previous entry.
offset: u16,

const Self = @This();

pub fn init(key: u8, index: u8, offset: u16) Self {
return Self{
.key = key,
.index = index,
.offset = offset,
};
}
pub const @"!bincode-config:offset" = var_int_config_u16;

pub fn eql(self: *const Self, other: *const Self) bool {
return self.key == other.key and
Expand Down Expand Up @@ -1322,12 +1320,12 @@ test "gossip.data: set & get socket on contact info" {

var ci = ContactInfo.init(testing.allocator, Pubkey.random(rng), @as(u64, @intCast(std.time.microTimestamp())), 0);
defer ci.deinit();
try ci.setSocket(socket_tag.RPC, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899));
try ci.setSocket(.rpc, SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899));

var set_socket = ci.getSocket(socket_tag.RPC);
var set_socket = ci.getSocket(.rpc);
try testing.expect(set_socket.?.eql(&SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899)));
try testing.expect(ci.addrs.items[0].eql(&IpAddr.newIpv4(127, 0, 0, 1)));
try testing.expect(ci.sockets.items[0].eql(&SocketEntry.init(socket_tag.RPC, 0, 8899)));
try testing.expect(ci.sockets.items[0].eql(&.{ .key = .rpc, .index = 0, .offset = 8899 }));
}

test "gossip.data: contact info bincode serialize matches rust bincode" {
Expand Down Expand Up @@ -1388,7 +1386,8 @@ test "gossip.data: ContactInfo bincode roundtrip maintains data integrity" {
test "gossip.data: SocketEntry serializer works" {
testing.log_level = .debug;

const se = SocketEntry.init(3, 3, 30304);
comptime std.debug.assert(@intFromEnum(SocketTag.rpc_pubsub) == 3);
const se: SocketEntry = .{ .key = .rpc_pubsub, .index = 3, .offset = 30304 };

var buf = std.ArrayList(u8).init(testing.allocator);
defer buf.deinit();
Expand Down
3 changes: 1 addition & 2 deletions src/gossip/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const _gossip_data = @import("data.zig");
const LegacyContactInfo = _gossip_data.LegacyContactInfo;
const SignedGossipData = _gossip_data.SignedGossipData;
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const AtomicBool = std.atomic.Value(bool);

const SocketAddr = @import("../net/net.zig").SocketAddr;
Expand Down Expand Up @@ -289,7 +288,7 @@ pub fn run() !void {

const fuzz_pubkey = Pubkey.fromPublicKey(&fuzz_keypair.public_key);
var fuzz_contact_info = ContactInfo.init(allocator, fuzz_pubkey, 0, 19);
try fuzz_contact_info.setSocket(socket_tag.GOSSIP, fuzz_address);
try fuzz_contact_info.setSocket(.gossip, fuzz_address);

var fuzz_exit = AtomicBool.init(false);
var gossip_service_fuzzer = try GossipService.init(
Expand Down
2 changes: 1 addition & 1 deletion src/gossip/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ pub const Ping = ping_pong.Ping;
pub const Pong = ping_pong.Pong;

pub const getWallclockMs = data.getWallclockMs;
pub const socket_tag = data.socket_tag;
pub const SocketTag = data.SocketTag;
3 changes: 1 addition & 2 deletions src/gossip/ping_pong.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const Hash = @import("../core/hash.zig").Hash;
const Signature = @import("../core/signature.zig").Signature;
const _gossip_data = @import("data.zig");
const ContactInfo = _gossip_data.ContactInfo;
const socket_tag = _gossip_data.socket_tag;
const getWallclockMs = _gossip_data.getWallclockMs;

const DefaultPrng = std.rand.DefaultPrng;
Expand Down Expand Up @@ -222,7 +221,7 @@ pub const PingCache = struct {
var pings = std.ArrayList(PingAndSocketAddr).init(allocator);

for (peers, 0..) |*peer, i| {
if (peer.getSocket(socket_tag.GOSSIP)) |gossip_addr| {
if (peer.getSocket(.gossip)) |gossip_addr| {
const result = self.check(now, PubkeyAndSocketAddr{ .pubkey = peer.pubkey, .socket_addr = gossip_addr }, &our_keypair);
if (result.passes_ping_check) {
try valid_peers.append(i);
Expand Down
Loading

0 comments on commit 263e620

Please sign in to comment.