Skip to content

Commit

Permalink
Merge pull request #153 from pulibrary/index_metrics_tracker_poc
Browse files Browse the repository at this point in the history
Measure Hydration/Transformation/Indexing Times
  • Loading branch information
hackartisan authored Dec 4, 2024
2 parents 4c07022 + 5932639 commit 4890507
Show file tree
Hide file tree
Showing 11 changed files with 535 additions and 7 deletions.
3 changes: 2 additions & 1 deletion lib/dpul_collections/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule DpulCollections.Application do
# Start a worker by calling: DpulCollections.Worker.start_link(arg)
# {DpulCollections.Worker, arg},
# Start to serve requests, typically the last entry
DpulCollectionsWeb.Endpoint
DpulCollectionsWeb.Endpoint,
DpulCollections.IndexMetricsTracker
] ++ environment_children(Application.fetch_env!(:dpul_collections, :current_env))

# See https://hexdocs.pm/elixir/Supervisor.html
Expand Down
171 changes: 171 additions & 0 deletions lib/dpul_collections/index_metrics_tracker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
defmodule DpulCollections.IndexMetricsTracker do
use GenServer
alias DpulCollections.IndexingPipeline.Metrics
alias DpulCollections.IndexingPipeline.DatabaseProducer

@type processor_state :: %{
start_time: integer(),
end_time: integer(),
polling_started: boolean(),
acked_count: integer()
}
@type state :: %{(processor_key :: String.t()) => processor_state()}

def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

@impl true
def init(_) do
:ok =
:telemetry.attach(
"metrics-ack-tracker",
[:database_producer, :ack, :done],
&ack_telemetry_callback/4,
nil
)

{:ok, %{}}
end

@spec register_fresh_start(source :: module()) :: term()
def register_fresh_start(source) do
GenServer.call(__MODULE__, {:fresh_index, source})
end

@spec register_polling_started(source :: module()) :: term()
def register_polling_started(source) do
GenServer.call(__MODULE__, {:poll_started, source})
end

@spec processor_durations(source :: module()) :: term()
def processor_durations(source) do
Metrics.index_metrics(source.processor_marker_key(), "full_index")
end

def reset() do
GenServer.call(__MODULE__, {:reset})
end

@impl true
@spec handle_call(term(), term(), state()) :: term()
def handle_call({:reset}, _, _state) do
{:reply, nil, %{}}
end

@impl true
@spec handle_call(term(), term(), state()) :: term()
def handle_call({:fresh_index, source}, _, state) do
new_state =
put_in(state, [source.processor_marker_key()], %{
start_time: :erlang.monotonic_time(),
acked_count: 0
})

{:reply, nil, new_state}
end

@spec handle_call(term(), term(), state()) :: term()
def handle_call({:poll_started, source}, _, state) do
# Record that polling has started if we've recorded a start time but not an
# end time for a source. Then the next time the source finishes acknowledgements
# we'll record an end time.
if get_in(state, [source.processor_marker_key(), :start_time]) != nil &&
get_in(state, [source.processor_marker_key(), :end_time]) == nil do
state = put_in(state, [source.processor_marker_key(), :polling_started], true)

{:reply, nil, state}
else
{:reply, nil, state}
end
end

@spec handle_call(term(), term(), state()) :: term()
def handle_call(
{:ack_received, metadata = %{processor_marker_key: processor_marker_key}},
_,
state
) do
state =
state
|> put_in(
[processor_marker_key],
handle_ack_received(metadata, Map.get(state, processor_marker_key))
)

{:reply, nil, state}
end

# If there's no stored info yet, do nothing.
@spec handle_ack_received(DatabaseProducer.ack_event_metadata(), processor_state()) ::
processor_state()
defp handle_ack_received(_event, nil), do: nil
# If there's a start and end time, do nothing
defp handle_ack_received(
_event,
processor_state = %{start_time: _start_time, end_time: _end_time}
),
do: processor_state

# If there's a start, trigger for end time, and the unacked_count is 0, create the IndexMetric.
defp handle_ack_received(
%{
processor_marker_key: processor_marker_key,
acked_count: new_acked_count,
unacked_count: 0
},
processor_state = %{
start_time: _start_time,
polling_started: true,
acked_count: old_acked_count
}
) do
processor_state =
processor_state
|> put_in([:end_time], :erlang.monotonic_time())
|> Map.delete(:polling_started)
|> put_in([:acked_count], old_acked_count + new_acked_count)

