Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gossip): crds table contents are too small #69

Merged
merged 18 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Level = @import("../trace/level.zig").Level;
const io = std.io;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const SocketAddr = @import("../net/net.zig").SocketAddr;
const echo = @import("../net/echo.zig");
const GossipService = @import("../gossip/gossip_service.zig").GossipService;
const servePrometheus = @import("../prometheus/http.zig").servePrometheus;
const globalRegistry = @import("../prometheus/registry.zig").globalRegistry;
Expand Down Expand Up @@ -115,12 +116,11 @@ fn gossip(_: []const []const u8) !void {

var gossip_port: u16 = @intCast(gossip_port_option.value.int.?);
var gossip_address = SocketAddr.initIpv4(.{ 0, 0, 0, 0 }, gossip_port);
logger.infof("gossip port: {d}\n", .{gossip_port});
logger.infof("gossip port: {d}", .{gossip_port});

// setup contact info
var my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key, false);
var contact_info = LegacyContactInfo.default(my_pubkey);
contact_info.shred_version = 0;
contact_info.gossip = gossip_address;

var entrypoints = std.ArrayList(SocketAddr).init(gpa_allocator);
Expand All @@ -134,7 +134,35 @@ fn gossip(_: []const []const u8) !void {
try entrypoints.append(value);
}
}
std.debug.print("entrypoints: {any}\n", .{entrypoints.items});

// log entrypoints
var entrypoint_string = try gpa_allocator.alloc(u8, 53 * entrypoints.items.len);
defer gpa_allocator.free(entrypoint_string);
var stream = std.io.fixedBufferStream(entrypoint_string);
var writer = stream.writer();
for (0.., entrypoints.items) |i, entrypoint| {
try entrypoint.toAddress().format("", .{}, writer);
if (i != entrypoints.items.len - 1) try writer.writeAll(", ");
}
logger.infof("entrypoints: {s}", .{entrypoint_string[0..stream.pos]});

// determine our shred version. in the solana-labs client, this approach is only
// used for validation. normally, shred version comes from the snapshot.
contact_info.shred_version = loop: for (entrypoints.items) |entrypoint| {
if (echo.requestIpEcho(gpa_allocator, entrypoint.toAddress(), .{})) |response| {
if (response.shred_version) |shred_version| {
var addr_str = entrypoint.toString();
logger.infof(
"shred version: {} - from entrypoint ip echo: {s}",
.{ shred_version.value, addr_str[0][0..addr_str[1]] },
);
break shred_version.value;
}
} else |_| {}
} else {
logger.warn("could not get a shred version from an entrypoint");
break :loop 0;
};

var exit = std.atomic.Atomic(bool).init(false);
var gossip_service = try GossipService.init(
Expand Down Expand Up @@ -163,7 +191,7 @@ fn gossip(_: []const []const u8) !void {
/// Uses same allocator for both registry and http adapter.
fn spawnMetrics(allocator: std.mem.Allocator, logger: Logger) !std.Thread {
var metrics_port: u16 = @intCast(metrics_port_option.value.int.?);
logger.infof("metrics port: {d}\n", .{metrics_port});
logger.infof("metrics port: {d}", .{metrics_port});
const registry = globalRegistry();
return try std.Thread.spawn(.{}, servePrometheus, .{ allocator, registry, metrics_port });
}
Expand Down
31 changes: 30 additions & 1 deletion src/gossip/crds_table.zig
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const Transaction = @import("../core/transaction.zig").Transaction;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const KeyPair = std.crypto.sign.Ed25519.KeyPair;
const RwLock = std.Thread.RwLock;
const SocketAddr = @import("../net/net.zig").SocketAddr;

const PACKET_DATA_SIZE = @import("./packet.zig").PACKET_DATA_SIZE;

Expand Down Expand Up @@ -432,7 +433,20 @@ pub const CrdsTable = struct {
return false;
}

// ** triming values in the crdstable **
/// ** triming values in the crdstable **
///
/// This frees the memory for any pointers in the CrdsData.
/// Be sure that this CrdsData is not being used anywhere else when calling this.
///
/// This method is not safe because neither CrdsTable nor CrdsValue
/// provide any guarantees that the CrdsValue being removed is not
/// also being accessed somewhere else in the code after this is called.
/// Since this frees the CrdsValue, any accesses of the CrdsValue
/// after this function is called will result in a segfault.
///
/// TODO: implement a safer approach to avoid dangling pointers, such as:
/// - removal buffer that is populated here and freed later
/// - reference counting for all crds values
pub fn remove(self: *Self, label: CrdsValueLabel) error{ LabelNotFound, OutOfMemory }!void {
const now = crds.getWallclockMs();

Expand Down Expand Up @@ -536,6 +550,7 @@ pub const CrdsTable = struct {
std.debug.assert(did_remove);
new_entry_indexs.put(entry_index, {}) catch unreachable;
}
bincode.free(self.allocator, versioned_value.value.data);
}

pub fn attemptTrim(self: *Self, max_pubkey_capacity: usize) error{OutOfMemory}!void {
Expand Down Expand Up @@ -673,6 +688,20 @@ pub const CrdsTable = struct {

return output;
}

pub fn getContactInfoByGossipAddr(
self: *const Self,
gossip_addr: SocketAddr,
) ?LegacyContactInfo {
var contact_indexs = self.contact_infos.keys();
for (contact_indexs) |index| {
const entry: CrdsVersionedValue = self.store.values()[index];
if (entry.value.data.LegacyContactInfo.gossip.eql(&gossip_addr)) {
return entry.value.data.LegacyContactInfo;
}
}
return null;
}
};

pub const HashTimeQueue = struct {
Expand Down
Loading
Loading