Skip to content

Commit

Permalink
Extend WAL and segment file names from 8 to 16 characters
Browse files Browse the repository at this point in the history
In order to avoid running out of characters after a few years of
consistent usage.

The renames happens when the WAL and segment writer initialises
respectively. The segment writer writes a marker after a successful
upgrade in order to avoid listing files in all server directories
every time. The WAL always checks the current set of wal files which
should be ok given there is rarely more than one.
  • Loading branch information
kjnilsson committed Jan 3, 2025
1 parent dce73f0 commit 6f73dfe
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 31 deletions.
46 changes: 30 additions & 16 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
zpad_filename/3,
zpad_filename_incr/1,
zpad_extract_num/1,
zpad_upgrade/3,
recursive_delete/1,
make_uid/0,
make_uid/1,
Expand Down Expand Up @@ -146,29 +147,44 @@ zpad_hex(Num) ->
lists:flatten(io_lib:format("~16.16.0B", [Num])).

zpad_filename("", Ext, Num) ->
lists:flatten(io_lib:format("~8..0B.~ts", [Num, Ext]));
lists:flatten(io_lib:format("~16..0B.~ts", [Num, Ext]));
zpad_filename(Prefix, Ext, Num) ->
lists:flatten(io_lib:format("~ts_~8..0B.~ts", [Prefix, Num, Ext])).
lists:flatten(io_lib:format("~ts_~16..0B.~ts", [Prefix, Num, Ext])).

zpad_filename_incr(Fn) ->
Base = filename:basename(Fn),
Dir = filename:dirname(Fn),
case re:run(Base, "(.*)([0-9]{8})(.*)",
case re:run(Base, "(.*)([0-9]{16})(.*)",
[{capture, all_but_first, list}]) of
{match, [Prefix, NumStr, Ext]} ->
Num = list_to_integer(NumStr),
filename:join(Dir,
lists:flatten(
io_lib:format("~ts~8..0B~ts", [Prefix, Num+1, Ext])));
NewFn = lists:flatten(io_lib:format("~ts~16..0B~ts",
[Prefix, Num + 1, Ext])),
filename:join(Dir, NewFn);
_ ->
undefined
end.

zpad_extract_num(Fn) ->
{match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{8})(.*)",
{match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{16})(.*)",
[{capture, all_but_first, list}]),
list_to_integer(NumStr).

zpad_upgrade(Dir, File, Ext) ->
B = filename:basename(File, Ext),
case length(B) of
8 ->
%% old format, convert and rename
F = "00000000" ++ B ++ Ext,
New = filename:join(Dir, F),
Old = filename:join(Dir, File),
ok = file:rename(Old, New),
F;
16 ->
File
end.


recursive_delete(Dir) ->
case is_dir(Dir) of
true ->
Expand Down Expand Up @@ -427,16 +443,14 @@ lists_shuffle(List0) ->

