Skip to content

Commit

Permalink
Rename "replication lag" to "retained WAL size" (#2056)
Browse files Browse the repository at this point in the history
The thing being measured here is the size of the WAL that Electric's
replication slot retains, so I think the new name describes it better.

This change also restructures the code a bit to make the dependency on
the DB connection pool crystal clear.
  • Loading branch information
alco authored Nov 29, 2024
1 parent 704ac91 commit d0b15e3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
34 changes: 20 additions & 14 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ defmodule Electric.Connection.Manager do
GenServer.cast(server, {:pg_info_looked_up, pg_info})
end

def query_replication_lag(server) do
GenServer.call(server, :query_replication_lag)
def report_retained_wal_size(server) do
GenServer.call(server, :report_retained_wal_size)
end

@impl true
Expand Down Expand Up @@ -231,8 +231,12 @@ defmodule Electric.Connection.Manager do
{:reply, :ok, %{state | drop_slot_requested: true}}
end

def handle_call(:query_replication_lag, _from, state) do
report_replication_lag(state)
def handle_call(:report_retained_wal_size, _from, state) do
if state.monitoring_started? do
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)
query_and_report_retained_wal_size(state.pool_pid, slot_name)
end

{:reply, :ok, state}
end

Expand Down Expand Up @@ -650,20 +654,22 @@ defmodule Electric.Connection.Manager do
end
end

defp report_replication_lag(%{monitoring_started?: false}), do: :ok

defp report_replication_lag(%{pool_pid: pool} = state) do
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)

query =
"SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_size FROM pg_replication_slots WHERE slot_name = $1;"
defp query_and_report_retained_wal_size(pool, slot_name) do
query = """
SELECT
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
FROM
pg_replication_slots
WHERE
slot_name = $1
"""

case Postgrex.query(pool, query, [slot_name]) do
{:ok, %Postgrex.Result{rows: [[lag]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{lag: lag})
{:ok, %Postgrex.Result{rows: [[wal_size]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size})

{:error, error} ->
Logger.warning("Failed to query replication lag\nError: #{inspect(error)}")
Logger.warning("Failed to query retained WAL size\nError: #{inspect(error)}")
end

:ok
Expand Down
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ defmodule Electric.Telemetry do
[
# A module, function and arguments to be invoked periodically.
{__MODULE__, :uptime_event, []},
{Electric.Connection.Manager, :query_replication_lag,
{Electric.Connection.Manager, :report_retained_wal_size,
[Electric.Connection.Manager.name(stack_id)]}
]
end
Expand Down

0 comments on commit d0b15e3

Please sign in to comment.