Skip to content

Commit

Permalink
Introduce initial_machine_version server config.
Browse files Browse the repository at this point in the history
This new key can be used to specify the initial machine version
a new Ra server should be initialised against.

This allows machines to skip old versions and clean up old
code. This is particularly useful when machine_version_strategy=all
as the initial upgrade after cluster creation is delayed until
all members reply to the info requests.

If the machine_version_strategy=all, starting a server with
an initial machine version that is higher than the locally available
machine version will result in an error: {error, invalid_initial_machine_version}.

When machine_version_strategy=quorum the initial machine version
will be clamped to the locally available machine version.
  • Loading branch information
kjnilsson committed Jan 13, 2025
1 parent 5997bdf commit 8588a54
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 51 deletions.
20 changes: 9 additions & 11 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

-include("ra.hrl").

-export([init/2,
-export([init/3,
apply/4,
tick/3,
snapshot_installed/5,
Expand All @@ -90,7 +90,9 @@
-type user_command() :: term().
%% the command type for a given machine implementation

-type machine_init_args() :: #{name := atom(), atom() => term()}.
-type machine_init_args() :: #{name := atom(),
machine_version => version(),
atom() => term()}.
%% the configuration passed to the init callback

-type machine() :: {machine, module(), AddInitArgs :: #{term() => term()}}.
Expand Down Expand Up @@ -294,15 +296,11 @@
%% @doc initialise a new machine
%% This is only called on startup only if there isn't yet a snapshot to recover
%% from. Once a snapshot has been taken this is never called again.
-spec init(machine(), atom()) -> state().
init({machine, _, Args} = Machine, Name) ->
%% init always dispatches to the first version
%% as this means every state machine in a mixed version cluster will
%% have a common starting point.
%% TODO: it should be possible to pass a lowest supported state machine
%% version flag in the init args so that old machine version can be purged
Mod = which_module(Machine, 0),
Mod:init(Args#{name => Name}).
-spec init(machine(), atom(), version()) -> state().
init({machine, _, Args} = Machine, Name, Version) ->
Mod = which_module(Machine, Version),
Mod:init(Args#{name => Name,
machine_version => Version}).

-spec apply(module(), command_meta_data(), command(), State) ->
{State, reply(), effects()} | {State, reply()}.
Expand Down
23 changes: 13 additions & 10 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
log_init_args := ra_log:ra_log_init_args(),
initial_members := [ra_server_id()],
machine := machine_conf(),
initial_machine_version => ra_machine:version(),
friendly_name => unicode:chardata(),
metrics_key => term(),
% TODO: review - only really used for
Expand Down Expand Up @@ -352,24 +353,26 @@ init(#{id := Id,
VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined),

LatestMacVer = ra_machine:version(Machine),
InitialMachineVersion = min(LatestMacVer,
maps:get(initial_machine_version, Config, 0)),

{_FirstIndex, Cluster0, MacVer, MacState,
{Cluster0, EffectiveMacVer, MacState,
{SnapshotIdx, _} = SnapshotIndexTerm} =
case ra_log:recover_snapshot(Log0) of
undefined ->
InitialMachineState = ra_machine:init(Machine, Name),
{0, make_cluster(Id, InitialNodes),
0, InitialMachineState, {0, 0}};
InitialMachineState = ra_machine:init(Machine, Name,
InitialMachineVersion),
{make_cluster(Id, InitialNodes),
InitialMachineVersion, InitialMachineState, {0, 0}};
{#{index := Idx,
term := Term,
cluster := ClusterNodes,
machine_version := MacVersion}, MacSt} ->
Clu = make_cluster(Id, ClusterNodes),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
{Clu, MacVersion, MacSt, {Idx, Term}}
end,
MacMod = ra_machine:which_module(Machine, MacVer),
MacMod = ra_machine:which_module(Machine, EffectiveMacVer),

CommitIndex = max(LastApplied, SnapshotIdx),
Cfg = #cfg{id = Id,
Expand All @@ -378,8 +381,8 @@ init(#{id := Id,
metrics_key = MetricKey,
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, MacVer}],
effective_machine_version = MacVer,
machine_versions = [{SnapshotIdx, EffectiveMacVer}],
effective_machine_version = EffectiveMacVer,
effective_machine_module = MacMod,
effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod),
max_pipeline_count = MaxPipelineCount,
Expand All @@ -389,7 +392,7 @@ init(#{id := Id,
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, MacVer),
put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, EffectiveMacVer),

NonVoter = get_membership(Cluster0, Id, UId,
maps:get(membership, Config, voter)),
Expand Down
31 changes: 27 additions & 4 deletions src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
-include("ra.hrl").

-spec start_server(System :: atom(), ra_server:ra_server_config()) ->
supervisor:startchild_ret() | {error, not_new | system_not_started} | {badrpc, term()}.
supervisor:startchild_ret() |
{error, not_new | system_not_started | invalid_initial_machine_version} |
{badrpc, term()}.
start_server(System, #{id := NodeId,
uid := UId} = Config)
when is_atom(System) ->
Expand All @@ -61,9 +63,14 @@ start_server_rpc(System, UId, Config0) ->
%% check that the server isn't already registered
case ra_directory:name_of(System, UId) of
undefined ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
case validate_config(Config) of
ok ->
case ra_system:lookup_name(System, server_sup) of
{ok, Name} ->
start_child(Name, Config);
Err ->
Err
end;
Err ->
Err
end;
Expand All @@ -77,6 +84,22 @@ start_server_rpc(System, UId, Config0) ->
end
end.

