Skip to content

Commit

Permalink
Use self: Self convention for GossipService
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption committed Jun 3, 2024
1 parent 570bf3e commit eba9783
Showing 1 changed file with 51 additions and 53 deletions.
104 changes: 51 additions & 53 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub const GossipService = struct {

stats: GossipStats,

const Self = @This();

const Entrypoint = struct { addr: SocketAddr, info: ?ContactInfo = null };

pub fn init(
Expand All @@ -136,7 +138,7 @@ pub const GossipService = struct {
entrypoints: ?[]const SocketAddr,
exit: *AtomicBool,
logger: Logger,
) !GossipService {
) !Self {
var packet_incoming_channel = Channel(PacketBatch).init(allocator, 10000);
errdefer packet_incoming_channel.deinit();

Expand Down Expand Up @@ -224,7 +226,7 @@ pub const GossipService = struct {
lg.unlock();
}

pub fn deinit(self: *GossipService) void {
pub fn deinit(self: *Self) void {
self.my_contact_info.deinit();
self.echo_server.deinit();
self.gossip_socket.close();
Expand Down Expand Up @@ -304,7 +306,7 @@ pub const GossipService = struct {
/// 5) a socket responder (to send outgoing packets)
/// 6) echo server
pub fn runThreads(
service: *GossipService,
self: *Self,
params: RunThreadsParams,
) std.Thread.SpawnError!RunHandles {
const message_allocator = params.message_allocator;
Expand All @@ -324,45 +326,45 @@ pub const GossipService = struct {
}.exitAndJoin;

const receiver_thread = try Thread.spawn(.{}, socket_utils.readSocket, .{
service.allocator,
&service.gossip_socket,
service.packet_incoming_channel,
service.exit,
service.logger,
self.allocator,
&self.gossip_socket,
self.packet_incoming_channel,
self.exit,
self.logger,
});
errdefer exitAndJoin(service.exit, receiver_thread);
errdefer exitAndJoin(self.exit, receiver_thread);

const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{ service, message_allocator });
errdefer exitAndJoin(service.exit, packet_verifier_thread);
const packet_verifier_thread = try Thread.spawn(.{}, verifyPackets, .{ self, message_allocator });
errdefer exitAndJoin(self.exit, packet_verifier_thread);

const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ service, message_allocator });
errdefer exitAndJoin(service.exit, message_processor_thread);
const message_processor_thread = try Thread.spawn(.{}, processMessages, .{ self, message_allocator });
errdefer exitAndJoin(self.exit, message_processor_thread);

const maybe_message_builder_thread: ?std.Thread = if (!spy_node) try Thread.spawn(.{}, buildMessages, .{service}) else null;
const maybe_message_builder_thread: ?std.Thread = if (!spy_node) try Thread.spawn(.{}, buildMessages, .{self}) else null;
errdefer if (maybe_message_builder_thread) |thread| {
exitAndJoin(service.exit, thread);
exitAndJoin(self.exit, thread);
};

const responder_thread = try Thread.spawn(.{}, socket_utils.sendSocket, .{
&service.gossip_socket,
service.packet_outgoing_channel,
service.exit,
service.logger,
&self.gossip_socket,
self.packet_outgoing_channel,
self.exit,
self.logger,
});
errdefer exitAndJoin(service.exit, responder_thread);
errdefer exitAndJoin(self.exit, responder_thread);

const maybe_dumper_thread: ?std.Thread = if (dump) try Thread.spawn(.{}, GossipDumpService.run, .{.{
.allocator = service.allocator,
.logger = service.logger,
.gossip_table_rw = &service.gossip_table_rw,
.exit = service.exit,
.allocator = self.allocator,
.logger = self.logger,
.gossip_table_rw = &self.gossip_table_rw,
.exit = self.exit,
}}) else null;
errdefer if (maybe_dumper_thread) |thread| {
exitAndJoin(service.exit, thread);
exitAndJoin(self.exit, thread);
};

return .{
.exit = service.exit,
.exit = self.exit,

.receiver_thread = receiver_thread,
.packet_verifier_thread = packet_verifier_thread,
Expand All @@ -373,8 +375,8 @@ pub const GossipService = struct {
};
}

pub fn run(service: *GossipService, params: RunThreadsParams) !void {
const run_handles = try service.runThreads(params);
pub fn run(self: *Self, params: RunThreadsParams) !void {
const run_handles = try self.runThreads(params);
defer run_handles.joinAndExit();
}

Expand Down Expand Up @@ -427,7 +429,7 @@ pub const GossipService = struct {
/// and verifing they have valid values, and have valid signatures.
/// Verified GossipMessagemessages are then sent to the verified_channel.
fn verifyPackets(
self: *GossipService,
self: *Self,
/// Must be thread-safe. Can be a specific allocator which will
/// only be contended for by the tasks spawned by in function.
task_allocator: std.mem.Allocator,
Expand Down Expand Up @@ -510,7 +512,7 @@ pub const GossipService = struct {
};

/// main logic for recieving and processing gossip messages.
pub fn processMessages(self: *GossipService, message_allocator: std.mem.Allocator) !void {
pub fn processMessages(self: *Self, message_allocator: std.mem.Allocator) !void {
var timer = std.time.Timer.start() catch unreachable;
var last_table_trim_ts: u64 = 0;
var msg_count: usize = 0;
Expand Down Expand Up @@ -802,9 +804,7 @@ pub const GossipService = struct {
/// main gossip loop for periodically sending new GossipMessagemessages.
/// this includes sending push messages, pull requests, and triming old
/// gossip data (in the gossip_table, active_set, and failed_pull_hashes).
fn buildMessages(
self: *GossipService,
) !void {
fn buildMessages(self: *Self) !void {
var last_push_ts: u64 = 0;
var last_stats_publish_ts: u64 = 0;
var last_pull_req_ts: u64 = 0;
Expand Down Expand Up @@ -890,7 +890,7 @@ pub const GossipService = struct {
}

// collect gossip table metrics and pushes them to stats
pub fn collectGossipTableMetrics(self: *GossipService) !void {
pub fn collectGossipTableMetrics(self: *Self) !void {
var gossip_table_lock = self.gossip_table_rw.read();
defer gossip_table_lock.unlock();

Expand All @@ -902,9 +902,7 @@ pub const GossipService = struct {
self.stats.table_n_pubkeys.add(n_pubkeys);
}

pub fn rotateActiveSet(
self: *GossipService,
) !void {
pub fn rotateActiveSet(self: *Self) !void {
const now = getWallclockMs();
var buf: [NUM_ACTIVE_SET_ENTRIES]ContactInfo = undefined;
const gossip_peers = try self.getGossipNodes(&buf, NUM_ACTIVE_SET_ENTRIES, now);
Expand Down Expand Up @@ -940,7 +938,7 @@ pub const GossipService = struct {

/// logic for building new push messages which are sent to peers from the
/// active set and serialized into packets.
fn buildPushMessages(self: *GossipService, push_cursor: *u64) !ArrayList(ArrayList(Packet)) {
fn buildPushMessages(self: *Self, push_cursor: *u64) !ArrayList(ArrayList(Packet)) {
// TODO: find a better static value?
var buf: [512]gossip.GossipVersionedData = undefined;

Expand Down Expand Up @@ -1053,7 +1051,7 @@ pub const GossipService = struct {
/// builds new pull request messages and serializes it into a list of Packets
/// to be sent to a random set of gossip nodes.
fn buildPullRequests(
self: *GossipService,
self: *Self,
/// the bloomsize of the pull request's filters
bloom_size: usize,
) !ArrayList(Packet) {
Expand Down Expand Up @@ -1236,7 +1234,7 @@ pub const GossipService = struct {
};

fn handleBatchPullRequest(
self: *GossipService,
self: *Self,
pull_requests: ArrayList(PullRequestMessage),
) !void {
// update the callers
Expand Down Expand Up @@ -1344,7 +1342,7 @@ pub const GossipService = struct {
}

pub fn handleBatchPongMessages(
self: *GossipService,
self: *Self,
pong_messages: *const ArrayList(PongMessage),
) void {
const now = std.time.Instant.now() catch @panic("time is not supported on the OS!");
Expand All @@ -1363,7 +1361,7 @@ pub const GossipService = struct {
}

pub fn handleBatchPingMessages(
self: *GossipService,
self: *Self,
ping_messages: *const ArrayList(PingMessage),
) !void {
const n_ping_messages = ping_messages.items.len;
Expand Down Expand Up @@ -1403,7 +1401,7 @@ pub const GossipService = struct {
/// failed inserts (ie, too old or duplicate values) are added to the failed pull hashes so that they can be
/// included in the next pull request (so we dont receive them again).
pub fn handleBatchPullResponses(
self: *GossipService,
self: *Self,
pull_response_messages: *const ArrayList(PullResponseMessage),
) !void {
if (pull_response_messages.items.len == 0) {
Expand Down Expand Up @@ -1482,7 +1480,7 @@ pub const GossipService = struct {
/// is not too old, and that the destination pubkey is the local node,
/// then updates the active set to prune the list of origin Pubkeys.
pub fn handleBatchPruneMessages(
self: *GossipService,
self: *Self,
prune_messages: *const ArrayList(*PruneData),
) void {
var active_set_lock = self.active_set_rw.write();
Expand All @@ -1504,7 +1502,7 @@ pub const GossipService = struct {
/// builds a prune message for a list of origin Pubkeys and serializes the values
/// into packets to send to the prune_destination.
fn buildPruneMessage(
self: *GossipService,
self: *Self,
/// origin Pubkeys which will be pruned
failed_origins: *const std.AutoArrayHashMap(Pubkey, void),
/// the pubkey of the node which we will send the prune message to
Expand Down Expand Up @@ -1558,7 +1556,7 @@ pub const GossipService = struct {
}

pub fn handleBatchPushMessages(
self: *GossipService,
self: *Self,
batch_push_messages: *const ArrayList(PushMessage),
) !void {
if (batch_push_messages.items.len == 0) {
Expand Down Expand Up @@ -1723,7 +1721,7 @@ pub const GossipService = struct {
/// gossip table, triming the max number of pubkeys in the gossip table, and removing
/// old labels from the gossip table.
fn trimMemory(
self: *GossipService,
self: *Self,
/// the current time
now: u64,
) error{OutOfMemory}!void {
Expand Down Expand Up @@ -1758,7 +1756,7 @@ pub const GossipService = struct {
/// Returns true if all entrypoints have been identified
///
/// Acquires the gossip table lock regardless of whether the gossip table is used.
fn populateEntrypointsFromGossipTable(self: *GossipService) !bool {
fn populateEntrypointsFromGossipTable(self: *Self) !bool {
var identified_all = true;

var gossip_table_lock = self.gossip_table_rw.read();
Expand All @@ -1776,7 +1774,7 @@ pub const GossipService = struct {

/// if we have no shred version, attempt to get one from an entrypoint.
/// Returns true if the shred version is set to non-zero
fn assignDefaultShredVersionFromEntrypoint(self: *GossipService) bool {
fn assignDefaultShredVersionFromEntrypoint(self: *Self) bool {
if (self.my_shred_version.load(.monotonic) != 0) return true;
for (self.entrypoints.items) |entrypoint| {
if (entrypoint.info) |info| {
Expand All @@ -1798,7 +1796,7 @@ pub const GossipService = struct {
/// drains values from the push queue and inserts them into the gossip table.
/// when inserting values in the gossip table, any errors are ignored.
fn drainPushQueueToGossipTable(
self: *GossipService,
self: *Self,
/// the current time to insert the values with
now: u64,
) void {
Expand All @@ -1817,7 +1815,7 @@ pub const GossipService = struct {

/// serializes a list of ping messages into Packets and sends them out
pub fn sendPings(
self: *GossipService,
self: *Self,
pings: ArrayList(PingAndSocketAddr),
) error{ OutOfMemory, ChannelClosed, SerializationError }!void {
const n_pings = pings.items.len;
Expand Down Expand Up @@ -1845,7 +1843,7 @@ pub const GossipService = struct {
/// nodes that are 1) too old, 2) have a different shred version, or 3) have
/// an invalid gossip address.
pub fn getGossipNodes(
self: *GossipService,
self: *Self,
/// the output slice which will be filled with gossip nodes
nodes: []ContactInfo,
/// the maximum number of nodes to return ( max_size == nodes.len but comptime for init of stack array)
Expand Down Expand Up @@ -1903,7 +1901,7 @@ pub const GossipService = struct {
}

pub fn filterBasedOnShredVersion(
self: *GossipService,
self: *Self,
gossip_table: *const GossipTable,
gossip_values: []SignedGossipData,
from_pubkey: Pubkey,
Expand Down

0 comments on commit eba9783

Please sign in to comment.