From ed27d3c261d8c679ef0001acc3382ba2bcdbd54f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 12 Dec 2024 11:19:44 +0000 Subject: [PATCH] Detect when a segment has been modified. When executing a read plan it is possible that the read plan refers to indexes not in the index of an segment that is still being written to. This commit handles that change. --- src/ra_log_reader.erl | 15 ++++-- src/ra_log_segment.erl | 74 +++++++++++++++++++--------- src/ra_server.erl | 2 +- test/ra_log_2_SUITE.erl | 16 ++++++ test/ra_log_segment_SUITE.erl | 49 +++++++++++++++--- test/ra_log_segment_writer_SUITE.erl | 8 +-- 6 files changed, 125 insertions(+), 39 deletions(-) diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index 164e5ef9..20dffb36 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -224,9 +224,16 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0) end, lists:foldl( fun ({Idxs, BaseName}, {Acc1, Open1}) -> - {Seg, Open} = get_segment_ext(Dir, Open1, BaseName), - {_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1), - {Acc, Open} + {Seg, Open2} = get_segment_ext(Dir, Open1, BaseName), + case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of + {ok, _, Acc} -> + {Acc, Open2}; + {error, modified} -> + {_, Open3} = ra_flru:evict(BaseName, Open2), + {SegNew, Open} = get_segment_ext(Dir, Open3, BaseName), + {ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1), + {Acc, Open} + end end, {Acc0, Open0}, Plan). -spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}. @@ -335,7 +342,7 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs, lists:foldl( fun ({Idxs, Fn}, {Open0, C, En0}) -> {Seg, Open} = get_segment(Cfg, Open0, Fn), - {ReadSparseCount, Entries} = + {ok, ReadSparseCount, Entries} = ra_log_segment:read_sparse(Seg, Idxs, fun (I, T, B, Acc) -> [{I, T, binary_to_term(B)} | Acc] diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index a76e8c9a..d71945a4 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -12,6 +12,7 @@ append/4, sync/1, fold/6, + is_modified/1, read_sparse/4, term_query/2, close/1, @@ -27,6 +28,8 @@ -include("ra.hrl"). +-include_lib("kernel/include/file.hrl"). + -define(VERSION, 2). -define(MAGIC, "RASG"). -define(HEADER_SIZE, 4 + (16 div 8) + (16 div 8)). @@ -112,6 +115,7 @@ open(Filename, Options) -> end. process_file(true, Mode, Filename, Fd, Options) -> + AccessPattern = maps:get(access_pattern, Options, random), case read_header(Fd) of {ok, Version, MaxCount} -> MaxPending = maps:get(max_pending, Options, ?SEGMENT_MAX_PENDING), @@ -120,7 +124,6 @@ process_file(true, Mode, Filename, Fd, Options) -> {NumIndexRecords, DataOffset, Range, Index} = recover_index(Fd, Version, MaxCount), IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize, - AccessPattern = maps:get(access_pattern, Options, random), Mode = maps:get(mode, Options, append), ComputeChecksums = maps:get(compute_checksums, Options, true), {ok, #state{cfg = #cfg{version = Version, @@ -184,16 +187,15 @@ append(#state{cfg = #cfg{max_pending = PendingCount}, append(#state{cfg = #cfg{version = Version, mode = append} = Cfg, index_offset = IndexOffset, - data_start = DataStart, data_offset = DataOffset, range = Range0, pending_count = PendCnt, pending_index = IdxPend0, pending_data = DataPend0} = State, Index, Term, {Length, Data}) -> - % check if file is full - case IndexOffset < DataStart of - true -> + + case is_full(State) of + false -> % TODO: check length is less than #FFFFFFFF ?? Checksum = compute_checksum(Cfg, Data), OSize = offset_size(Version), @@ -209,7 +211,7 @@ append(#state{cfg = #cfg{version = Version, pending_data = [DataPend0, Data], pending_count = PendCnt + 1} }; - false -> + true -> {error, full} end; append(State, Index, Term, Data) @@ -271,38 +273,58 @@ fold(#state{cfg = #cfg{mode = read} = Cfg, FromIdx, ToIdx, Fun, AccFun, Acc) -> fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc). +-spec is_modified(state()) -> boolean(). +is_modified(#state{cfg = #cfg{fd = Fd}, + data_offset = DataOffset} = State) -> + case is_full(State) of + true -> + %% a full segment cannot be appended to. + false; + false -> + %% get info and compare to data_offset + {ok, #file_info{size = Size}} = prim_file:read_handle_info(Fd), + Size > DataOffset + end. + -spec read_sparse(state(), [ra_index()], fun((ra:index(), ra_term(), binary(), Acc) -> Acc), Acc) -> - {NumRead :: non_neg_integer(), Acc} + {ok, NumRead :: non_neg_integer(), Acc} | {error, modified} when Acc :: term(). read_sparse(#state{index = Index, - cfg = Cfg}, Indexes, AccFun, Acc) -> - Cache0 = prepare_cache(Cfg, Indexes, Index), - read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0). + cfg = #cfg{fd = Fd}} = State, + Indexes, AccFun, Acc) -> + case is_modified(State) of + true -> + {error, modified}; + false -> + Cache0 = prepare_cache(Fd, Indexes, Index), + read_sparse0(Fd, Indexes, Index, Cache0, Acc, AccFun, 0) + end. -read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) -> - {Num, Acc}; -read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num) +read_sparse0(_Fd, [], _Index, _Cache, Acc, _AccFun, Num) -> + {ok, Num, Acc}; +read_sparse0(Fd, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num) when is_map_key(NextIdx, Index) -> - {Term, Offset, Length, _} = map_get(NextIdx, Index), - case cache_read(Cache0, Offset, Length) of + {Term, Pos, Length, _} = map_get(NextIdx, Index), + case cache_read(Cache0, Pos, Length) of false -> - case prepare_cache(Cfg, Indexes, Index) of + case prepare_cache(Fd, Indexes, Index) of undefined -> - {ok, Data, _} = pread(Cfg, undefined, Offset, Length), - read_sparse0(Cfg, Rem, Index, undefined, + %% TODO: check for partial data? + {ok, Data} = file:pread(Fd, Pos, Length), + read_sparse0(Fd, Rem, Index, undefined, AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1); Cache -> - read_sparse0(Cfg, Indexes, Index, Cache, - Acc, AccFun, Num+1) + read_sparse0(Fd, Indexes, Index, Cache, + Acc, AccFun, Num) end; Data -> - read_sparse0(Cfg, Rem, Index, Cache0, + read_sparse0(Fd, Rem, Index, Cache0, AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1) end; -read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) -> +read_sparse0(_Fd, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) -> exit({missing_key, NextIdx}). cache_read({CPos, CLen, Bin}, Pos, Length) @@ -313,9 +335,9 @@ cache_read({CPos, CLen, Bin}, Pos, Length) cache_read(_, _, _) -> false. -prepare_cache(#cfg{} = _Cfg, [_], _SegIndex) -> +prepare_cache(_Fd, [_], _SegIndex) -> undefined; -prepare_cache(#cfg{fd = Fd} = _Cfg, [FirstIdx | Rem], SegIndex) -> +prepare_cache(Fd, [FirstIdx | Rem], SegIndex) -> case consec_run(FirstIdx, FirstIdx, Rem) of {Idx, Idx} -> %% no run, no cache; @@ -622,6 +644,10 @@ validate_checksum(0, _) -> validate_checksum(Crc, Data) -> Crc == erlang:crc32(Data). +is_full(#state{index_offset = IndexOffset, + data_start = DataStart}) -> + IndexOffset >= DataStart. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/ra_server.erl b/src/ra_server.erl index 877dd55f..407baa85 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1728,7 +1728,7 @@ machine_query(QueryFun, #{cfg := #cfg{effective_machine_module = MacMod}, become(leader, OldRaftState, #{cluster := Cluster, cluster_change_permitted := CCP0, log := Log0} = State) -> - Log = ra_log:release_resources(maps:size(Cluster) + 2, random, Log0), + Log = ra_log:release_resources(maps:size(Cluster), sequential, Log0), CCP = case OldRaftState of await_condition -> CCP0; diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 2adcddfe..882668b3 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -56,6 +56,7 @@ all_tests() -> transient_writer_is_handled, read_opt, sparse_read, + read_plan_modified, read_plan, sparse_read_out_of_range, sparse_read_out_of_range_2, @@ -481,6 +482,21 @@ sparse_read(Config) -> {99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2), ok. +read_plan_modified(Config) -> + Log0 = ra_log_init(Config), + Log1 = write_and_roll(1, 2, 1, Log0, 50), + Log2 = deliver_all_log_events(Log1, 100), + Plan = ra_log:partial_read([1], Log2, fun (_, _, Cmd) -> Cmd end), + {#{1 := _}, Flru} = ra_log_read_plan:execute(Plan, undefined), + + Log = deliver_all_log_events(write_and_roll(2, 3, 1, Log2, 50), 100), + Plan2 = ra_log:partial_read([1,2], Log, fun (_, _, Cmd) -> Cmd end), + %% assert we can read the newly appended item with the cached + %% segment + {#{1 := _, 2 := _}, _} = ra_log_read_plan:execute(Plan2, Flru), + ra_log:close(Log), + ok. + read_plan(Config) -> Num = 256 * 2, Div = 2, diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index 9c9589ed..5e2cd9aa 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -32,6 +32,7 @@ all_tests() -> overwrite, term_query, write_many, + read_sparse_append_read, open_invalid, corrupted_segment, large_segment, @@ -81,6 +82,7 @@ corrupted_segment(Config) -> %% ct:pal("DUMP PRE ~p", [ra_log_segment:dump_index(Fn)]), %% check that the current state throws a missing key {ok, SegR0} = ra_log_segment:open(Fn, #{mode => read}), + ?assertNot(ra_log_segment:is_modified(SegR0)), ?assertExit({missing_key, 2}, read_sparse(SegR0, [1, 2])), @@ -210,11 +212,13 @@ segref(Config) -> full_file(Config) -> Dir = ?config(data_dir, Config), Fn = filename:join(Dir, "seg1.seg"), - Data = make_data(1024), - {ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2}), + Data = make_data(10), + {ok, Seg0} = ra_log_segment:open(Fn, #{max_count => 2, + max_pending => 1}), {ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, Data), {ok, Seg} = ra_log_segment:append(Seg1, 2, 2, Data), {error, full} = ra_log_segment:append(Seg, 3, 2, Data), + ?assertNot(ra_log_segment:is_modified(Seg)), {1,2} = ra_log_segment:range(Seg), ok = ra_log_segment:close(Seg), ok. @@ -396,6 +400,39 @@ write_many(Config) -> ct:pal("~p", [Result]), ok. + +read_sparse_append_read(Config) -> + Dir = ?config(data_dir, Config), + Fn = filename:join(Dir, <<"0000000.segment">>), + {ok, W0} = ra_log_segment:open(Fn, #{}), + Data = <<"banana">>, + Term = 1, + %% write two entries in term 1 + {ok, W1} = ra_log_segment:append(W0, 1, Term, Data), + {ok, W2} = ra_log_segment:append(W1, 2, Term, Data), + {ok, W3} = ra_log_segment:flush(W2), + + + {ok, R0} = ra_log_segment:open(Fn, #{mode => read}), + {ok, 2, [_, _]} = ra_log_segment:read_sparse(R0, [1, 2], + fun (I, _, _, Acc) -> + [I | Acc] + end, []), + + ?assertNot(ra_log_segment:is_modified(R0)), + %% overwrite in term 2 + {ok, W4} = ra_log_segment:append(W3, 2, 2, <<"apple">>), + {ok, W5} = ra_log_segment:append(W4, 3, 2, <<"apple">>), + {ok, W} = ra_log_segment:flush(W5), + ?assert(ra_log_segment:is_modified(R0)), + {error, modified} = ra_log_segment:read_sparse(R0, [2], + fun (_I, _, B, Acc) -> + [B | Acc] + end, []), + ra_log_segment:close(W), + ra_log_segment:close(R0), + ok. + write_until_full(Idx, Term, Data, Seg0) -> case ra_log_segment:append(Seg0, Idx, Term, Data) of {ok, Seg} -> @@ -410,8 +447,8 @@ make_data(Size) -> term_to_binary(crypto:strong_rand_bytes(Size)). read_sparse(R, Idxs) -> - {_, Entries} = ra_log_segment:read_sparse(R, Idxs, - fun (I, T, B, Acc) -> - [{I, T, B} | Acc] - end, []), + {ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs, + fun (I, T, B, Acc) -> + [{I, T, B} | Acc] + end, []), lists:reverse(Entries). diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index b19a4fe0..1d76e53a 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -800,10 +800,10 @@ segments_for(UId, DataDir) -> SegFiles. read_sparse(R, Idxs) -> - {_, Entries} = ra_log_segment:read_sparse(R, Idxs, - fun(I, T, B, Acc) -> - [{I, T, B} | Acc] - end, []), + {ok, _, Entries} = ra_log_segment:read_sparse(R, Idxs, + fun(I, T, B, Acc) -> + [{I, T, B} | Acc] + end, []), lists:reverse(Entries). get_names(System) when is_atom(System) ->