Skip to content

Commit

Permalink
Merge pull request #37 from Syndica/ultd/metrics-and-thread-pool
Browse files Browse the repository at this point in the history
initial prometheus metrics
  • Loading branch information
ultd authored Dec 22, 2023
2 parents 949eb15 + 47dce1a commit e8e50dc
Show file tree
Hide file tree
Showing 12 changed files with 1,500 additions and 1 deletion.
14 changes: 13 additions & 1 deletion build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn build(b: *std.Build) void {
const zig_network_module = b.dependency("zig-network", opts).module("network");
const zig_cli_module = b.dependency("zig-cli", opts).module("zig-cli");
const getty_mod = b.dependency("getty", opts).module("getty");
const httpz_mod = b.dependency("httpz", opts).module("httpz");

const lib = b.addStaticLibrary(.{
.name = "sig",
Expand Down Expand Up @@ -53,13 +54,18 @@ pub fn build(b: *std.Build) void {
.name = "getty",
.module = getty_mod,
},
.{
.name = "httpz",
.module = httpz_mod,
},
},
});

lib.addModule("base58-zig", base58_module);
lib.addModule("zig-network", zig_network_module);
lib.addModule("zig-cli", zig_cli_module);
lib.addModule("getty", getty_mod);
lib.addModule("httpz", httpz_mod);

