diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index ebc23c4..7540164 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -17,13 +17,10 @@ defmodule Jackalope.PersistentWorkList do @type t :: %__MODULE__{ # The maximum number of items that can be persisted as files max_size: non_neg_integer(), - # The lowest index of an unexpired, not-pending item. No pending item has an index >= bottom_index. + # The lowest index of an unexpired, not-pending item. No pending item has an index >= bottom_index. Monotonic. bottom_index: non_neg_integer(), - # The index at which the next item will be pushed + # The index at which the next item will be pushed. Monotonic. next_index: non_neg_integer(), - # Indices greater than bottom_index of (maybe prematurely and forcibly) expired items. - # Used to get a correct count of work items (i.e. not pending). - expired: [], # Cache of item expiration times for all persisted items (pending and not) expirations: %{required(non_neg_integer()) => integer}, # Indices of pending items mapped by their references. @@ -32,19 +29,21 @@ defmodule Jackalope.PersistentWorkList do data_dir: String.t(), # The function to use to get an expiration given the item expiration_fn: fun(), - # The function to use to update an item's expiration - update_expiration_fn: fun() + # Delta between the (approximate) monotonic time at last reboot and the monotomic time at startup + delta_time: integer(), + # The highest index of a recovered item, -1 if none + recovery_index: integer } defstruct bottom_index: 0, next_index: 0, - expired: [], expirations: %{}, pending: %{}, data_dir: nil, max_size: nil, expiration_fn: nil, - update_expiration_fn: nil + delta_time: 0, + recovery_index: -1 end @doc "Create a new work list" @@ -63,11 +62,11 @@ defmodule Jackalope.PersistentWorkList do %State{ max_size: Keyword.get(opts, :max_size), data_dir: Keyword.fetch!(opts, :data_dir), - expiration_fn: Keyword.fetch!(opts, :expiration_fn), - update_expiration_fn: Keyword.fetch!(opts, :update_expiration_fn) + expiration_fn: Keyword.fetch!(opts, :expiration_fn) } |> recover() + record_time_now(initial_state) {:ok, initial_state} end @@ -121,10 +120,7 @@ defmodule Jackalope.PersistentWorkList do index -> {:ok, item} = stored_item_at(index, state, remove: true) - - updated_state = - %State{state | pending: Map.delete(state.pending, ref)} - |> clean_up() + updated_state = %State{state | pending: Map.delete(state.pending, ref)} {:reply, item, updated_state} end @@ -156,7 +152,13 @@ defmodule Jackalope.PersistentWorkList do ## PRIVATE defp count(state) do - (state.next_index - state.bottom_index - length(state.expired)) + expired_count = + Enum.count( + state.bottom_index..(state.next_index - 1), + &expired?(&1, state) + ) + + (state.next_index - state.bottom_index - expired_count) |> max(0) end @@ -175,14 +177,12 @@ defmodule Jackalope.PersistentWorkList do # Peek at oldest non-pending work item defp peek_oldest(state) do - cond do - empty?(state) -> - nil - - true -> - # If this fails, let it crash - {:ok, item} = stored_item_at(state.bottom_index, state) - item + if empty?(state) do + nil + else + # If this fails, let it crash + {:ok, item} = stored_item_at(state.bottom_index, state) + item end end @@ -206,7 +206,6 @@ defmodule Jackalope.PersistentWorkList do %State{state | expirations: Map.delete(state.expirations, index)} |> move_bottom_index() - |> clean_up() end # Move bottom index up until it is not an expired @@ -217,7 +216,7 @@ defmodule Jackalope.PersistentWorkList do next_bottom_index > state.next_index -> state - next_bottom_index in state.expired -> + expired?(next_bottom_index, state) -> %State{ state | bottom_index: next_bottom_index @@ -235,32 +234,13 @@ defmodule Jackalope.PersistentWorkList do # No non-pending items? defp empty?(state), do: state.bottom_index == state.next_index - defp bottom_pending_index(state) do - if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending)) - end - - # Maybe reset indices to initial values and cleanup expired list - defp clean_up(state) do - if empty?(state) and Enum.empty?(state.pending) do - %State{state | bottom_index: 0, next_index: 0, expired: []} - else - cleanup_expired(state) - end + defp expired?(index, state) do + path = item_file_path(index, state) + not File.exists?(path) end - # Remove from expired all indices smaller than the smallest index of a persisted item - defp cleanup_expired(state) do - expired_index_min = - case bottom_pending_index(state) do - nil -> state.bottom_index - index -> min(index, state.bottom_index) - end - - updated_expired = - state.expired - |> Enum.reject(&(&1 < expired_index_min)) - - %State{state | expired: updated_expired} + defp bottom_pending_index(state) do + if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending)) end defp store_item(item, index, state) do @@ -330,11 +310,16 @@ defmodule Jackalope.PersistentWorkList do end end + defp expiration(index, state) do + expiration = Map.fetch!(state.expirations, index) + if index <= state.recovery_index, do: expiration + state.delta_time, else: expiration + end + defp maybe_expire(index, state) do - if index in state.expired do + if expired?(index, state) do state else - expiration = Map.fetch!(state.expirations, index) + expiration = expiration(index, state) if Expiration.after?(expiration, Expiration.now()) do state @@ -355,9 +340,9 @@ defmodule Jackalope.PersistentWorkList do else live_indices = state.bottom_index..(state.next_index - 1) - |> Enum.reject(&(&1 in state.expired)) + |> Enum.reject(&expired?(&1, state)) |> Enum.sort(fn index1, index2 -> - Map.fetch!(state.expirations, index1) <= Map.fetch!(state.expirations, index2) + expiration(index1, state) <= expiration(index2, state) end) |> Enum.take(excess_count) @@ -383,63 +368,79 @@ defmodule Jackalope.PersistentWorkList do move_bottom_index(state) true -> - %State{state | expired: [index | state.expired]} + state end %State{updated_state | expirations: Map.delete(state.expirations, index)} - |> clean_up() end defp pending_item?(index, state), do: Map.has_key?(state.pending, index) defp recover(state) do :ok = File.mkdir_p!(state.data_dir) - now = Expiration.now() - - item_files = - File.ls!(state.data_dir) |> Enum.filter(&Regex.match?(~r/.*\.item/, &1)) |> Enum.sort() - item_files - |> Enum.reduce( - reset_state(state), - fn file, acc -> - recover_file(Path.join(state.data_dir, file), now, acc) - end - ) - |> bound_items() - end + delta_time = Expiration.now() - latest_time(state) + + expirations = + File.ls!(state.data_dir) + |> Enum.filter(&Regex.match?(~r/.*\.item/, &1)) + |> Enum.reduce( + [], + fn item_file, acc -> + index = index_of_item_file(item_file) + + case stored_item_at(index, state) do + {:ok, item} -> + expiration = state.expiration_fn.(item) + [{index, expiration} | acc] + + {:error, reason} -> + Logger.warn( + "Failed to recover item in #{inspect(item_file)}: #{inspect(reason)}. Removing it." + ) + + _ = File.rm(item_file_path(index, state)) + acc + end + end + ) + |> Enum.into(%{}) - defp reset_state(state) do - %State{state | bottom_index: 0, next_index: 0, expired: [], pending: %{}, expirations: %{}} - end + item_indices = Map.keys(expirations) - defp recover_file(file, now, state) do - binary = File.read!(file) - :ok = File.rm!(file) + if Enum.empty?(item_indices) do + reset_state(state) + else + bottom_index = Enum.min(item_indices) + last_index = Enum.max(item_indices) - updated_state = - case item_from_binary(binary) do - {:ok, item} -> - index = state.next_index - # TODO - do some version checking... - rebased_expiration = - Expiration.rebase_expiration(state.expiration_fn.(item), latest_time(state), now) - - rebased_item = state.update_expiration_fn.(item, rebased_expiration) - :ok = store_item(rebased_item, index, state) - - %State{ - state - | next_index: index + 1, - expirations: Map.put(state.expirations, index, rebased_expiration) - } + %State{ + state + | bottom_index: bottom_index, + next_index: last_index + 1, + recovery_index: last_index, + delta_time: delta_time, + expirations: expirations + } + |> bound_items() + end + end - {:error, :invalid} -> - # Ignore invalid item file - state - end + defp index_of_item_file(item_file) do + [index_s, _] = String.split(item_file, ".") + {index, _} = Integer.parse(index_s) + index + end - updated_state + defp reset_state(state) do + %State{ + state + | bottom_index: 0, + next_index: 0, + pending: %{}, + expirations: %{}, + recovery_index: -1 + } end defp latest_time(state) do diff --git a/lib/jackalope/session.ex b/lib/jackalope/session.ex index 80731c7..dd26f86 100644 --- a/lib/jackalope/session.ex +++ b/lib/jackalope/session.ex @@ -79,9 +79,6 @@ defmodule Jackalope.Session do work_list = Keyword.merge(opts, expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - update_expiration_fn: fn {cmd, opts}, expiration -> - {cmd, Keyword.put(opts, :expiration, expiration)} - end, max_size: max_work_list_size ) |> work_list_mod.new() diff --git a/test/jackalope_test.exs b/test/jackalope_test.exs index d4751e2..2c54bf2 100644 --- a/test/jackalope_test.exs +++ b/test/jackalope_test.exs @@ -175,9 +175,6 @@ defmodule JackalopeTest do work_list = Jackalope.PersistentWorkList.new( expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - update_expiration_fn: fn {cmd, opts}, expiration -> - {cmd, Keyword.put(opts, :expiration, expiration)} - end, max_size: 10, data_dir: "/tmp/jackalope" ) @@ -193,6 +190,7 @@ defmodule JackalopeTest do ) end) + assert WorkList.count(work_list) == 10 ref = make_ref() work_list = WorkList.pending(work_list, ref) :ok = GenServer.stop(work_list, :normal) @@ -200,9 +198,6 @@ defmodule JackalopeTest do work_list = Jackalope.PersistentWorkList.new( expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - update_expiration_fn: fn {cmd, opts}, expiration -> - {cmd, Keyword.put(opts, :expiration, expiration)} - end, max_size: 5, data_dir: "/tmp/jackalope" )