is_dir(Dir) ->
case prim_file:read_file_info(Dir) of
{ok, #file_info{type=directory}} ->
{ok, #file_info{type = directory}} ->
true;
_ ->
false
end.

is_file(File) ->
case prim_file:read_file_info(File) of
{ok, #file_info{type = directory}} ->
true;
{ok, #file_info{type = regular}} ->
true;
_ ->
Expand Down Expand Up @@ -516,17 +530,17 @@ make_uid_test() ->
ok.

zpad_filename_incr_test() ->
Fn = "/lib/blah/prefix_00000001.segment",
Ex = "/lib/blah/prefix_00000002.segment",
Fn = "/lib/blah/prefix_0000000000000001.segment",
Ex = "/lib/blah/prefix_0000000000000002.segment",
Ex = zpad_filename_incr(Fn),
undefined = zpad_filename_incr("0000001"),
undefined = zpad_filename_incr("000000000000001"),
ok.

zpad_filename_incr_utf8_test() ->
Fn = "/lib/🐰/prefix/00000001.segment",
Ex = "/lib/🐰/prefix/00000002.segment",
Fn = "/lib/🐰/prefix/0000000000000001.segment",
Ex = "/lib/🐰/prefix/0000000000000002.segment",
Ex = zpad_filename_incr(Fn),
undefined = zpad_filename_incr("0000001"),
undefined = zpad_filename_incr("000000000000001"),
ok.

derive_safe_string_test() ->
Expand Down
26 changes: 26 additions & 0 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ await(SegWriter) ->
ok
end.

-define(UPGRADE_MARKER, "segment_name_upgrade_marker").

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand All @@ -115,6 +117,7 @@ init([#{data_dir := DataDir,
process_flag(trap_exit, true),
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
SegmentConf = maps:get(segment_conf, Conf, #{}),
maybe_upgrade_segment_file_names(System, DataDir),
{ok, #state{system = System,
data_dir = DataDir,
counter = CRef,
Expand Down Expand Up @@ -529,3 +532,26 @@ maybe_wait_for_segment_writer(SegWriter, TimeRemaining)
maybe_wait_for_segment_writer(_SegWriter, _TimeRemaining) ->
ok.

maybe_upgrade_segment_file_names(System, DataDir) ->
Marker = filename:join(DataDir, ?UPGRADE_MARKER),
case ra_lib:is_file(Marker) of
false ->
?INFO("segment_writer: upgrading segment file names to "
"new format in dirctory ~ts",
[DataDir]),
[begin
Dir = filename:join(DataDir, UId),
case file:list_dir(Dir) of
{ok, Files} ->
[ra_lib:zpad_upgrade(Dir, F, ".segment")
|| F <- Files, filename:extension(F) =:= ".segment"];
{error, enoent} ->
ok
end
end || {_, UId} <- ra_directory:list_registered(System)],

ok = ra_lib:write_file(Marker, <<>>);
true ->
ok
end.

9 changes: 6 additions & 3 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,12 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
ok = ra_log_segment_writer:await(SegWriter),
post_boot
end,
{ok, Files} = file:list_dir(Dir),
WalFiles = lists:sort([F || F <- Files,
filename:extension(F) == ".wal"]),
{ok, Files0} = file:list_dir(Dir),
Files = [begin
ra_lib:zpad_upgrade(Dir, File, ".wal")
end || File <- Files0,
filename:extension(File) == ".wal"],
WalFiles = lists:sort(Files),
AllWriters =
[begin
?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]),
Expand Down
2 changes: 1 addition & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ execute_state_machine() ->

wal_file() ->
{ok, RaDataDir} = application:get_env(ra, data_dir),
filename:join([RaDataDir, node(), "00000001.wal"]).
filename:join([RaDataDir, node(), "0000000000000001.wal"]).

report(Pid, Count) ->
receive
Expand Down
63 changes: 53 additions & 10 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ all_tests() ->
truncate_segments_with_pending_update,
truncate_segments_with_pending_overwrite,
my_segments,
upgrade_segment_name_format,
skip_entries_lower_than_snapshot_index,
skip_all_entries_lower_than_snapshot_index
].
Expand All @@ -61,7 +62,6 @@ init_per_testcase(TestCase, Config) ->
SysCfg = ra_system:default_config(),
ra_system:store(SysCfg),
_ = ra_log_ets:start_link(SysCfg),
% ra_directory:init(default),
ra_counters:init(),
UId = atom_to_binary(TestCase, utf8),
ok = ra_directory:register_name(default, UId, self(), undefined,
Expand Down Expand Up @@ -314,23 +314,23 @@ accept_mem_tables_for_down_server(Config) ->
ets:new(ra_log_closed_mem_tables, [named_table, bag, public]),
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
FakeUId = <<"down-uid">>,
DownUId = <<"down-uid">>,
%% only insert into dets so that the server is shown as registered
%% but not running
ok = dets:insert(maps:get(directory_rev, get_names(default)),
{down_uid, FakeUId}),
ok = ra_lib:make_dir(filename:join(Dir, FakeUId)),
{down_uid, DownUId}),
ok = ra_lib:make_dir(filename:join(Dir, DownUId)),
application:start(sasl),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir}),
% fake up a mem segment for Self
Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}],
Mt = make_mem_table(FakeUId, Entries),
Mt = make_mem_table(DownUId, Entries),
Mt2 = make_mem_table(UId, Entries),
Tid = ra_mt:tid(Mt),
Tid2 = ra_mt:tid(Mt2),
Ranges = #{FakeUId => [{Tid, {1, 3}}],
Ranges = #{DownUId => [{Tid, {1, 3}}],
UId => [{Tid2, {1, 3}}]},
WalFile = filename:join(Dir, "00001.wal"),
ok = file:write_file(WalFile, <<"waldata">>),
Expand All @@ -348,10 +348,11 @@ accept_mem_tables_for_down_server(Config) ->
end,
%% validate fake uid entries were written
ra_log_segment_writer:await(?SEGWR),
FakeSegmentFile = filename:join([?config(wal_dir, Config),
FakeUId,
"00000001.segment"]),
{ok, FakeSeg} = ra_log_segment:open(FakeSegmentFile, #{mode => read}),
DownFn = ra_lib:zpad_filename("", "segment", 1),
ct:pal("FakeFn ~s", [DownFn]),
DownSegmentFile = filename:join([?config(wal_dir, Config),
DownUId, DownFn]),
{ok, FakeSeg} = ra_log_segment:open(DownSegmentFile, #{mode => read}),
% assert Entries have been fully transferred
Entries = [{I, T, binary_to_term(B)}
|| {I, T, B} <- read_sparse(FakeSeg, [1, 2, 3])],
Expand Down Expand Up @@ -701,6 +702,48 @@ my_segments(Config) ->
proc_lib:stop(TblWriterPid),
ok.

upgrade_segment_name_format(Config) ->
Dir = ?config(wal_dir, Config),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{name => ?SEGWR,
system => default,
data_dir => Dir}),
UId = ?config(uid, Config),
% fake up a mem segment for Self
Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}],
Mt = make_mem_table(UId, Entries),
Ranges = #{UId => [{ra_mt:tid(Mt), ra_mt:range(Mt)}]},
TidRanges = maps:get(UId, Ranges),
WalFile = make_wal(Config, "00001.wal"),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
File =
receive
{ra_log_event, {segments, TidRanges, [{{1, 3}, _Fn}]}} ->
[MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId),
MyFile
after 2000 ->
flush(),
exit(ra_log_event_timeout)
end,

