diff --git a/src/ra.hrl b/src/ra.hrl
index cef26c04..dc3c2b85 100644
--- a/src/ra.hrl
+++ b/src/ra.hrl
@@ -70,7 +70,8 @@
voter_status => ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
- status := ra_peer_status()}.
+ status := ra_peer_status(),
+ machine_version => ra_machine:version()}.
-type ra_cluster() :: #{ra_server_id() => ra_peer_state()}.
@@ -187,6 +188,17 @@
{query_index :: integer(),
term :: ra_term()}).
+-record(info_rpc,
+ {from :: ra_server_id(),
+ term :: ra_term(),
+ keys :: [ra_server:ra_server_info_key()]}).
+
+-record(info_reply,
+ {from :: ra_server_id(),
+ term :: ra_term(),
+ keys :: [ra_server:ra_server_info_key()],
+ info :: ra_server:ra_server_info() | undefined}).
+
%% WAL defaults
-define(WAL_DEFAULT_MAX_SIZE_BYTES, 256 * 1000 * 1000).
-define(WAL_DEFAULT_MAX_BATCH_SIZE, 8192).
diff --git a/src/ra_server.erl b/src/ra_server.erl
index bffa31cd..86c70e15 100644
--- a/src/ra_server.erl
+++ b/src/ra_server.erl
@@ -146,6 +146,8 @@
ra_log:event() |
{consistent_query, term(), ra:query_fun()} |
#heartbeat_rpc{} |
+ #info_rpc{} |
+ #info_reply{} |
{ra_server_id, #heartbeat_reply{}} |
pipeline_rpcs.
@@ -219,6 +221,27 @@
has_changed => boolean()
}.
+-type ra_server_info_key() :: machine_version | atom().
+%% Key one can get in `ra_server_info()'.
+%%
+%% This is used in the `#info_rpc{}' to ask for a specific list of info keys.
+%%
+%% Key meanings:
+%%
+%% - `machine_version': Highest machine version supported by this Ra
+%% server.
+%%
+%%
+%% Any atom is supported because a future version of Ra could ask for
+%% something this version does not know about.
+
+-type ra_server_info() :: #{machine_version := ra_machine:version(),
+ atom() => any()}.
+%% Info for a Ra server, got from `#info_reply{}'.
+%%
+%% In addition, the map may contain keys unknown to this version of Ra but
+%% emitted by a future version.
+
-type mutable_config() :: #{cluster_name => ra_cluster_name(),
metrics_key => term(),
broadcast_time => non_neg_integer(), % ms
@@ -235,6 +258,8 @@
ra_server_state/0,
ra_state/0,
ra_server_config/0,
+ ra_server_info_key/0,
+ ra_server_info/0,
mutable_config/0,
ra_msg/0,
machine_conf/0,
@@ -814,6 +839,29 @@ handle_leader(#request_vote_result{}, State) ->
handle_leader(#pre_vote_result{}, State) ->
%% handle to avoid logging as unhandled
{leader, State, []};
+handle_leader(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when CurTerm < Term ->
+ ?INFO("~ts: leader saw info_rpc from ~w for term ~b, abdicates term: ~b!",
+ [LogId, Msg#info_rpc.from, Term, CurTerm]),
+ {follower, update_term(Term, State0#{leader_id => undefined}),
+ [{next_event, Msg}]};
+handle_leader(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = info_reply_effect(State, InfoRpc),
+ {leader, State, [InfoReplyEffect]};
+handle_leader(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when CurTerm < Term ->
+ ?INFO("~ts: leader saw info_reply from ~w for term ~b, abdicates "
+ "term: ~b!",
+ [LogId, Msg#info_reply.from, Term, CurTerm]),
+ {follower, update_term(Term, State0#{leader_id => undefined}),
+ [{next_event, Msg}]};
+handle_leader(#info_reply{} = InfoReply, State) ->
+ {State1, Effects} = handle_info_reply(State, InfoReply),
+ {leader, State1, Effects};
handle_leader({transfer_leadership, Leader},
#{cfg := #cfg{id = Leader, log_id = LogId}} = State) ->
?DEBUG("~ts: transfer leadership requested but already leader",
@@ -860,8 +908,7 @@ handle_leader(Msg, State) ->
{ra_state(), ra_server_state(), effects()}.
handle_candidate(#request_vote_result{term = Term, vote_granted = true},
#{cfg := #cfg{id = Id,
- log_id = LogId,
- machine = Mac},
+ log_id = LogId},
current_term := Term,
votes := Votes,
cluster := Nodes} = State0) ->
@@ -871,11 +918,10 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
case required_quorum(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
- Noop = {noop, #{ts => erlang:system_time(millisecond)},
- ra_machine:version(Mac)},
State = State1#{leader_id => Id},
+ PostElectionEffects = post_election_effects(State),
{leader, maps:without([votes], State),
- [{next_event, cast, {command, Noop}} | Effects]};
+ PostElectionEffects ++ Effects};
_ ->
{candidate, State0#{votes => NewVotes}, []}
end;
@@ -966,6 +1012,27 @@ handle_candidate({register_external_log_reader, Pid}, #{log := Log0} = State) ->
{candidate, State#{log => Log}, Effs};
handle_candidate(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
+handle_candidate(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when Term >= CurTerm ->
+ ?INFO("~ts: candidate info_rpc with higher term received ~b -> ~b",
+ [LogId, CurTerm, Term]),
+ State = update_term_and_voted_for(Term, undefined, State0),
+ {follower, State, [{next_event, Msg}]};
+handle_candidate(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
+ {candidate, State, [InfoReplyEffect]};
+handle_candidate(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId}} = State0)
+ when Term >= CurTerm ->
+ ?INFO("~ts: candidate info_reply with higher term received ~b -> ~b",
+ [LogId, CurTerm, Term]),
+ State = update_term_and_voted_for(Term, undefined, State0),
+ {follower, State, [{next_event, Msg}]};
+handle_candidate(#info_reply{}, State) ->
+ {candidate, State, []};
handle_candidate(Msg, State) ->
log_unhandled_msg(candidate, Msg, State),
{candidate, State, [{reply, {error, {unsupported_call, Msg}}}]}.
@@ -1048,6 +1115,19 @@ handle_pre_vote({register_external_log_reader, Pid}, #{log := Log0} = State) ->
{pre_vote, State#{log => Log}, Effs};
handle_pre_vote(force_member_change, State0) ->
{follower, State0#{votes => 0}, [{next_event, force_member_change}]};
+handle_pre_vote(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm} = State0)
+ when Term >= CurTerm ->
+ {follower, State0#{votes => 0}, [{next_event, Msg}]};
+handle_pre_vote(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
+ {pre_vote, State, [InfoReplyEffect]};
+handle_pre_vote(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm} = State0)
+ when Term >= CurTerm ->
+ {follower, State0#{votes => 0}, [{next_event, Msg}]};
+handle_pre_vote(#info_reply{}, State) ->
+ {pre_vote, State, []};
handle_pre_vote(Msg, State) ->
log_unhandled_msg(pre_vote, Msg, State),
{pre_vote, State, [{reply, {error, {unsupported_call, Msg}}}]}.
@@ -1349,6 +1429,21 @@ handle_follower(force_member_change,
{ok, _, _, State, Effects} =
append_cluster_change(Cluster, undefined, no_reply, State0, []),
call_for_election(pre_vote, State, [{reply, ok} | Effects]);
+handle_follower(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm} = State)
+ when Term >= CurTerm ->
+ State1 = update_term(Term, State),
+ {follower, State1, [{next_event, Msg}]};
+handle_follower(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = info_reply_effect(State, InfoRpc),
+ {follower, State, [InfoReplyEffect]};
+handle_follower(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm} = State)
+ when Term >= CurTerm ->
+ State1 = update_term(Term, State),
+ {follower, State1, [{next_event, Msg}]};
+handle_follower(#info_reply{}, State) ->
+ {follower, State, []};
handle_follower(Msg, State) ->
log_unhandled_msg(follower, Msg, State),
{follower, State, [{reply, {error, {unsupported_call, Msg}}}]}.
@@ -1465,6 +1560,39 @@ handle_receive_snapshot(receive_snapshot_timeout, #{log := Log0} = State) ->
handle_receive_snapshot({register_external_log_reader, Pid}, #{log := Log0} = State) ->
{Log, Effs} = ra_log:register_reader(Pid, Log0),
{receive_snapshot, State#{log => Log}, Effs};
+handle_receive_snapshot(#info_rpc{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId},
+ log := Log0} = State)
+ when Term > CurTerm ->
+ ?INFO("~ts: follower receiving snapshot saw info_rpc from ~w for term ~b "
+ "abdicates term: ~b!",
+ [LogId, Msg#info_rpc.from,
+ Term, CurTerm]),
+ SnapState0 = ra_log:snapshot_state(Log0),
+ SnapState = ra_snapshot:abort_accept(SnapState0),
+ Log = ra_log:set_snapshot_state(SnapState, Log0),
+ {follower, update_term(Term, clear_leader_id(State#{log => Log})),
+ [{next_event, Msg}]};
+handle_receive_snapshot(#info_rpc{} = InfoRpc, State) ->
+ InfoReplyEffect = empty_info_reply_effect(State, InfoRpc),
+ {receive_snapshot, State, [InfoReplyEffect]};
+handle_receive_snapshot(#info_reply{term = Term} = Msg,
+ #{current_term := CurTerm,
+ cfg := #cfg{log_id = LogId},
+ log := Log0} = State)
+ when Term > CurTerm ->
+ ?INFO("~ts: follower receiving snapshot saw info_reply from ~w for term ~b "
+ "abdicates term: ~b!",
+ [LogId, Msg#info_reply.from,
+ Term, CurTerm]),
+ SnapState0 = ra_log:snapshot_state(Log0),
+ SnapState = ra_snapshot:abort_accept(SnapState0),
+ Log = ra_log:set_snapshot_state(SnapState, Log0),
+ {follower, update_term(Term, clear_leader_id(State#{log => Log})),
+ [{next_event, Msg}]};
+handle_receive_snapshot(#info_reply{}, State) ->
+ {receive_snapshot, State, []};
handle_receive_snapshot(Msg, State) ->
log_unhandled_msg(receive_snapshot, Msg, State),
%% drop all other events??
@@ -1533,9 +1661,10 @@ process_new_leader_queries(#{pending_consistent_queries := Pending,
-spec tick(ra_server_state()) -> effects().
tick(#{cfg := #cfg{effective_machine_module = MacMod},
- machine_state := MacState}) ->
+ machine_state := MacState} = State) ->
+ InfoRpcEffects = info_rpc_effects(State),
Now = erlang:system_time(millisecond),
- ra_machine:tick(MacMod, Now, MacState).
+ InfoRpcEffects ++ ra_machine:tick(MacMod, Now, MacState).
-spec log_tick(ra_server_state()) -> ra_server_state().
log_tick(#{cfg := #cfg{},
@@ -2313,6 +2442,19 @@ peers_with_normal_status(State) ->
(_, _) -> false
end, peers(State)).
+voting_peers(State) ->
+ Peers0 = peers(State),
+ Peers1 = maps:filter(
+ fun
+ (_PeerId, #{voter_status := #{membership := voter}}) ->
+ true;
+ (_PeerId, Peer) when not is_map_key(voter_status, Peer) ->
+ true;
+ (_PeerId, _Peer) ->
+ false
+ end, Peers0),
+ Peers1.
+
% peers that could need an update
stale_peers(#{commit_index := CommitIndex,
cfg := #cfg{id = ThisId},
@@ -3314,6 +3456,186 @@ after_log_append_reply(Cmd, Idx, Term, Effects0) ->
Effects0
end.
+post_election_effects(
+ #{cfg := #cfg{effective_machine_version = EffectiveMacVer,
+ machine = Mac}} = State) ->
+ Peers = voting_peers(State),
+ PeerIds = maps:keys(Peers),
+ case PeerIds of
+ [] ->
+ %% This node is alone in the cluster, we can send the `noop'
+ %% command with the newer machine version right away.
+ MacVer = ra_machine:version(Mac),
+ Noop = {noop, #{ts => erlang:system_time(millisecond)},
+ MacVer},
+ NoopEffect = {next_event, cast, {command, Noop}},
+ [NoopEffect];
+ _ ->
+ %% We continue to send the `noop' command immediately after
+ %% becoming a leader, but compared to Ra 2.15 and older, we don't
+ %% set the machine version to the latest: we keep the same
+ %% effective machine version for now.
+ %%
+ %% However, we query info keys from all peers, including their
+ %% supported machine version. The replies will be used to
+ %% determine the max supported machine version and when it is
+ %% greater than the effective one, another `noop' command will be
+ %% sent.
+ Noop = {noop, #{ts => erlang:system_time(millisecond)},
+ EffectiveMacVer},
+ NoopEffect = {next_event, cast, {command, Noop}},
+ InfoRpcEffects = info_rpc_effects(State),
+ [NoopEffect | InfoRpcEffects]
+ end.
+
+info_rpc_effects(#{cfg := #cfg{id = Id}, cluster := Cluster} = State) ->
+ InfoRpcEffects = maps:fold(
+ fun
+ (PeerId, _, Acc) when PeerId =:= Id ->
+ Acc;
+ (PeerId, _, Acc) ->
+ Acc ++ info_rpc_effects_for_peer(State, PeerId)
+ end, [], Cluster),
+ InfoRpcEffects.
+
+info_rpc_effects_for_peer(
+ #{cluster := Cluster, current_term := CurTerm} = State, PeerId) ->
+ %% We determine if we need to ask (for the fist time or again) the info
+ %% from a peer.
+ MacVer = machine_version(State),
+ SendRpc = case Cluster of
+ #{PeerId := #{machine_version := PeerMacVer}} ->
+ %% We have the machine version of the peer, but we want
+ %% to ask again if that version is old, in the hope the
+ %% peer restarted and was updated.
+ PeerMacVer < MacVer;
+ _ ->
+ %% We don't have any details about the peer, we ask.
+ true
+ end,
+ case SendRpc of
+ true ->
+ %% We ask for all info keys currently. If we ask for specific
+ %% keys, we will have to handle merging of already known info keys
+ %% and updates.
+ Id = id(State),
+ Command = #info_rpc{from = Id,
+ term = CurTerm,
+ keys = [machine_version]},
+ [{send_rpc, PeerId, Command}];
+ false ->
+ []
+ end.
+
+info_reply_effect(
+ #{current_term := CurTerm} = State,
+ #info_rpc{from = FromId, keys = Keys}) ->
+ Id = id(State),
+ Info = ra_server_info(State, Keys),
+ InfoReply = #info_reply{from = Id,
+ term = CurTerm,
+ keys = Keys,
+ info = Info},
+ {cast, FromId, InfoReply}.
+
+empty_info_reply_effect(
+ #{current_term := CurTerm} = State,
+ #info_rpc{from = FromId, keys = Keys}) ->
+ Id = id(State),
+ InfoReply = #info_reply{from = Id,
+ term = CurTerm,
+ keys = Keys,
+ info = undefined},
+ {cast, FromId, InfoReply}.
+
+ra_server_info(State) ->
+ MacVer = machine_version(State),
+ #{machine_version => MacVer}.
+
+ra_server_info(State, Keys) ->
+ %% Note that the node that asked may ask for keys we don't know.
+ Info0 = ra_server_info(State),
+ Info1 = maps:filter(
+ fun(Key, _Value) ->
+ lists:member(Key, Keys)
+ end, Info0),
+ Info1.
+
+handle_info_reply(
+ #{cluster := Cluster} = State,
+ #info_reply{from = PeerId, keys = Keys, info = Info}) ->
+ PeerState0 = maps:get(PeerId, Cluster),
+ PeerState1 = case Info of
+ undefined ->
+ %% The peer is is a Raft state where it can't give
+ %% info yet.
+ lists:foldl(
+ fun(Key, PS) ->
+ maps:remove(Key, PS)
+ end, PeerState0, Keys);
+ _ ->
+ lists:foldl(
+ fun(Key, PS) ->
+ Value = maps:get(Key, Info),
+ PS#{Key => Value}
+ end, PeerState0, Keys)
+ end,
+ Cluster1 = Cluster#{PeerId => PeerState1},
+ State1 = State#{cluster => Cluster1},
+ determine_if_machine_upgrade_allowed(State1).
+
+determine_if_machine_upgrade_allowed(
+ #{cfg := #cfg{effective_machine_version = EffectiveMacVer,
+ log_id = LogId}} = State) ->
+ Effects = case has_enough_peer_info(State) of
+ false ->
+ %% We miss the info from some peers, wait for them to
+ %% determine if we can upgrade the machine version.
+ [];
+ true ->
+ case get_max_supported_machine_version(State) of
+ MaxSupMacVer
+ when MaxSupMacVer > EffectiveMacVer ->
+ ?DEBUG(
+ "~ts: max supported machine version = ~b, "
+ "upgrading from ~b",
+ [LogId, MaxSupMacVer, EffectiveMacVer]),
+ Noop = {noop,
+ #{ts => erlang:system_time(millisecond)},
+ MaxSupMacVer},
+ [{next_event, cast, {command, Noop}}];
+ _ ->
+ []
+ end
+ end,
+ {State, Effects}.
+
+has_enough_peer_info(#{cfg := #cfg{id = Id}, cluster := Cluster}) ->
+ maps:fold(
+ fun
+ (PeerId, PeerState, HasEnough) when PeerId =/= Id ->
+ HasEnough andalso maps:is_key(machine_version, PeerState);
+ (_PeerId, _PeerState, HasEnough) ->
+ HasEnough
+ end, true, Cluster).
+
+get_max_supported_machine_version(
+ #{cfg := #cfg{id = Id}, cluster := Cluster} = State) ->
+ MacVer = machine_version(State),
+ MaxSupMacVer = maps:fold(
+ fun
+ (PeerId, #{machine_version := PeerMacVer}, Max)
+ when PeerId =/= Id andalso PeerMacVer < Max ->
+ PeerMacVer;
+ (_PeerId, _PeerState, Max) ->
+ %% Either this is this Ra server, or this is a
+ %% peer using a newer machine version, or a peer
+ %% for which we don't have the machine version
+ %% yet.
+ Max
+ end, MacVer, Cluster),
+ MaxSupMacVer.
+
%%% ===================
%%% Internal unit tests
%%% ===================
diff --git a/test/ra_machine_version_SUITE.erl b/test/ra_machine_version_SUITE.erl
index 7ebf8d83..40836fad 100644
--- a/test/ra_machine_version_SUITE.erl
+++ b/test/ra_machine_version_SUITE.erl
@@ -29,11 +29,10 @@ all() ->
all_tests() ->
[
server_with_higher_version_needs_quorum_to_be_elected,
- server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher,
+ cluster_waits_for_all_members_to_have_latest_version_to_upgrade,
unversioned_machine_never_sees_machine_version_command,
unversioned_can_change_to_versioned,
server_upgrades_machine_state_on_noop_command,
- lower_version_does_not_apply_until_upgraded,
server_applies_with_new_module
% snapshot_persists_machine_version
].
@@ -135,7 +134,7 @@ server_with_higher_version_needs_quorum_to_be_elected(Config) ->
?assertNotEqual(LastFollower, Leader3),
ok.
-server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Config) ->
+cluster_waits_for_all_members_to_have_latest_version_to_upgrade(Config) ->
ok = ra_env:configure_logger(logger),
LogFile = filename:join([?config(priv_dir, Config), "ra.log"]),
LogConfig = #{config => #{type => {file, LogFile}}, level => debug},
@@ -168,30 +167,42 @@ server_with_lower_version_can_vote_for_higher_if_effective_version_is_higher(Con
ra:restart_server(?SYS, Leader),
ra:restart_server(?SYS, Follower1),
timer:sleep(100),
- {ok, _, Leader2} = ra:members(Leader, 2000),
+ {ok, _, _Leader1} = ra:members(Leader, 2000),
ra:restart_server(?SYS, Follower2),
- %% need to wait until the restarted Follower2 discovers the current
- %% effective machine version
- await(fun () ->
- case ra:member_overview(Follower2) of
- {ok, #{effective_machine_version := 2,
- machine_version := 1}, _} ->
- true;
- _ ->
- false
- end
- end, 100),
- %% at this point the effective machine version known by all members is 2
- %% but Follower2's local machine version is 1 as it hasn't been "upgraded"
- %% yet
- %% stop the leader to trigger an election that Follower2 must not win
- ra:stop_server(?SYS, Leader2),
- ExpectedLeader = case Leader2 of
- Follower1 -> Leader;
- _ -> Follower1
- end,
- %% follower 1 should now be elected
- ?assertMatch({ok, _, ExpectedLeader}, ra:members(ExpectedLeader, 60000)),
+ %% The cluster is still using v1 even though Leader and Follower2 knows
+ %% about v2.
+ lists:foreach(
+ fun(Member) ->
+ await(fun () ->
+ case ra:member_overview(Member) of
+ {ok, #{effective_machine_version := 1,
+ machine_version := 1}, _}
+ when Member == Follower2 ->
+ true;
+ {ok, #{effective_machine_version := 1,
+ machine_version := 2}, _} ->
+ true;
+ _ ->
+ false
+ end
+ end, 100)
+ end, Cluster),
+ %% Restart Follower2 with v2. The cluster should now upgrade to v2.
+ ra:stop_server(?SYS, Follower2),
+ meck:expect(Mod, version, fun () -> 2 end),
+ ra:restart_server(?SYS, Follower2),
+ lists:foreach(
+ fun(Member) ->
+ await(fun () ->
+ case ra:member_overview(Member) of
+ {ok, #{effective_machine_version := 2,
+ machine_version := 2}, _} ->
+ true;
+ _ ->
+ false
+ end
+ end, 100)
+ end, Cluster),
ok.
@@ -336,89 +347,6 @@ server_applies_with_new_module(Config) ->
{ok, state_v1, _} = ra:consistent_query(ServerId, fun ra_lib:id/1),
ok.
-lower_version_does_not_apply_until_upgraded(Config) ->
- ok = logger:set_primary_config(level, all),
- Mod = ?config(modname, Config),
- meck:new(Mod, [non_strict]),
- meck:expect(Mod, init, fun (_) -> init_state end),
- meck:expect(Mod, version, fun () -> 1 end),
- meck:expect(Mod, which_module, fun (_) -> Mod end),
- meck:expect(Mod, apply, fun
- (_, {machine_version, _, _}, S) ->
- %% retain state for machine versions
- {S, ok};
- (_, C, _) ->
- %% any other command replaces the state
- {C, ok}
- end),
- Cluster = ?config(cluster, Config),
- ClusterName = ?config(cluster_name, Config),
- %% 3 node cluster, upgrade the first two to the later version
- %% leaving the follower on a lower version
- Leader = start_cluster(ClusterName, {module, Mod, #{}}, Cluster),
- Followers = lists:delete(Leader, Cluster),
- ct:pal("Leader1 ~w Followers ~w", [Leader, Followers]),
- meck:expect(Mod, version, fun () ->
- Self = self(),
- case whereis(element(1, Leader)) of
- Self -> 2;
- _ -> 1
- end
- end),
- timer:sleep(200),
- ra:stop_server(?SYS, Leader),
- {ok, _, Leader2} = ra:members(Followers),
- [LastFollower] = lists:delete(Leader2, Followers),
- ct:pal("Leader2 ~w LastFollower ~w", [Leader2, LastFollower]),
- ra:restart_server(?SYS, Leader),
- meck:expect(Mod, version, fun () ->
- New = [whereis(element(1, Leader)),
- whereis(element(1, Leader2))],
- case lists:member(self(), New) of
- true -> 2;
- _ -> 1
- end
- end),
- ra:stop_server(?SYS, Leader2),
- timer:sleep(500),
- {ok, _, Leader3} = ra:members(LastFollower),
- ct:pal("Leader3 ~w LastFollower ~w", [Leader3, LastFollower]),
- ra:restart_server(?SYS, Leader2),
-
- case Leader3 of
- LastFollower ->
- %% if last follower happened to be elected
- ct:pal("Leader3 is LastFollower", []),
- ra:stop_server(?SYS, Leader3),
- %% allow time for a different member to be elected
- timer:sleep(1000),
- ra:restart_server(?SYS, Leader3);
- _ -> ok
- end,
-
-
- %% process a command that should be replicated to all servers but only
- %% applied to new machine version servers
- {ok, ok, _} = ra:process_command(Leader, dummy),
- %% a little sleep to make it more likely that replication is complete to
- %% all servers and not just a quorum
- timer:sleep(100),
-
- %% the updated servers should have the same state
- {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader, fun ra_lib:id/1),
- {ok, {{Idx, _}, dummy}, _} = ra:local_query(Leader2, fun ra_lib:id/1),
- %% the last follower with the lower machine version should not have
- %% applied the last command
- {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1),
-
- ra:stop_server(?SYS, LastFollower),
- ra:restart_server(?SYS, LastFollower),
-
- {ok, {{LFIdx, _}, init_state}, _} = ra:local_query(LastFollower, fun ra_lib:id/1),
-
- ?assert(Idx > LFIdx),
- ok.
-
snapshot_persists_machine_version(_Config) ->
error({todo, ?FUNCTION_NAME}).
diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl
index a17ba982..a4663e31 100644
--- a/test/ra_server_SUITE.erl
+++ b/test/ra_server_SUITE.erl
@@ -1861,7 +1861,8 @@ candidate_election(_Config) ->
{follower, #{current_term := 7}, []}
= ra_server:handle_candidate(HighTermResult, State1),
- MacVer = 1,
+ EffectiveMacVer = 0,
+ MacVer = EffectiveMacVer + 1,
meck:expect(ra_machine, version, fun (_) -> MacVer end),
% quorum has been achieved - candidate becomes leader
@@ -1875,7 +1876,12 @@ candidate_election(_Config) ->
N4 := PeerState,
N5 := PeerState}},
[
- {next_event, cast, {command, {noop, _, MacVer}}},
+ {next_event, cast, {command, {noop, _, EffectiveMacVer}}},
+ {send_rpc, N2, {info_rpc, _, _, _}},
+ {send_rpc, N3, {info_rpc, _, _, _}},
+ {send_rpc, N4, {info_rpc, _, _, _}},
+ {send_rpc, N5, {info_rpc, _, _, _}},
+
{send_rpc, _, _},
{send_rpc, _, _},
{send_rpc, _, _},