Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore (sync service): report replication lag #2043

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/khaki-meals-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Telemetry for reporting replication lag.
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ defmodule Electric.Application do
children =
Enum.concat([
[
Electric.Telemetry,
{Registry, name: Registry.StackEvents, keys: :duplicate},
{Electric.StackSupervisor,
stack_id: stack_id,
Expand All @@ -64,6 +63,7 @@ defmodule Electric.Application do
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
{Electric.Telemetry, stack_id: stack_id},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
Expand Down
38 changes: 36 additions & 2 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ defmodule Electric.Connection.Manager do
:stack_events_registry,
:tweaks,
awaiting_active: [],
drop_slot_requested: false
drop_slot_requested: false,
monitoring_started?: false
]
end

Expand Down Expand Up @@ -145,6 +146,10 @@ defmodule Electric.Connection.Manager do
GenServer.cast(server, {:pg_info_looked_up, pg_info})
end

def query_replication_lag(server) do
GenServer.call(server, :query_replication_lag)
end

@impl true
def init(opts) do
# Because child processes are started via `start_link()` functions and due to how Postgrex
Expand Down Expand Up @@ -226,6 +231,11 @@ defmodule Electric.Connection.Manager do
{:reply, :ok, %{state | drop_slot_requested: true}}
end

def handle_call(:query_replication_lag, _from, state) do
report_replication_lag(state)
{:reply, :ok, state}
end

@impl true
def handle_continue(:start_lock_connection, %State{lock_connection_pid: nil} = state) do
case Electric.Postgres.LockConnection.start_link(
Expand Down Expand Up @@ -311,7 +321,12 @@ defmodule Electric.Connection.Manager do
log_collector_pid = lookup_log_collector_pid(shapes_sup_pid)
Process.monitor(log_collector_pid)

state = %{state | pool_pid: pool_pid, shape_log_collector_pid: log_collector_pid}
state = %{
state
| pool_pid: pool_pid,
shape_log_collector_pid: log_collector_pid,
monitoring_started?: true
}

for awaiting <- state.awaiting_active do
GenServer.reply(awaiting, :ok)
Expand Down Expand Up @@ -634,4 +649,23 @@ defmodule Electric.Connection.Manager do
Logger.error("Failed to execute query: #{query}\nError: #{inspect(error)}")
end
end

defp report_replication_lag(%{monitoring_started?: false}), do: :ok

defp report_replication_lag(%{pool_pid: pool} = state) do
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)

query =
"SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS replication_lag_size FROM pg_replication_slots WHERE slot_name = $1;"

case Postgrex.query(pool, query, [slot_name]) do
{:ok, %Postgrex.Result{rows: [[lag]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{lag: lag})

{:error, error} ->
Logger.warning("Failed to query replication lag\nError: #{inspect(error)}")
end

:ok
end
end
19 changes: 19 additions & 0 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ defmodule Electric.Shapes.Consumer do

notify_listeners(registry, :new_changes, shape_handle, last_log_offset)

report_replication_lag(txn)

{:cont, notify(txn, %{state | log_state: new_log_state})}

true ->
Expand Down Expand Up @@ -423,4 +425,21 @@ defmodule Electric.Shapes.Consumer do
"shape.where": shape.where
]
end

defp report_replication_lag(%Transaction{commit_timestamp: commit_timestamp}) do
# Compute time elapsed since commit
# since we are comparing PG's clock with our own
# there may be a slight skew so we make sure not to report negative lag.
# Since the lag is only useful when it becomes significant, a slight skew doesn't matter.
now = DateTime.utc_now()
lag = Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond))

OpenTelemetry.with_span(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this just be an attribute on the handle transaction span?

"shape_write.consumer.do_handle_txn.report_replication_lag",
[lag: lag],
fn ->
lag
end
)
end
end
13 changes: 8 additions & 5 deletions packages/sync-service/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ defmodule Electric.Telemetry do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def init(_) do
def init(stack_id: stack_id) do
children = [
{:telemetry_poller, measurements: periodic_measurements(), period: 2_000}
{:telemetry_poller, measurements: periodic_measurements(stack_id), period: 2_000}
]

children
Expand Down Expand Up @@ -72,7 +72,8 @@ defmodule Electric.Telemetry do
last_value("vm.memory.ets", unit: :byte),
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
last_value("vm.total_run_queue_lengths.io")
last_value("vm.total_run_queue_lengths.io"),
last_value("electric.postgres.replication.lag", unit: :byte)
# distribution("plug.router_dispatch.stop.duration",
# tags: [:route],
# unit: {:native, :millisecond}
Expand All @@ -88,10 +89,12 @@ defmodule Electric.Telemetry do
]
end

defp periodic_measurements do
defp periodic_measurements(stack_id) do
[
# A module, function and arguments to be invoked periodically.
{__MODULE__, :uptime_event, []}
{__MODULE__, :uptime_event, []},
{Electric.Connection.Manager, :query_replication_lag,
[Electric.Connection.Manager.name(stack_id)]}
]
end

Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,8 @@ defmodule Electric.ShapeCacheTest do
xid: @xid,
last_log_offset: @change_offset,
lsn: @lsn,
affected_relations: MapSet.new([{"public", "items"}])
affected_relations: MapSet.new([{"public", "items"}]),
commit_timestamp: DateTime.utc_now()
},
context.shape_log_collector
)
Expand Down
56 changes: 48 additions & 8 deletions packages/sync-service/test/electric/shapes/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ defmodule Electric.Shapes.ConsumerTest do
Registry.register(ctx.registry, @shape_handle1, ref)