%% stop segment writer and rename existing segment to old format
proc_lib:stop(TblWriterPid),
Root = filename:dirname(File),
Base = filename:basename(File),
{_, FileOld} = lists:split(8, Base),
ok = file:rename(File, filename:join(Root, FileOld)),
%% also remove upgrade marker file
ok = file:delete(filename:join(Dir, "segment_name_upgrade_marker")),
%% restart segment writer which should trigger upgrade process
{ok, Pid2} = ra_log_segment_writer:start_link(#{name => ?SEGWR,
system => default,
data_dir => Dir}),
%% validate the renamed segment has been renamed back to the new
%% 16 character format
[File] = ra_log_segment_writer:my_segments(?SEGWR, UId),

proc_lib:stop(Pid2),
ok.

skip_entries_lower_than_snapshot_index(Config) ->
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
Expand Down
36 changes: 35 additions & 1 deletion test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ all() ->
all_tests() ->
[
basic_log_writes,
wal_filename_upgrade,
same_uid_different_process,
consecutive_terms_in_batch_should_result_in_two_written_events,
overwrite_in_same_batch,
Expand Down Expand Up @@ -143,7 +144,7 @@ basic_log_writes(Config) ->
ra_log_wal:force_roll_over(Pid),
receive
{'$gen_cast',
{mem_tables, #{UId := [{Tid, {12, 13}}]}, "00000001.wal"}} ->
{mem_tables, #{UId := [{Tid, {12, 13}}]}, "0000000000000001.wal"}} ->
ok
after 5000 ->
flush(),
Expand All @@ -153,6 +154,39 @@ basic_log_writes(Config) ->
meck:unload(),
ok.

wal_filename_upgrade(Config) ->
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
Conf = ?config(wal_conf, Config),
#{dir := Dir} = Conf,
{UId, _} = WriterId = ?config(writer_id, Config),
Tid = ets:new(?FUNCTION_NAME, []),
{ok, Pid} = ra_log_wal:start_link(Conf#{segment_writer => self()}),
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"),
ok = await_written(WriterId, 1, {12, 12}),
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 13, 1, "value2"),
ok = await_written(WriterId, 1, {13, 13}),
proc_lib:stop(Pid),
%% rename file to old 8 character format
Fn = filename:join(Dir, "0000000000000001.wal"),
FnOld = filename:join(Dir, "00000001.wal"),
ok = file:rename(Fn, FnOld),
% debugger:start(),
% int:i(ra_log_wal),
% int:break(ra_log_wal, 373),
{ok, Pid2} = ra_log_wal:start_link(Conf#{segment_writer => self()}),
receive
{'$gen_cast',
{mem_tables, #{UId := [{_Tid, {12, 13}}]}, "0000000000000001.wal"}} ->
ok
after 5000 ->
flush(),
ct:fail("receiving mem tables timed out")
end,
proc_lib:stop(Pid2),
meck:unload(),
ok.

same_uid_different_process(Config) ->
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
Expand Down

0 comments on commit 6f73dfe

Please sign in to comment.