diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index ebc23c4..a2a03c0 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -19,11 +19,8 @@ defmodule Jackalope.PersistentWorkList do max_size: non_neg_integer(), # The lowest index of an unexpired, not-pending item. No pending item has an index >= bottom_index. bottom_index: non_neg_integer(), - # The index at which the next item will be pushed + # The index at which the next item will be pushed. next_index: non_neg_integer(), - # Indices greater than bottom_index of (maybe prematurely and forcibly) expired items. - # Used to get a correct count of work items (i.e. not pending). - expired: [], # Cache of item expiration times for all persisted items (pending and not) expirations: %{required(non_neg_integer()) => integer}, # Indices of pending items mapped by their references. @@ -32,19 +29,21 @@ defmodule Jackalope.PersistentWorkList do data_dir: String.t(), # The function to use to get an expiration given the item expiration_fn: fun(), - # The function to use to update an item's expiration - update_expiration_fn: fun() + # Delta between the (approximate) monotonic time at last reboot and the monotomic time at startup + delta_time: integer(), + # The highest index of a recovered item, -1 if none + recovery_index: integer } defstruct bottom_index: 0, next_index: 0, - expired: [], expirations: %{}, pending: %{}, data_dir: nil, max_size: nil, expiration_fn: nil, - update_expiration_fn: nil + delta_time: 0, + recovery_index: -1 end @doc "Create a new work list" @@ -63,11 +62,11 @@ defmodule Jackalope.PersistentWorkList do %State{ max_size: Keyword.get(opts, :max_size), data_dir: Keyword.fetch!(opts, :data_dir), - expiration_fn: Keyword.fetch!(opts, :expiration_fn), - update_expiration_fn: Keyword.fetch!(opts, :update_expiration_fn) + expiration_fn: Keyword.fetch!(opts, :expiration_fn) } |> recover() + record_time_now(initial_state) {:ok, initial_state} end @@ -79,12 +78,8 @@ defmodule Jackalope.PersistentWorkList do end @impl GenServer - def handle_call(:count, _from, state) do - {:reply, count(state), state} - end - - def handle_call(:count_pending, _from, state) do - {:reply, count_pending(state), state} + def handle_call(:info, _from, state) do + {:reply, info(state), state} end def handle_call(:peek, _from, state) do @@ -121,10 +116,7 @@ defmodule Jackalope.PersistentWorkList do index -> {:ok, item} = stored_item_at(index, state, remove: true) - - updated_state = - %State{state | pending: Map.delete(state.pending, ref)} - |> clean_up() + updated_state = %State{state | pending: Map.delete(state.pending, ref)} {:reply, item, updated_state} end @@ -155,8 +147,18 @@ defmodule Jackalope.PersistentWorkList do ## PRIVATE + defp info(state) do + %{count_waiting: count(state), count_pending: count_pending(state)} + end + defp count(state) do - (state.next_index - state.bottom_index - length(state.expired)) + expired_count = + Enum.count( + state.bottom_index..(state.next_index - 1), + &expired?(&1, state) + ) + + (state.next_index - state.bottom_index - expired_count) |> max(0) end @@ -175,14 +177,12 @@ defmodule Jackalope.PersistentWorkList do # Peek at oldest non-pending work item defp peek_oldest(state) do - cond do - empty?(state) -> - nil - - true -> - # If this fails, let it crash - {:ok, item} = stored_item_at(state.bottom_index, state) - item + if empty?(state) do + nil + else + # If this fails, let it crash + {:ok, item} = stored_item_at(state.bottom_index, state) + item end end @@ -206,7 +206,6 @@ defmodule Jackalope.PersistentWorkList do %State{state | expirations: Map.delete(state.expirations, index)} |> move_bottom_index() - |> clean_up() end # Move bottom index up until it is not an expired @@ -217,7 +216,7 @@ defmodule Jackalope.PersistentWorkList do next_bottom_index > state.next_index -> state - next_bottom_index in state.expired -> + expired?(next_bottom_index, state) -> %State{ state | bottom_index: next_bottom_index @@ -235,32 +234,13 @@ defmodule Jackalope.PersistentWorkList do # No non-pending items? defp empty?(state), do: state.bottom_index == state.next_index - defp bottom_pending_index(state) do - if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending)) - end - - # Maybe reset indices to initial values and cleanup expired list - defp clean_up(state) do - if empty?(state) and Enum.empty?(state.pending) do - %State{state | bottom_index: 0, next_index: 0, expired: []} - else - cleanup_expired(state) - end + defp expired?(index, state) do + path = item_file_path(index, state) + not File.exists?(path) end - # Remove from expired all indices smaller than the smallest index of a persisted item - defp cleanup_expired(state) do - expired_index_min = - case bottom_pending_index(state) do - nil -> state.bottom_index - index -> min(index, state.bottom_index) - end - - updated_expired = - state.expired - |> Enum.reject(&(&1 < expired_index_min)) - - %State{state | expired: updated_expired} + defp bottom_pending_index(state) do + if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending)) end defp store_item(item, index, state) do @@ -330,11 +310,16 @@ defmodule Jackalope.PersistentWorkList do end end + defp expiration(index, state) do + expiration = Map.fetch!(state.expirations, index) + if index <= state.recovery_index, do: expiration + state.delta_time, else: expiration + end + defp maybe_expire(index, state) do - if index in state.expired do + if expired?(index, state) do state else - expiration = Map.fetch!(state.expirations, index) + expiration = expiration(index, state) if Expiration.after?(expiration, Expiration.now()) do state @@ -355,9 +340,9 @@ defmodule Jackalope.PersistentWorkList do else live_indices = state.bottom_index..(state.next_index - 1) - |> Enum.reject(&(&1 in state.expired)) + |> Enum.reject(&expired?(&1, state)) |> Enum.sort(fn index1, index2 -> - Map.fetch!(state.expirations, index1) <= Map.fetch!(state.expirations, index2) + expiration(index1, state) <= expiration(index2, state) end) |> Enum.take(excess_count) @@ -383,63 +368,82 @@ defmodule Jackalope.PersistentWorkList do move_bottom_index(state) true -> - %State{state | expired: [index | state.expired]} + state end %State{updated_state | expirations: Map.delete(state.expirations, index)} - |> clean_up() end defp pending_item?(index, state), do: Map.has_key?(state.pending, index) defp recover(state) do :ok = File.mkdir_p!(state.data_dir) - now = Expiration.now() - item_files = - File.ls!(state.data_dir) |> Enum.filter(&Regex.match?(~r/.*\.item/, &1)) |> Enum.sort() + delta_time = Expiration.now() - latest_time(state) - item_files - |> Enum.reduce( - reset_state(state), - fn file, acc -> - recover_file(Path.join(state.data_dir, file), now, acc) - end - ) - |> bound_items() - end + item_files = + File.ls!(state.data_dir) + |> Enum.filter(&Regex.match?(~r/.*\.item/, &1)) + + expirations = + item_files + |> Enum.reduce( + [], + fn item_file, acc -> + index = index_of_item_file(item_file) + + case stored_item_at(index, state) do + {:ok, item} -> + expiration = state.expiration_fn.(item) + [{index, expiration} | acc] + + {:error, reason} -> + Logger.warn( + "Failed to recover item in #{inspect(item_file)}: #{inspect(reason)}. Removing it." + ) + + _ = File.rm(item_file_path(index, state)) + acc + end + end + ) + |> Enum.into(%{}) - defp reset_state(state) do - %State{state | bottom_index: 0, next_index: 0, expired: [], pending: %{}, expirations: %{}} - end + item_indices = Map.keys(expirations) - defp recover_file(file, now, state) do - binary = File.read!(file) - :ok = File.rm!(file) + if Enum.empty?(item_files) do + reset_state(state) + else + bottom_index = Enum.min(item_indices) + last_index = Enum.max(item_indices) - updated_state = - case item_from_binary(binary) do - {:ok, item} -> - index = state.next_index - # TODO - do some version checking... - rebased_expiration = - Expiration.rebase_expiration(state.expiration_fn.(item), latest_time(state), now) - - rebased_item = state.update_expiration_fn.(item, rebased_expiration) - :ok = store_item(rebased_item, index, state) - - %State{ - state - | next_index: index + 1, - expirations: Map.put(state.expirations, index, rebased_expiration) - } + %State{ + state + | bottom_index: bottom_index, + next_index: last_index + 1, + recovery_index: last_index, + delta_time: delta_time, + expirations: expirations + } + |> bound_items() + end + end - {:error, :invalid} -> - # Ignore invalid item file - state - end + defp index_of_item_file(item_file) do + [index_s, _] = String.split(item_file, ".") + {index, _} = Integer.parse(index_s) + index + end - updated_state + defp reset_state(state) do + %State{ + state + | bottom_index: 0, + next_index: 0, + pending: %{}, + expirations: %{}, + recovery_index: -1 + } end defp latest_time(state) do @@ -500,18 +504,10 @@ defimpl Jackalope.WorkList, for: PID do end @impl Jackalope.WorkList - def count(work_list) do - GenServer.call(work_list, :count) + def info(work_list) do + GenServer.call(work_list, :info) end - @impl Jackalope.WorkList - def count_pending(work_list) do - GenServer.call(work_list, :count_pending) - end - - @impl Jackalope.WorkList - def empty?(work_list), do: peek(work_list) == nil - @impl Jackalope.WorkList def remove_all(work_list) do :ok = GenServer.call(work_list, :remove_all) diff --git a/lib/jackalope/session.ex b/lib/jackalope/session.ex index 80731c7..dd26f86 100644 --- a/lib/jackalope/session.ex +++ b/lib/jackalope/session.ex @@ -79,9 +79,6 @@ defmodule Jackalope.Session do work_list = Keyword.merge(opts, expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - update_expiration_fn: fn {cmd, opts}, expiration -> - {cmd, Keyword.put(opts, :expiration, expiration)} - end, max_size: max_work_list_size ) |> work_list_mod.new() diff --git a/lib/jackalope/transient_work_list.ex b/lib/jackalope/transient_work_list.ex index 9638ad3..cc1830b 100644 --- a/lib/jackalope/transient_work_list.ex +++ b/lib/jackalope/transient_work_list.ex @@ -169,18 +169,8 @@ defimpl Jackalope.WorkList, for: Jackalope.TransientWorkList do end @impl Jackalope.WorkList - def count(work_list) do - length(work_list.items) - end - - @impl Jackalope.WorkList - def count_pending(work_list) do - Enum.count(work_list.pending) - end - - @impl Jackalope.WorkList - def empty?(work_list) do - work_list.items == [] + def info(work_list) do + %{count_waiting: length(work_list.items), count_pending: Enum.count(work_list.pending)} end @impl Jackalope.WorkList diff --git a/lib/jackalope/work_list.ex b/lib/jackalope/work_list.ex index 45260f5..892d4cc 100644 --- a/lib/jackalope/work_list.ex +++ b/lib/jackalope/work_list.ex @@ -25,15 +25,12 @@ defprotocol Jackalope.WorkList do @doc "Remove an item from the pending set" @spec done(work_list(), reference()) :: {work_list(), item()} def done(work_list, ref) - @doc "Count the number of work items" - @spec count(work_list()) :: non_neg_integer() - def count(work_list) - @doc "Count the number of pending work items" - @spec count_pending(work_list()) :: non_neg_integer() - def count_pending(work_list) - @doc "Are there work items" - @spec empty?(work_list()) :: boolean - def empty?(work_list) + @doc "Info about the work list" + @spec info(work_list()) :: %{ + required(:count_waiting) => non_neg_integer(), + required(:count_pending) => non_neg_integer() + } + def info(work_list) @doc "Remove all work items" @spec remove_all(work_list()) :: work_list() def remove_all(work_list) diff --git a/test/jackalope_test.exs b/test/jackalope_test.exs index d4751e2..f1a212f 100644 --- a/test/jackalope_test.exs +++ b/test/jackalope_test.exs @@ -85,7 +85,7 @@ defmodule JackalopeTest do ) end) - assert WorkList.count(work_list) == 10 + assert count(work_list) == 10 end test "pending and done work items #{work_list_mod}", context do @@ -103,7 +103,7 @@ defmodule JackalopeTest do ) end) - assert WorkList.count(work_list) == 5 + assert count(work_list) == 5 ref = make_ref() @@ -112,7 +112,7 @@ defmodule JackalopeTest do |> WorkList.pending(ref) |> WorkList.done(ref) - assert WorkList.count(work_list) == 4 + assert count(work_list) == 4 end test "dropping work items #{work_list_mod}", context do @@ -130,7 +130,7 @@ defmodule JackalopeTest do ) end) - assert WorkList.count(work_list) == 10 + assert count(work_list) == 10 end test "reset_pending work items #{work_list_mod}", context do @@ -151,9 +151,9 @@ defmodule JackalopeTest do ref = make_ref() work_list = WorkList.pending(work_list, ref) - assert WorkList.count(work_list) == 4 + assert count(work_list) == 4 work_list = WorkList.reset_pending(work_list) - assert WorkList.count(work_list) == 5 + assert count(work_list) == 5 end end @@ -175,9 +175,6 @@ defmodule JackalopeTest do work_list = Jackalope.PersistentWorkList.new( expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - update_expiration_fn: fn {cmd, opts}, expiration -> - {cmd, Keyword.put(opts, :expiration, expiration)} - end, max_size: 10, data_dir: "/tmp/jackalope" ) @@ -193,6 +190,7 @@ defmodule JackalopeTest do ) end) + assert count(work_list) == 10 ref = make_ref() work_list = WorkList.pending(work_list, ref) :ok = GenServer.stop(work_list, :normal) @@ -200,14 +198,16 @@ defmodule JackalopeTest do work_list = Jackalope.PersistentWorkList.new( expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - update_expiration_fn: fn {cmd, opts}, expiration -> - {cmd, Keyword.put(opts, :expiration, expiration)} - end, max_size: 5, data_dir: "/tmp/jackalope" ) - assert WorkList.count(work_list) == 5 + assert count(work_list) == 5 + end + + defp count(work_list) do + %{count_waiting: count} = WorkList.info(work_list) + count end defp get_session_work_list() do @@ -260,7 +260,9 @@ defmodule JackalopeTest do if reset? do WorkList.remove_all(work_list) - assert WorkList.empty?(work_list) + info = WorkList.info(work_list) + assert info.count_waiting == 0 + assert info.count_pending == 0 end end