diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 4158026778..0b4796b0f2 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -304,6 +304,8 @@ defmodule Electric.Shapes.Consumer do notify_listeners(registry, :new_changes, shape_handle, last_log_offset) + report_replication_lag(txn) + {:cont, notify(txn, %{state | log_state: new_log_state})} true -> @@ -423,4 +425,21 @@ defmodule Electric.Shapes.Consumer do "shape.where": shape.where ] end + + defp report_replication_lag(%Transaction{commit_timestamp: commit_timestamp}) do + # Compute time elapsed since commit + # since we are comparing PG's clock with our own + # there may be a slight skew so we make sure not to report negative lag. + # Since the lag is only useful when it becomes significant, a slight skew doesn't matter. + now = DateTime.utc_now() + lag = Kernel.max(0, DateTime.diff(now, commit_timestamp, :millisecond)) + + OpenTelemetry.with_span( + "shape_write.consumer.do_handle_txn.report_replication_lag", + [lag: lag], + fn -> + lag + end + ) + end end