Skip to content

Commit

Permalink
Effect bug fixes and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Nov 21, 2024
1 parent 979a25b commit 5053c42
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 102 deletions.
13 changes: 9 additions & 4 deletions src/ra_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,8 @@
-type version() :: non_neg_integer().

-type effect() ::
%% These effects are only executed on the leader
{send_msg, To :: locator(), Msg :: term()} |
%% @TODO: with local deliveries is it theoretically possible for a follower
%% to apply entries but not know who the current leader is?
%% If so, `To' must also include undefined
{send_msg, To :: locator(), Msg :: term(), Options :: send_msg_opts()} |
{mod_call, module(), Function :: atom(), [term()]} |
%% appends a user command to the raft log
{append, term()} |
Expand All @@ -134,11 +131,19 @@
{demonitor, node, node()} |
{timer, term(), non_neg_integer() | infinity} |
{log, [ra_index()], fun(([user_command()]) -> effects())} |

%% these are either conditional on the local configuration or
%% will always be evaluated when seen by members in any raft state
{send_msg, To :: locator(), Msg :: term(), Options :: send_msg_opts()} |
{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} |
{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}} |
{release_cursor, ra_index(), state()} |
{release_cursor, ra_index()} |
{checkpoint, ra_index(), state()} |
{aux, term()} |
%% like append/3 but a special backwards compatible function
%% that tries to execute in any raft state
{try_append, term(), ra_server:command_reply_mode()} |
garbage_collection.

%% Effects are data structures that can be returned by {@link apply/3} to ask
Expand Down
112 changes: 59 additions & 53 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@
-type effect() ::
ra_machine:effect() |
ra_log:effect() |
%% this is used for replies for immedate requests
{reply, ra_reply_body()} |
{reply, term(), ra_reply_body()} |
%% this is used by the leader only
{reply, from(), ra_reply_body()} |
{reply, from(), ra_reply_body(),
Replier :: leader | local | {member, ra_server_id()}} |
{cast, ra_server_id(), term()} |
{send_vote_requests, [{ra_server_id(),
#request_vote_rpc{} | #pre_vote_rpc{}}]} |
Expand Down Expand Up @@ -1802,10 +1806,10 @@ evaluate_commit_index_follower(#{commit_index := CommitIndex,
{delete_and_terminate, State1, Effects} ->
Reply = append_entries_reply(Term, true, State1),
{delete_and_terminate, State1,
[cast_reply(Id, LeaderId, Reply) |
filter_follower_effects(Effects)]};
{#{last_applied := LastApplied} = State, Effects1} ->
Effects = filter_follower_effects(Effects1),
[cast_reply(Id, LeaderId, Reply) | Effects]};
% filter_follower_effects(Effects)]};
{#{last_applied := LastApplied} = State, Effects} ->
% Effects = filter_follower_effects(Effects1),
case LastApplied > LastApplied0 of
true ->
%% entries were applied, append eval_aux effect
Expand All @@ -1819,49 +1823,51 @@ evaluate_commit_index_follower(State, Effects) ->
%% when no leader is known
{follower, State, Effects}.

filter_follower_effects(Effects) ->
lists:foldr(fun ({release_cursor, _, _} = C, Acc) ->
[C | Acc];
({release_cursor, _} = C, Acc) ->
[C | Acc];
({checkpoint, _, _} = C, Acc) ->
[C | Acc];
({record_leader_msg, _} = C, Acc) ->
[C | Acc];
({aux, _} = C, Acc) ->
[C | Acc];
(garbage_collection = C, Acc) ->
[C | Acc];
({delete_snapshot, _} = C, Acc) ->
[C | Acc];
({send_msg, _, _, _Opts} = C, Acc) ->
%% send_msg effects _may_ have the local option
%% and will be evaluated properly during
%% effect processing
[C | Acc];
({log, _, _, _Opts} = C, Acc) ->
[C | Acc];
({reply, _, _, leader}, Acc) ->
Acc;
({reply, _, _, _} = C, Acc) ->
%% If the reply-from is not `leader', the follower
%% might be the replier.
[C | Acc];
({monitor, _ProcOrNode, Comp, _} = C, Acc)
when Comp =/= machine ->
%% only machine monitors should not be emitted
%% by followers
[C | Acc];
(L, Acc) when is_list(L) ->
%% nested case - recurse
case filter_follower_effects(L) of
[] -> Acc;
Filtered ->
[Filtered | Acc]
end;
(_, Acc) ->
Acc
end, [], Effects).
% filter_follower_effects(Effects) ->
% lists:foldr(fun ({release_cursor, _, _} = C, Acc) ->
% [C | Acc];
% ({release_cursor, _} = C, Acc) ->
% [C | Acc];
% ({checkpoint, _, _} = C, Acc) ->
% [C | Acc];
% ({record_leader_msg, _} = C, Acc) ->
% [C | Acc];
% ({aux, _} = C, Acc) ->
% [C | Acc];
% (garbage_collection = C, Acc) ->
% [C | Acc];
% ({delete_snapshot, _} = C, Acc) ->
% [C | Acc];
% ({send_msg, _, _, _Opts} = C, Acc) ->
% %% send_msg effects _may_ have the local option
% %% and will be evaluated properly during
% %% effect processing
% [C | Acc];
% ({log, _, _, _Opts} = C, Acc) ->
% [C | Acc];
% ({log_ext, _, _, _Opts} = C, Acc) ->
% [C | Acc];
% ({reply, _, _, leader}, Acc) ->
% Acc;
% ({reply, _, _, _} = C, Acc) ->
% %% If the reply-from is not `leader', the follower
% %% might be the replier.
% [C | Acc];
% ({monitor, _ProcOrNode, Comp, _} = C, Acc)
% when Comp =/= machine ->
% %% only machine monitors should not be emitted
% %% by followers
% [C | Acc];
% (L, Acc) when is_list(L) ->
% %% nested case - recurse
% case filter_follower_effects(L) of
% [] -> Acc;
% Filtered ->
% [Filtered | Acc]
% end;
% (_, Acc) ->
% Acc
% end, [], Effects).


make_pipelined_rpc_effects(#{cfg := #cfg{id = Id,
Expand Down Expand Up @@ -2603,11 +2609,11 @@ make_notify_effects(Nots, Prior) when map_size(Nots) > 0 ->
make_notify_effects(_Nots, Prior) ->
Prior.

append_app_effects([], Effs) ->
append_machine_effects([], Effs) ->
Effs;
append_app_effects([AppEff], Effs) ->
append_machine_effects([AppEff], Effs) ->
[AppEff | Effs];
append_app_effects(AppEffs, Effs) ->
append_machine_effects(AppEffs, Effs) ->
[AppEffs | Effs].

cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}},
Expand Down Expand Up @@ -2638,9 +2644,9 @@ apply_with({Idx, Term, {'$usr', CmdMeta, Cmd, ReplyMode}},
Meta = augment_command_meta(Idx, Term, MacVer, ReplyMode, CmdMeta),
Ts = maps:get(ts, CmdMeta, LastTs),
case ra_machine:apply(Module, Meta, Cmd, MacSt) of
{NextMacSt, Reply, AppEffs} ->
{NextMacSt, Reply, MacEffs} ->
{Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyMode,
append_app_effects(AppEffs, Effects0),
append_machine_effects(MacEffs, Effects0),
Notifys0),
{Module, Idx, State, NextMacSt,
Effects, Notifys, Ts};
Expand Down
57 changes: 40 additions & 17 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,7 @@ handle_effects(RaftState, Effects0, EvtType, State0, Actions0) ->
end, {State0, Actions0}, Effects0),
{State, lists:reverse(Actions)}.

handle_effect(_, {send_rpc, To, Rpc}, _,
handle_effect(leader, {send_rpc, To, Rpc}, _,
#state{conf = Conf} = State0, Actions) ->
% fully qualified use only so that we can mock it for testing
% TODO: review / refactor to remove the mod call here
Expand Down Expand Up @@ -1347,7 +1347,7 @@ handle_effect(_, {next_event, Evt}, EvtType, State, Actions) ->
{State, [{next_event, EvtType, Evt} | Actions]};
handle_effect(_, {next_event, _, _} = Next, _, State, Actions) ->
{State, [Next | Actions]};
handle_effect(_, {send_msg, To, Msg}, _, State, Actions) ->
handle_effect(leader, {send_msg, To, Msg}, _, State, Actions) ->
%% default is to send without any wrapping
%% TODO: handle send failure? how?
_ = send(To, Msg, State#state.conf),
Expand Down Expand Up @@ -1380,10 +1380,16 @@ handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType,
false ->
{State, Actions}
end;
handle_effect(_RaftState, {append, Cmd}, _EvtType, State, Actions) ->
handle_effect(leader, {append, Cmd}, _EvtType, State, Actions) ->
Evt = {command, normal, {'$usr', Cmd, noreply}},
{State, [{next_event, cast, Evt} | Actions]};
handle_effect(_RaftState, {append, Cmd, ReplyMode}, _EvtType, State, Actions) ->
handle_effect(leader, {append, Cmd, ReplyMode}, _EvtType, State, Actions) ->
Evt = {command, normal, {'$usr', Cmd, ReplyMode}},
{State, [{next_event, cast, Evt} | Actions]};
handle_effect(_RaftState, {try_append, Cmd, ReplyMode}, _EvtType, State, Actions) ->
%% this is a special mode to retain the backwards compatibility of
%% certain prior uses of {append, when it wasn't (accidentally)
%% limited to the leader
Evt = {command, normal, {'$usr', Cmd, ReplyMode}},
{State, [{next_event, cast, Evt} | Actions]};
handle_effect(RaftState, {log, Idxs, Fun}, EvtType,
Expand Down Expand Up @@ -1420,19 +1426,21 @@ handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) ->
handle_effects(RaftState, Effects, EventType,
State0#state{server_state = ServerState}),
{State, Actions0 ++ Actions};
handle_effect(_, {notify, Nots}, _, #state{} = State0, Actions) ->
handle_effect(leader, {notify, Nots}, _, #state{} = State0, Actions) ->
%% should only be done by leader
State = send_applied_notifications(State0, Nots),
{State, Actions};
handle_effect(_, {cast, To, Msg}, _, State, Actions) ->
handle_effect(_AnyState, {cast, To, Msg}, _, State, Actions) ->
%% TODO: handle send failure
_ = gen_cast(To, Msg, State),
{State, Actions};
handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _,
State, Actions) ->
case Replier of
leader ->
leader when RaftState == leader ->
ok = gen_statem:reply(From, Reply);
leader ->
ok;
local ->
case can_execute_locally(RaftState, node(Pid), State) of
true ->
Expand All @@ -1451,15 +1459,17 @@ handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _,
ok
end,
{State, Actions};
handle_effect(_, {reply, From, Reply}, _, State, Actions) ->
% reply directly
handle_effect(leader, {reply, From, Reply}, _, State, Actions) ->
% reply directly, this is only done from the leader
% this is like reply/4 above with the Replier=leader
ok = gen_statem:reply(From, Reply),
{State, Actions};
handle_effect(_, {reply, Reply}, {call, From}, State, Actions) ->
% reply directly
handle_effect(_RaftState, {reply, Reply}, {call, From}, State, Actions) ->
% this is the reply effect for replying to the current call, any state
% can use this
ok = gen_statem:reply(From, Reply),
{State, Actions};
handle_effect(_, {reply, _Reply}, _EvtType, State, Actions) ->
handle_effect(_RaftState, {reply, _From, _Reply}, _EvtType, State, Actions) ->
{State, Actions};
handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _,
#state{server_state = SS0,
Expand Down Expand Up @@ -1548,10 +1558,21 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
true = erlang:garbage_collect(),
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
{State, Actions};
handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _,
handle_effect(leader, {monitor, _ProcOrNode, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
%% this effect type is only emitted by state machines and thus will
%% only be monitored from the leader
{State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)},
Actions0};
handle_effect(leader, {monitor, _ProcOrNode, machine, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
{State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)},
Actions0};
handle_effect(_RaftState, {monitor, _ProcOrNode, machine, _PidOrNode}, _,
#state{} = State, Actions0) ->
%% AFAIK: there is nothing emitting this effect type but we have to
%% guard against it being actioned on the follower anyway
{State, Actions0};
handle_effect(_, {monitor, _ProcOrNode, Component, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
{State#state{monitors = ra_monitors:add(PidOrNode, Component, Monitors)},
Expand All @@ -1564,9 +1585,9 @@ handle_effect(_, {demonitor, _ProcOrNode, Component, PidOrNode}, _,
#state{monitors = Monitors0} = State, Actions) ->
Monitors = ra_monitors:remove(PidOrNode, Component, Monitors0),
{State#state{monitors = Monitors}, Actions};
handle_effect(_, {timer, Name, T}, _, State, Actions) ->
handle_effect(leader, {timer, Name, T}, _, State, Actions) ->
{State, [{{timeout, Name}, T, machine_timeout} | Actions]};
handle_effect(_, {mod_call, Mod, Fun, Args}, _,
handle_effect(leader, {mod_call, Mod, Fun, Args}, _,
State, Actions) ->
%% TODO: catch and log failures or rely on calling function never crashing
_ = erlang:apply(Mod, Fun, Args),
Expand All @@ -1583,6 +1604,8 @@ handle_effect(follower, {record_leader_msg, _LeaderId}, _, State0, Actions) ->
{State, [{state_timeout, infinity, undefined} | Actions]};
handle_effect(_, {record_leader_msg, _LeaderId}, _, State0, Actions) ->
%% non follower states don't need to reset state timeout after an effect
{State0, Actions};
handle_effect(_, _, _, State0, Actions) ->
{State0, Actions}.

send_rpcs(State0) ->
Expand Down Expand Up @@ -1954,9 +1977,9 @@ handle_tick_metrics(State) ->
can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
Membership = ra_server:get_membership(ServerState),
?DEBUG("~s membership ~s", [?FUNCTION_NAME, Membership]),
case RaftState of
follower when Membership == voter ->
_ when RaftState =/= leader andalso
Membership == voter ->
TargetNode == node();
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
Expand Down
25 changes: 13 additions & 12 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,16 @@ disconnected_node_catches_up(Config) ->

Self = self(),
SPid = erlang:spawn(DownNode,
fun () ->
erlang:register(snapshot_installed_proc, self()),
receive
{snapshot_installed, _Meta} = Evt ->
Self ! Evt,
ok
after 10000 ->
ok
end
end),
fun () ->
erlang:register(snapshot_installed_proc, self()),
receive
{snapshot_installed, _Meta} = Evt ->
Self ! Evt,
ok
after 10000 ->
ok
end
end),
await_condition(
fun () ->
ok == ra:restart_server(?SYS, DownServerId)
Expand All @@ -512,10 +512,11 @@ disconnected_node_catches_up(Config) ->

receive
{snapshot_installed, Meta} ->
ct:pal("snapshot installed receive ~p", [Meta]),
ct:pal("snapshot installed received ~p", [Meta]),
ok
after 10000 ->
erlang:exit(SPid, kill),
flush(),
ct:fail("snapshot_installed not received"),
ok
end,
Expand Down Expand Up @@ -1344,7 +1345,7 @@ snapshot_installed(#{machine_version := _,
undefined ->
[];
Pid ->
[{send_msg, Pid, {snapshot_installed, Meta}}]
[{send_msg, Pid, {snapshot_installed, Meta}, local}]
end.

node_setup(DataDir) ->
Expand Down
Loading

0 comments on commit 5053c42

Please sign in to comment.