From 267aa04ff100654cdc6db1a35b808f2d3aba0122 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 19 Dec 2024 18:45:28 +0100 Subject: [PATCH] Delay machine upgrade until all Ra servers support it [Why] Before this patch, a Ra cluster would switch to a new machine version immediately after a leader with that version was elected. Because a leader can be elected with a quorum number of candidate voting for it, it means the cluster could start using the new machine version as soon as a quorum of members support that version. Unfortunately, other members that do not support it stop applying commands because they run an older version of the machine code. For some consumers of Ra, like Khepri, this means they could cease their operation locally until the member is restarted with the new machine version. We want to delay the machine upgrade to a point where all members know about the new version. This ensures all members can continue to provide their service. [How] The machine version to use is communicated by the leader using the `noop` command. This command is the first one sent just after an election. The machine version passed was the local machine version. With this patch, the `noop` command sent after an election passes the effective machine version, except if the leader is unclustered alone (in which case it passes the latest machine version. Therefore in a cluster, the leader will send a second `noop` command with a newer machine version later, once all members support it. To determine what each follower supports, this patch introduces two commands: * `#info_rpc{}` * `#info_reply{}` Once a leader is elected, in addition to the `noop` command, it sends an `#info_rpc{}` command to all followers. They reply with `#info_reply{}` with the machine version they support. This mechanism is not specific to machine upgrades: this could be extended in the future to communicate more details about each follower. Once the leader received the machine version of every followers, it can determine the highest possible supported machine version. For that, it simply takes the lowest reported machine version (including the leader's machine version). If this version is greater than the effective machine version, the leader sends a new `noop` command with the new machine version to use. The leader sends the `#info_rpc{}` command again and again to some followers at each "tick", if these followers did not report anything yet, or if the reported machine version is lower than its own supported machine version. This takes care of follower that did not receive the initial `#info_rpc{}` and those that were restarted as part of an upgrade. Fixes #490. --- src/ra.hrl | 14 +- src/ra_server.erl | 336 +++++++++++++++++++++++++++++- test/ra_machine_version_SUITE.erl | 146 ++++--------- test/ra_server_SUITE.erl | 10 +- 4 files changed, 387 insertions(+), 119 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index cef26c04..dc3c2b85 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -70,7 +70,8 @@ voter_status => ra_voter_status(), %% indicates that a snapshot is being sent %% to the peer - status := ra_peer_status()}. + status := ra_peer_status(), + machine_version => ra_machine:version()}. -type ra_cluster() :: #{ra_server_id() => ra_peer_state()}. @@ -187,6 +188,17 @@ {query_index :: integer(), term :: ra_term()}). +-record(info_rpc, + {from :: ra_server_id(), + term :: ra_term(), + keys :: [ra_server:ra_server_info_key()]}). + +-record(info_reply, + {from :: ra_server_id(), + term :: ra_term(), + keys :: [ra_server:ra_server_info_key()], + info :: ra_server:ra_server_info() | undefined}). + %% WAL defaults -define(WAL_DEFAULT_MAX_SIZE_BYTES, 256 * 1000 * 1000). -define(WAL_DEFAULT_MAX_BATCH_SIZE, 8192). diff --git a/src/ra_server.erl b/src/ra_server.erl index bffa31cd..86c70e15 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -146,6 +146,8 @@ ra_log:event() | {consistent_query, term(), ra:query_fun()} | #heartbeat_rpc{} | + #info_rpc{} | + #info_reply{} | {ra_server_id, #heartbeat_reply{}} | pipeline_rpcs. @@ -219,6 +221,27 @@ has_changed => boolean() }. +-type ra_server_info_key() :: machine_version | atom(). +%% Key one can get in `ra_server_info()'. +%% +%% This is used in the `#info_rpc{}' to ask for a specific list of info keys. +%% +%% Key meanings: +%% +%% +%% Any atom is supported because a future version of Ra could ask for +%% something this version does not know about. + +-type ra_server_info() :: #{machine_version := ra_machine:version(), + atom() => any()}. +%% Info for a Ra server, got from `#info_reply{}'. +%% +%% In addition, the map may contain keys unknown to this version of Ra but +%% emitted by a future version. + -type mutable_config() :: #{cluster_name => ra_cluster_name(), metrics_key => term(), broadcast_time => non_neg_integer(), % ms @@ -235,6 +258,8 @@ ra_server_state/0, ra_state/0, ra_server_config/0, + ra_server_info_key/0, + ra_server_info/0, mutable_config/0, ra_msg/0, machine_conf/0, @@ -814,6 +839,29 @@ handle_leader(#request_vote_result{}, State) -> handle_leader(#pre_vote_result{}, State) -> %% handle to avoid logging as unhandled {leader, State, []}; +handle_leader(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when CurTerm < Term -> + ?INFO("~ts: leader saw info_rpc from ~w for term ~b, abdicates term: ~b!", + [LogId, Msg#info_rpc.from, Term, CurTerm]), + {follower, update_term(Term, State0#{leader_id => undefined}), + [{next_event, Msg}]}; +handle_leader(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = info_reply_effect(State, InfoRpc), + {leader, State, [InfoReplyEffect]}; +handle_leader(#info_reply{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when CurTerm < Term -> + ?INFO("~ts: leader saw info_reply from ~w for term ~b, abdicates " + "term: ~b!", + [LogId, Msg#info_reply.from, Term, CurTerm]), + {follower, update_term(Term, State0#{leader_id => undefined}), + [{next_event, Msg}]}; +handle_leader(#info_reply{} = InfoReply, State) -> + {State1, Effects} = handle_info_reply(State, InfoReply), + {leader, State1, Effects}; handle_leader({transfer_leadership, Leader}, #{cfg := #cfg{id = Leader, log_id = LogId}} = State) -> ?DEBUG("~ts: transfer leadership requested but already leader", @@ -860,8 +908,7 @@ handle_leader(Msg, State) -> {ra_state(), ra_server_state(), effects()}. handle_candidate(#request_vote_result{term = Term, vote_granted = true}, #{cfg := #cfg{id = Id, - log_id = LogId, - machine = Mac}, + log_id = LogId}, current_term := Term, votes := Votes, cluster := Nodes} = State0) -> @@ -871,11 +918,10 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true}, case required_quorum(Nodes) of NewVotes -> {State1, Effects} = make_all_rpcs(initialise_peers(State0)), - Noop = {noop, #{ts => erlang:system_time(millisecond)}, - ra_machine:version(Mac)}, State = State1#{leader_id => Id}, + PostElectionEffects = post_election_effects(State), {leader, maps:without([votes], State), - [{next_event, cast, {command, Noop}} | Effects]}; + PostElectionEffects ++ Effects}; _ -> {candidate, State0#{votes => NewVotes}, []} end; @@ -966,6 +1012,27 @@ handle_candidate({register_external_log_reader, Pid}, #{log := Log0} = State) -> {candidate, State#{log => Log}, Effs}; handle_candidate(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; +handle_candidate(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when Term >= CurTerm -> + ?INFO("~ts: candidate info_rpc with higher term received ~b -> ~b", + [LogId, CurTerm, Term]), + State = update_term_and_voted_for(Term, undefined, State0), + {follower, State, [{next_event, Msg}]}; +handle_candidate(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = empty_info_reply_effect(State, InfoRpc), + {candidate, State, [InfoReplyEffect]}; +handle_candidate(#info_reply{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}} = State0) + when Term >= CurTerm -> + ?INFO("~ts: candidate info_reply with higher term received ~b -> ~b", + [LogId, CurTerm, Term]), + State = update_term_and_voted_for(Term, undefined, State0), + {follower, State, [{next_event, Msg}]}; +handle_candidate(#info_reply{}, State) -> + {candidate, State, []}; handle_candidate(Msg, State) -> log_unhandled_msg(candidate, Msg, State), {candidate, State, [{reply, {error, {unsupported_call, Msg}}}]}. @@ -1048,6 +1115,19 @@ handle_pre_vote({register_external_log_reader, Pid}, #{log := Log0} = State) -> {pre_vote, State#{log => Log}, Effs}; handle_pre_vote(force_member_change, State0) -> {follower, State0#{votes => 0}, [{next_event, force_member_change}]}; +handle_pre_vote(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm} = State0) + when Term >= CurTerm -> + {follower, State0#{votes => 0}, [{next_event, Msg}]}; +handle_pre_vote(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = empty_info_reply_effect(State, InfoRpc), + {pre_vote, State, [InfoReplyEffect]}; +handle_pre_vote(#info_reply{term = Term} = Msg, + #{current_term := CurTerm} = State0) + when Term >= CurTerm -> + {follower, State0#{votes => 0}, [{next_event, Msg}]}; +handle_pre_vote(#info_reply{}, State) -> + {pre_vote, State, []}; handle_pre_vote(Msg, State) -> log_unhandled_msg(pre_vote, Msg, State), {pre_vote, State, [{reply, {error, {unsupported_call, Msg}}}]}. @@ -1349,6 +1429,21 @@ handle_follower(force_member_change, {ok, _, _, State, Effects} = append_cluster_change(Cluster, undefined, no_reply, State0, []), call_for_election(pre_vote, State, [{reply, ok} | Effects]); +handle_follower(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm} = State) + when Term >= CurTerm -> + State1 = update_term(Term, State), + {follower, State1, [{next_event, Msg}]}; +handle_follower(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = info_reply_effect(State, InfoRpc), + {follower, State, [InfoReplyEffect]}; +handle_follower(#info_reply{term = Term} = Msg, + #{current_term := CurTerm} = State) + when Term >= CurTerm -> + State1 = update_term(Term, State), + {follower, State1, [{next_event, Msg}]}; +handle_follower(#info_reply{}, State) -> + {follower, State, []}; handle_follower(Msg, State) -> log_unhandled_msg(follower, Msg, State), {follower, State, [{reply, {error, {unsupported_call, Msg}}}]}. @@ -1465,6 +1560,39 @@ handle_receive_snapshot(receive_snapshot_timeout, #{log := Log0} = State) -> handle_receive_snapshot({register_external_log_reader, Pid}, #{log := Log0} = State) -> {Log, Effs} = ra_log:register_reader(Pid, Log0), {receive_snapshot, State#{log => Log}, Effs}; +handle_receive_snapshot(#info_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}, + log := Log0} = State) + when Term > CurTerm -> + ?INFO("~ts: follower receiving snapshot saw info_rpc from ~w for term ~b " + "abdicates term: ~b!", + [LogId, Msg#info_rpc.from, + Term, CurTerm]), + SnapState0 = ra_log:snapshot_state(Log0), + SnapState = ra_snapshot:abort_accept(SnapState0), + Log = ra_log:set_snapshot_state(SnapState, Log0), + {follower, update_term(Term, clear_leader_id(State#{log => Log})), + [{next_event, Msg}]}; +handle_receive_snapshot(#info_rpc{} = InfoRpc, State) -> + InfoReplyEffect = empty_info_reply_effect(State, InfoRpc), + {receive_snapshot, State, [InfoReplyEffect]}; +handle_receive_snapshot(#info_reply{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}, + log := Log0} = State) + when Term > CurTerm -> + ?INFO("~ts: follower receiving snapshot saw info_reply from ~w for term ~b " + "abdicates term: ~b!", + [LogId, Msg#info_reply.from, + Term, CurTerm]), + SnapState0 = ra_log:snapshot_state(Log0), + SnapState = ra_snapshot:abort_accept(SnapState0), + Log = ra_log:set_snapshot_state(SnapState, Log0), + {follower, update_term(Term, clear_leader_id(State#{log => Log})), + [{next_event, Msg}]}; +handle_receive_snapshot(#info_reply{}, State) -> + {receive_snapshot, State, []}; handle_receive_snapshot(Msg, State) -> log_unhandled_msg(receive_snapshot, Msg, State), %% drop all other events?? @@ -1533,9 +1661,10 @@ process_new_leader_queries(#{pending_consistent_queries := Pending, -spec tick(ra_server_state()) -> effects(). tick(#{cfg := #cfg{effective_machine_module = MacMod}, - machine_state := MacState}) -> + machine_state := MacState} = State) -> + InfoRpcEffects = info_rpc_effects(State), Now = erlang:system_time(millisecond), - ra_machine:tick(MacMod, Now, MacState). + InfoRpcEffects ++ ra_machine:tick(MacMod, Now, MacState). -spec log_tick(ra_server_state()) -> ra_server_state(). log_tick(#{cfg := #cfg{}, @@ -2313,6 +2442,19 @@ peers_with_normal_status(State) -> (_, _) -> false end, peers(State)). +voting_peers(State) -> + Peers0 = peers(State), + Peers1 = maps:filter( + fun + (_PeerId, #{voter_status := #{membership := voter}}) -> + true; + (_PeerId, Peer) when not is_map_key(voter_status, Peer) -> + true; + (_PeerId, _Peer) -> + false + end, Peers0), + Peers1. + % peers that could need an update stale_peers(#{commit_index := CommitIndex, cfg := #cfg{id = ThisId}, @@ -3314,6 +3456,186 @@ after_log_append_reply(Cmd, Idx, Term, Effects0) -> Effects0 end. +post_election_effects( + #{cfg := #cfg{effective_machine_version = EffectiveMacVer, + machine = Mac}} = State) -> + Peers = voting_peers(State), + PeerIds = maps:keys(Peers), + case PeerIds of + [] -> + %% This node is alone in the cluster, we can send the `noop' + %% command with the newer machine version right away. + MacVer = ra_machine:version(Mac), + Noop = {noop, #{ts => erlang:system_time(millisecond)}, + MacVer}, + NoopEffect = {next_event, cast, {command, Noop}}, + [NoopEffect]; + _ -> + %% We continue to send the `noop' command immediately after + %% becoming a leader, but compared to Ra 2.15 and older, we don't + %% set the machine version to the latest: we keep the same + %% effective machine version for now. + %% + %% However, we query info keys from all peers, including their + %% supported machine version. The replies will be used to + %% determine the max supported machine version and when it is + %% greater than the effective one, another `noop' command will be + %% sent. + Noop = {noop, #{ts => erlang:system_time(millisecond)}, + EffectiveMacVer}, + NoopEffect = {next_event, cast, {command, Noop}}, + InfoRpcEffects = info_rpc_effects(State), + [NoopEffect | InfoRpcEffects] + end. + +info_rpc_effects(#{cfg := #cfg{id = Id}, cluster := Cluster} = State) -> + InfoRpcEffects = maps:fold( + fun + (PeerId, _, Acc) when PeerId =:= Id -> + Acc; + (PeerId, _, Acc) -> + Acc ++ info_rpc_effects_for_peer(State, PeerId) + end, [], Cluster), + InfoRpcEffects. + +info_rpc_effects_for_peer( + #{cluster := Cluster, current_term := CurTerm} = State, PeerId) -> + %% We determine if we need to ask (for the fist time or again) the info + %% from a peer. + MacVer = machine_version(State), + SendRpc = case Cluster of + #{PeerId := #{machine_version := PeerMacVer}} -> + %% We have the machine version of the peer, but we want + %% to ask again if that version is old, in the hope the + %% peer restarted and was updated. + PeerMacVer < MacVer; + _ -> + %% We don't have any details about the peer, we ask. + true + end, + case SendRpc of + true -> + %% We ask for all info keys currently. If we ask for specific + %% keys, we will have to handle merging of already known info keys + %% and updates. + Id = id(State), + Command = #info_rpc{from = Id, + term = CurTerm, + keys = [machine_version]}, + [{send_rpc, PeerId, Command}]; + false -> + [] + end. + +info_reply_effect( + #{current_term := CurTerm} = State, + #info_rpc{from = FromId, keys = Keys}) -> + Id = id(State), + Info = ra_server_info(State, Keys), + InfoReply = #info_reply{from = Id, + term = CurTerm, + keys = Keys, + info = Info}, + {cast, FromId, InfoReply}. + +empty_info_reply_effect( + #{current_term := CurTerm} = State, + #info_rpc{from = FromId, keys = Keys}) -> + Id = id(State), + InfoReply = #info_reply{from = Id, + term = CurTerm, + keys = Keys, + info = undefined}, + {cast, FromId, InfoReply}. + +ra_server_info(State) -> + MacVer = machine_version(State), + #{machine_version => MacVer}. + +ra_server_info(State, Keys) -> + %% Note that the node that asked may ask for keys we don't know. + Info0 = ra_server_info(State), + Info1 = maps:filter( + fun(Key, _Value) -> + lists:member(Key, Keys) + end, Info0), + Info1. + +handle_info_reply( + #{cluster := Cluster} = State, + #info_reply{from = PeerId, keys = Keys, info = Info}) -> + PeerState0 = maps:get(PeerId, Cluster), + PeerState1 = case Info of + undefined -> + %% The peer is is a Raft state where it can't give + %% info yet. + lists:foldl( + fun(Key, PS) -> + maps:remove(Key, PS) + end, PeerState0, Keys); + _ -> + lists:foldl( + fun(Key, PS) -> + Value = maps:get(Key, Info), + PS#{Key => Value} + end, PeerState0, Keys) + end, + Cluster1 = Cluster#{PeerId => PeerState1}, + State1 = State#{cluster => Cluster1}, + determine_if_machine_upgrade_allowed(State1). + +determine_if_machine_upgrade_allowed( + #{cfg := #cfg{effective_machine_version = EffectiveMacVer, + log_id = LogId}} = State) -> + Effects = case has_enough_peer_info(State) of + false -> + %% We miss the info from some peers, wait for them to + %% determine if we can upgrade the machine version. + []; + true -> + case get_max_supported_machine_version(State) of + MaxSupMacVer + when MaxSupMacVer > EffectiveMacVer -> + ?DEBUG( + "~ts: max supported machine version = ~b, " + "upgrading from ~b", + [LogId, MaxSupMacVer, EffectiveMacVer]), + Noop = {noop, + #{ts => erlang:system_time(millisecond)}, + MaxSupMacVer}, + [{next_event, cast, {command, Noop}}]; + _ -> + [] + end + end, + {State, Effects}. + +has_enough_peer_info(#{cfg := #cfg{id = Id}, cluster := Cluster}) -> + maps:fold( + fun + (PeerId, PeerState, HasEnough) when PeerId =/= Id -> + HasEnough andalso maps:is_key(machine_version, PeerState); + (_PeerId, _PeerState, HasEnough) -> + HasEnough + end, true, Cluster). + +get_max_supported_machine_version( + #{cfg := #cfg{id = Id}, cluster := Cluster} = State) -> + MacVer = machine_version(State), + MaxSupMacVer = maps:fold( + fun + (PeerId, #{machine_version := PeerMacVer}, Max) + when PeerId =/= Id andalso PeerMacVer < Max -> + PeerMacVer; + (_PeerId, _PeerState, Max) -> + %% Either this is this Ra server, or this is a + %% peer using a newer machine version, or a peer + %% for which we don't have the machine version + %% yet. + Max + end, MacVer, Cluster), + MaxSupMacVer. + %%% =================== %%% Internal unit tests %%% =================== diff --git a/test/ra_machine_version_SUITE.erl b/test/ra_machine_version_SUITE.erl index 7ebf8d83..40836fad 100644 --- a/test/ra_machine_version_SUITE.erl +++ b/test/ra_machine_version_SUITE.erl @@ -29,11 +29,10 @@ all() -> all_tests() -> [ server_with_higher_version_needs_quorum_to_be_elected, - server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher, + cluster_waits_for_all_members_to_have_latest_version_to_upgrade, unversioned_machine_never_sees_machine_version_command, unversioned_can_change_to_versioned, server_upgrades_machine_state_on_noop_command, - lower_version_does_not_apply_until_upgraded, server_applies_with_new_module % snapshot_persists_machine_version ]. @@ -135,7 +134,7 @@ server_with_higher_version_needs_quorum_to_be_elected(Config) -> ?assertNotEqual(LastFollower, Leader3), ok. -server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Config) -> +cluster_waits_for_all_members_to_have_latest_version_to_upgrade(Config) -> ok = ra_env:configure_logger(logger), LogFile = filename:join([?config(priv_dir, Config), "ra.log"]), LogConfig = #{config => #{type => {file, LogFile}}, level => debug}, @@ -168,30 +167,42 @@ server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Con ra:restart_server(?SYS, Leader), ra:restart_server(?SYS, Follower1), timer:sleep(100), - {ok, _, Leader2} = ra:members(Leader, 2000), + {ok, _, _Leader1} = ra:members(Leader, 2000), ra:restart_server(?SYS, Follower2), - %% need to wait until the restarted Follower2 discovers the current - %% effective machine version - await(fun () -> - case ra:member_overview(Follower2) of - {ok, #{effective_machine_version := 2, - machine_version := 1}, _} -> - true; - _ -> - false - end - end, 100), - %% at this point the effective machine version known by all members is 2 - %% but Follower2's local machine version is 1 as it hasn't been "upgraded" - %% yet - %% stop the leader to trigger an election that Follower2 must not win - ra:stop_server(?SYS, Leader2), - ExpectedLeader = case Leader2 of - Follower1 -> Leader; - _ -> Follower1 - end, - %% follower 1 should now be elected - ?assertMatch({ok, _, ExpectedLeader}, ra:members(ExpectedLeader, 60000)), + %% The cluster is still using v1 even though Leader and Follower2 knows + %% about v2. + lists:foreach( + fun(Member) -> + await(fun () -> + case ra:member_overview(Member) of + {ok, #{effective_machine_version := 1, + machine_version := 1}, _} + when Member == Follower2 -> + true; + {ok, #{effective_machine_version := 1, + machine_version := 2}, _} -> + true; + _ -> + false + end + end, 100) + end, Cluster), + %% Restart Follower2 with v2. The cluster should now upgrade to v2. + ra:stop_server(?SYS, Follower2), + meck:expect(Mod, version, fun () -> 2 end), + ra:restart_server(?SYS, Follower2), + lists:foreach( + fun(Member) -> + await(fun () -> + case ra:member_overview(Member) of + {ok, #{effective_machine_version := 2, + machine_version := 2}, _} -> + true; + _ -> + false + end + end, 100) + end, Cluster), ok. @@ -336,89 +347,6 @@ server_applies_with_new_module(Config) -> {ok, state_v1, _} = ra:consistent_query(ServerId, fun ra_lib:id/1), ok. -lower_version_does_not_apply_until_upgraded(Config) -> - ok = logger:set_primary_config(level, all), - Mod = ?config(modname, Config), - meck:new(Mod, [non_strict]), - meck:expect(Mod, init, fun (_) -> init_state end), - meck:expect(Mod, version, fun () -> 1 end), - meck:expect(Mod, which_module, fun (_) -> Mod end), - meck:expect(Mod, apply, fun - (_, {machine_version, _, _}, S) -> - %% retain state for machine versions - {S, ok}; - (_, C, _) -> - %% any other command replaces the state - {C, ok} - end), - Cluster = ?config(cluster, Config), - ClusterName = ?config(cluster_name, Config), - %% 3 node cluster, upgrade the first two to the later version - %% leaving the follower on a lower version - Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster), - Followers = lists:delete(Leader, Cluster), - ct:pal("Leader1 ~w Followers ~w", [Leader, Followers]), - meck:expect(Mod, version, fun () -> - Self = self(), - case whereis(element(1, Leader)) of - Self -> 2; - _ -> 1 - end - end), - timer:sleep(200), - ra:stop_server(?SYS, Leader), - {ok, _, Leader2} = ra:members(Followers), - [LastFollower] = lists:delete(Leader2, Followers), - ct:pal("Leader2 ~w LastFollower ~w", [Leader2, LastFollower]), - ra:restart_server(?SYS, Leader), - meck:expect(Mod, version, fun () -> - New = [whereis(element(1, Leader)), - whereis(element(1, Leader2))], - case lists:member(self(), New) of - true -> 2; - _ -> 1 - end - end), - ra:stop_server(?SYS, Leader2), - timer:sleep(500), - {ok, _, Leader3} = ra:members(LastFollower), - ct:pal("Leader3 ~w LastFollower ~w", [Leader3, LastFollower]), - ra:restart_server(?SYS, Leader2), - - case Leader3 of - LastFollower -> - %% if last follower happened to be elected - ct:pal("Leader3 is LastFollower", []), - ra:stop_server(?SYS, Leader3), - %% allow time for a different member to be elected - timer:sleep(1000), - ra:restart_server(?SYS, Leader3); - _ -> ok - end, - - - %% process a command that should be replicated to all servers but only - %% applied to new machine version servers - {ok, ok, _} = ra:process_command(Leader, dummy), - %% a little sleep to make it more likely that replication is complete to - %% all servers and not just a quorum - timer:sleep(100), - - %% the updated servers should have the same state - {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader, fun ra_lib:id/1), - {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader2, fun ra_lib:id/1), - %% the last follower with the lower machine version should not have - %% applied the last command - {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1), - - ra:stop_server(?SYS, LastFollower), - ra:restart_server(?SYS, LastFollower), - - {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1), - - ?assert(Idx > LFIdx), - ok. - snapshot_persists_machine_version(_Config) -> error({todo, ?FUNCTION_NAME}). diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index a17ba982..a4663e31 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -1861,7 +1861,8 @@ candidate_election(_Config) -> {follower, #{current_term := 7}, []} = ra_server:handle_candidate(HighTermResult, State1), - MacVer = 1, + EffectiveMacVer = 0, + MacVer = EffectiveMacVer + 1, meck:expect(ra_machine, version, fun (_) -> MacVer end), % quorum has been achieved - candidate becomes leader @@ -1875,7 +1876,12 @@ candidate_election(_Config) -> N4 := PeerState, N5 := PeerState}}, [ - {next_event, cast, {command, {noop, _, MacVer}}}, + {next_event, cast, {command, {noop, _, EffectiveMacVer}}}, + {send_rpc, N2, {info_rpc, _, _, _}}, + {send_rpc, N3, {info_rpc, _, _, _}}, + {send_rpc, N4, {info_rpc, _, _, _}}, + {send_rpc, N5, {info_rpc, _, _, _}}, + {send_rpc, _, _}, {send_rpc, _, _}, {send_rpc, _, _},