Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use an LRU cache for standalone funs #82

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/khepri_fun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ fun((#{calls := #{Call :: mfa() => true},
should_process_function =>
should_process_function_fun(),
is_standalone_fun_still_needed =>
is_standalone_fun_still_needed_fun()}.
is_standalone_fun_still_needed_fun(),
standalone_fun_cache => module()}.
%% Options to tune the extraction of an anonymous function.
%%
%% <ul>
Expand Down Expand Up @@ -452,10 +453,11 @@ standalone_fun_cache_key(Module, Name, Arity, Checksum, Options) ->
%% @private

get_cached_standalone_fun(
#state{fun_info = #{module := Module}} = State)
#state{fun_info = #{module := Module}, options = Options} = State)
when Module =/= erl_eval ->
Key = standalone_fun_cache_key(State),
case persistent_term:get(Key, undefined) of
FunctionCache = maps:get(standalone_fun_cache, Options, khepri_fun_cache),
case FunctionCache:get(Key, undefined) of
#{standalone_fun := StandaloneFunWithoutEnv,
checksums := Checksums,
counters := Counters} ->
Expand Down Expand Up @@ -531,6 +533,8 @@ cache_standalone_fun(
%% currently.
Counters = counters:new(1, [write_concurrency]),

FunctionCache = maps:get(standalone_fun_cache, Options, khepri_fun_cache),

case StandaloneFun of
#standalone_fun{} ->
%% The standalone function is stored in the cache without its
Expand All @@ -550,11 +554,11 @@ cache_standalone_fun(

%% TODO: We need to add some memory management here to clear the
%% cache if there are many different standalone functions.
persistent_term:put(Key, Value);
FunctionCache:put(Key, Value);
fun_kept ->
Value = #{fun_kept => true,
counters => Counters},
persistent_term:put(Key, Value)
FunctionCache:put(Key, Value)
end,
ok.

Expand Down
111 changes: 111 additions & 0 deletions src/khepri_fun_cache.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
%%

%% @doc A cache interface for transaction functions
%%
%% The default cache implementation is an `ets'-based LRU cache.
%% The capacity of the cache can be configured by setting
%% `application:set_env(khepri, fun_cache_capacity, Capacity)'.
%% By default, the capacity is `10_000' elements.

-module(khepri_fun_cache).

-export([get/2, put/2]).

%% Exported for testing
-export([clear/0]).

-define(CACHE, '$__KHEPRI_FUN_CACHE__').
-define(RANKS, '$__KHEPRI_FUN_RANKS__').

-ifdef(TEST).
-define(CAPACITY, 3).
-else.
-define(CAPACITY, 10_000).
-endif.

-callback get(Key :: term(), Default :: term()) -> term().
-callback put(Key :: term(), Fun :: term()) -> ok.

get(Key, Default) ->
case ets:whereis(?CACHE) of
undefined ->
setup_tables(),
Default;
_Tid ->
get1(Key, Default)
end.

get1(Key, Default) ->
case ets:lookup(?CACHE, Key) of
[{Key, Rank, Value}] ->
update_rank(Rank, Key),
Value;
[] ->
Default
end.

put(Key, Value) ->
case ets:whereis(?CACHE) of
undefined ->
setup_tables(),
insert_key(Key, Value),
ok;
_Tid ->
put1(Key, Value)
end.

put1(Key, Value) ->
case ets:lookup(?CACHE, Key) of
[{Key, _Rank, Value}] ->
ok;
[{Key, Rank, _OtherValue}] ->
update_rank(Rank, Key),
ets:update_element(?CACHE, Key, {3, Value}),
ok;
[] ->
insert_key(Key, Value),
ok
end.

clear() ->
ets:delete(?CACHE),
ets:delete(?RANKS),
ok.

setup_tables() ->
_ = ets:new(?CACHE, [public, named_table, set]),
_ = ets:new(?RANKS, [public, named_table, ordered_set]),
ok.

update_rank(Rank, Key) ->
NextRank = next_rank(),
ets:delete(?RANKS, Rank),
ets:insert(?RANKS, {NextRank, Key}),
ets:update_element(?CACHE, Key, {2, NextRank}),
ok.

insert_key(Key, Value) ->
case ets:info(?CACHE, size) >= capacity() of
true ->
%% The cache is full so we discard the least-recently used item.
OldestRank = ets:first(?RANKS),
[{OldestRank, OldestKey}] = ets:take(?RANKS, OldestRank),
ets:delete(?CACHE, OldestKey);
_ ->
ok
end,
NextRank = next_rank(),
ets:insert(?CACHE, {Key, NextRank, Value}),
ets:insert(?RANKS, {NextRank, Key}),
ok.

capacity() ->
application:get_env(khepri, fun_cache_capacity, ?CAPACITY).

next_rank() ->
erlang:unique_integer([monotonic]).
64 changes: 56 additions & 8 deletions test/sf_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@ standalone_fun_is_cached_test() ->
Module, Name, Arity, Checksum, Options),

StandaloneFun1 = khepri_fun:to_standalone_fun(Fun, Options),
CacheEntry1 = persistent_term:get(Key, undefined),
CacheEntry1 = khepri_fun_cache:get(Key, undefined),
?assertMatch(#standalone_fun{}, StandaloneFun1),
?assertMatch(#{standalone_fun := StandaloneFun1}, CacheEntry1),
#{counters := Counters} = CacheEntry1,
?assertEqual(0, counters:get(Counters, 1)),

StandaloneFun2 = khepri_fun:to_standalone_fun(Fun, Options),
CacheEntry2 = persistent_term:get(Key, undefined),
CacheEntry2 = khepri_fun_cache:get(Key, undefined),
?assertEqual(StandaloneFun1, StandaloneFun2),
?assertEqual(CacheEntry1, CacheEntry2),
?assertEqual(1, counters:get(Counters, 1)),

StandaloneFun3 = khepri_fun:to_standalone_fun(Fun, Options),
CacheEntry3 = persistent_term:get(Key, undefined),
CacheEntry3 = khepri_fun_cache:get(Key, undefined),
?assertEqual(StandaloneFun1, StandaloneFun3),
?assertEqual(CacheEntry1, CacheEntry3),
?assertEqual(2, counters:get(Counters, 1)).
Expand All @@ -65,20 +65,20 @@ kept_fun_is_cached_test() ->
Module, Name, Arity, Checksum, Options),

StandaloneFun1 = khepri_fun:to_standalone_fun(Fun, Options),
CacheEntry1 = persistent_term:get(Key, undefined),
CacheEntry1 = khepri_fun_cache:get(Key, undefined),
?assertEqual(Fun, StandaloneFun1),
?assertMatch(#{fun_kept := true}, CacheEntry1),
#{counters := Counters} = CacheEntry1,
?assertEqual(0, counters:get(Counters, 1)),

StandaloneFun2 = khepri_fun:to_standalone_fun(Fun, Options),
CacheEntry2 = persistent_term:get(Key, undefined),
CacheEntry2 = khepri_fun_cache:get(Key, undefined),
?assertEqual(StandaloneFun1, StandaloneFun2),
?assertEqual(CacheEntry1, CacheEntry2),
?assertEqual(1, counters:get(Counters, 1)),

StandaloneFun3 = khepri_fun:to_standalone_fun(Fun, Options),
CacheEntry3 = persistent_term:get(Key, undefined),
CacheEntry3 = khepri_fun_cache:get(Key, undefined),
?assertEqual(StandaloneFun1, StandaloneFun3),
?assertEqual(CacheEntry1, CacheEntry3),
?assertEqual(2, counters:get(Counters, 1)).
Expand Down Expand Up @@ -153,7 +153,7 @@ modified_module_causes_cache_miss_test() ->
Module, Name1, Arity1, Checksum1, Options),

StandaloneFun1 = khepri_fun:to_standalone_fun(Fun1, Options),
CacheEntry1 = persistent_term:get(Key1, undefined),
CacheEntry1 = khepri_fun_cache:get(Key1, undefined),
?assertMatch(#standalone_fun{}, StandaloneFun1),
?assertEqual(1, khepri_fun:exec(StandaloneFun1, [])),
#{counters := Counters1} = CacheEntry1,
Expand All @@ -179,11 +179,59 @@ modified_module_causes_cache_miss_test() ->
?assertNotEqual(Checksum1, Checksum2),

StandaloneFun2 = khepri_fun:to_standalone_fun(Fun2, Options),
CacheEntry2 = persistent_term:get(Key2, undefined),
CacheEntry2 = khepri_fun_cache:get(Key2, undefined),
?assertMatch(#standalone_fun{}, StandaloneFun2),
?assertEqual(2, khepri_fun:exec(StandaloneFun2, [])),
#{counters := Counters2} = CacheEntry2,
?assertEqual(0, counters:get(Counters2, 1)),

true = code:delete(Module),
_ = code:purge(Module).

lru_cache_evicts_least_recently_used_item_test() ->
%% In test mode, the cache capacity is 3.
khepri_fun_cache:clear(),
khepri_fun_cache:put(1, one),
khepri_fun_cache:put(2, two),
khepri_fun_cache:put(3, three),
?assertEqual(one, khepri_fun_cache:get(1, undefined)),
?assertEqual(two, khepri_fun_cache:get(2, undefined)),
?assertEqual(three, khepri_fun_cache:get(3, undefined)),
%% 1 is evicted.
khepri_fun_cache:put(4, four),
?assertEqual(four, khepri_fun_cache:get(4, undefined)),
?assertEqual(undefined, khepri_fun_cache:get(1, undefined)),
?assertEqual(two, khepri_fun_cache:get(2, undefined)),
%% Then 3 is evicted because 2 was just accessed.
khepri_fun_cache:put(5, five),
?assertEqual(undefined, khepri_fun_cache:get(3, undefined)),
?assertEqual(two, khepri_fun_cache:get(2, undefined)).

lru_cache_updating_element_resets_rank_test() ->
%% Updating an existing element with a new Value resets its rank.
khepri_fun_cache:clear(),
khepri_fun_cache:put(1, one),
khepri_fun_cache:put(2, two),
khepri_fun_cache:put(3, three),
%% 1's rank is reset.
khepri_fun_cache:put(1, one_and_a_half),
%% 2 is evicted.
khepri_fun_cache:put(4, four),
?assertEqual(one_and_a_half, khepri_fun_cache:get(1, undefined)),
?assertEqual(undefined, khepri_fun_cache:get(2, undefined)),
?assertEqual(three, khepri_fun_cache:get(3, undefined)),
?assertEqual(four, khepri_fun_cache:get(4, undefined)).

lru_cache_updating_element_with_same_value_does_not_reset_rank_test() ->
%% `put/2'-ing an element with the same Value does not reset the rank
khepri_fun_cache:clear(),
khepri_fun_cache:put(1, one),
khepri_fun_cache:put(2, two),
khepri_fun_cache:put(3, three),
khepri_fun_cache:put(1, one),
%% 1 is evicted.
khepri_fun_cache:put(4, four),
?assertEqual(undefined, khepri_fun_cache:get(1, undefined)),
?assertEqual(two, khepri_fun_cache:get(2, undefined)),
?assertEqual(three, khepri_fun_cache:get(3, undefined)),
?assertEqual(four, khepri_fun_cache:get(4, undefined)).