diff --git a/src/ra_log.erl b/src/ra_log.erl index ecc68707..983b5d64 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -21,7 +21,7 @@ fold/5, sparse_read/2, partial_read/3, - execute_read_plan/3, + execute_read_plan/4, read_plan_info/1, last_index_term/1, set_last_index/2, @@ -210,7 +210,7 @@ init(#{uid := UId, Curr -> Curr end, - AccessPattern = maps:get(initial_access_pattern, Conf, random), + AccessPattern = maps:get(initial_access_pattern, Conf, sequential), {ok, Mt0} = ra_log_ets:mem_table_please(Names, UId), % recover current range and any references to segments % this queries the segment writer and thus blocks until any @@ -558,12 +558,15 @@ partial_read(Indexes0, #?MODULE{cfg = Cfg, -spec execute_read_plan(read_plan(), undefined | ra_flru:state(), - TransformFun :: transform_fun()) -> + TransformFun :: transform_fun(), + ra_log_reader:read_plan_options()) -> {#{ra_index() => Command :: term()}, ra_flru:state()}. execute_read_plan(#read_plan{dir = Dir, read = Read, - plan = Plan}, Flru0, TransformFun) -> - ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read). + plan = Plan}, Flru0, TransformFun, + Options) -> + ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, + Options, Read). -spec read_plan_info(read_plan()) -> map(). read_plan_info(#read_plan{read = Read, diff --git a/src/ra_log_read_plan.erl b/src/ra_log_read_plan.erl index 16d69034..16a7148f 100644 --- a/src/ra_log_read_plan.erl +++ b/src/ra_log_read_plan.erl @@ -8,12 +8,22 @@ -export([execute/2, + execute/3, info/1]). -spec execute(ra_log:read_plan(), undefined | ra_flru:state()) -> {#{ra:index() => Command :: term()}, ra_flru:state()}. execute(Plan, Flru) -> - ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3). + execute(Plan, Flru, #{access_pattern => random, + file_advise => normal}). + +-spec execute(ra_log:read_plan(), undefined | ra_flru:state(), + ra_log_reader:read_plan_options()) -> + {#{ra:index() => Command :: term()}, ra_flru:state()}. +execute(Plan, Flru, Options) -> + ra_log:execute_read_plan(Plan, Flru, + fun ra_server:transform_for_partial_read/3, + Options). -spec info(ra_log:read_plan()) -> map(). info(Plan) -> diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index 20dffb36..daa60147 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -22,7 +22,7 @@ fold/5, sparse_read/3, read_plan/2, - exec_read_plan/5, + exec_read_plan/6, fetch_term/2 ]). @@ -47,11 +47,14 @@ -opaque state() :: #?STATE{}. -type read_plan() :: [{BaseName :: file:filename_all(), [ra:index()]}]. +-type read_plan_options() :: #{access_pattern => random | sequential, + file_advise => ra_log_segment:posix_file_advise()}. -export_type([ state/0, - read_plan/0 + read_plan/0, + read_plan_options/0 ]). %% PUBLIC @@ -209,14 +212,17 @@ read_plan(#?STATE{segment_refs = SegRefs}, Indexes) -> %% TODO: add counter for number of read plans requested segment_read_plan(SegRefs, Indexes, []). --spec exec_read_plan(file:filename_all(), read_plan(), undefined | ra_flru:state(), - TransformFun :: fun(), +-spec exec_read_plan(file:filename_all(), + read_plan(), + undefined | ra_flru:state(), + TransformFun :: fun((ra_index(), ra_term(), binary()) -> term()), + read_plan_options(), #{ra_index() => Command :: term()}) -> {#{ra_index() => Command :: term()}, ra_flru:state()}. -exec_read_plan(Dir, Plan, undefined, TransformFun, Acc0) -> +exec_read_plan(Dir, Plan, undefined, TransformFun, Options, Acc0) -> Open = ra_flru:new(1, fun({_, Seg}) -> ra_log_segment:close(Seg) end), - exec_read_plan(Dir, Plan, Open, TransformFun, Acc0); -exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0) + exec_read_plan(Dir, Plan, Open, TransformFun, Options, Acc0); +exec_read_plan(Dir, Plan, Open0, TransformFun, Options, Acc0) when is_list(Plan) -> Fun = fun (I, T, B, Acc) -> E = TransformFun(I, T, binary_to_term(B)), @@ -224,13 +230,13 @@ exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0) end, lists:foldl( fun ({Idxs, BaseName}, {Acc1, Open1}) -> - {Seg, Open2} = get_segment_ext(Dir, Open1, BaseName), + {Seg, Open2} = get_segment_ext(Dir, Open1, BaseName, Options), case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of {ok, _, Acc} -> {Acc, Open2}; {error, modified} -> {_, Open3} = ra_flru:evict(BaseName, Open2), - {SegNew, Open} = get_segment_ext(Dir, Open3, BaseName), + {SegNew, Open} = get_segment_ext(Dir, Open3, BaseName, Options), {ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1), {Acc, Open} end @@ -382,15 +388,14 @@ get_segment(#cfg{directory = Dir, end end. -get_segment_ext(Dir, Open0, Fn) -> +get_segment_ext(Dir, Open0, Fn, Options) -> case ra_flru:fetch(Fn, Open0) of {ok, S, Open1} -> {S, Open1}; error -> AbsFn = filename:join(Dir, Fn), case ra_log_segment:open(AbsFn, - #{mode => read, - access_pattern => random}) + Options#{mode => read}) of {ok, S} -> {S, ra_flru:insert(Fn, S, Open0)}; diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index d2a1988d..269da3fc 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -52,6 +52,7 @@ fd :: option(file:io_device()), index_size :: pos_integer(), access_pattern :: sequential | random, + file_advise = normal :: posix_file_advise(), mode = append :: read | append, compute_checksums = true :: boolean()}). @@ -70,14 +71,19 @@ cache :: undefined | {non_neg_integer(), non_neg_integer(), binary()} }). +-type posix_file_advise() :: 'normal' | 'sequential' | 'random' + | 'no_reuse' | 'will_need' | 'dont_need'. + -type ra_log_segment_options() :: #{max_count => non_neg_integer(), max_pending => non_neg_integer(), compute_checksums => boolean(), mode => append | read, - access_pattern => sequential | random}. + access_pattern => sequential | random, + file_advise => posix_file_advise()}. -opaque state() :: #state{}. -export_type([state/0, + posix_file_advise/0, ra_log_segment_options/0]). -spec open(Filename :: file:filename_all()) -> @@ -116,9 +122,9 @@ open(Filename, Options) -> process_file(true, Mode, Filename, Fd, Options) -> AccessPattern = maps:get(access_pattern, Options, random), - if AccessPattern == random andalso + FileAdvise = maps:get(file_advise, Options, normal), + if FileAdvise == random andalso Mode == read -> - %% this is a guess using the defaults Offs = maps:get(max_count, Options, ?SEGMENT_MAX_ENTRIES) * ?INDEX_RECORD_SIZE_V2, _ = file:advise(Fd, Offs, 0, random), ok; @@ -133,7 +139,6 @@ process_file(true, Mode, Filename, Fd, Options) -> {NumIndexRecords, DataOffset, Range, Index} = recover_index(Fd, Version, MaxCount), IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize, - Mode = maps:get(mode, Options, append), ComputeChecksums = maps:get(compute_checksums, Options, true), {ok, #state{cfg = #cfg{version = Version, max_count = MaxCount, @@ -142,6 +147,7 @@ process_file(true, Mode, Filename, Fd, Options) -> mode = Mode, index_size = IndexSize, access_pattern = AccessPattern, + file_advise = FileAdvise, compute_checksums = ComputeChecksums, fd = Fd}, data_start = ?HEADER_SIZE + IndexSize, @@ -165,6 +171,7 @@ process_file(false, Mode, Filename, Fd, Options) -> ComputeChecksums = maps:get(compute_checksums, Options, true), IndexSize = MaxCount * ?INDEX_RECORD_SIZE_V2, ok = write_header(MaxCount, Fd), + FileAdvise = maps:get(file_advise, Options, dont_need), {ok, #state{cfg = #cfg{version = ?VERSION, max_count = MaxCount, max_pending = MaxPending, @@ -173,6 +180,7 @@ process_file(false, Mode, Filename, Fd, Options) -> index_size = IndexSize, fd = Fd, compute_checksums = ComputeChecksums, + file_advise = FileAdvise, access_pattern = random}, index_write_offset = ?HEADER_SIZE, index_offset = ?HEADER_SIZE, @@ -431,13 +439,15 @@ is_same_as(#state{cfg = #cfg{filename = Fn0}}, Fn) -> is_same_filename_all(Fn0, Fn). -spec close(state()) -> ok. -close(#state{cfg = #cfg{fd = Fd, mode = append}} = State) -> +close(#state{cfg = #cfg{fd = Fd, + mode = append, + file_advise = FileAdvise}} = State) -> % close needs to be defensive and idempotent so we ignore the return % values here _ = sync(State), case is_full(State) of true -> - _ = file:advise(Fd, 0, 0, dont_need); + _ = file:advise(Fd, 0, 0, FileAdvise); false -> ok end, @@ -684,3 +694,4 @@ cache_length_test() -> -endif. + diff --git a/src/ra_server.erl b/src/ra_server.erl index 407baa85..8f2199d9 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -409,12 +409,10 @@ recover(#{cfg := #cfg{log_id = LogId, FromScan = CommitIndex + 1, {ToScan, _} = ra_log:last_index_term(Log0), ?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]), - {State, Log1} = ra_log:fold(FromScan, ToScan, - fun cluster_scan_fun/2, - State1, Log0), + {State, Log} = ra_log:fold(FromScan, ToScan, + fun cluster_scan_fun/2, + State1, Log0), - %% disable segment read cache by setting random access pattern - Log = ra_log:release_resources(1, random, Log1), put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0), State#{log => Log, %% reset commit latency as recovery may calculate a very old value