forked from tigerbeetle/tigerbeetle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulator.zig
321 lines (271 loc) · 12.7 KB
/
simulator.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const mem = std.mem;
const config = @import("config.zig");
const Client = @import("test/cluster.zig").Client;
const Cluster = @import("test/cluster.zig").Cluster;
const Header = @import("vsr.zig").Header;
const Replica = @import("test/cluster.zig").Replica;
const StateChecker = @import("test/state_checker.zig").StateChecker;
const StateMachine = @import("test/cluster.zig").StateMachine;
const PartitionMode = @import("test/packet_simulator.zig").PartitionMode;
/// The `log` namespace in this root file is required to implement our custom `log` function.
const output = std.log.scoped(.state_checker);
/// Set this to `false` if you want to see how literally everything works.
/// This will run much slower but will trace all logic across the cluster.
const log_state_transitions_only = builtin.mode != .Debug;
/// You can fine tune your log levels even further (debug/info/notice/warn/err/crit/alert/emerg):
pub const log_level: std.log.Level = if (log_state_transitions_only) .info else .debug;
var cluster: *Cluster = undefined;
pub fn main() !void {
// TODO Use std.testing.allocator when all deinit() leaks are fixed.
const allocator = std.heap.page_allocator;
var args = std.process.args();
// Skip argv[0] which is the name of this executable:
_ = args_next(&args, allocator);
const seed_random = std.crypto.random.int(u64);
const seed = seed_from_arg: {
const arg_two = args_next(&args, allocator) orelse break :seed_from_arg seed_random;
defer allocator.free(arg_two);
break :seed_from_arg parse_seed(arg_two);
};
if (builtin.mode == .ReleaseFast or builtin.mode == .ReleaseSmall) {
// We do not support ReleaseFast or ReleaseSmall because they disable assertions.
@panic("the simulator must be run with -OReleaseSafe");
}
if (seed == seed_random) {
if (builtin.mode != .ReleaseSafe) {
// If no seed is provided, than Debug is too slow and ReleaseSafe is much faster.
@panic("no seed provided: the simulator must be run with -OReleaseSafe");
}
if (log_level == .debug) {
output.warn("no seed provided: full debug logs are enabled, this will be slow", .{});
}
}
var prng = std.rand.DefaultPrng.init(seed);
const random = prng.random();
const replica_count = 1 + random.uintLessThan(u8, config.replicas_max);
const client_count = 1 + random.uintLessThan(u8, config.clients_max);
const node_count = replica_count + client_count;
const ticks_max = 100_000_000;
const transitions_max = config.journal_size_max / config.message_size_max;
const request_probability = 1 + random.uintLessThan(u8, 99);
const idle_on_probability = random.uintLessThan(u8, 20);
const idle_off_probability = 10 + random.uintLessThan(u8, 10);
cluster = try Cluster.create(allocator, random, .{
.cluster = 0,
.replica_count = replica_count,
.client_count = client_count,
.seed = random.int(u64),
.network_options = .{
.packet_simulator_options = .{
.replica_count = replica_count,
.client_count = client_count,
.node_count = node_count,
.seed = random.int(u64),
.one_way_delay_mean = 3 + random.uintLessThan(u16, 10),
.one_way_delay_min = random.uintLessThan(u16, 3),
.packet_loss_probability = random.uintLessThan(u8, 30),
.path_maximum_capacity = 2 + random.uintLessThan(u8, 19),
.path_clog_duration_mean = random.uintLessThan(u16, 500),
.path_clog_probability = random.uintLessThan(u8, 2),
.packet_replay_probability = random.uintLessThan(u8, 50),
.partition_mode = random_partition_mode(random),
.partition_probability = random.uintLessThan(u8, 3),
.unpartition_probability = 1 + random.uintLessThan(u8, 10),
.partition_stability = 100 + random.uintLessThan(u32, 100),
.unpartition_stability = random.uintLessThan(u32, 20),
},
},
.storage_options = .{
.seed = random.int(u64),
.read_latency_min = random.uintLessThan(u16, 3),
.read_latency_mean = 3 + random.uintLessThan(u16, 10),
.write_latency_min = random.uintLessThan(u16, 3),
.write_latency_mean = 3 + random.uintLessThan(u16, 10),
.read_fault_probability = random.uintLessThan(u8, 10),
.write_fault_probability = random.uintLessThan(u8, 10),
},
});
defer cluster.destroy();
cluster.state_checker = try StateChecker.init(allocator, cluster);
defer cluster.state_checker.deinit();
for (cluster.replicas) |*replica| {
replica.on_change_state = on_change_replica;
}
cluster.on_change_state = on_change_replica;
output.info(
\\
\\ SEED={}
\\
\\ replicas={}
\\ clients={}
\\ request_probability={}%
\\ idle_on_probability={}%
\\ idle_off_probability={}%
\\ one_way_delay_mean={} ticks
\\ one_way_delay_min={} ticks
\\ packet_loss_probability={}%
\\ path_maximum_capacity={} messages
\\ path_clog_duration_mean={} ticks
\\ path_clog_probability={}%
\\ packet_replay_probability={}%
\\ partition_mode={}
\\ partition_probability={}%
\\ unpartition_probability={}%
\\ partition_stability={} ticks
\\ unpartition_stability={} ticks
\\ read_latency_min={}
\\ read_latency_mean={}
\\ write_latency_min={}
\\ write_latency_mean={}
\\ read_fault_probability={}%
\\ write_fault_probability={}%
\\
, .{
seed,
replica_count,
client_count,
request_probability,
idle_on_probability,
idle_off_probability,
cluster.options.network_options.packet_simulator_options.one_way_delay_mean,
cluster.options.network_options.packet_simulator_options.one_way_delay_min,
cluster.options.network_options.packet_simulator_options.packet_loss_probability,
cluster.options.network_options.packet_simulator_options.path_maximum_capacity,
cluster.options.network_options.packet_simulator_options.path_clog_duration_mean,
cluster.options.network_options.packet_simulator_options.path_clog_probability,
cluster.options.network_options.packet_simulator_options.packet_replay_probability,
cluster.options.network_options.packet_simulator_options.partition_mode,
cluster.options.network_options.packet_simulator_options.partition_probability,
cluster.options.network_options.packet_simulator_options.unpartition_probability,
cluster.options.network_options.packet_simulator_options.partition_stability,
cluster.options.network_options.packet_simulator_options.unpartition_stability,
cluster.options.storage_options.read_latency_min,
cluster.options.storage_options.read_latency_mean,
cluster.options.storage_options.write_latency_min,
cluster.options.storage_options.write_latency_mean,
cluster.options.storage_options.read_fault_probability,
cluster.options.storage_options.write_fault_probability,
});
var requests_sent: u64 = 0;
var idle = false;
var tick: u64 = 0;
while (tick < ticks_max) : (tick += 1) {
for (cluster.storages) |*storage| storage.tick();
for (cluster.replicas) |*replica, i| {
replica.tick();
cluster.state_checker.check_state(@intCast(u8, i));
}
cluster.network.packet_simulator.tick();
for (cluster.clients) |*client| client.tick();
if (cluster.state_checker.transitions == transitions_max) {
if (cluster.state_checker.convergence()) break;
continue;
} else {
assert(cluster.state_checker.transitions < transitions_max);
}
if (requests_sent < transitions_max) {
if (idle) {
if (chance(random, idle_off_probability)) idle = false;
} else {
if (chance(random, request_probability)) {
if (send_request(random)) requests_sent += 1;
}
if (chance(random, idle_on_probability)) idle = true;
}
}
}
if (cluster.state_checker.transitions < transitions_max) {
output.err("you can reproduce this failure with seed={}", .{seed});
@panic("unable to complete transitions_max before ticks_max");
}
assert(cluster.state_checker.convergence());
output.info("\n PASSED ({} ticks)", .{tick});
}
/// Returns true, `p` percent of the time, else false.
fn chance(random: std.rand.Random, p: u8) bool {
assert(p <= 100);
return random.uintLessThan(u8, 100) < p;
}
/// Returns the next argument for the simulator or null (if none available)
fn args_next(args: *std.process.ArgIterator, allocator: std.mem.Allocator) ?[:0]const u8 {
const err_or_bytes = args.next(allocator) orelse return null;
return err_or_bytes catch @panic("Unable to extract next value from args");
}
fn on_change_replica(replica: *Replica) void {
assert(cluster.state_machines[replica.replica].state == replica.state_machine.state);
cluster.state_checker.check_state(replica.replica);
}
fn send_request(random: std.rand.Random) bool {
const client_index = random.uintLessThan(u8, cluster.options.client_count);
const client = &cluster.clients[client_index];
const checker_request_queue = &cluster.state_checker.client_requests[client_index];
// Ensure that we don't shortchange testing of the full client request queue length:
assert(client.request_queue.buffer.len <= checker_request_queue.buffer.len);
if (client.request_queue.full()) return false;
if (checker_request_queue.full()) return false;
const message = client.get_message();
defer client.unref(message);
const body_size_max = config.message_size_max - @sizeOf(Header);
const body_size: u32 = switch (random.uintLessThan(u8, 100)) {
0...10 => 0,
11...89 => random.uintLessThan(u32, body_size_max),
90...99 => body_size_max,
else => unreachable,
};
const body = message.buffer[@sizeOf(Header)..][0..body_size];
if (chance(random, 10)) {
std.mem.set(u8, body, 0);
} else {
random.bytes(body);
}
// While hashing the client ID with the request body prevents input collisions across clients,
// it's still possible for the same client to generate the same body, and therefore input hash.
const client_input = StateMachine.hash(client.id, body);
checker_request_queue.push_assume_capacity(client_input);
std.log.scoped(.test_client).debug("client {} sending input={x}", .{
client_index,
client_input,
});
client.request(0, client_callback, .hash, message, body_size);
return true;
}
fn client_callback(
user_data: u128,
operation: StateMachine.Operation,
results: Client.Error![]const u8,
) void {
_ = operation;
_ = results catch unreachable;
assert(user_data == 0);
}
/// Returns a random partitioning mode, excluding .custom
fn random_partition_mode(random: std.rand.Random) PartitionMode {
const typeInfo = @typeInfo(PartitionMode).Enum;
var enumAsInt = random.uintAtMost(typeInfo.tag_type, typeInfo.fields.len - 2);
if (enumAsInt >= @enumToInt(PartitionMode.custom)) enumAsInt += 1;
return @intToEnum(PartitionMode, enumAsInt);
}
fn parse_seed(bytes: []const u8) u64 {
return std.fmt.parseUnsigned(u64, bytes, 10) catch |err| switch (err) {
error.Overflow => @panic("seed exceeds a 64-bit unsigned integer"),
error.InvalidCharacter => @panic("seed contains an invalid character"),
};
}
pub fn log(
comptime level: std.log.Level,
comptime scope: @TypeOf(.EnumLiteral),
comptime format: []const u8,
args: anytype,
) void {
if (log_state_transitions_only and scope != .state_checker) return;
const prefix_default = "[" ++ @tagName(level) ++ "] " ++ "(" ++ @tagName(scope) ++ "): ";
const prefix = if (log_state_transitions_only) "" else prefix_default;
// Print the message to stdout, silently ignoring any errors
const stderr = std.io.getStdErr().writer();
std.debug.getStderrMutex().lock();
defer std.debug.getStderrMutex().unlock();
nosuspend stderr.print(prefix ++ format ++ "\n", args) catch return;
}