Skip to content

Commit

Permalink
Merge pull request #97 from pulibrary/75-hydrator-incrementals
Browse files Browse the repository at this point in the history
Figgy Hydrator Polling
  • Loading branch information
eliotjordan authored Sep 25, 2024
2 parents 356a4b2 + fb8031e commit 586f4b1
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 25 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ config :logger, :console,
# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason

config :dpul_collections, :figgy_hydrator, poll_interval: 60000

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{config_env()}.exs"
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ config :dpul_collections, :solr, %{
username: System.get_env("SOLR_USERNAME"),
password: System.get_env("SOLR_PASSWORD")
}

# Set this poll interval really small so it triggers in test.
config :dpul_collections, :figgy_hydrator, poll_interval: 50
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
last_queried_marker: Figgy.ResourceMarker.t(),
pulled_records: [Figgy.ResourceMarker.t()],
acked_records: [Figgy.ResourceMarker.t()],
cache_version: Integer
cache_version: Integer,
stored_demand: Integer
}
def init(cache_version) do
last_queried_marker = IndexingPipeline.get_processor_marker!("hydrator", cache_version)
Expand All @@ -26,7 +27,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
last_queried_marker: last_queried_marker |> Figgy.ResourceMarker.from(),
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:producer, initial_state}
Expand All @@ -38,11 +40,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, demand)
) do
total_demand = stored_demand + demand

records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -55,10 +59,40 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
Enum.concat(pulled_records, Enum.map(records, &Figgy.ResourceMarker.from/1))
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(
self(),
:check_for_updates,
Application.get_env(:dpul_collections, :figgy_hydrator)[:poll_interval]
)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand})
when demand > 0 do
new_demand = 0
handle_demand(new_demand, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
def handle_info({:ack, :figgy_producer_ack, pending_markers}, state) do
messages = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -39,7 +40,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -56,7 +58,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -79,7 +82,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
fabricated_marker
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -95,7 +99,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

assert new_state == expected_state
Expand All @@ -114,7 +119,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: fabricated_marker,
pulled_records: [],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 0
}

{:noreply, messages, new_state} = Figgy.HydrationProducer.handle_demand(1, initial_state)
Expand All @@ -126,7 +132,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: fabricated_marker,
pulled_records: [],
acked_records: [],
cache_version: 0
cache_version: 0,
stored_demand: 1
}

assert new_state == expected_state
Expand All @@ -144,7 +151,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

acked_markers =
Expand All @@ -163,7 +171,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker3
],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -187,7 +196,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: cache_version
cache_version: cache_version,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand Down Expand Up @@ -217,7 +227,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker3
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -236,7 +247,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -257,7 +269,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -270,7 +283,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker3,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -296,7 +310,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

acked_markers =
Expand All @@ -314,7 +329,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
acked_records: [
marker2
],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -341,7 +357,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

first_ack =
Expand All @@ -355,7 +372,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
marker2
],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -373,7 +391,8 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
last_queried_marker: marker2,
pulled_records: [],
acked_records: [],
cache_version: 1
cache_version: 1,
stored_demand: 0
}

{:noreply, [], new_state} =
Expand All @@ -386,5 +405,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do

assert processor_marker == marker2
end

test ".handle_info(:check_for_updates) with no stored demand" do
assert Figgy.HydrationProducer.handle_info(:check_for_updates, %{stored_demand: 0}) ==
{:noreply, [], %{stored_demand: 0}}
end
end
end

0 comments on commit 586f4b1

Please sign in to comment.