diff --git a/src/simple_cache_server.erl b/src/simple_cache_server.erl index 783e2f6..8b5dedd 100644 --- a/src/simple_cache_server.erl +++ b/src/simple_cache_server.erl @@ -10,7 +10,11 @@ ]). -record(state, {table :: ets:tid() | atom(), - options :: list()}). + options :: list(), + buffer :: #{Expires :: non_neg_integer() => buffer_item()}, + buffer_limit :: non_neg_integer()}). + +-type buffer_item() :: {Keys :: list(), Count :: non_neg_integer()}. %%============================================================================= %% API Function Exports @@ -113,31 +117,34 @@ sync_flush() -> %%============================================================================= %% gen_server Function Definitions %%============================================================================= -init([]) -> - {ok, #state{table = ets:new(?SERVER, ?ETS_OPTIONS), - options = ?ETS_OPTIONS}}; init(CustomOptions) -> + BufferLimit = application:get_env(simple_cache, buffer_limit, 1), + %% Legacy behaviour: without buffering and batching send_after/3 calls + %% there is no need to send regular ticks. + BufferLimit > 1 andalso send_tick(), Options = merge_options(?ETS_OPTIONS, CustomOptions), {ok, #state{table = ets:new(?SERVER, Options), - options = Options}}. + options = Options, + buffer = #{}, + buffer_limit = BufferLimit}}. handle_call(ops_info, _From, #state{table = Table} = State) -> {reply, ets:info(Table), State}; handle_call(ops_list, _From, #state{table = Table} = State) -> {reply, ets:tab2list(Table), State}; handle_call({set, Key, Value, infinity}, _From, #state{table = Table} = State) -> - insert(Table, Key, Value, infinity), + insert_no_expire(Table, Key, Value), {reply, ok, State}; handle_call({set, Key, Value, Expires}, _From, #state{table = Table} = State) -> - insert(Table, Key, Value, Expires), - {reply, ok, State}; + NewState = insert_and_buffer_expiry(Table, Key, Value, Expires, State), + {reply, ok, NewState}; handle_call({set, Key, Value, Conditional, Expires}, _From, #state{table = Table} = State) -> Test = case lookup(Key) of {ok, OldValue} -> Conditional(OldValue); {error, not_found} -> true end, - Test andalso insert(Table, Key, Value, Expires), - {reply, {ok, Test}, State}; + NewState = maybe_insert(Table, Key, Value, Expires, Test, State), + {reply, {ok, Test}, NewState}; handle_call({flush, Key}, _From, #state{table = Table} = State) -> ets:delete(Table, Key), {reply, ok, State}; @@ -146,11 +153,11 @@ handle_call(flush, _From, #state{table = Table} = State) -> {reply, ok, State}. handle_cast({set, Key, Value, infinity}, #state{table = Table} = State) -> - insert(Table, Key, Value, infinity), + insert_no_expire(Table, Key, Value), {noreply, State}; handle_cast({set, Key, Value, Expires}, #state{table = Table} = State) -> - insert(Table, Key, Value, Expires), - {noreply, State}; + NewState = insert_and_buffer_expiry(Table, Key, Value, Expires, State), + {noreply, NewState}; handle_cast({flush, Key}, #state{table = Table} = State) -> ets:delete(Table, Key), {noreply, State}; @@ -158,10 +165,19 @@ handle_cast(flush, #state{table = Table} = State) -> ets:delete_all_objects(Table), {noreply, State}. +handle_info({expire, Keys}, #state{table = Table} = State) when is_list(Keys) -> + lists:foreach(fun(Key) -> ets:delete(Table, Key) end, Keys), + {noreply, State}; handle_info({expire, Key}, #state{table = Table} = State) -> ets:delete(Table, Key), - {noreply, State}. - + {noreply, State}; +handle_info(tick, #state{buffer = Buffer} = State) -> + maps:foreach(fun (Expires, {Keys, _Count}) -> + erlang:send_after(1000 * Expires, ?SERVER, {expire, Keys}) + end, + Buffer), + send_tick(), + {noreply, State#state{buffer = #{}}}. terminate(_Reason, _State) -> ok. @@ -171,11 +187,42 @@ code_change(_OldVsn, State, _Extra) -> %%============================================================================= %% Internal functionality %%============================================================================= -insert(Table, Key, Value, infinity) -> - ets:insert(Table, {Key, Value, infinity}); -insert(Table, Key, Value, Expires) -> +maybe_insert(_Table, _Key, _Value, _Expires, false, State) -> + State; +maybe_insert(Table, Key, Value, infinity, true, State) -> + insert_no_expire(Table, Key, Value), + State; +maybe_insert(Table, Key, Value, Expires, true, State) -> + insert_and_buffer_expiry(Table, Key, Value, Expires, State). + +insert_no_expire(Table, Key, Value) -> + ets:insert(Table, {Key, Value, infinity}). + +insert_and_buffer_expiry(Table, Key, Value, Expires, State) -> ets:insert(Table, {Key, Value, Expires}), - erlang:send_after(1000 * Expires, ?SERVER, {expire, Key}). + buffer_expiry(Key, Expires, State). + +buffer_expiry(Key, Expires, #state{buffer_limit = Limit} = State) when Limit < 2 -> + %% Do not buffer + erlang:send_after(1000 * Expires, ?SERVER, {expire, Key}), + State; +buffer_expiry(Key, Expires, #state{buffer = Buffer, buffer_limit = Limit} = State) -> + case Buffer of + #{Expires := {Keys, Count}} when Count == Limit -> + erlang:send_after(1000 * Expires, ?SERVER, {expire, [Key | Keys]}), + NewBuffer = maps:remove(Expires, Buffer), + State#state{buffer = NewBuffer}; + _ -> + NewBuffer = maps:update_with(Expires, + fun ({Keys, Count}) -> {[Key | Keys], Count + 1} end, + {[Key], 1}, % initial value + Buffer), + State#state{buffer = NewBuffer} + end. + +send_tick() -> + TickTime = application:get_env(simple_cache, expiry_tick_ms, 10 * 1000), + erlang:send_after(TickTime, ?SERVER, tick). get_by_key(Table, Key) -> case ets:lookup(Table, Key) of @@ -185,7 +232,9 @@ get_by_key(Table, Key) -> {error, not_found} end. +merge_options(ExistingOptions, NewOptions) when NewOptions == [] -> + ExistingOptions; merge_options(ExistingOptions, NewOptions) -> orddict:merge(fun (_, X, Y) -> X + Y end, orddict:from_list(ExistingOptions), - orddict:from_list(NewOptions)). + orddict:from_list(NewOptions)). \ No newline at end of file