Skip to content

Commit

Permalink
Add configurable poll interval for Hydrator.
Browse files Browse the repository at this point in the history
  • Loading branch information
tpendragon committed Sep 24, 2024
1 parent 396fd03 commit fb8031e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ config :logger, :console,
# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason

config :dpul_collections, :figgy_hydrator, poll_interval: 60000

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{config_env()}.exs"
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ config :dpul_collections, :solr, %{
username: System.get_env("SOLR_USERNAME"),
password: System.get_env("SOLR_PASSWORD")
}

# Set this poll interval really small so it triggers in test.
config :dpul_collections, :figgy_hydrator, poll_interval: 50
40 changes: 36 additions & 4 deletions lib/dpul_collections/indexing_pipeline/figgy/hydration_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
state = %{
last_queried_marker: last_queried_marker,
pulled_records: pulled_records,
acked_records: acked_records
acked_records: acked_records,
stored_demand: stored_demand
}
)
when demand > 0 do
records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, demand)
) do
total_demand = stored_demand + demand

records = IndexingPipeline.get_figgy_resources_since!(last_queried_marker, total_demand)

new_state =
state
Expand All @@ -57,10 +59,40 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducer do
Enum.concat(pulled_records, Enum.map(records, &Figgy.ResourceMarker.from/1))
)
|> Map.put(:acked_records, acked_records)
|> Map.put(:stored_demand, calculate_stored_demand(total_demand, length(records)))

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
Process.send_after(
self(),
:check_for_updates,
Application.get_env(:dpul_collections, :figgy_hydrator)[:poll_interval]
)
end

{:noreply, Enum.map(records, &wrap_record/1), new_state}
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand == fulfilled_demand do
0
end

defp calculate_stored_demand(total_demand, fulfilled_demand)
when total_demand > fulfilled_demand do
total_demand - fulfilled_demand
end

def handle_info(:check_for_updates, state = %{stored_demand: demand})
when demand > 0 do
new_demand = 0
handle_demand(new_demand, state)
end

def handle_info(:check_for_updates, state) do
{:noreply, [], state}
end

@impl GenStage
def handle_info({:ack, :figgy_producer_ack, pending_markers}, state) do
messages = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do
pulled_records: [],
acked_records: [],
cache_version: 0,
stored_demand: 0
stored_demand: 1
}

assert new_state == expected_state
Expand Down Expand Up @@ -405,5 +405,10 @@ defmodule DpulCollections.IndexingPipeline.Figgy.HydrationProducerTest do

assert processor_marker == marker2
end

test ".handle_info(:check_for_updates) with no stored demand" do
assert Figgy.HydrationProducer.handle_info(:check_for_updates, %{stored_demand: 0}) ==
{:noreply, [], %{stored_demand: 0}}
end
end
end

0 comments on commit fb8031e

Please sign in to comment.