Skip to content

Commit

Permalink
incorp comments
Browse files Browse the repository at this point in the history
  • Loading branch information
0xNineteen committed Dec 13, 2023
1 parent bdf7eba commit 78ca244
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 27 deletions.
10 changes: 5 additions & 5 deletions src/gossip/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const socket_utils = @import("./socket_utils.zig");
const _gossip_service = @import("./gossip_service.zig");
const GossipService = _gossip_service.GossipService;
const ChunkType = _gossip_service.ChunkType;
const crds_values_to_packets = _gossip_service.crdsValuesToPackets;
const crdsValuesToPackets = _gossip_service.crdsValuesToPackets;
const MAX_PUSH_MESSAGE_PAYLOAD_SIZE = _gossip_service.MAX_PUSH_MESSAGE_PAYLOAD_SIZE;

const Logger = @import("../trace/log.zig").Logger;
Expand Down Expand Up @@ -119,14 +119,14 @@ pub fn randomPushMessage(rng: std.rand.Random, keypair: *const KeyPair, to_addr:
}

const allocator = std.heap.page_allocator;
const packets = try crds_values_to_packets(
const packets = try crdsValuesToPackets(
allocator,
&Pubkey.fromPublicKey(&keypair.public_key, false),
&crds_values,
&to_addr,
ChunkType.PushMessage,
);
return packets.?;
return packets;
}

pub fn randomPullResponse(rng: std.rand.Random, keypair: *const KeyPair, to_addr: EndPoint) !std.ArrayList(Packet) {
Expand All @@ -139,14 +139,14 @@ pub fn randomPullResponse(rng: std.rand.Random, keypair: *const KeyPair, to_addr
}

const allocator = std.heap.page_allocator;
const packets = try crds_values_to_packets(
const packets = try crdsValuesToPackets(
allocator,
&Pubkey.fromPublicKey(&keypair.public_key, false),
&crds_values,
&to_addr,
ChunkType.PullResponse,
);
return packets.?;
return packets;
}

pub fn randomPullRequest(allocator: std.mem.Allocator, rng: std.rand.Random, keypair: *const KeyPair, to_addr: EndPoint) !Packet {
Expand Down
45 changes: 23 additions & 22 deletions src/gossip/gossip_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub const GossipService = struct {
}

var thread_pool = try allocator.create(ThreadPool);
var n_threads = @min(@as(u32, @truncate(std.Thread.getCpuCount() catch 0)), 8);
var n_threads = @min(@as(u32, @truncate(std.Thread.getCpuCount() catch 1)), 8);
thread_pool.* = ThreadPool.init(.{
.max_threads = n_threads,
.stack_size = 2 * 1024 * 1024,
Expand Down Expand Up @@ -736,8 +736,8 @@ pub const GossipService = struct {
defer valid_gossip_indexs.deinit();

var valid_gossip_peers: [NUM_ACTIVE_SET_ENTRIES]LegacyContactInfo = undefined;
for (valid_gossip_indexs.items) |i| {
valid_gossip_peers[i] = gossip_peers[i];
for (0.., valid_gossip_indexs.items) |i, valid_gossip_index| {
valid_gossip_peers[i] = gossip_peers[valid_gossip_index];
}

// send pings to peers
Expand All @@ -754,7 +754,7 @@ pub const GossipService = struct {

/// logic for building new push messages which are sent to peers from the
/// active set and serialized into packets.
fn buildPushMessages(self: *Self, push_cursor: *u64) !?ArrayList(ArrayList(Packet)) {
fn buildPushMessages(self: *Self, push_cursor: *u64) !ArrayList(ArrayList(Packet)) {
// TODO: find a better static value?
var buf: [512]crds.CrdsVersionedValue = undefined;

Expand All @@ -766,8 +766,11 @@ pub const GossipService = struct {
break :blk crds_table.getEntriesWithCursor(&buf, push_cursor);
};

var packet_batch = ArrayList(ArrayList(Packet)).init(self.allocator);
errdefer packet_batch.deinit();

if (crds_entries.len == 0) {
return null;
return packet_batch;
}

const now = getWallclockMs();
Expand All @@ -791,7 +794,7 @@ pub const GossipService = struct {
var active_set: *const ActiveSet = active_set_lock.get();
defer active_set_lock.unlock();

if (active_set.len() == 0) return null;
if (active_set.len() == 0) return packet_batch;

for (crds_entries) |entry| {
const value = entry.value;
Expand Down Expand Up @@ -841,24 +844,21 @@ pub const GossipService = struct {
const num_values_not_considered = crds_entries.len - num_values_considered;
push_cursor.* -= num_values_not_considered;

var packet_batch = ArrayList(ArrayList(Packet)).init(self.allocator);
errdefer packet_batch.deinit();

var push_iter = push_messages.iterator();
while (push_iter.next()) |push_entry| {
const crds_values: *const ArrayList(CrdsValue) = push_entry.value_ptr;
const to_endpoint: *const EndPoint = push_entry.key_ptr;

// send the values as a pull response
var maybe_endpoint_packets = try crdsValuesToPackets(
var packets = try crdsValuesToPackets(
self.allocator,
&self.my_pubkey,
crds_values.items,
to_endpoint,
ChunkType.PushMessage,
);
if (maybe_endpoint_packets) |endpoint_packets| {
try packet_batch.append(endpoint_packets);
if (packets.items.len > 0) {
try packet_batch.append(packets);
}
}
return packet_batch;
Expand Down Expand Up @@ -1041,19 +1041,19 @@ pub const GossipService = struct {
std.atomic.Ordering.Release,
);

const maybe_packets = crdsValuesToPackets(
const packets = crdsValuesToPackets(
self.allocator,
self.my_pubkey,
response_crds_values.items,
self.from_endpoint,
ChunkType.PullResponse,
) catch {
return;
};
) catch return;

if (maybe_packets) |*packets| {
if (packets.items.len > 0) {
defer packets.deinit();
self.output.appendSlice(packets.items) catch unreachable;
self.output.appendSlice(packets.items) catch {
std.debug.panic("thread task: failed to append packets", .{});
};
}
}
};
Expand Down Expand Up @@ -1686,8 +1686,9 @@ pub fn crdsValuesToPackets(
crds_values: []CrdsValue,
to_endpoint: *const EndPoint,
chunk_type: ChunkType,
) error{ OutOfMemory, SerializationError }!?ArrayList(Packet) {
if (crds_values.len == 0) return null;
) error{ OutOfMemory, SerializationError }!ArrayList(Packet) {
if (crds_values.len == 0)
return ArrayList(Packet).init(allocator);

const indexs = try chunkValuesIntoPacketIndexs(
allocator,
Expand Down Expand Up @@ -2230,15 +2231,15 @@ test "gossip.gossip_service: test build_push_messages" {
clg.unlock();

var cursor: u64 = 0;
var msgs = (try gossip_service.buildPushMessages(&cursor)).?;
var msgs = try gossip_service.buildPushMessages(&cursor);
try std.testing.expectEqual(cursor, 11);
try std.testing.expect(msgs.items.len > 0);
for (msgs.items) |*msg| msg.deinit();
msgs.deinit();

var msgs2 = try gossip_service.buildPushMessages(&cursor);
try std.testing.expectEqual(cursor, 11);
try std.testing.expect(msgs2 == null);
try std.testing.expect(msgs2.items.len == 0);
}

test "gossip.gossip_service: test packet verification" {
Expand Down

0 comments on commit 78ca244

Please sign in to comment.