// This declares intent for the library to be installed into the standard
// location when the user invokes the "install" step (the default step when
Expand All @@ -77,6 +83,8 @@ pub fn build(b: *std.Build) void {
tests.addModule("base58-zig", base58_module);
tests.addModule("zig-cli", zig_cli_module);
tests.addModule("getty", getty_mod);
tests.addModule("httpz", httpz_mod);

const run_tests = b.addRunArtifact(tests);
const test_step = b.step("test", "Run library tests");
test_step.dependOn(&lib.step);
Expand All @@ -94,6 +102,7 @@ pub fn build(b: *std.Build) void {
exe.addModule("zig-network", zig_network_module);
exe.addModule("zig-cli", zig_cli_module);
exe.addModule("getty", getty_mod);
exe.addModule("httpz", httpz_mod);

// This declares intent for the executable to be installed into the
// standard location when the user invokes the "install" step (the default
Expand Down Expand Up @@ -137,6 +146,8 @@ pub fn build(b: *std.Build) void {
fuzz_exe.addModule("zig-network", zig_network_module);
fuzz_exe.addModule("zig-cli", zig_cli_module);
fuzz_exe.addModule("getty", getty_mod);
fuzz_exe.addModule("httpz", httpz_mod);

b.installArtifact(fuzz_exe);
const fuzz_cmd = b.addRunArtifact(fuzz_exe);
if (b.args) |args| {
Expand All @@ -158,11 +169,12 @@ pub fn build(b: *std.Build) void {
benchmark_exe.addModule("zig-network", zig_network_module);
benchmark_exe.addModule("zig-cli", zig_cli_module);
benchmark_exe.addModule("getty", getty_mod);
benchmark_exe.addModule("httpz", httpz_mod);

b.installArtifact(benchmark_exe);
const benchmark_cmd = b.addRunArtifact(benchmark_exe);
if (b.args) |args| {
benchmark_cmd.addArgs(args);
}

b.step("benchmark", "benchmark gossip").dependOn(&benchmark_cmd.step);
}
4 changes: 4 additions & 0 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,9 @@
.url = "https://github.com/getty-zig/getty/archive/5b0e750d92ee4ef8e46ad743bb8ced63723acd00.tar.gz",
.hash = "12209398657d260abcd6dae946d8da4cd3057b8c7990608476a9f8011aae570d2ebb",
},
.httpz = .{
.url = "https://github.com/karlseguin/http.zig/archive/7a751549a751d9b45952037abdb127b3225b2ac1.tar.gz",
.hash = "122004f74adf46001fe9129d8cec54bd4a98895ce89f0897790e13b60fa99e527b99",
},
},
}
25 changes: 25 additions & 0 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const io = std.io;
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const SocketAddr = @import("../net/net.zig").SocketAddr;
const GossipService = @import("../gossip/gossip_service.zig").GossipService;
const servePrometheus = @import("../prometheus/http.zig").servePrometheus;
const global_registry = @import("../prometheus/registry.zig").global_registry;
const Registry = @import("../prometheus/registry.zig").Registry;

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const gpa_allocator = gpa.allocator();
Expand All @@ -31,11 +34,21 @@ var gossip_entrypoints_option = cli.Option{
.value_name = "Entrypoints",
};

var metrics_port_option = cli.Option{
.long_name = "metrics-port",
.help = "port to expose prometheus metrics via http",
.short_alias = 'm',
.value = cli.OptionValue{ .int = 12345 },
.required = false,
.value_name = "port_number",
};

var app = &cli.App{
.name = "sig",
.description = "Sig is a Solana client implementation written in Zig.\nThis is still a WIP, PRs welcome.",
.version = "0.1.1",
.author = "Syndica & Contributors",
.options = &.{&metrics_port_option},
.subcommands = &.{
&cli.Command{
.name = "identity",
Expand Down Expand Up @@ -76,6 +89,8 @@ fn gossip(_: []const []const u8) !void {

// var logger: Logger = .noop;

const metrics_thread = try spawnMetrics(gpa_allocator);

var my_keypair = try getOrInitIdentity(gpa_allocator, logger);

var gossip_port: u16 = @intCast(gossip_port_option.value.int.?);
Expand Down Expand Up @@ -119,6 +134,16 @@ fn gossip(_: []const []const u8) !void {
);

handle.join();
metrics_thread.detach();
}

/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the CLI configured port.
/// Uses same allocator for both registry and http adapter.
fn spawnMetrics(allocator: std.mem.Allocator) !std.Thread {
var metrics_port: u16 = @intCast(metrics_port_option.value.int.?);
const registry = try global_registry.initialize(Registry(.{}).init, .{allocator});
return try std.Thread.spawn(.{}, servePrometheus, .{ allocator, registry, metrics_port });
}

pub fn run() !void {
Expand Down
11 changes: 11 additions & 0 deletions src/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub const sync = struct {
pub usingnamespace @import("sync/mpmc.zig");
pub usingnamespace @import("sync/ref.zig");
pub usingnamespace @import("sync/mux.zig");
pub usingnamespace @import("sync/once_cell.zig");
pub usingnamespace @import("sync/thread_pool.zig");
};

Expand Down Expand Up @@ -75,3 +76,13 @@ pub const net = struct {
pub usingnamespace @import("net/net.zig");
pub usingnamespace @import("net/echo.zig");
};

pub const prometheus = struct {
pub usingnamespace @import("prometheus/counter.zig");
pub usingnamespace @import("prometheus/gauge.zig");
pub usingnamespace @import("prometheus/gauge_fn.zig");
pub usingnamespace @import("prometheus/http.zig");
pub usingnamespace @import("prometheus/histogram.zig");
pub usingnamespace @import("prometheus/metric.zig");
pub usingnamespace @import("prometheus/registry.zig");
};
89 changes: 89 additions & 0 deletions src/prometheus/counter.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const std = @import("std");
const mem = std.mem;
const testing = std.testing;

const Metric = @import("metric.zig").Metric;

pub const Counter = struct {
const Self = @This();

metric: Metric = Metric{ .getResultFn = getResult },
value: std.atomic.Atomic(u64) = std.atomic.Atomic(u64).init(0),

pub fn inc(self: *Self) void {
_ = self.value.fetchAdd(1, .Monotonic);
}

pub fn add(self: *Self, value: anytype) void {
switch (@typeInfo(@TypeOf(value))) {
.Int, .Float, .ComptimeInt, .ComptimeFloat => {},
else => @compileError("can't add a non-number"),
}

_ = self.value.fetchAdd(@intCast(value), .Monotonic);
}

pub fn get(self: *const Self) u64 {
return self.value.load(.Monotonic);
}

pub fn reset(self: *Self) void {
_ = self.value.store(0, .Monotonic);
}

fn getResult(metric: *Metric, _: mem.Allocator) Metric.Error!Metric.Result {
const self = @fieldParentPtr(Self, "metric", metric);
return Metric.Result{ .counter = self.get() };
}
};

test "prometheus.counter: inc/add/dec/set/get" {
var buffer = std.ArrayList(u8).init(testing.allocator);
defer buffer.deinit();

var counter = Counter{};

try testing.expectEqual(@as(u64, 0), counter.get());

counter.inc();
try testing.expectEqual(@as(u64, 1), counter.get());

counter.add(200);
try testing.expectEqual(@as(u64, 201), counter.get());
}

test "prometheus.counter: concurrent" {
var counter = Counter{};

var threads: [4]std.Thread = undefined;
for (&threads) |*thread| {
thread.* = try std.Thread.spawn(
.{},
struct {
fn run(c: *Counter) void {
var i: usize = 0;
while (i < 20) : (i += 1) {
c.inc();
}
}
}.run,
.{&counter},
);
}

for (&threads) |*thread| thread.join();

try testing.expectEqual(@as(u64, 80), counter.get());
}

test "prometheus.counter: write" {
var counter = Counter{ .value = .{ .value = 340 } };

var buffer = std.ArrayList(u8).init(testing.allocator);
defer buffer.deinit();

var metric = &counter.metric;
try metric.write(testing.allocator, buffer.writer(), "mycounter");

try testing.expectEqualStrings("mycounter 340\n", buffer.items);
}
50 changes: 50 additions & 0 deletions src/prometheus/gauge.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
const std = @import("std");

const Metric = @import("metric.zig").Metric;

/// A gauge that stores the value it reports.
/// Read and write operations are atomic and monotonic.
pub fn Gauge(comptime T: type) type {
return struct {
value: std.atomic.Atomic(T) = .{ .value = 0 },
metric: Metric = .{ .getResultFn = getResult },

const Self = @This();

pub fn inc(self: *Self) void {
self.value.fetchAdd(1, .Monotonic);
}

pub fn add(self: *Self, v: T) void {
self.value.fetchAdd(v, .Monotonic);
}

pub fn dec(self: *Self) void {
self.value.fetchSub(1, .Monotonic);
}

pub fn sub(self: *Self, v: T) void {
self.value.fetchAdd(v, .Monotonic);
}

pub fn set(self: *Self, v: T) void {
self.value.store(v, .Monotonic);
}

pub fn get(self: *Self) T {
return self.value.load(.Monotonic);
}

fn getResult(metric: *Metric, allocator: std.mem.Allocator) Metric.Error!Metric.Result {
_ = allocator;

const self = @fieldParentPtr(Self, "metric", metric);

return switch (T) {
f64 => Metric.Result{ .gauge = self.get() },
u64 => Metric.Result{ .gauge_int = self.get() },
else => unreachable, // Gauge Return may only be 'f64' or 'u64'
};
}
};
}
Loading

0 comments on commit e8e50dc

Please sign in to comment.