duration = processor_state[:end_time] - processor_state[:start_time]

:telemetry.execute(
[:dpulc, :indexing_pipeline, event(processor_marker_key), :time_to_poll],
%{duration: duration},
%{source: processor_marker_key}
)

Metrics.create_index_metric(%{
type: processor_marker_key,
measurement_type: "full_index",
duration: System.convert_time_unit(duration, :native, :second),
records_acked: processor_state[:acked_count]
})

processor_state
end

# If there's a start time, record the acked_count
defp handle_ack_received(
%{acked_count: new_acked_count},
processor_state = %{start_time: _start_time, acked_count: old_acked_count}
) do
processor_state
|> put_in([:acked_count], old_acked_count + new_acked_count)
end

defp ack_telemetry_callback([:database_producer, :ack, :done], _measurements, metadata, _config) do
GenServer.call(__MODULE__, {:ack_received, metadata})
end

def event("figgy_hydrator") do
:hydrator
end

def event("figgy_transformer") do
:transformer
end

def event("figgy_indexer") do
:indexer
end
end
27 changes: 23 additions & 4 deletions lib/dpul_collections/indexing_pipeline/database_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
records =
source_module.get_cache_entries_since!(last_queried_marker, total_demand, cache_version)

if last_queried_marker == nil && length(records) > 0 do
DpulCollections.IndexMetricsTracker.register_fresh_start(source_module)
end

new_state =
state
|> Map.put(
Expand All @@ -82,6 +86,7 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do

# Set a timer to try fulfilling demand again later
if new_state.stored_demand > 0 do
DpulCollections.IndexMetricsTracker.register_polling_started(source_module)
Process.send_after(self(), :check_for_updates, 50)
end

Expand Down Expand Up @@ -136,7 +141,12 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do
})
end

notify_ack(pending_markers |> length())
notify_ack(
pending_markers |> length(),
new_state.pulled_records |> length(),
state.source_module.processor_marker_key()
)

{:noreply, messages, new_state}
end

Expand Down Expand Up @@ -214,12 +224,21 @@ defmodule DpulCollections.IndexingPipeline.DatabaseProducer do

# This happens when ack is finished, we listen to this telemetry event in
# tests so we know when the Producer's done processing a message.
@spec notify_ack(integer()) :: any()
defp notify_ack(acked_message_count) do
@spec notify_ack(integer(), integer(), String.t()) :: any()
@type ack_event_metadata :: %{
acked_count: integer(),
unacked_count: integer(),
processor_marker_key: String.t()
}
defp notify_ack(acked_message_count, unacked_count, processor_marker_key) do
:telemetry.execute(
[:database_producer, :ack, :done],
%{},
%{acked_count: acked_message_count}
%{
acked_count: acked_message_count,
unacked_count: unacked_count,
processor_marker_key: processor_marker_key
}
)
end

Expand Down
21 changes: 21 additions & 0 deletions lib/dpul_collections/indexing_pipeline/index_metric.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule DpulCollections.IndexingPipeline.IndexMetric do
use Ecto.Schema
import Ecto.Changeset

schema "index_metrics" do
field :type, :string
field :measurement_type, :string
# Duration in seconds
field :duration, :integer
field :records_acked, :integer, default: 0

timestamps(type: :utc_datetime_usec)
end

@doc false
def changeset(index_metric, attrs) do
index_metric
|> cast(attrs, [:type, :measurement_type, :duration, :records_acked])
|> validate_required([:type, :measurement_type, :duration, :records_acked])
end
end
29 changes: 29 additions & 0 deletions lib/dpul_collections/indexing_pipeline/metrics.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule DpulCollections.IndexingPipeline.Metrics do
import Ecto.Query, warn: false
alias DpulCollections.Repo
alias DpulCollections.IndexingPipeline.IndexMetric

@doc """
Creates an IndexMetric
"""
def create_index_metric(attrs \\ %{}) do
{:ok, index_metric} =
%IndexMetric{}
|> IndexMetric.changeset(attrs)
|> Repo.insert()

index_metric
end

@doc """
Get index metrics by type
"""
def index_metrics(type, measurement_type) do
query =
from r in IndexMetric,
where: r.type == ^type and r.measurement_type == ^measurement_type,
order_by: [desc: r.inserted_at]

Repo.all(query)
end
end
Loading

0 comments on commit 4890507

Please sign in to comment.