diff --git a/src/ra_lib.erl b/src/ra_lib.erl index f24d0248..3fba3887 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -30,6 +30,7 @@ zpad_filename/3, zpad_filename_incr/1, zpad_extract_num/1, + zpad_upgrade/3, recursive_delete/1, make_uid/0, make_uid/1, @@ -146,29 +147,44 @@ zpad_hex(Num) -> lists:flatten(io_lib:format("~16.16.0B", [Num])). zpad_filename("", Ext, Num) -> - lists:flatten(io_lib:format("~8..0B.~ts", [Num, Ext])); + lists:flatten(io_lib:format("~16..0B.~ts", [Num, Ext])); zpad_filename(Prefix, Ext, Num) -> - lists:flatten(io_lib:format("~ts_~8..0B.~ts", [Prefix, Num, Ext])). + lists:flatten(io_lib:format("~ts_~16..0B.~ts", [Prefix, Num, Ext])). zpad_filename_incr(Fn) -> Base = filename:basename(Fn), Dir = filename:dirname(Fn), - case re:run(Base, "(.*)([0-9]{8})(.*)", + case re:run(Base, "(.*)([0-9]{16})(.*)", [{capture, all_but_first, list}]) of {match, [Prefix, NumStr, Ext]} -> Num = list_to_integer(NumStr), - filename:join(Dir, - lists:flatten( - io_lib:format("~ts~8..0B~ts", [Prefix, Num+1, Ext]))); + NewFn = lists:flatten(io_lib:format("~ts~16..0B~ts", + [Prefix, Num + 1, Ext])), + filename:join(Dir, NewFn); _ -> undefined end. zpad_extract_num(Fn) -> - {match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{8})(.*)", + {match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{16})(.*)", [{capture, all_but_first, list}]), list_to_integer(NumStr). +zpad_upgrade(Dir, File, Ext) -> + B = filename:basename(File, Ext), + case length(B) of + 8 -> + %% old format, convert and rename + F = "00000000" ++ B ++ Ext, + New = filename:join(Dir, F), + Old = filename:join(Dir, File), + ok = file:rename(Old, New), + F; + 16 -> + File + end. + + recursive_delete(Dir) -> case is_dir(Dir) of true -> @@ -427,7 +443,7 @@ lists_shuffle(List0) -> is_dir(Dir) -> case prim_file:read_file_info(Dir) of - {ok, #file_info{type=directory}} -> + {ok, #file_info{type = directory}} -> true; _ -> false @@ -435,8 +451,6 @@ is_dir(Dir) -> is_file(File) -> case prim_file:read_file_info(File) of - {ok, #file_info{type = directory}} -> - true; {ok, #file_info{type = regular}} -> true; _ -> @@ -516,17 +530,17 @@ make_uid_test() -> ok. zpad_filename_incr_test() -> - Fn = "/lib/blah/prefix_00000001.segment", - Ex = "/lib/blah/prefix_00000002.segment", + Fn = "/lib/blah/prefix_0000000000000001.segment", + Ex = "/lib/blah/prefix_0000000000000002.segment", Ex = zpad_filename_incr(Fn), - undefined = zpad_filename_incr("0000001"), + undefined = zpad_filename_incr("000000000000001"), ok. zpad_filename_incr_utf8_test() -> - Fn = "/lib/🐰/prefix/00000001.segment", - Ex = "/lib/🐰/prefix/00000002.segment", + Fn = "/lib/🐰/prefix/0000000000000001.segment", + Ex = "/lib/🐰/prefix/0000000000000002.segment", Ex = zpad_filename_incr(Fn), - undefined = zpad_filename_incr("0000001"), + undefined = zpad_filename_incr("000000000000001"), ok. derive_safe_string_test() -> diff --git a/src/ra_log_segment_writer.erl b/src/ra_log_segment_writer.erl index fe5c4079..5f789895 100644 --- a/src/ra_log_segment_writer.erl +++ b/src/ra_log_segment_writer.erl @@ -105,7 +105,9 @@ await(SegWriter) -> ok end. -%%%=================================================================== +-define(UPGRADE_MARKER, "segment_name_upgrade_marker"). + +%%%================================================================== %%% gen_server callbacks %%%=================================================================== @@ -115,6 +117,7 @@ init([#{data_dir := DataDir, process_flag(trap_exit, true), CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS), SegmentConf = maps:get(segment_conf, Conf, #{}), + maybe_upgrade_segment_file_names(System, DataDir), {ok, #state{system = System, data_dir = DataDir, counter = CRef, @@ -460,8 +463,8 @@ find_segment_files(Dir) -> segment_files(Dir) -> case prim_file:list_dir(Dir) of {ok, Files0} -> - Files = [filename:join(Dir, F) || F <- Files0, - filename:extension(F) == ".segment"], + Files = [filename:join(Dir, F) + || F <- Files0, filename:extension(F) =:= ".segment"], lists:sort(Files); {error, enoent} -> [] @@ -529,3 +532,26 @@ maybe_wait_for_segment_writer(SegWriter, TimeRemaining) maybe_wait_for_segment_writer(_SegWriter, _TimeRemaining) -> ok. +maybe_upgrade_segment_file_names(System, DataDir) -> + Marker = filename:join(DataDir, ?UPGRADE_MARKER), + case ra_lib:is_file(Marker) of + false -> + ?INFO("segment_writer: upgrading segment file names to " + "new format in dirctory ~ts", + [DataDir]), + [begin + Dir = filename:join(DataDir, UId), + case prim_file:list_dir(Dir) of + {ok, Files} -> + [ra_lib:zpad_upgrade(Dir, F, ".segment") + || F <- Files, filename:extension(F) =:= ".segment"]; + {error, enoent} -> + ok + end + end || {_, UId} <- ra_directory:list_registered(System)], + + ok = ra_lib:write_file(Marker, <<>>); + true -> + ok + end. + diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index d7ecceb2..87aa87e6 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -370,9 +370,12 @@ recover_wal(Dir, #conf{segment_writer = SegWriter, ok = ra_log_segment_writer:await(SegWriter), post_boot end, - {ok, Files} = file:list_dir(Dir), - WalFiles = lists:sort([F || F <- Files, - filename:extension(F) == ".wal"]), + {ok, Files0} = file:list_dir(Dir), + Files = [begin + ra_lib:zpad_upgrade(Dir, File, ".wal") + end || File <- Files0, + filename:extension(File) == ".wal"], + WalFiles = lists:sort(Files), AllWriters = [begin ?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]), diff --git a/test/ra_dbg_SUITE.erl b/test/ra_dbg_SUITE.erl index 0f2745be..38da1482 100644 --- a/test/ra_dbg_SUITE.erl +++ b/test/ra_dbg_SUITE.erl @@ -95,7 +95,7 @@ execute_state_machine() -> wal_file() -> {ok, RaDataDir} = application:get_env(ra, data_dir), - filename:join([RaDataDir, node(), "00000001.wal"]). + filename:join([RaDataDir, node(), "0000000000000001.wal"]). report(Pid, Count) -> receive diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 1d76e53a..002089a9 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -38,6 +38,7 @@ all_tests() -> truncate_segments_with_pending_update, truncate_segments_with_pending_overwrite, my_segments, + upgrade_segment_name_format, skip_entries_lower_than_snapshot_index, skip_all_entries_lower_than_snapshot_index ]. @@ -61,7 +62,6 @@ init_per_testcase(TestCase, Config) -> SysCfg = ra_system:default_config(), ra_system:store(SysCfg), _ = ra_log_ets:start_link(SysCfg), - % ra_directory:init(default), ra_counters:init(), UId = atom_to_binary(TestCase, utf8), ok = ra_directory:register_name(default, UId, self(), undefined, @@ -314,23 +314,23 @@ accept_mem_tables_for_down_server(Config) -> ets:new(ra_log_closed_mem_tables, [named_table, bag, public]), Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), - FakeUId = <<"down-uid">>, + DownUId = <<"down-uid">>, %% only insert into dets so that the server is shown as registered %% but not running ok = dets:insert(maps:get(directory_rev, get_names(default)), - {down_uid, FakeUId}), - ok = ra_lib:make_dir(filename:join(Dir, FakeUId)), + {down_uid, DownUId}), + ok = ra_lib:make_dir(filename:join(Dir, DownUId)), application:start(sasl), {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default, name => ?SEGWR, data_dir => Dir}), % fake up a mem segment for Self Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}], - Mt = make_mem_table(FakeUId, Entries), + Mt = make_mem_table(DownUId, Entries), Mt2 = make_mem_table(UId, Entries), Tid = ra_mt:tid(Mt), Tid2 = ra_mt:tid(Mt2), - Ranges = #{FakeUId => [{Tid, {1, 3}}], + Ranges = #{DownUId => [{Tid, {1, 3}}], UId => [{Tid2, {1, 3}}]}, WalFile = filename:join(Dir, "00001.wal"), ok = file:write_file(WalFile, <<"waldata">>), @@ -348,10 +348,11 @@ accept_mem_tables_for_down_server(Config) -> end, %% validate fake uid entries were written ra_log_segment_writer:await(?SEGWR), - FakeSegmentFile = filename:join([?config(wal_dir, Config), - FakeUId, - "00000001.segment"]), - {ok, FakeSeg} = ra_log_segment:open(FakeSegmentFile, #{mode => read}), + DownFn = ra_lib:zpad_filename("", "segment", 1), + ct:pal("DownFn ~s", [DownFn]), + DownSegmentFile = filename:join([?config(wal_dir, Config), + DownUId, DownFn]), + {ok, FakeSeg} = ra_log_segment:open(DownSegmentFile, #{mode => read}), % assert Entries have been fully transferred Entries = [{I, T, binary_to_term(B)} || {I, T, B} <- read_sparse(FakeSeg, [1, 2, 3])], @@ -701,6 +702,48 @@ my_segments(Config) -> proc_lib:stop(TblWriterPid), ok. +upgrade_segment_name_format(Config) -> + Dir = ?config(wal_dir, Config), + {ok, TblWriterPid} = ra_log_segment_writer:start_link(#{name => ?SEGWR, + system => default, + data_dir => Dir}), + UId = ?config(uid, Config), + % fake up a mem segment for Self + Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}], + Mt = make_mem_table(UId, Entries), + Ranges = #{UId => [{ra_mt:tid(Mt), ra_mt:range(Mt)}]}, + TidRanges = maps:get(UId, Ranges), + WalFile = make_wal(Config, "00001.wal"), + ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile), + File = + receive + {ra_log_event, {segments, TidRanges, [{{1, 3}, _Fn}]}} -> + [MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId), + MyFile + after 2000 -> + flush(), + exit(ra_log_event_timeout) + end, + + %% stop segment writer and rename existing segment to old format + proc_lib:stop(TblWriterPid), + Root = filename:dirname(File), + Base = filename:basename(File), + {_, FileOld} = lists:split(8, Base), + ok = file:rename(File, filename:join(Root, FileOld)), + %% also remove upgrade marker file + ok = file:delete(filename:join(Dir, "segment_name_upgrade_marker")), + %% restart segment writer which should trigger upgrade process + {ok, Pid2} = ra_log_segment_writer:start_link(#{name => ?SEGWR, + system => default, + data_dir => Dir}), + %% validate the renamed segment has been renamed back to the new + %% 16 character format + [File] = ra_log_segment_writer:my_segments(?SEGWR, UId), + + proc_lib:stop(Pid2), + ok. + skip_entries_lower_than_snapshot_index(Config) -> Dir = ?config(wal_dir, Config), UId = ?config(uid, Config), diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 7224a636..a538a52a 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -26,6 +26,7 @@ all() -> all_tests() -> [ basic_log_writes, + wal_filename_upgrade, same_uid_different_process, consecutive_terms_in_batch_should_result_in_two_written_events, overwrite_in_same_batch, @@ -143,7 +144,7 @@ basic_log_writes(Config) -> ra_log_wal:force_roll_over(Pid), receive {'$gen_cast', - {mem_tables, #{UId := [{Tid, {12, 13}}]}, "00000001.wal"}} -> + {mem_tables, #{UId := [{Tid, {12, 13}}]}, "0000000000000001.wal"}} -> ok after 5000 -> flush(), @@ -153,6 +154,39 @@ basic_log_writes(Config) -> meck:unload(), ok. +wal_filename_upgrade(Config) -> + meck:new(ra_log_segment_writer, [passthrough]), + meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), + Conf = ?config(wal_conf, Config), + #{dir := Dir} = Conf, + {UId, _} = WriterId = ?config(writer_id, Config), + Tid = ets:new(?FUNCTION_NAME, []), + {ok, Pid} = ra_log_wal:start_link(Conf#{segment_writer => self()}), + {ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"), + ok = await_written(WriterId, 1, {12, 12}), + {ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 13, 1, "value2"), + ok = await_written(WriterId, 1, {13, 13}), + proc_lib:stop(Pid), + %% rename file to old 8 character format + Fn = filename:join(Dir, "0000000000000001.wal"), + FnOld = filename:join(Dir, "00000001.wal"), + ok = file:rename(Fn, FnOld), + % debugger:start(), + % int:i(ra_log_wal), + % int:break(ra_log_wal, 373), + {ok, Pid2} = ra_log_wal:start_link(Conf#{segment_writer => self()}), + receive + {'$gen_cast', + {mem_tables, #{UId := [{_Tid, {12, 13}}]}, "0000000000000001.wal"}} -> + ok + after 5000 -> + flush(), + ct:fail("receiving mem tables timed out") + end, + proc_lib:stop(Pid2), + meck:unload(), + ok. + same_uid_different_process(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),