Skip to content

Commit

Permalink
chore (sync service): store OTEL attributes per stack in a persistent…
Browse files Browse the repository at this point in the history
… term (#2083)

This PR is an alternative to
#2077.
Instead of injecting the OTEL attributes and passing them around
everywhere, we store each stack's OTEL attributes in a dedicated
persistent term. When creating a span we lookup the OTEL attributes for
that stack and add them to the span.
  • Loading branch information
kevin-dp authored Dec 4, 2024
1 parent 743b00e commit e840b5f
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 74 deletions.
60 changes: 37 additions & 23 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ defmodule Electric.Plug.ServeShapePlug do
end
end

defp load_shape_info(%Conn{} = conn, _) do
OpenTelemetry.with_span("shape_get.plug.load_shape_info", [], fn ->
defp load_shape_info(%Conn{assigns: %{config: config}} = conn, _) do
OpenTelemetry.with_span("shape_get.plug.load_shape_info", [], config[:stack_id], fn ->
shape_info = get_or_create_shape_handle(conn.assigns)
handle_shape_info(conn, shape_info)
end)
Expand Down Expand Up @@ -421,13 +421,20 @@ defmodule Electric.Plug.ServeShapePlug do
)

# If offset is -1, we're serving a snapshot
defp serve_log_or_snapshot(%Conn{assigns: %{offset: @before_all_offset}} = conn, _) do
OpenTelemetry.with_span("shape_get.plug.serve_snapshot", [], fn -> serve_snapshot(conn) end)
defp serve_log_or_snapshot(
%Conn{assigns: %{offset: @before_all_offset, config: config}} = conn,
_
) do
OpenTelemetry.with_span("shape_get.plug.serve_snapshot", [], config[:stack_id], fn ->
serve_snapshot(conn)
end)
end

# Otherwise, serve log since that offset
defp serve_log_or_snapshot(conn, _) do
OpenTelemetry.with_span("shape_get.plug.serve_shape_log", [], fn -> serve_shape_log(conn) end)
defp serve_log_or_snapshot(%Conn{assigns: %{config: config}} = conn, _) do
OpenTelemetry.with_span("shape_get.plug.serve_shape_log", [], config[:stack_id], fn ->
serve_shape_log(conn)
end)
end

defp serve_snapshot(
Expand Down Expand Up @@ -513,29 +520,35 @@ defmodule Electric.Plug.ServeShapePlug do
end

defp send_stream(stream, conn, status) do
stack_id = conn.assigns.config[:stack_id]
conn = send_chunked(conn, status)

{conn, bytes_sent} =
Enum.reduce_while(stream, {conn, 0}, fn chunk, {conn, bytes_sent} ->
chunk_size = IO.iodata_length(chunk)

OpenTelemetry.with_span("shape_get.plug.stream_chunk", [chunk_size: chunk_size], fn ->
case chunk(conn, chunk) do
{:ok, conn} ->
{:cont, {conn, bytes_sent + chunk_size}}

{:error, "closed"} ->
error_str = "Connection closed unexpectedly while streaming response"
conn = assign(conn, :error_str, error_str)
{:halt, {conn, bytes_sent}}

{:error, reason} ->
error_str = "Error while streaming response: #{inspect(reason)}"
Logger.error(error_str)
conn = assign(conn, :error_str, error_str)
{:halt, {conn, bytes_sent}}
OpenTelemetry.with_span(
"shape_get.plug.stream_chunk",
[chunk_size: chunk_size],
stack_id,
fn ->
case chunk(conn, chunk) do
{:ok, conn} ->
{:cont, {conn, bytes_sent + chunk_size}}

{:error, "closed"} ->
error_str = "Connection closed unexpectedly while streaming response"
conn = assign(conn, :error_str, error_str)
{:halt, {conn, bytes_sent}}

{:error, reason} ->
error_str = "Error while streaming response: #{inspect(reason)}"
Logger.error(error_str)
conn = assign(conn, :error_str, error_str)
{:halt, {conn, bytes_sent}}
end
end
end)
)
end)

assign(conn, :streaming_bytes_sent, bytes_sent)
Expand Down Expand Up @@ -599,7 +612,8 @@ defmodule Electric.Plug.ServeShapePlug do

maybe_up_to_date = if up_to_date = assigns[:up_to_date], do: up_to_date != []

Electric.Plug.Utils.common_open_telemetry_attrs(conn)
Electric.Telemetry.OpenTelemetry.get_stack_span_attrs(assigns.config[:stack_id])
|> Map.merge(Electric.Plug.Utils.common_open_telemetry_attrs(conn))
|> Map.merge(%{
"shape.handle" => shape_handle,
"shape.where" => assigns[:where],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ defmodule Electric.Postgres.ReplicationClient do

Postgrex.ReplicationConnection.start_link(
__MODULE__,
config.replication_opts ++ [stack_id: config.stack_id],
config.replication_opts,
start_opts
)
end
Expand Down Expand Up @@ -196,11 +196,12 @@ defmodule Electric.Postgres.ReplicationClient do
{:noreply, State.t()} | {:noreply, list(binary()), State.t()}
def handle_data(
<<@repl_msg_x_log_data, _wal_start::64, wal_end::64, _clock::64, rest::binary>>,
%State{} = state
%State{stack_id: stack_id} = state
) do
OpenTelemetry.with_span(
"pg_txn.replication_client.process_x_log_data",
[msg_size: byte_size(rest)],
stack_id,
fn -> process_x_log_data(rest, wal_end, state) end
)
end
Expand All @@ -220,7 +221,7 @@ defmodule Electric.Postgres.ReplicationClient do
end
end

defp process_x_log_data(data, wal_end, %State{} = state) do
defp process_x_log_data(data, wal_end, %State{stack_id: stack_id} = state) do
OpenTelemetry.timed_fun("decode_message_duration", fn -> decode_message(data) end)
# # Useful for debugging:
# |> tap(fn %struct{} = msg ->
Expand All @@ -243,6 +244,7 @@ defmodule Electric.Postgres.ReplicationClient do
OpenTelemetry.with_span(
"pg_txn.replication_client.relation_received",
["rel.id": rel.id, "rel.schema": rel.schema, "rel.table": rel.table],
stack_id,
fn -> apply(m, f, [rel | args]) end
)

Expand Down Expand Up @@ -279,6 +281,7 @@ defmodule Electric.Postgres.ReplicationClient do
OpenTelemetry.with_span(
"pg_txn.replication_client.transaction_received",
[num_changes: txn.num_changes, num_relations: MapSet.size(txn.affected_relations)],
stack_id,
fn -> apply(m, f, [txn | args]) end
)
|> case do
Expand Down
4 changes: 2 additions & 2 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ defmodule Electric.ShapeCache do
],
prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true],
create_snapshot_fn: [
type: {:fun, 5},
default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/5
type: {:fun, 6},
default: &Shapes.Consumer.Snapshotter.query_in_readonly_txn/6
],
purge_all_shapes?: [type: :boolean, required: false]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,11 @@ defmodule Electric.ShapeCache.FileStorage do
defp offset({_, tuple_offset}), do: LogOffset.new(tuple_offset)

@impl Electric.ShapeCache.Storage
def make_new_snapshot!(data_stream, %FS{} = opts) do
def make_new_snapshot!(data_stream, %FS{stack_id: stack_id} = opts) do
OpenTelemetry.with_span(
"storage.make_new_snapshot",
[storage_impl: "mixed_disk", "shape.handle": opts.shape_handle],
stack_id,
fn ->
data_stream
|> Stream.map(&[&1, ?\n])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ defmodule Electric.ShapeCache.InMemoryStorage do
end

@impl Electric.ShapeCache.Storage
def make_new_snapshot!(data_stream, %MS{} = opts) do
def make_new_snapshot!(data_stream, %MS{stack_id: stack_id} = opts) do
OpenTelemetry.with_span(
"storage.make_new_snapshot",
[storage_impl: "in_memory", "shape.handle": opts.shape_handle],
stack_id,
fn ->
table = opts.snapshot_table

Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ defmodule Electric.Shapes.Consumer do
OpenTelemetry.with_span(
"shape_write.consumer.handle_txns",
[snapshot_xmin: state.snapshot_xmin],
state.stack_id,
fn -> handle_txns(txns, state) end
)
end
Expand All @@ -239,7 +240,7 @@ defmodule Electric.Shapes.Consumer do
[xid: txn.xid, total_num_changes: txn.num_changes] ++
shape_attrs(state.shape_handle, state.shape)

OpenTelemetry.with_span("shape_write.consumer.handle_txn", ot_attrs, fn ->
OpenTelemetry.with_span("shape_write.consumer.handle_txn", ot_attrs, state.stack_id, fn ->
do_handle_txn(txn, state)
end)
end
Expand Down
29 changes: 22 additions & 7 deletions packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,16 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
storage: storage,
run_with_conn_fn: run_with_conn_fn,
create_snapshot_fn: create_snapshot_fn,
prepare_tables_fn: prepare_tables_fn_or_mfa
prepare_tables_fn: prepare_tables_fn_or_mfa,
stack_id: stack_id
} = state

affected_tables = Shape.affected_tables(shape)

OpenTelemetry.with_span(
"shape_snapshot.create_snapshot_task",
shape_attrs(shape_handle, shape),
stack_id,
fn ->
try do
# Grab the same connection from the pool for both operations to
Expand All @@ -66,6 +68,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
OpenTelemetry.with_span(
"shape_snapshot.prepare_tables",
shape_attrs(shape_handle, shape),
stack_id,
fn ->
Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [
pool_conn,
Expand All @@ -74,7 +77,14 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
end
)

apply(create_snapshot_fn, [consumer, shape_handle, shape, pool_conn, storage])
apply(create_snapshot_fn, [
consumer,
shape_handle,
shape,
pool_conn,
storage,
stack_id
])
end
])
rescue
Expand Down Expand Up @@ -115,7 +125,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
end

@doc false
def query_in_readonly_txn(parent, shape_handle, shape, db_pool, storage) do
def query_in_readonly_txn(parent, shape_handle, shape, db_pool, storage, stack_id) do
shape_attrs = shape_attrs(shape_handle, shape)

Postgrex.transaction(
Expand All @@ -124,13 +134,15 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
OpenTelemetry.with_span(
"shape_snapshot.query_in_readonly_txn",
shape_attrs,
stack_id,
fn ->
query_span!(
conn,
"shape_snapshot.start_readonly_txn",
shape_attrs,
"SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY",
[]
[],
stack_id
)

%{rows: [[xmin]]} =
Expand All @@ -139,7 +151,8 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
"shape_snapshot.get_snapshot_xmin",
shape_attrs,
"SELECT pg_snapshot_xmin(pg_current_snapshot())",
[]
[],
stack_id
)

GenServer.cast(parent, {:snapshot_xmin_known, shape_handle, xmin})
Expand All @@ -149,12 +162,13 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
OpenTelemetry.with_span(
"shape_snapshot.set_display_settings",
shape_attrs,
stack_id,
fn ->
Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, []))
end
)

stream = Querying.stream_initial_data(conn, shape)
stream = Querying.stream_initial_data(conn, stack_id, shape)

GenServer.cast(parent, {:snapshot_started, shape_handle})

Expand All @@ -168,10 +182,11 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
)
end

defp query_span!(conn, span_name, span_attrs, query, params) do
defp query_span!(conn, span_name, span_attrs, query, params, stack_id) do
OpenTelemetry.with_span(
span_name,
span_attrs,
stack_id,
fn -> Postgrex.query!(conn, query, params) end
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ defmodule Electric.Shapes.ConsumerSupervisor do
db_pool: [type: {:or, [:atom, :pid, @name_schema_tuple]}, required: true],
prepare_tables_fn: [type: {:or, [:mfa, {:fun, 2}]}, required: true],
create_snapshot_fn: [
type: {:fun, 5},
default: &Electric.Shapes.Consumer.Snapshotter.query_in_readonly_txn/5
type: {:fun, 6},
default: &Electric.Shapes.Consumer.Snapshotter.query_in_readonly_txn/6
]
)

Expand Down
6 changes: 3 additions & 3 deletions packages/sync-service/lib/electric/shapes/querying.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ defmodule Electric.Shapes.Querying do

@type json_result_stream :: Enumerable.t(json_iodata())

@spec stream_initial_data(DBConnection.t(), Shape.t()) :: json_result_stream()
def stream_initial_data(conn, %Shape{root_table: root_table} = shape) do
OpenTelemetry.with_span("shape_read.stream_initial_data", [], fn ->
@spec stream_initial_data(DBConnection.t(), String.t(), Shape.t()) :: json_result_stream()
def stream_initial_data(conn, stack_id, %Shape{root_table: root_table} = shape) do
OpenTelemetry.with_span("shape_read.stream_initial_data", [], stack_id, fn ->
table = Utils.relation_to_sql(root_table)

where =
Expand Down
15 changes: 15 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ defmodule Electric.StackSupervisor do
keys: [
registry_partitions: [type: :non_neg_integer, required: false]
]
],
telemetry_span_attrs: [
type: {:map, :string, {:or, [:string, :integer, :float, :boolean]}},
required: false
]
)

Expand Down Expand Up @@ -192,6 +196,7 @@ defmodule Electric.StackSupervisor do
stack_events_registry: config.stack_events_registry,
replication_opts:
[
stack_id: stack_id,
transaction_received:
{Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]},
relation_received:
Expand Down Expand Up @@ -221,6 +226,16 @@ defmodule Electric.StackSupervisor do
{Electric.Connection.Supervisor, new_connection_manager_opts}
]

# Store the telemetry span attributes in the persistent term for this stack
telemetry_span_attrs = Access.get(config, :telemetry_span_attrs, %{})

if telemetry_span_attrs != %{},
do:
Electric.Telemetry.OpenTelemetry.set_stack_span_attrs(
stack_id,
telemetry_span_attrs
)

Supervisor.init(children, strategy: :one_for_one, auto_shutdown: :any_significant)
end
end
Loading

0 comments on commit e840b5f

Please sign in to comment.