From fe66262b41ee52d918dbc75e53555143dbe2c5eb Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 3 Jan 2025 14:40:37 +0000 Subject: [PATCH 1/2] Extend WAL and segment file names from 8 to 16 characters In order to avoid running out of characters after a few years of consistent usage. The renames happens when the WAL and segment writer initialises respectively. The segment writer writes a marker after a successful upgrade in order to avoid listing files in all server directories every time. The WAL always checks the current set of wal files which should be ok given there is rarely more than one. --- src/ra_lib.erl | 46 +++++++++++++------- src/ra_log_segment_writer.erl | 32 ++++++++++++-- src/ra_log_wal.erl | 9 ++-- test/ra_dbg_SUITE.erl | 2 +- test/ra_log_segment_writer_SUITE.erl | 63 +++++++++++++++++++++++----- test/ra_log_wal_SUITE.erl | 36 +++++++++++++++- 6 files changed, 154 insertions(+), 34 deletions(-) diff --git a/src/ra_lib.erl b/src/ra_lib.erl index f24d0248..3fba3887 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -30,6 +30,7 @@ zpad_filename/3, zpad_filename_incr/1, zpad_extract_num/1, + zpad_upgrade/3, recursive_delete/1, make_uid/0, make_uid/1, @@ -146,29 +147,44 @@ zpad_hex(Num) -> lists:flatten(io_lib:format("~16.16.0B", [Num])). zpad_filename("", Ext, Num) -> - lists:flatten(io_lib:format("~8..0B.~ts", [Num, Ext])); + lists:flatten(io_lib:format("~16..0B.~ts", [Num, Ext])); zpad_filename(Prefix, Ext, Num) -> - lists:flatten(io_lib:format("~ts_~8..0B.~ts", [Prefix, Num, Ext])). + lists:flatten(io_lib:format("~ts_~16..0B.~ts", [Prefix, Num, Ext])). zpad_filename_incr(Fn) -> Base = filename:basename(Fn), Dir = filename:dirname(Fn), - case re:run(Base, "(.*)([0-9]{8})(.*)", + case re:run(Base, "(.*)([0-9]{16})(.*)", [{capture, all_but_first, list}]) of {match, [Prefix, NumStr, Ext]} -> Num = list_to_integer(NumStr), - filename:join(Dir, - lists:flatten( - io_lib:format("~ts~8..0B~ts", [Prefix, Num+1, Ext]))); + NewFn = lists:flatten(io_lib:format("~ts~16..0B~ts", + [Prefix, Num + 1, Ext])), + filename:join(Dir, NewFn); _ -> undefined end. zpad_extract_num(Fn) -> - {match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{8})(.*)", + {match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{16})(.*)", [{capture, all_but_first, list}]), list_to_integer(NumStr). +zpad_upgrade(Dir, File, Ext) -> + B = filename:basename(File, Ext), + case length(B) of + 8 -> + %% old format, convert and rename + F = "00000000" ++ B ++ Ext, + New = filename:join(Dir, F), + Old = filename:join(Dir, File), + ok = file:rename(Old, New), + F; + 16 -> + File + end. + + recursive_delete(Dir) -> case is_dir(Dir) of true -> @@ -427,7 +443,7 @@ lists_shuffle(List0) -> is_dir(Dir) -> case prim_file:read_file_info(Dir) of - {ok, #file_info{type=directory}} -> + {ok, #file_info{type = directory}} -> true; _ -> false @@ -435,8 +451,6 @@ is_dir(Dir) -> is_file(File) -> case prim_file:read_file_info(File) of - {ok, #file_info{type = directory}} -> - true; {ok, #file_info{type = regular}} -> true; _ -> @@ -516,17 +530,17 @@ make_uid_test() -> ok. zpad_filename_incr_test() -> - Fn = "/lib/blah/prefix_00000001.segment", - Ex = "/lib/blah/prefix_00000002.segment", + Fn = "/lib/blah/prefix_0000000000000001.segment", + Ex = "/lib/blah/prefix_0000000000000002.segment", Ex = zpad_filename_incr(Fn), - undefined = zpad_filename_incr("0000001"), + undefined = zpad_filename_incr("000000000000001"), ok. zpad_filename_incr_utf8_test() -> - Fn = "/lib/🐰/prefix/00000001.segment", - Ex = "/lib/🐰/prefix/00000002.segment", + Fn = "/lib/🐰/prefix/0000000000000001.segment", + Ex = "/lib/🐰/prefix/0000000000000002.segment", Ex = zpad_filename_incr(Fn), - undefined = zpad_filename_incr("0000001"), + undefined = zpad_filename_incr("000000000000001"), ok. derive_safe_string_test() -> diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index fe5c4079..5f789895 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -105,7 +105,9 @@ await(SegWriter) -> ok end. -%%%=================================================================== +-define(UPGRADE_MARKER, "segment_name_upgrade_marker"). + +%%%================================================================== %%% gen_server callbacks %%%=================================================================== @@ -115,6 +117,7 @@ init([#{data_dir := DataDir, process_flag(trap_exit, true), CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS), SegmentConf = maps:get(segment_conf, Conf, #{}), + maybe_upgrade_segment_file_names(System, DataDir), {ok, #state{system = System, data_dir = DataDir, counter = CRef, @@ -460,8 +463,8 @@ find_segment_files(Dir) -> segment_files(Dir) -> case prim_file:list_dir(Dir) of {ok, Files0} -> - Files = [filename:join(Dir, F) || F <- Files0, - filename:extension(F) == ".segment"], + Files = [filename:join(Dir, F) + || F <- Files0, filename:extension(F) =:= ".segment"], lists:sort(Files); {error, enoent} -> [] @@ -529,3 +532,26 @@ maybe_wait_for_segment_writer(SegWriter, TimeRemaining) maybe_wait_for_segment_writer(_SegWriter, _TimeRemaining) -> ok. +maybe_upgrade_segment_file_names(System, DataDir) -> + Marker = filename:join(DataDir, ?UPGRADE_MARKER), + case ra_lib:is_file(Marker) of + false -> + ?INFO("segment_writer: upgrading segment file names to " + "new format in dirctory ~ts", + [DataDir]), + [begin + Dir = filename:join(DataDir, UId), + case prim_file:list_dir(Dir) of + {ok, Files} -> + [ra_lib:zpad_upgrade(Dir, F, ".segment") + || F <- Files, filename:extension(F) =:= ".segment"]; + {error, enoent} -> + ok + end + end || {_, UId} <- ra_directory:list_registered(System)], + + ok = ra_lib:write_file(Marker, <<>>); + true -> + ok + end. + diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index d7ecceb2..87aa87e6 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -370,9 +370,12 @@ recover_wal(Dir, #conf{segment_writer = SegWriter, ok = ra_log_segment_writer:await(SegWriter), post_boot end, - {ok, Files} = file:list_dir(Dir), - WalFiles = lists:sort([F || F <- Files, - filename:extension(F) == ".wal"]), + {ok, Files0} = file:list_dir(Dir), + Files = [begin + ra_lib:zpad_upgrade(Dir, File, ".wal") + end || File <- Files0, + filename:extension(File) == ".wal"], + WalFiles = lists:sort(Files), AllWriters = [begin ?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]), diff --git a/test/ra_dbg_SUITE.erl b/test/ra_dbg_SUITE.erl index 0f2745be..38da1482 100644 --- a/test/ra_dbg_SUITE.erl +++ b/test/ra_dbg_SUITE.erl @@ -95,7 +95,7 @@ execute_state_machine() -> wal_file() -> {ok, RaDataDir} = application:get_env(ra, data_dir), - filename:join([RaDataDir, node(), "00000001.wal"]). + filename:join([RaDataDir, node(), "0000000000000001.wal"]). report(Pid, Count) -> receive diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 1d76e53a..07c0223b 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -38,6 +38,7 @@ all_tests() -> truncate_segments_with_pending_update, truncate_segments_with_pending_overwrite, my_segments, + upgrade_segment_name_format, skip_entries_lower_than_snapshot_index, skip_all_entries_lower_than_snapshot_index ]. @@ -61,7 +62,6 @@ init_per_testcase(TestCase, Config) -> SysCfg = ra_system:default_config(), ra_system:store(SysCfg), _ = ra_log_ets:start_link(SysCfg), - % ra_directory:init(default), ra_counters:init(), UId = atom_to_binary(TestCase, utf8), ok = ra_directory:register_name(default, UId, self(), undefined, @@ -314,23 +314,23 @@ accept_mem_tables_for_down_server(Config) -> ets:new(ra_log_closed_mem_tables, [named_table, bag, public]), Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), - FakeUId = <<"down-uid">>, + DownUId = <<"down-uid">>, %% only insert into dets so that the server is shown as registered %% but not running ok = dets:insert(maps:get(directory_rev, get_names(default)), - {down_uid, FakeUId}), - ok = ra_lib:make_dir(filename:join(Dir, FakeUId)), + {down_uid, DownUId}), + ok = ra_lib:make_dir(filename:join(Dir, DownUId)), application:start(sasl), {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, name => ?SEGWR, data_dir => Dir}), % fake up a mem segment for Self Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}], - Mt = make_mem_table(FakeUId, Entries), + Mt = make_mem_table(DownUId, Entries), Mt2 = make_mem_table(UId, Entries), Tid = ra_mt:tid(Mt), Tid2 = ra_mt:tid(Mt2), - Ranges = #{FakeUId => [{Tid, {1, 3}}], + Ranges = #{DownUId => [{Tid, {1, 3}}], UId => [{Tid2, {1, 3}}]}, WalFile = filename:join(Dir, "00001.wal"), ok = file:write_file(WalFile, <<"waldata">>), @@ -348,10 +348,11 @@ accept_mem_tables_for_down_server(Config) -> end, %% validate fake uid entries were written ra_log_segment_writer:await(?SEGWR), - FakeSegmentFile = filename:join([?config(wal_dir, Config), - FakeUId, - "00000001.segment"]), - {ok, FakeSeg} = ra_log_segment:open(FakeSegmentFile, #{mode => read}), + DownFn = ra_lib:zpad_filename("", "segment", 1), + ct:pal("FakeFn ~s", [DownFn]), + DownSegmentFile = filename:join([?config(wal_dir, Config), + DownUId, DownFn]), + {ok, FakeSeg} = ra_log_segment:open(DownSegmentFile, #{mode => read}), % assert Entries have been fully transferred Entries = [{I, T, binary_to_term(B)} || {I, T, B} <- read_sparse(FakeSeg, [1, 2, 3])], @@ -701,6 +702,48 @@ my_segments(Config) -> proc_lib:stop(TblWriterPid), ok. +upgrade_segment_name_format(Config) -> + Dir = ?config(wal_dir, Config), + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{name => ?SEGWR, + system => default, + data_dir => Dir}), + UId = ?config(uid, Config), + % fake up a mem segment for Self + Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}], + Mt = make_mem_table(UId, Entries), + Ranges = #{UId => [{ra_mt:tid(Mt), ra_mt:range(Mt)}]}, + TidRanges = maps:get(UId, Ranges), + WalFile = make_wal(Config, "00001.wal"), + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), + File = + receive + {ra_log_event, {segments, TidRanges, [{{1, 3}, _Fn}]}} -> + [MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId), + MyFile + after 2000 -> + flush(), + exit(ra_log_event_timeout) + end, + + %% stop segment writer and rename existing segment to old format + proc_lib:stop(TblWriterPid), + Root = filename:dirname(File), + Base = filename:basename(File), + {_, FileOld} = lists:split(8, Base), + ok = file:rename(File, filename:join(Root, FileOld)), + %% also remove upgrade marker file + ok = file:delete(filename:join(Dir, "segment_name_upgrade_marker")), + %% restart segment writer which should trigger upgrade process + {ok, Pid2} = ra_log_segment_writer:start_link(#{name => ?SEGWR, + system => default, + data_dir => Dir}), + %% validate the renamed segment has been renamed back to the new + %% 16 character format + [File] = ra_log_segment_writer:my_segments(?SEGWR, UId), + + proc_lib:stop(Pid2), + ok. + skip_entries_lower_than_snapshot_index(Config) -> Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 7224a636..a538a52a 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -26,6 +26,7 @@ all() -> all_tests() -> [ basic_log_writes, + wal_filename_upgrade, same_uid_different_process, consecutive_terms_in_batch_should_result_in_two_written_events, overwrite_in_same_batch, @@ -143,7 +144,7 @@ basic_log_writes(Config) -> ra_log_wal:force_roll_over(Pid), receive {'$gen_cast', - {mem_tables, #{UId := [{Tid, {12, 13}}]}, "00000001.wal"}} -> + {mem_tables, #{UId := [{Tid, {12, 13}}]}, "0000000000000001.wal"}} -> ok after 5000 -> flush(), @@ -153,6 +154,39 @@ basic_log_writes(Config) -> meck:unload(), ok. +wal_filename_upgrade(Config) -> + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + Conf = ?config(wal_conf, Config), + #{dir := Dir} = Conf, + {UId, _} = WriterId = ?config(writer_id, Config), + Tid = ets:new(?FUNCTION_NAME, []), + {ok, Pid} = ra_log_wal:start_link(Conf#{segment_writer => self()}), + {ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"), + ok = await_written(WriterId, 1, {12, 12}), + {ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 13, 1, "value2"), + ok = await_written(WriterId, 1, {13, 13}), + proc_lib:stop(Pid), + %% rename file to old 8 character format + Fn = filename:join(Dir, "0000000000000001.wal"), + FnOld = filename:join(Dir, "00000001.wal"), + ok = file:rename(Fn, FnOld), + % debugger:start(), + % int:i(ra_log_wal), + % int:break(ra_log_wal, 373), + {ok, Pid2} = ra_log_wal:start_link(Conf#{segment_writer => self()}), + receive + {'$gen_cast', + {mem_tables, #{UId := [{_Tid, {12, 13}}]}, "0000000000000001.wal"}} -> + ok + after 5000 -> + flush(), + ct:fail("receiving mem tables timed out") + end, + proc_lib:stop(Pid2), + meck:unload(), + ok. + same_uid_different_process(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), From cf8f270a7b15e4f173abb1567071fbd5859a5e2c Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 3 Jan 2025 15:38:21 +0000 Subject: [PATCH 2/2] Update test/ra_log_segment_writer_SUITE.erl Co-authored-by: Michael Davis --- test/ra_log_segment_writer_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 07c0223b..002089a9 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -349,7 +349,7 @@ accept_mem_tables_for_down_server(Config) -> %% validate fake uid entries were written ra_log_segment_writer:await(?SEGWR), DownFn = ra_lib:zpad_filename("", "segment", 1), - ct:pal("FakeFn ~s", [DownFn]), + ct:pal("DownFn ~s", [DownFn]), DownSegmentFile = filename:join([?config(wal_dir, Config), DownUId, DownFn]), {ok, FakeSeg} = ra_log_segment:open(DownSegmentFile, #{mode => read}),