Skip to content

Commit

Permalink
chore (sync service): remove redundant spans (#2037)
Browse files Browse the repository at this point in the history
Part of #2032.
I removed 2 spans that seem redundant:

- `shape_write.log_collector.handle_txn`: this span wraps the
`handle_transaction` function. However, there is already a span
`pg_txn.replication_client.transaction_received` that in fact calls into
`handle_transaction`.
- `shape_write.log_collector.handle_relation`: this span wraps the
handling of relation messages. Similarly to transactions, there is a
`pg_txn.replication_client.relation_received` span that ends up calling
into the `handle_relation` function.

Here are the relevant code snippets:

`Electric.Postgres.ReplicationClient`:
```ex
{m, f, args} = state.transaction_received

OpenTelemetry.with_span(
  "pg_txn.replication_client.transaction_received",
  [num_changes: length(txn.changes), num_relations: MapSet.size(txn.affected_relations)],
  fn -> apply(m, f, [txn | args]) end
)
```

The call to `apply` is a chain of calls that eventually ends up calling
`handle_transaction` (and does nothing more):

`Electric.StackSupervisor`:
```ex
transaction_received: {Electric.Replication.ShapeLogCollector, :store_transaction, [shape_log_collector]}
```

`Electric.Replication.ShapeLogCollector.ex`:
```ex
def store_transaction(%Transaction{} = txn, server) do
  ot_span_ctx = OpenTelemetry.get_current_context()
  GenStage.call(server, {:new_txn, txn, ot_span_ctx}, :infinity)
end

def handle_call({:new_txn, %Transaction{xid: xid, lsn: lsn} = txn, ot_span_ctx}, from, state) do
  OpenTelemetry.set_current_context(ot_span_ctx)
  Logger.info("Received transaction #{xid} from Postgres at #{lsn}")
  Logger.debug(fn -> "Txn received in ShapeLogCollector: #{inspect(txn)}" end)

  OpenTelemetry.with_span("shape_write.log_collector.handle_txn", [], fn ->
    handle_transaction(txn, from, state) 
  end)
end
```

### Question

I removed the spans from `Electric.Replication.ShapeLogCollector`,
another option would be to remove the spans from
`Electric.Postgres.ReplicationClient`, any preference here?
  • Loading branch information
kevin-dp authored Dec 2, 2024
1 parent 32d4b50 commit 0dc844f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/few-gorillas-fetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Remove redundant spans in open telemetry tracing.
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ defmodule Electric.Replication.ShapeLogCollector do
# determining how long a write should reasonably take and if that fails
# it should raise.
def store_transaction(%Transaction{} = txn, server) do
ot_span_ctx = OpenTelemetry.get_current_context()
GenStage.call(server, {:new_txn, txn, ot_span_ctx}, :infinity)
GenStage.call(server, {:new_txn, txn}, :infinity)
end

def handle_relation_msg(%Changes.Relation{} = rel, server) do
ot_span_ctx = OpenTelemetry.get_current_context()
GenServer.call(server, {:relation_msg, rel, ot_span_ctx}, :infinity)
GenServer.call(server, {:relation_msg, rel}, :infinity)
end

def init(opts) do
Expand Down Expand Up @@ -93,26 +91,18 @@ defmodule Electric.Replication.ShapeLogCollector do
{:noreply, [], remove_subscription(from, state)}
end

def handle_call({:new_txn, %Transaction{xid: xid, lsn: lsn} = txn, ot_span_ctx}, from, state) do
OpenTelemetry.set_current_context(ot_span_ctx)

def handle_call({:new_txn, %Transaction{xid: xid, lsn: lsn} = txn}, from, state) do
Logger.info("Received transaction #{xid} from Postgres at #{lsn}")
Logger.debug(fn -> "Txn received in ShapeLogCollector: #{inspect(txn)}" end)

OpenTelemetry.with_span("shape_write.log_collector.handle_txn", [], fn ->
handle_transaction(txn, from, state)
end)
handle_transaction(txn, from, state)
end

def handle_call({:relation_msg, %Relation{} = rel, ot_span_ctx}, from, state) do
OpenTelemetry.set_current_context(ot_span_ctx)

def handle_call({:relation_msg, %Relation{} = rel}, from, state) do
Logger.info("Received relation #{inspect(rel.schema)}.#{inspect(rel.table)}")
Logger.debug(fn -> "Relation received in ShapeLogCollector: #{inspect(rel)}" end)

OpenTelemetry.with_span("shape_write.log_collector.handle_relation", [], fn ->
handle_relation(rel, from, state)
end)
handle_relation(rel, from, state)
end

# If no-one is listening to the replication stream, then just return without
Expand Down

0 comments on commit 0dc844f

Please sign in to comment.