Skip to content

Commit

Permalink
Merge pull request #39 from Syndica/19/gossip_optim_v2
Browse files Browse the repository at this point in the history
gossip: optimize logic
  • Loading branch information
ultd authored Dec 15, 2023
2 parents 4f4af09 + 1c3c02a commit 949eb15
Show file tree
Hide file tree
Showing 21 changed files with 3,042 additions and 928 deletions.
5 changes: 4 additions & 1 deletion build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ pub fn build(b: *std.Build) void {
fuzz_exe.addModule("getty", getty_mod);
b.installArtifact(fuzz_exe);
const fuzz_cmd = b.addRunArtifact(fuzz_exe);
b.step("fuzz_gossip", "fuzz gossip").dependOn(&fuzz_cmd.step);
if (b.args) |args| {
fuzz_cmd.addArgs(args);
}
b.step("fuzz", "fuzz gossip").dependOn(&fuzz_cmd.step);

// benchmarking
const benchmark_exe = b.addExecutable(.{
Expand Down
9 changes: 6 additions & 3 deletions src/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn main() !void {

// TODO: very manual for now (bc we only have 2 benchmarks)
// if we have more benchmarks we can make this more efficient
const max_time_per_bench = 500; // !!
const max_time_per_bench = 2 * std.time.ms_per_s; // !!

if (std.mem.startsWith(u8, "socket_utils", filter)) {
try benchmark(
Expand All @@ -41,7 +41,7 @@ pub fn main() !void {

if (std.mem.startsWith(u8, "gossip", filter)) {
try benchmark(
@import("gossip/gossip_service.zig").BenchmarkMessageProcessing,
@import("gossip/gossip_service.zig").BenchmarkGossipServiceGeneral,
max_time_per_bench,
TimeUnits.milliseconds,
);
Expand Down Expand Up @@ -155,7 +155,10 @@ pub fn benchmark(
try stderr.context.flush();

var timer = try time.Timer.start();
inline for (functions) |def| {
inline for (functions, 0..) |def, fcni| {
if (fcni > 0)
std.debug.print("---\n", .{});

inline for (args, 0..) |arg, index| {
var runtimes: [max_iterations]u64 = undefined;
var min: u64 = math.maxInt(u64);
Expand Down
20 changes: 11 additions & 9 deletions src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ pub fn Deserializer(comptime Reader: type) type {
}

pub fn deserializeEnum(self: *Self, ally: ?std.mem.Allocator, visitor: anytype) Error!@TypeOf(visitor).Value {
const T = u32; // enum size
const T = @TypeOf(visitor).Value;
comptime var SerializedSize = u32;
comptime if (@hasDecl(T, "BincodeSize")) {
SerializedSize = T.BincodeSize;
};
const tag = switch (self.params.endian) {
.Little => self.reader.readIntLittle(T),
.Big => self.reader.readIntBig(T),
.Little => self.reader.readIntLittle(SerializedSize),
.Big => self.reader.readIntBig(SerializedSize),
} catch {
return Error.IO;
};
Expand Down Expand Up @@ -215,9 +219,8 @@ pub fn Deserializer(comptime Reader: type) type {
},
.Struct => |*info| {
inline for (info.fields) |field| {
if (get_field_config(T, field)) |config| {
if (getFieldConfig(T, field)) |config| {
if (config.free) |free_fcn| {
// std.debug.print("found free fcn...\n", .{});
var field_value = @field(value, field.name);
switch (@typeInfo(field.type)) {
.Pointer => |*field_info| {
Expand Down Expand Up @@ -278,7 +281,7 @@ pub fn Deserializer(comptime Reader: type) type {

inline for (info.fields) |field| {
if (!field.is_comptime) {
if (get_field_config(T, field)) |config| {
if (getFieldConfig(T, field)) |config| {
if (shouldUseDefaultValue(field, config)) |default_val| {
@field(data, field.name) = @as(*const field.type, @ptrCast(@alignCast(default_val))).*;
continue;
Expand Down Expand Up @@ -557,7 +560,7 @@ pub fn Serializer(

inline for (info.fields) |field| {
if (!field.is_comptime) {
if (get_field_config(T, field)) |config| {
if (getFieldConfig(T, field)) |config| {
if (config.skip) {
continue;
}
Expand Down Expand Up @@ -655,7 +658,7 @@ pub fn FieldConfig(comptime T: type) type {
};
}

pub fn get_field_config(comptime struct_type: type, comptime field: std.builtin.Type.StructField) ?FieldConfig(field.type) {
pub fn getFieldConfig(comptime struct_type: type, comptime field: std.builtin.Type.StructField) ?FieldConfig(field.type) {
const bincode_field = "!bincode-config:" ++ field.name;
if (@hasDecl(struct_type, bincode_field)) {
const config = @field(struct_type, bincode_field);
Expand Down Expand Up @@ -726,7 +729,6 @@ pub fn readFromSlice(alloc: ?std.mem.Allocator, comptime T: type, slice: []const
var d = deserializer(reader, params);
const dd = d.deserializer();
const v = try getty.deserialize(alloc, T, dd);
errdefer getty.de.free(alloc, @TypeOf(dd), v); // !

return v;
}
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ fn gossip(_: []const []const u8) !void {
defer logger.deinit();
logger.spawn();

// var logger: Logger = .noop;

var my_keypair = try getOrInitIdentity(gpa_allocator, logger);

var gossip_port: u16 = @intCast(gossip_port_option.value.int.?);
Expand All @@ -83,7 +85,7 @@ fn gossip(_: []const []const u8) !void {
// setup contact info
var my_pubkey = Pubkey.fromPublicKey(&my_keypair.public_key, false);
var contact_info = LegacyContactInfo.default(my_pubkey);
contact_info.shred_version = 0; // TODO: double check
contact_info.shred_version = 0;
contact_info.gossip = gossip_address;

var entrypoints = std.ArrayList(SocketAddr).init(gpa_allocator);
Expand Down Expand Up @@ -113,7 +115,7 @@ fn gossip(_: []const []const u8) !void {
var handle = try std.Thread.spawn(
.{},
GossipService.run,
.{&gossip_service},
.{ &gossip_service, true },
);

handle.join();
Expand Down
83 changes: 53 additions & 30 deletions src/common/lru.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@ const Allocator = std.mem.Allocator;
const TailQueue = std.TailQueue;
const testing = std.testing;
const assert = std.debug.assert;
const Mutex = std.Thread.Mutex;

pub const Kind = enum {
locking,
non_locking,
};

/// A thread-safe LRU Cache
///
// TODO: allow for passing custom hash context to use in std.ArrayHashMap for performance.
pub fn LruCache(comptime K: type, comptime V: type) type {
pub fn LruCache(comptime kind: Kind, comptime K: type, comptime V: type) type {
return struct {
mux: if (kind == .locking) Mutex else void,
allocator: Allocator,
hashmap: if (K == []const u8) std.StringArrayHashMap(*Node) else std.AutoArrayHashMap(K, *Node),
dbl_link_list: TailQueue(LruEntry),
max_items: usize,
len: usize = 0,
mux: std.Thread.Mutex,

const Self = @This();

Expand Down Expand Up @@ -53,7 +57,7 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
.hashmap = hashmap,
.dbl_link_list = TailQueue(LruEntry){},
.max_items = max_items,
.mux = std.Thread.Mutex{},
.mux = if (kind == .locking) Mutex{} else undefined,
};

// pre allocate enough capacity for max items since we will use
Expand All @@ -64,12 +68,10 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
}

pub fn deinit(self: *Self) void {
self.mux.lock();
defer self.mux.unlock();

while (self.dbl_link_list.pop()) |node| {
self.deinitNode(node);
}
std.debug.assert(self.len == 0); // no leaks
self.hashmap.deinit();
}

Expand Down Expand Up @@ -117,8 +119,10 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
/// Inserts key/value if key doesn't exist, updates only value if it does.
/// In any case, it will affect cache ordering.
pub fn insert(self: *Self, key: K, value: V) error{OutOfMemory}!void {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

_ = self.internal_insert(key, value);
return;
Expand All @@ -127,16 +131,20 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
/// Whether or not contains key.
/// NOTE: doesn't affect cache ordering.
pub fn contains(self: *Self, key: K) bool {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

return self.hashmap.contains(key);
}

/// Most recently used entry
pub fn mru(self: *Self) ?LruEntry {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.dbl_link_list.last) |node| {
return node.data;
Expand All @@ -146,8 +154,10 @@ pub fn LruCache(comptime K: type, comptime V: type) type {

/// Least recently used entry
pub fn lru(self: *Self) ?LruEntry {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.dbl_link_list.first) |node| {
return node.data;
Expand All @@ -163,8 +173,10 @@ pub fn LruCache(comptime K: type, comptime V: type) type {

/// Gets value associated with key if exists
pub fn get(self: *Self, key: K) ?V {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.hashmap.get(key)) |node| {
self.dbl_link_list.remove(node);
Expand All @@ -175,18 +187,25 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
}

pub fn pop(self: *Self, k: K) ?V {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.hashmap.fetchSwapRemove(k)) |kv| {
self.dbl_link_list.remove(kv.value);
return kv.value.data.value;
var node = kv.value;
self.dbl_link_list.remove(node);
defer self.deinitNode(node);
return node.data.value;
}
return null;
}

pub fn peek(self: *Self, key: K) ?V {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.hashmap.get(key)) |node| {
return node.data.value;
Expand All @@ -198,8 +217,10 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
/// Puts a key-value pair into cache. If the key already exists in the cache, then it updates
/// the key's value and returns the old value. Otherwise, `null` is returned.
pub fn put(self: *Self, key: K, value: V) ?V {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.hashmap.getEntry(key)) |existing_entry| {
var existing_node: *Node = existing_entry.value_ptr.*;
Expand All @@ -215,8 +236,10 @@ pub fn LruCache(comptime K: type, comptime V: type) type {

/// Removes key from cache. Returns true if found, false if not.
pub fn remove(self: *Self, key: K) bool {
self.mux.lock();
defer self.mux.unlock();
if (kind == .locking) {
self.mux.lock();
defer self.mux.unlock();
}

if (self.hashmap.fetchSwapRemove(key)) |kv| {
var node = kv.value;
Expand All @@ -230,7 +253,7 @@ pub fn LruCache(comptime K: type, comptime V: type) type {
}

test "common.lru: LruCache state is correct" {
var cache = try LruCache(u64, []const u8).init(testing.allocator, 4);
var cache = try LruCache(.locking, u64, []const u8).init(testing.allocator, 4);
defer cache.deinit();

try cache.insert(1, "one");
Expand Down Expand Up @@ -262,7 +285,7 @@ test "common.lru: LruCache state is correct" {
}

test "common.lru: put works as expected" {
var cache = try LruCache([]const u8, usize).init(testing.allocator, 4);
var cache = try LruCache(.non_locking, []const u8, usize).init(testing.allocator, 4);
defer cache.deinit();

try cache.insert("a", 1);
Expand Down
Loading

0 comments on commit 949eb15

Please sign in to comment.