Skip to content

Commit

Permalink
Version 2 of persistentwork list
Browse files Browse the repository at this point in the history
WIP - TESTS FAIL
No rewriting of item files on recovery
Using File.exists?/1 to test for expired items
Monotonic bottom and next indices
  • Loading branch information
jfcloutier committed Jan 31, 2022
1 parent 32323e8 commit d7b095f
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 105 deletions.
193 changes: 97 additions & 96 deletions lib/jackalope/persistent_work_list.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ defmodule Jackalope.PersistentWorkList do
@type t :: %__MODULE__{
# The maximum number of items that can be persisted as files
max_size: non_neg_integer(),
# The lowest index of an unexpired, not-pending item. No pending item has an index >= bottom_index.
# The lowest index of an unexpired, not-pending item. No pending item has an index >= bottom_index. Monotonic.
bottom_index: non_neg_integer(),
# The index at which the next item will be pushed
# The index at which the next item will be pushed. Monotonic.
next_index: non_neg_integer(),
# Indices greater than bottom_index of (maybe prematurely and forcibly) expired items.
# Used to get a correct count of work items (i.e. not pending).
expired: [],
# Cache of item expiration times for all persisted items (pending and not)
expirations: %{required(non_neg_integer()) => integer},
# Indices of pending items mapped by their references.
Expand All @@ -32,19 +29,21 @@ defmodule Jackalope.PersistentWorkList do
data_dir: String.t(),
# The function to use to get an expiration given the item
expiration_fn: fun(),
# The function to use to update an item's expiration
update_expiration_fn: fun()
# Delta between the (approximate) monotonic time at last reboot and the monotomic time at startup
delta_time: integer(),
# The highest index of a recovered item, -1 if none
recovery_index: integer
}

defstruct bottom_index: 0,
next_index: 0,
expired: [],
expirations: %{},
pending: %{},
data_dir: nil,
max_size: nil,
expiration_fn: nil,
update_expiration_fn: nil
delta_time: 0,
recovery_index: -1
end

@doc "Create a new work list"
Expand All @@ -63,11 +62,11 @@ defmodule Jackalope.PersistentWorkList do
%State{
max_size: Keyword.get(opts, :max_size),
data_dir: Keyword.fetch!(opts, :data_dir),
expiration_fn: Keyword.fetch!(opts, :expiration_fn),
update_expiration_fn: Keyword.fetch!(opts, :update_expiration_fn)
expiration_fn: Keyword.fetch!(opts, :expiration_fn)
}
|> recover()

record_time_now(initial_state)
{:ok, initial_state}
end

Expand Down Expand Up @@ -121,10 +120,7 @@ defmodule Jackalope.PersistentWorkList do

index ->
{:ok, item} = stored_item_at(index, state, remove: true)

updated_state =
%State{state | pending: Map.delete(state.pending, ref)}
|> clean_up()
updated_state = %State{state | pending: Map.delete(state.pending, ref)}

{:reply, item, updated_state}
end
Expand Down Expand Up @@ -156,7 +152,13 @@ defmodule Jackalope.PersistentWorkList do
## PRIVATE

defp count(state) do
(state.next_index - state.bottom_index - length(state.expired))
expired_count =
Enum.count(
state.bottom_index..(state.next_index - 1),
&expired?(&1, state)
)

(state.next_index - state.bottom_index - expired_count)
|> max(0)
end

Expand All @@ -175,14 +177,12 @@ defmodule Jackalope.PersistentWorkList do

# Peek at oldest non-pending work item
defp peek_oldest(state) do
cond do
empty?(state) ->
nil

true ->
# If this fails, let it crash
{:ok, item} = stored_item_at(state.bottom_index, state)
item
if empty?(state) do
nil
else
# If this fails, let it crash
{:ok, item} = stored_item_at(state.bottom_index, state)
item
end
end

Expand All @@ -206,7 +206,6 @@ defmodule Jackalope.PersistentWorkList do

%State{state | expirations: Map.delete(state.expirations, index)}
|> move_bottom_index()
|> clean_up()
end

# Move bottom index up until it is not an expired
Expand All @@ -217,7 +216,7 @@ defmodule Jackalope.PersistentWorkList do
next_bottom_index > state.next_index ->
state

next_bottom_index in state.expired ->
expired?(next_bottom_index, state) ->
%State{
state
| bottom_index: next_bottom_index
Expand All @@ -235,32 +234,13 @@ defmodule Jackalope.PersistentWorkList do
# No non-pending items?
defp empty?(state), do: state.bottom_index == state.next_index

defp bottom_pending_index(state) do
if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending))
end

# Maybe reset indices to initial values and cleanup expired list
defp clean_up(state) do
if empty?(state) and Enum.empty?(state.pending) do
%State{state | bottom_index: 0, next_index: 0, expired: []}
else
cleanup_expired(state)
end
defp expired?(index, state) do
path = item_file_path(index, state)
not File.exists?(path)
end

# Remove from expired all indices smaller than the smallest index of a persisted item
defp cleanup_expired(state) do
expired_index_min =
case bottom_pending_index(state) do
nil -> state.bottom_index
index -> min(index, state.bottom_index)
end

updated_expired =
state.expired
|> Enum.reject(&(&1 < expired_index_min))

%State{state | expired: updated_expired}
defp bottom_pending_index(state) do
if Enum.empty?(state.pending), do: nil, else: Enum.min(Map.values(state.pending))
end

