diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 54686b2015..44bf27b09a 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -49,7 +49,6 @@ defmodule Electric.Application do children = Enum.concat([ [ - Electric.Telemetry, {Registry, name: Registry.StackEvents, keys: :duplicate}, {Electric.StackSupervisor, stack_id: stack_id, @@ -64,6 +63,7 @@ defmodule Electric.Application do pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)], storage: Application.fetch_env!(:electric, :storage), chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)}, + {Electric.Telemetry, stack_id: stack_id}, {Bandit, plug: {Electric.Plug.Router, router_opts}, port: Application.fetch_env!(:electric, :service_port), diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 4a4b57b3aa..0f406ae7f3 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -63,7 +63,8 @@ defmodule Electric.Connection.Manager do :stack_events_registry, :tweaks, awaiting_active: [], - drop_slot_requested: false + drop_slot_requested: false, + monitoring_started?: false ] end @@ -145,6 +146,10 @@ defmodule Electric.Connection.Manager do GenServer.cast(server, {:pg_info_looked_up, pg_info}) end + def query_replication_lag(server) do + GenServer.call(server, :query_replication_lag) + end + @impl true def init(opts) do # Because child processes are started via `start_link()` functions and due to how Postgrex @@ -226,6 +231,11 @@ defmodule Electric.Connection.Manager do {:reply, :ok, %{state | drop_slot_requested: true}} end + def handle_call(:query_replication_lag, _from, state) do + report_replication_lag(state) + {:reply, :ok, state} + end + @impl true def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do case Electric.Postgres.LockConnection.start_link( @@ -311,7 +321,12 @@ defmodule Electric.Connection.Manager do log_collector_pid = lookup_log_collector_pid(shapes_sup_pid) Process.monitor(log_collector_pid) - state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid} + state = %{ + state + | pool_pid: pool_pid, + shape_log_collector_pid: log_collector_pid, + monitoring_started?: true + } for awaiting <- state.awaiting_active do GenServer.reply(awaiting, :ok) @@ -634,4 +649,23 @@ defmodule Electric.Connection.Manager do Logger.error("Failed to execute query: #{query}\nError: #{inspect(error)}") end end + + defp report_replication_lag(%{monitoring_started?: false} = state), do: :ok + + defp report_replication_lag(%{pool_pid: pool, stack_id: stack_id} = state) do + slot_name = Keyword.fetch!(state.replication_opts, :slot_name) + + query = + "SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_size FROM pg_replication_slots WHERE slot_name = $1;" + + case Postgrex.query(pool, query, [slot_name]) do + {:ok, %Postgrex.Result{rows: [[lag]]}} -> + :telemetry.execute([:electric, :postgres, :replication], %{lag: lag}) + + {:error, error} -> + Logger.warning("Failed to query replication lag\nError: #{inspect(error)}") + end + + :ok + end end diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index fcf55a5258..c603ec7477 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -4,6 +4,7 @@ defmodule Electric.Postgres.ReplicationClient do """ use Postgrex.ReplicationConnection + alias Electric.Utils alias Electric.Postgres.LogicalReplication.Decoder alias Electric.Postgres.Lsn alias Electric.Postgres.ReplicationClient.Collector diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry.ex index 36068fbb62..e7fa7a6e73 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry.ex @@ -6,9 +6,9 @@ defmodule Electric.Telemetry do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) end - def init(_) do + def init(stack_id: stack_id) do children = [ - {:telemetry_poller, measurements: periodic_measurements(), period: 2_000} + {:telemetry_poller, measurements: periodic_measurements(stack_id), period: 2_000} ] children @@ -72,7 +72,8 @@ defmodule Electric.Telemetry do last_value("vm.memory.ets", unit: :byte), last_value("vm.total_run_queue_lengths.total"), last_value("vm.total_run_queue_lengths.cpu"), - last_value("vm.total_run_queue_lengths.io") + last_value("vm.total_run_queue_lengths.io"), + last_value("electric.postgres.replication.lag", unit: :byte) # distribution("plug.router_dispatch.stop.duration", # tags: [:route], # unit: {:native, :millisecond} @@ -88,10 +89,12 @@ defmodule Electric.Telemetry do ] end - defp periodic_measurements do + defp periodic_measurements(stack_id) do [ # A module, function and arguments to be invoked periodically. - {__MODULE__, :uptime_event, []} + {__MODULE__, :uptime_event, []}, + {Electric.Connection.Manager, :query_replication_lag, + [Electric.Connection.Manager.name(stack_id)]} ] end