diff --git a/.changeset/khaki-meals-accept.md b/.changeset/khaki-meals-accept.md new file mode 100644 index 0000000000..454dedb625 --- /dev/null +++ b/.changeset/khaki-meals-accept.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Telemetry for reporting replication lag. diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 54686b2015..44bf27b09a 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -49,7 +49,6 @@ defmodule Electric.Application do children = Enum.concat([ [ - Electric.Telemetry, {Registry, name: Registry.StackEvents, keys: :duplicate}, {Electric.StackSupervisor, stack_id: stack_id, @@ -64,6 +63,7 @@ defmodule Electric.Application do pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)], storage: Application.fetch_env!(:electric, :storage), chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)}, + {Electric.Telemetry, stack_id: stack_id}, {Bandit, plug: {Electric.Plug.Router, router_opts}, port: Application.fetch_env!(:electric, :service_port), diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index 4a4b57b3aa..760c2456cd 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -63,7 +63,8 @@ defmodule Electric.Connection.Manager do :stack_events_registry, :tweaks, awaiting_active: [], - drop_slot_requested: false + drop_slot_requested: false, + monitoring_started?: false ] end @@ -145,6 +146,10 @@ defmodule Electric.Connection.Manager do GenServer.cast(server, {:pg_info_looked_up, pg_info}) end + def query_replication_lag(server) do + GenServer.call(server, :query_replication_lag) + end + @impl true def init(opts) do # Because child processes are started via `start_link()` functions and due to how Postgrex @@ -226,6 +231,11 @@ defmodule Electric.Connection.Manager do {:reply, :ok, %{state | drop_slot_requested: true}} end + def handle_call(:query_replication_lag, _from, state) do + report_replication_lag(state) + {:reply, :ok, state} + end + @impl true def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do case Electric.Postgres.LockConnection.start_link( @@ -311,7 +321,12 @@ defmodule Electric.Connection.Manager do log_collector_pid = lookup_log_collector_pid(shapes_sup_pid) Process.monitor(log_collector_pid) - state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid} + state = %{ + state + | pool_pid: pool_pid, + shape_log_collector_pid: log_collector_pid, + monitoring_started?: true + } for awaiting <- state.awaiting_active do GenServer.reply(awaiting, :ok) @@ -634,4 +649,23 @@ defmodule Electric.Connection.Manager do Logger.error("Failed to execute query: #{query}\nError: #{inspect(error)}") end end + + defp report_replication_lag(%{monitoring_started?: false}), do: :ok + + defp report_replication_lag(%{pool_pid: pool} = state) do + slot_name = Keyword.fetch!(state.replication_opts, :slot_name) + + query = + "SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_size FROM pg_replication_slots WHERE slot_name = $1;" + + case Postgrex.query(pool, query, [slot_name]) do + {:ok, %Postgrex.Result{rows: [[lag]]}} -> + :telemetry.execute([:electric, :postgres, :replication], %{lag: lag}) + + {:error, error} -> + Logger.warning("Failed to query replication lag\nError: #{inspect(error)}") + end + + :ok + end end diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 4158026778..0b4796b0f2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -304,6 +304,8 @@ defmodule Electric.Shapes.Consumer do notify_listeners(registry, :new_changes, shape_handle, last_log_offset) + report_replication_lag(txn) + {:cont, notify(txn, %{state | log_state: new_log_state})} true -> @@ -423,4 +425,21 @@ defmodule Electric.Shapes.Consumer do "shape.where": shape.where ] end + + defp report_replication_lag(%Transaction{commit_timestamp: commit_timestamp}) do + # Compute time elapsed since commit + # since we are comparing PG's clock with our own + # there may be a slight skew so we make sure not to report negative lag. + # Since the lag is only useful when it becomes significant, a slight skew doesn't matter. + now = DateTime.utc_now() + lag = Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) + + OpenTelemetry.with_span( + "shape_write.consumer.do_handle_txn.report_replication_lag", + [lag: lag], + fn -> + lag + end + ) + end end diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry.ex index 36068fbb62..e7fa7a6e73 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry.ex @@ -6,9 +6,9 @@ defmodule Electric.Telemetry do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) end - def init(_) do + def init(stack_id: stack_id) do children = [ - {:telemetry_poller, measurements: periodic_measurements(), period: 2_000} + {:telemetry_poller, measurements: periodic_measurements(stack_id), period: 2_000} ] children @@ -72,7 +72,8 @@ defmodule Electric.Telemetry do last_value("vm.memory.ets", unit: :byte), last_value("vm.total_run_queue_lengths.total"), last_value("vm.total_run_queue_lengths.cpu"), - last_value("vm.total_run_queue_lengths.io") + last_value("vm.total_run_queue_lengths.io"), + last_value("electric.postgres.replication.lag", unit: :byte) # distribution("plug.router_dispatch.stop.duration", # tags: [:route], # unit: {:native, :millisecond} @@ -88,10 +89,12 @@ defmodule Electric.Telemetry do ] end - defp periodic_measurements do + defp periodic_measurements(stack_id) do [ # A module, function and arguments to be invoked periodically. - {__MODULE__, :uptime_event, []} + {__MODULE__, :uptime_event, []}, + {Electric.Connection.Manager, :query_replication_lag, + [Electric.Connection.Manager.name(stack_id)]} ] end diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 67e11e57be..1dace24830 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -857,7 +857,8 @@ defmodule Electric.ShapeCacheTest do xid: @xid, last_log_offset: @change_offset, lsn: @lsn, - affected_relations: MapSet.new([{"public", "items"}]) + affected_relations: MapSet.new([{"public", "items"}]), + commit_timestamp: DateTime.utc_now() }, context.shape_log_collector ) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 030209e87b..728566f39b 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -181,7 +181,12 @@ defmodule Electric.Shapes.ConsumerTest do Registry.register(ctx.registry, @shape_handle1, ref) txn = - %Transaction{xid: xmin, lsn: lsn, last_log_offset: last_log_offset} + %Transaction{ + xid: xmin, + lsn: lsn, + last_log_offset: last_log_offset, + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "1"}, @@ -222,7 +227,12 @@ defmodule Electric.Shapes.ConsumerTest do Registry.register(ctx.registry, @shape_handle2, ref2) txn = - %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} + %Transaction{ + xid: xid, + lsn: lsn, + last_log_offset: last_log_offset, + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "1"}, @@ -277,7 +287,12 @@ defmodule Electric.Shapes.ConsumerTest do ) txn = - %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} + %Transaction{ + xid: xid, + lsn: lsn, + last_log_offset: last_log_offset, + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "1"}, @@ -359,7 +374,12 @@ defmodule Electric.Shapes.ConsumerTest do ) txn = - %Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset} + %Transaction{ + xid: xid, + lsn: lsn, + last_log_offset: last_log_offset, + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.TruncatedRelation{ relation: {"public", "test_table"} }) @@ -509,7 +529,12 @@ defmodule Electric.Shapes.ConsumerTest do lsn = Lsn.from_string("0/10") txn = - %Transaction{xid: 150, lsn: lsn, last_log_offset: LogOffset.new(lsn, 0)} + %Transaction{ + xid: 150, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 0), + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "1"}, @@ -632,7 +657,12 @@ defmodule Electric.Shapes.ConsumerTest do ref = Shapes.Consumer.monitor(ctx.stack_id, shape_handle) txn = - %Transaction{xid: 11, lsn: lsn, last_log_offset: LogOffset.new(lsn, 2)} + %Transaction{ + xid: 11, + lsn: lsn, + last_log_offset: LogOffset.new(lsn, 2), + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "2"}, @@ -679,7 +709,12 @@ defmodule Electric.Shapes.ConsumerTest do ref = Shapes.Consumer.monitor(ctx.stack_id, shape_handle) txn1 = - %Transaction{xid: 9, lsn: lsn1, last_log_offset: LogOffset.new(lsn1, 2)} + %Transaction{ + xid: 9, + lsn: lsn1, + last_log_offset: LogOffset.new(lsn1, 2), + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "2"}, @@ -692,7 +727,12 @@ defmodule Electric.Shapes.ConsumerTest do }) txn2 = - %Transaction{xid: 10, lsn: lsn2, last_log_offset: LogOffset.new(lsn2, 2)} + %Transaction{ + xid: 10, + lsn: lsn2, + last_log_offset: LogOffset.new(lsn2, 2), + commit_timestamp: DateTime.utc_now() + } |> Transaction.prepend_change(%Changes.NewRecord{ relation: {"public", "test_table"}, record: %{"id" => "2"},