defp store_item(item, index, state) do
Expand Down Expand Up @@ -330,11 +310,16 @@ defmodule Jackalope.PersistentWorkList do
end
end

defp expiration(index, state) do
expiration = Map.fetch!(state.expirations, index)
if index <= state.recovery_index, do: expiration + state.delta_time, else: expiration
end

defp maybe_expire(index, state) do
if index in state.expired do
if expired?(index, state) do
state
else
expiration = Map.fetch!(state.expirations, index)
expiration = expiration(index, state)

if Expiration.after?(expiration, Expiration.now()) do
state
Expand All @@ -355,9 +340,9 @@ defmodule Jackalope.PersistentWorkList do
else
live_indices =
state.bottom_index..(state.next_index - 1)
|> Enum.reject(&(&1 in state.expired))
|> Enum.reject(&expired?(&1, state))
|> Enum.sort(fn index1, index2 ->
Map.fetch!(state.expirations, index1) <= Map.fetch!(state.expirations, index2)
expiration(index1, state) <= expiration(index2, state)
end)
|> Enum.take(excess_count)

Expand All @@ -383,63 +368,79 @@ defmodule Jackalope.PersistentWorkList do
move_bottom_index(state)

true ->
%State{state | expired: [index | state.expired]}
state
end

%State{updated_state | expirations: Map.delete(state.expirations, index)}
|> clean_up()
end

defp pending_item?(index, state), do: Map.has_key?(state.pending, index)

defp recover(state) do
:ok = File.mkdir_p!(state.data_dir)
now = Expiration.now()

item_files =
File.ls!(state.data_dir) |> Enum.filter(&Regex.match?(~r/.*\.item/, &1)) |> Enum.sort()

item_files
|> Enum.reduce(
reset_state(state),
fn file, acc ->
recover_file(Path.join(state.data_dir, file), now, acc)
end
)
|> bound_items()
end
delta_time = Expiration.now() - latest_time(state)

expirations =
File.ls!(state.data_dir)
|> Enum.filter(&Regex.match?(~r/.*\.item/, &1))
|> Enum.reduce(
[],
fn item_file, acc ->
index = index_of_item_file(item_file)

case stored_item_at(index, state) do
{:ok, item} ->
expiration = state.expiration_fn.(item)
[{index, expiration} | acc]

{:error, reason} ->
Logger.warn(
"Failed to recover item in #{inspect(item_file)}: #{inspect(reason)}. Removing it."
)

_ = File.rm(item_file_path(index, state))
acc
end
end
)
|> Enum.into(%{})

defp reset_state(state) do
%State{state | bottom_index: 0, next_index: 0, expired: [], pending: %{}, expirations: %{}}
end
item_indices = Map.keys(expirations)

defp recover_file(file, now, state) do
binary = File.read!(file)
:ok = File.rm!(file)
if Enum.empty?(item_indices) do
reset_state(state)
else
bottom_index = Enum.min(item_indices)
last_index = Enum.max(item_indices)

updated_state =
case item_from_binary(binary) do
{:ok, item} ->
index = state.next_index
# TODO - do some version checking...
rebased_expiration =
Expiration.rebase_expiration(state.expiration_fn.(item), latest_time(state), now)

rebased_item = state.update_expiration_fn.(item, rebased_expiration)
:ok = store_item(rebased_item, index, state)

%State{
state
| next_index: index + 1,
expirations: Map.put(state.expirations, index, rebased_expiration)
}
%State{
state
| bottom_index: bottom_index,
next_index: last_index + 1,
recovery_index: last_index,
delta_time: delta_time,
expirations: expirations
}
|> bound_items()
end
end

{:error, :invalid} ->
# Ignore invalid item file
state
end
defp index_of_item_file(item_file) do
[index_s, _] = String.split(item_file, ".")
{index, _} = Integer.parse(index_s)
index
end

updated_state
defp reset_state(state) do
%State{
state
| bottom_index: 0,
next_index: 0,
pending: %{},
expirations: %{},
recovery_index: -1
}
end

defp latest_time(state) do
Expand Down
3 changes: 0 additions & 3 deletions lib/jackalope/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ defmodule Jackalope.Session do
work_list =
Keyword.merge(opts,
expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end,
update_expiration_fn: fn {cmd, opts}, expiration ->
{cmd, Keyword.put(opts, :expiration, expiration)}
end,
max_size: max_work_list_size
)
|> work_list_mod.new()
Expand Down
7 changes: 1 addition & 6 deletions test/jackalope_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,6 @@ defmodule JackalopeTest do
work_list =
Jackalope.PersistentWorkList.new(
expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end,
update_expiration_fn: fn {cmd, opts}, expiration ->
{cmd, Keyword.put(opts, :expiration, expiration)}
end,
max_size: 10,
data_dir: "/tmp/jackalope"
)
Expand All @@ -193,16 +190,14 @@ defmodule JackalopeTest do
)
end)

assert WorkList.count(work_list) == 10
ref = make_ref()
work_list = WorkList.pending(work_list, ref)
:ok = GenServer.stop(work_list, :normal)

work_list =
Jackalope.PersistentWorkList.new(
expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end,
update_expiration_fn: fn {cmd, opts}, expiration ->
{cmd, Keyword.put(opts, :expiration, expiration)}
end,
max_size: 5,
data_dir: "/tmp/jackalope"
)
Expand Down

0 comments on commit d7b095f

Please sign in to comment.