Skip to content

Commit

Permalink
Report replication lag in bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-dp committed Nov 26, 2024
1 parent 2597867 commit e1fc9ac
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ defmodule Electric.Application do
children =
Enum.concat([
[
Electric.Telemetry,
{Registry, name: Registry.StackEvents, keys: :duplicate},
{Electric.StackSupervisor,
stack_id: stack_id,
Expand All @@ -64,6 +63,7 @@ defmodule Electric.Application do
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
{Electric.Telemetry, stack_id: stack_id},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
Expand Down
38 changes: 36 additions & 2 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ defmodule Electric.Connection.Manager do
:stack_events_registry,
:tweaks,
awaiting_active: [],
drop_slot_requested: false
drop_slot_requested: false,
monitoring_started?: false
]
end

Expand Down Expand Up @@ -145,6 +146,10 @@ defmodule Electric.Connection.Manager do
GenServer.cast(server, {:pg_info_looked_up, pg_info})
end

def query_replication_lag(server) do
GenServer.call(server, :query_replication_lag)
end

@impl true
def init(opts) do
# Because child processes are started via `start_link()` functions and due to how Postgrex
Expand Down Expand Up @@ -226,6 +231,11 @@ defmodule Electric.Connection.Manager do
{:reply, :ok, %{state | drop_slot_requested: true}}
end

def handle_call(:query_replication_lag, _from, state) do
report_replication_lag(state)
{:reply, :ok, state}
end

@impl true
def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do
case Electric.Postgres.LockConnection.start_link(
Expand Down Expand Up @@ -311,7 +321,12 @@ defmodule Electric.Connection.Manager do
log_collector_pid = lookup_log_collector_pid(shapes_sup_pid)
Process.monitor(log_collector_pid)

state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid}
state = %{
state
| pool_pid: pool_pid,
shape_log_collector_pid: log_collector_pid,
monitoring_started?: true
}

for awaiting <- state.awaiting_active do
GenServer.reply(awaiting, :ok)
Expand Down Expand Up @@ -634,4 +649,23 @@ defmodule Electric.Connection.Manager do
Logger.error("Failed to execute query: #{query}\nError: #{inspect(error)}")
end
end

defp report_replication_lag(%{monitoring_started?: false} = state), do: :ok

defp report_replication_lag(%{pool_pid: pool, stack_id: stack_id} = state) do
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)

query =
"SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_size FROM pg_replication_slots WHERE slot_name = $1;"

case Postgrex.query(pool, query, [slot_name]) do
{:ok, %Postgrex.Result{rows: [[lag]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{lag: lag})

{:error, error} ->
Logger.warning("Failed to query replication lag\nError: #{inspect(error)}")
end

:ok
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.Postgres.ReplicationClient do
"""
use Postgrex.ReplicationConnection

alias Electric.Utils
alias Electric.Postgres.LogicalReplication.Decoder
alias Electric.Postgres.Lsn
alias Electric.Postgres.ReplicationClient.Collector
Expand Down
13 changes: 8 additions & 5 deletions packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ defmodule Electric.Telemetry do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def init(_) do
def init(stack_id: stack_id) do
children = [
{:telemetry_poller, measurements: periodic_measurements(), period: 2_000}
{:telemetry_poller, measurements: periodic_measurements(stack_id), period: 2_000}
]

children
Expand Down Expand Up @@ -72,7 +72,8 @@ defmodule Electric.Telemetry do
last_value("vm.memory.ets", unit: :byte),
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
last_value("vm.total_run_queue_lengths.io")
last_value("vm.total_run_queue_lengths.io"),
last_value("electric.postgres.replication.lag", unit: :byte)
# distribution("plug.router_dispatch.stop.duration",
# tags: [:route],
# unit: {:native, :millisecond}
Expand All @@ -88,10 +89,12 @@ defmodule Electric.Telemetry do
]
end

defp periodic_measurements do
defp periodic_measurements(stack_id) do
[
# A module, function and arguments to be invoked periodically.
{__MODULE__, :uptime_event, []}
{__MODULE__, :uptime_event, []},
{Electric.Connection.Manager, :query_replication_lag,
[Electric.Connection.Manager.name(stack_id)]}
]
end

Expand Down

0 comments on commit e1fc9ac

Please sign in to comment.