txn =
%Transaction{xid: xmin, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xmin,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -222,7 +227,12 @@ defmodule Electric.Shapes.ConsumerTest do
Registry.register(ctx.registry, @shape_handle2, ref2)

txn =
%Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xid,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -277,7 +287,12 @@ defmodule Electric.Shapes.ConsumerTest do
)

txn =
%Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xid,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -359,7 +374,12 @@ defmodule Electric.Shapes.ConsumerTest do
)

txn =
%Transaction{xid: xid, lsn: lsn, last_log_offset: last_log_offset}
%Transaction{
xid: xid,
lsn: lsn,
last_log_offset: last_log_offset,
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.TruncatedRelation{
relation: {"public", "test_table"}
})
Expand Down Expand Up @@ -509,7 +529,12 @@ defmodule Electric.Shapes.ConsumerTest do
lsn = Lsn.from_string("0/10")

txn =
%Transaction{xid: 150, lsn: lsn, last_log_offset: LogOffset.new(lsn, 0)}
%Transaction{
xid: 150,
lsn: lsn,
last_log_offset: LogOffset.new(lsn, 0),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "1"},
Expand Down Expand Up @@ -632,7 +657,12 @@ defmodule Electric.Shapes.ConsumerTest do
ref = Shapes.Consumer.monitor(ctx.stack_id, shape_handle)

txn =
%Transaction{xid: 11, lsn: lsn, last_log_offset: LogOffset.new(lsn, 2)}
%Transaction{
xid: 11,
lsn: lsn,
last_log_offset: LogOffset.new(lsn, 2),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2"},
Expand Down Expand Up @@ -679,7 +709,12 @@ defmodule Electric.Shapes.ConsumerTest do
ref = Shapes.Consumer.monitor(ctx.stack_id, shape_handle)

txn1 =
%Transaction{xid: 9, lsn: lsn1, last_log_offset: LogOffset.new(lsn1, 2)}
%Transaction{
xid: 9,
lsn: lsn1,
last_log_offset: LogOffset.new(lsn1, 2),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2"},
Expand All @@ -692,7 +727,12 @@ defmodule Electric.Shapes.ConsumerTest do
})

txn2 =
%Transaction{xid: 10, lsn: lsn2, last_log_offset: LogOffset.new(lsn2, 2)}
%Transaction{
xid: 10,
lsn: lsn2,
last_log_offset: LogOffset.new(lsn2, 2),
commit_timestamp: DateTime.utc_now()
}
|> Transaction.prepend_change(%Changes.NewRecord{
relation: {"public", "test_table"},
record: %{"id" => "2"},
Expand Down