validate_config(#{system_config := SysConf} = Config) ->
Strat = maps:get(machine_upgrade_strategy, SysConf, all),
case Config of
#{initial_machine_version := InitMacVer,
machine := {module, Mod, Args}} when Strat == all ->
MacVer = ra_machine:version({machine, Mod, Args}),
if MacVer < InitMacVer ->
{error, invalid_initial_machine_version};
true ->
ok
end;
_ ->
ok
end.


restart_server_rpc(System, {RaName, _Node}, AddConfig)
when is_atom(System) ->
case ra_system:fetch(System) of
Expand Down
3 changes: 2 additions & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ execute_state_machine() ->
%% creating a new WAL file with ra_fifo
[Srv] = Nodes = [{ra_dbg, node()}],
ClusterId = ra_dbg,
Config = #{name => ClusterId},
Config = #{name => ClusterId,
machine_version => 0},
Machine = {module, ra_fifo, Config},
ra:start(),
{ok, _, _} = ra:start_cluster(default, ClusterId, Machine, Nodes),
Expand Down
29 changes: 9 additions & 20 deletions test/ra_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
-opaque state() :: #state{}.

-type config() :: #{name := atom(),
machine_version := ra_machine:version(),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
cancel_customer_handler => applied_mfa(),
Expand Down Expand Up @@ -902,7 +903,8 @@ size_test(NumMsg, NumCust) ->
EnqGen = fun(N) -> {N, {enqueue, N}} end,
CustGen = fun(N) -> {N, {checkout, {auto, 100},
spawn(fun() -> ok end)}} end,
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test})),
S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test,
machine_version => 0})),
S = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
S2 = S#state{ra_indexes = ra_fifo_index:map(fun(_, _) -> undefined end,
S#state.ra_indexes)},
Expand All @@ -918,29 +920,13 @@ perf_test(NumMsg, NumCust) ->
{N, {settle, N - NumMsg - NumCust - 1, Pid}}
end,
S0 = run_log(1, NumMsg, EnqGen,
init(#{name => size_test})),
init(#{name => size_test,
machine_version => 0})),
S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
_ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
ok
end).

% profile(File) ->
% GzFile = atom_to_list(File) ++ ".gz",
% lg:trace([ra_fifo, maps, queue, ra_fifo_index], lg_file_tracer,
% GzFile, #{running => false, mode => profile}),
% NumMsg = 10000,
% NumCust = 500,
% EnqGen = fun(N) -> {N, {enqueue, self(), N, N}} end,
% Pid = spawn(fun() -> ok end),
% CustGen = fun(N) -> {N, {checkout, {auto, NumMsg},
% {term_to_binary(N), Pid}}} end,
% SetlGen = fun(N) -> {N, {settle, N - NumMsg - NumCust - 1, Pid}} end,
% S0 = run_log(1, NumMsg, EnqGen, element(1, init(#{name => size_test}))),
% S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0),
% _ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1),
% lg:stop().


run_log(Num, Num, _Gen, State) ->
State;
run_log(Num, Max, Gen, State0) ->
Expand Down Expand Up @@ -995,6 +981,7 @@ dehydrate_state(#state{messages = Messages0,

test_init(Name) ->
init(#{name => Name,
machine_version => 0,
shadow_copy_interval => 0,
metrics_handler => {?MODULE, metrics_handler, []}}).

Expand Down Expand Up @@ -1243,6 +1230,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
Cid = {<<"completed_customer_yields_demonitor_effect_test">>, self()},
State00 = init(#{name => test,
machine_version => 0,
dead_letter_handler =>
{somemod, somefun, [somearg]}}),
{State0, _, [_, _]} = enq(1, 1, first, State00),
Expand Down Expand Up @@ -1430,6 +1418,7 @@ duplicate_delivery_test() ->
state_enter_test() ->

S0 = init(#{name => the_name,
machine_version => 0,
become_leader_handler => {m, f, [a]}}),
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
Expand Down Expand Up @@ -1505,7 +1494,7 @@ run_log(InitState, Entries) ->
aux_test() ->
_ = ra_machine_ets:start_link(),
Aux0 = init_aux(aux_test),
MacState = init(#{name => aux_test}),
MacState = init(#{name => aux_test, machine_version => 0}),
Log = undefined,
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
Log, MacState),
Expand Down
Loading

0 comments on commit 8588a54

Please sign in to comment.