diff --git a/src/ra_log.erl b/src/ra_log.erl index 983b5d64..04301f7b 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -1066,6 +1066,7 @@ write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) -> Config = maps:without([parent, counter, has_changed, + initial_machine_version, %% don't write system config to disk as it will %% be updated each time system_config], Config0), diff --git a/src/ra_machine.erl b/src/ra_machine.erl index 63e97fcc..33533c0e 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -66,7 +66,7 @@ -include("ra.hrl"). --export([init/2, +-export([init/3, apply/4, tick/3, snapshot_installed/5, @@ -90,7 +90,9 @@ -type user_command() :: term(). %% the command type for a given machine implementation --type machine_init_args() :: #{name := atom(), atom() => term()}. +-type machine_init_args() :: #{name := atom(), + machine_version => version(), + atom() => term()}. %% the configuration passed to the init callback -type machine() :: {machine, module(), AddInitArgs :: #{term() => term()}}. @@ -294,15 +296,11 @@ %% @doc initialise a new machine %% This is only called on startup only if there isn't yet a snapshot to recover %% from. Once a snapshot has been taken this is never called again. --spec init(machine(), atom()) -> state(). -init({machine, _, Args} = Machine, Name) -> - %% init always dispatches to the first version - %% as this means every state machine in a mixed version cluster will - %% have a common starting point. - %% TODO: it should be possible to pass a lowest supported state machine - %% version flag in the init args so that old machine version can be purged - Mod = which_module(Machine, 0), - Mod:init(Args#{name => Name}). +-spec init(machine(), atom(), version()) -> state(). +init({machine, _, Args} = Machine, Name, Version) -> + Mod = which_module(Machine, Version), + Mod:init(Args#{name => Name, + machine_version => Version}). -spec apply(module(), command_meta_data(), command(), State) -> {State, reply(), effects()} | {State, reply()}. diff --git a/src/ra_server.erl b/src/ra_server.erl index 36fa5720..44312d18 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -215,6 +215,7 @@ log_init_args := ra_log:ra_log_init_args(), initial_members := [ra_server_id()], machine := machine_conf(), + initial_machine_version => ra_machine:version(), friendly_name => unicode:chardata(), metrics_key => term(), % TODO: review - only really used for @@ -352,24 +353,26 @@ init(#{id := Id, VotedFor = ra_log_meta:fetch(MetaName, UId, voted_for, undefined), LatestMacVer = ra_machine:version(Machine), + InitialMachineVersion = min(LatestMacVer, + maps:get(initial_machine_version, Config, 0)), - {_FirstIndex, Cluster0, MacVer, MacState, + {Cluster0, EffectiveMacVer, MacState, {SnapshotIdx, _} = SnapshotIndexTerm} = case ra_log:recover_snapshot(Log0) of undefined -> - InitialMachineState = ra_machine:init(Machine, Name), - {0, make_cluster(Id, InitialNodes), - 0, InitialMachineState, {0, 0}}; + InitialMachineState = ra_machine:init(Machine, Name, + InitialMachineVersion), + {make_cluster(Id, InitialNodes), + InitialMachineVersion, InitialMachineState, {0, 0}}; {#{index := Idx, term := Term, cluster := ClusterNodes, machine_version := MacVersion}, MacSt} -> Clu = make_cluster(Id, ClusterNodes), %% the snapshot is the last index before the first index - %% TODO: should this be Idx + 1? - {Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}} + {Clu, MacVersion, MacSt, {Idx, Term}} end, - MacMod = ra_machine:which_module(Machine, MacVer), + MacMod = ra_machine:which_module(Machine, EffectiveMacVer), CommitIndex = max(LastApplied, SnapshotIdx), Cfg = #cfg{id = Id, @@ -378,8 +381,8 @@ init(#{id := Id, metrics_key = MetricKey, machine = Machine, machine_version = LatestMacVer, - machine_versions = [{SnapshotIdx, MacVer}], - effective_machine_version = MacVer, + machine_versions = [{SnapshotIdx, EffectiveMacVer}], + effective_machine_version = EffectiveMacVer, effective_machine_module = MacMod, effective_handle_aux_fun = ra_machine:which_aux_fun(MacMod), max_pipeline_count = MaxPipelineCount, @@ -389,7 +392,7 @@ init(#{id := Id, put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, CommitIndex), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx), put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm), - put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, MacVer), + put_counter(Cfg, ?C_RA_SVR_METRIC_EFFECTIVE_MACHINE_VERSION, EffectiveMacVer), NonVoter = get_membership(Cluster0, Id, UId, maps:get(membership, Config, voter)), diff --git a/src/ra_server_sup_sup.erl b/src/ra_server_sup_sup.erl index 855af650..4659c63c 100644 --- a/src/ra_server_sup_sup.erl +++ b/src/ra_server_sup_sup.erl @@ -39,7 +39,9 @@ -include("ra.hrl"). -spec start_server(System :: atom(), ra_server:ra_server_config()) -> - supervisor:startchild_ret() | {error, not_new | system_not_started} | {badrpc, term()}. + supervisor:startchild_ret() | + {error, not_new | system_not_started | invalid_initial_machine_version} | + {badrpc, term()}. start_server(System, #{id := NodeId, uid := UId} = Config) when is_atom(System) -> @@ -61,9 +63,14 @@ start_server_rpc(System, UId, Config0) -> %% check that the server isn't already registered case ra_directory:name_of(System, UId) of undefined -> - case ra_system:lookup_name(System, server_sup) of - {ok, Name} -> - start_child(Name, Config); + case validate_config(Config) of + ok -> + case ra_system:lookup_name(System, server_sup) of + {ok, Name} -> + start_child(Name, Config); + Err -> + Err + end; Err -> Err end; @@ -77,6 +84,22 @@ start_server_rpc(System, UId, Config0) -> end end. +validate_config(#{system_config := SysConf} = Config) -> + Strat = maps:get(machine_upgrade_strategy, SysConf, all), + case Config of + #{initial_machine_version := InitMacVer, + machine := {module, Mod, Args}} when Strat == all -> + MacVer = ra_machine:version({machine, Mod, Args}), + if MacVer < InitMacVer -> + {error, invalid_initial_machine_version}; + true -> + ok + end; + _ -> + ok + end. + + restart_server_rpc(System, {RaName, _Node}, AddConfig) when is_atom(System) -> case ra_system:fetch(System) of diff --git a/test/ra_dbg_SUITE.erl b/test/ra_dbg_SUITE.erl index 38da1482..65ffcf9b 100644 --- a/test/ra_dbg_SUITE.erl +++ b/test/ra_dbg_SUITE.erl @@ -77,7 +77,8 @@ execute_state_machine() -> %% creating a new WAL file with ra_fifo [Srv] = Nodes = [{ra_dbg, node()}], ClusterId = ra_dbg, - Config = #{name => ClusterId}, + Config = #{name => ClusterId, + machine_version => 0}, Machine = {module, ra_fifo, Config}, ra:start(), {ok, _, _} = ra:start_cluster(default, ClusterId, Machine, Nodes), diff --git a/test/ra_fifo.erl b/test/ra_fifo.erl index 9a68dd0a..3e4076fe 100644 --- a/test/ra_fifo.erl +++ b/test/ra_fifo.erl @@ -170,6 +170,7 @@ -opaque state() :: #state{}. -type config() :: #{name := atom(), + machine_version := ra_machine:version(), dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), cancel_customer_handler => applied_mfa(), @@ -902,7 +903,8 @@ size_test(NumMsg, NumCust) -> EnqGen = fun(N) -> {N, {enqueue, N}} end, CustGen = fun(N) -> {N, {checkout, {auto, 100}, spawn(fun() -> ok end)}} end, - S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test})), + S0 = run_log(1, NumMsg, EnqGen, init(#{name => size_test, + machine_version => 0})), S = run_log(NumMsg, NumMsg + NumCust, CustGen, S0), S2 = S#state{ra_indexes = ra_fifo_index:map(fun(_, _) -> undefined end, S#state.ra_indexes)}, @@ -918,29 +920,13 @@ perf_test(NumMsg, NumCust) -> {N, {settle, N - NumMsg - NumCust - 1, Pid}} end, S0 = run_log(1, NumMsg, EnqGen, - init(#{name => size_test})), + init(#{name => size_test, + machine_version => 0})), S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0), _ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1), ok end). -% profile(File) -> -% GzFile = atom_to_list(File) ++ ".gz", -% lg:trace([ra_fifo, maps, queue, ra_fifo_index], lg_file_tracer, -% GzFile, #{running => false, mode => profile}), -% NumMsg = 10000, -% NumCust = 500, -% EnqGen = fun(N) -> {N, {enqueue, self(), N, N}} end, -% Pid = spawn(fun() -> ok end), -% CustGen = fun(N) -> {N, {checkout, {auto, NumMsg}, -% {term_to_binary(N), Pid}}} end, -% SetlGen = fun(N) -> {N, {settle, N - NumMsg - NumCust - 1, Pid}} end, -% S0 = run_log(1, NumMsg, EnqGen, element(1, init(#{name => size_test}))), -% S1 = run_log(NumMsg, NumMsg + NumCust, CustGen, S0), -% _ = run_log(NumMsg, NumMsg + NumCust + NumMsg, SetlGen, S1), -% lg:stop(). - - run_log(Num, Num, _Gen, State) -> State; run_log(Num, Max, Gen, State0) -> @@ -995,6 +981,7 @@ dehydrate_state(#state{messages = Messages0, test_init(Name) -> init(#{name => Name, + machine_version => 0, shadow_copy_interval => 0, metrics_handler => {?MODULE, metrics_handler, []}}). @@ -1243,6 +1230,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> Cid = {<<"completed_customer_yields_demonitor_effect_test">>, self()}, State00 = init(#{name => test, + machine_version => 0, dead_letter_handler => {somemod, somefun, [somearg]}}), {State0, _, [_, _]} = enq(1, 1, first, State00), @@ -1430,6 +1418,7 @@ duplicate_delivery_test() -> state_enter_test() -> S0 = init(#{name => the_name, + machine_version => 0, become_leader_handler => {m, f, [a]}}), [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. @@ -1505,7 +1494,7 @@ run_log(InitState, Entries) -> aux_test() -> _ = ra_machine_ets:start_link(), Aux0 = init_aux(aux_test), - MacState = init(#{name => aux_test}), + MacState = init(#{name => aux_test, machine_version => 0}), Log = undefined, {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0, Log, MacState), diff --git a/test/ra_machine_version_SUITE.erl b/test/ra_machine_version_SUITE.erl index 665f68bb..3e88e8f8 100644 --- a/test/ra_machine_version_SUITE.erl +++ b/test/ra_machine_version_SUITE.erl @@ -34,7 +34,9 @@ all_tests() -> unversioned_machine_never_sees_machine_version_command, unversioned_can_change_to_versioned, server_upgrades_machine_state_on_noop_command, - server_applies_with_new_module + server_applies_with_new_module, + initial_machine_version, + initial_machine_version_quorum % snapshot_persists_machine_version ]. @@ -65,8 +67,8 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config) -> ok = logger:set_primary_config(level, all), ra_server_sup_sup:remove_all(?SYS), - case TestCase of - server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher -> + case lists:member(TestCase, machine_upgrade_quorum_tests()) of + true -> ok = application:set_env(ra, machine_upgrade_strategy, quorum), _ = ra_system:stop_default(), {ok, _} = ra_system:start_default(); @@ -94,8 +96,8 @@ init_per_testcase(TestCase, Config) -> end_per_testcase(TestCase, Config) -> catch ra:delete_cluster(?config(cluster, Config)), meck:unload(), - case TestCase of - server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher -> + case lists:member(TestCase, machine_upgrade_quorum_tests()) of + true -> ok = application:unset_env(ra, machine_upgrade_strategy), _ = ra_system:stop_default(), {ok, _} = ra_system:start_default(); @@ -104,6 +106,10 @@ end_per_testcase(TestCase, Config) -> end, ok. +machine_upgrade_quorum_tests() -> + [server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher, + initial_machine_version_quorum]. + %%%=================================================================== %%% Test cases %%%=================================================================== @@ -427,6 +433,152 @@ server_applies_with_new_module(Config) -> snapshot_persists_machine_version(_Config) -> error({todo, ?FUNCTION_NAME}). +initial_machine_version(Config) -> + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (#{machine_version := MacVer}) -> + ?assertEqual(3, MacVer), + init_state + end), + meck:expect(Mod, version, fun () -> 5 end), + meck:expect(Mod, which_module, fun (_) -> Mod end), + meck:expect(Mod, apply, fun (_, dummy, S) -> + {S, ok}; + (_, {machine_version, 0, 3}, init_state) -> + exit(booo), + {state_v3, ok}; + (_, {machine_version, 3, 5}, init_state) -> + ct:pal("3-5"), + {state_v5, ok} + end), + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(server_id, Config), + Machine = {module, Mod, #{}}, + Configs = [begin + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + #{id => Id, + uid => UId, + cluster_name => ClusterName, + log_init_args => #{uid => UId}, + initial_members => [ServerId], + machine => Machine, + initial_machine_version => 3} + end || Id <- [ServerId]], + % debugger:start(), + % int:i(ra_machine), + % int:i(ra_server_sup_sup), + % int:break(ra_server_sup_sup, 66), + {ok, _, _} = ra:start_cluster(?SYS, Configs, 5000), + await(fun () -> + {ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1), + S == state_v5 + end, 100), + ?assertMatch({ok, #{effective_machine_version := 5}, _}, + ra:member_overview(ServerId)), + {ok, _} = ra:delete_cluster([ServerId]), + await(fun () -> whereis(element(1, ServerId)) == undefined end, 100), + meck:expect(Mod, init, fun (#{machine_version := MacVer}) -> + ?assertEqual(5, MacVer), + init_state + end), + meck:expect(Mod, apply, fun (Meta, meta, _S) -> + {state_v5, Meta}; + (_, {machine_version, 0, 3}, init_state) -> + exit(booo), + {state_v3, ok}; + (_, {machine_version, 5, 5}, init_state) -> + ct:pal("5-5"), + {state_v5, ok} + end), + Configs2 = [begin + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + #{id => Id, + uid => UId, + cluster_name => ClusterName, + log_init_args => #{uid => UId}, + initial_members => [ServerId], + machine => Machine, + initial_machine_version => 9} + end || Id <- [ServerId]], + {error, cluster_not_formed} = ra:start_cluster(?SYS, Configs2, 5000), + ok. + +initial_machine_version_quorum(Config) -> + Mod = ?config(modname, Config), + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (#{machine_version := MacVer}) -> + ?assertEqual(3, MacVer), + init_state + end), + meck:expect(Mod, version, fun () -> 5 end), + meck:expect(Mod, which_module, fun (_) -> Mod end), + meck:expect(Mod, apply, fun (_, dummy, S) -> + {S, ok}; + (_, {machine_version, 0, 3}, init_state) -> + exit(booo), + {state_v3, ok}; + (_, {machine_version, 3, 5}, init_state) -> + ct:pal("3-5"), + {state_v5, ok} + end), + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(server_id, Config), + Machine = {module, Mod, #{}}, + Configs = [begin + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + #{id => Id, + uid => UId, + cluster_name => ClusterName, + log_init_args => #{uid => UId}, + initial_members => [ServerId], + machine => Machine, + initial_machine_version => 3} + end || Id <- [ServerId]], + % debugger:start(), + % int:i(ra_machine), + % int:i(ra_server_sup_sup), + % int:break(ra_server_sup_sup, 66), + {ok, _, _} = ra:start_cluster(?SYS, Configs, 5000), + await(fun () -> + {ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1), + S == state_v5 + end, 100), + ?assertMatch({ok, #{effective_machine_version := 5}, _}, + ra:member_overview(ServerId)), + {ok, _} = ra:delete_cluster([ServerId]), + await(fun () -> whereis(element(1, ServerId)) == undefined end, 100), + meck:expect(Mod, init, fun (#{machine_version := MacVer}) -> + ?assertEqual(5, MacVer), + init_state + end), + meck:expect(Mod, apply, fun (Meta, meta, _S) -> + {state_v5, Meta}; + (_, {machine_version, 0, 3}, init_state) -> + exit(booo), + {state_v3, ok}; + (_, {machine_version, 5, 5}, init_state) -> + ct:pal("5-5"), + {state_v5, ok} + end), + Configs2 = [begin + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + #{id => Id, + uid => UId, + cluster_name => ClusterName, + log_init_args => #{uid => UId}, + initial_members => [ServerId], + machine => Machine, + initial_machine_version => 9} + end || Id <- [ServerId]], + {ok, _, _} = ra:start_cluster(?SYS, Configs2, 5000), + {ok, #{machine_version := 5}, _} = ra:process_command(ServerId, meta), + await(fun () -> + {ok, {_, S}, _} = ra:leader_query(ServerId, fun ra_lib:id/1), + ct:pal("S ~p", [S]), + S == state_v5 + end, 100), + ct:pal("overview ~p", [ra:member_overview(ServerId)]), + ok. %% Utility validate_state_enters(States) ->