Skip to content

Commit

Permalink
feat: add more metrics and expose them as usage telemetry (#2057)
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter authored Dec 3, 2024
1 parent bb27ee9 commit a16ab24
Show file tree
Hide file tree
Showing 30 changed files with 715 additions and 84 deletions.
5 changes: 5 additions & 0 deletions .changeset/five-masks-check.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

feat: add more telemetry
4 changes: 3 additions & 1 deletion packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,6 @@ config :electric,
prometheus_port: prometheus_port,
storage: storage,
persistent_kv: persistent_kv,
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false)
listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false),
call_home_telemetry: env!("ELECTRIC_USAGE_REPORTING", :boolean, Mix.env() == :prod),
telemetry_url: "https://checkpoint.electric-sql.com"
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ defmodule Electric.Application do
pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)],
storage: Application.fetch_env!(:electric, :storage),
chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)},
{Electric.Telemetry, stack_id: stack_id},
{Electric.Telemetry,
stack_id: stack_id, storage: Application.fetch_env!(:electric, :storage)},
{Bandit,
plug: {Electric.Plug.Router, router_opts},
port: Application.fetch_env!(:electric, :service_port),
Expand Down
18 changes: 15 additions & 3 deletions packages/sync-service/lib/electric/connection/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ defmodule Electric.Connection.Manager do
def handle_call(:report_retained_wal_size, _from, state) do
if state.monitoring_started? do
slot_name = Keyword.fetch!(state.replication_opts, :slot_name)
query_and_report_retained_wal_size(state.pool_pid, slot_name)
query_and_report_retained_wal_size(state.pool_pid, slot_name, state.stack_id)
end

{:reply, :ok, state}
Expand Down Expand Up @@ -426,6 +426,16 @@ defmodule Electric.Connection.Manager do
end

def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do
:telemetry.execute(
[:electric, :postgres, :info_looked_up],
%{
pg_version: server_version,
pg_system_identifier: system_identifier,
pg_timeline_id: timeline_id
},
%{stack_id: state.stack_id}
)

{:noreply,
%{
state
Expand Down Expand Up @@ -654,7 +664,7 @@ defmodule Electric.Connection.Manager do
end
end

defp query_and_report_retained_wal_size(pool, slot_name) do
defp query_and_report_retained_wal_size(pool, slot_name, stack_id) do
query = """
SELECT
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
Expand All @@ -666,7 +676,9 @@ defmodule Electric.Connection.Manager do

case Postgrex.query(pool, query, [slot_name]) do
{:ok, %Postgrex.Result{rows: [[wal_size]]}} ->
:telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size})
:telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size}, %{
stack_id: stack_id
})

{:error, error} ->
Logger.warning("Failed to query retained WAL size\nError: #{inspect(error)}")
Expand Down
1 change: 1 addition & 0 deletions packages/sync-service/lib/electric/plug/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Electric.Plug.Router do
plug :server_header, Electric.version()
# converts HEAD requests to GET requests
plug Plug.Head
plug RemoteIp
plug :match
plug Electric.Plug.LabelProcessPlug
plug Electric.Plug.TraceContextPlug
Expand Down
25 changes: 19 additions & 6 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,7 @@ defmodule Electric.Plug.ServeShapePlug do

defp open_telemetry_attrs(%Conn{assigns: assigns} = conn) do
shape_handle =
if is_struct(conn.query_params, Plug.Conn.Unfetched) do
assigns[:active_shape_handle] || assigns[:shape_handle]
else
conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:shape_handle]
end
conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:handle]

maybe_up_to_date = if up_to_date = assigns[:up_to_date], do: up_to_date != []

Expand Down Expand Up @@ -626,7 +622,24 @@ defmodule Electric.Plug.ServeShapePlug do
# We want to have all the relevant HTTP and shape request attributes on the root span. This
# is the place to assign them because we keep this plug last in the "plug pipeline" defined
# in this module.
defp end_telemetry_span(conn, _ \\ nil) do
defp end_telemetry_span(%Conn{assigns: assigns} = conn, _ \\ nil) do
:telemetry.execute(
[:electric, :plug, :serve_shape],
%{
count: 1,
bytes: assigns[:streaming_bytes_sent] || 0,
monotonic_time: System.monotonic_time()
},
%{
live: assigns[:live],
shape_handle:
conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:handle],
client_ip: conn.remote_ip,
status: conn.status,
stack_id: assigns.config[:stack_id]
}
)

add_span_attrs_from_conn(conn)
OpentelemetryTelemetry.end_telemetry_span(OpenTelemetry, %{})
conn
Expand Down
14 changes: 1 addition & 13 deletions packages/sync-service/lib/electric/plug/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ defmodule Electric.Plug.Utils do
"error.type" => assigns[:error_str],
"http.request_id" => assigns[:plug_request_id],
"http.query_string" => conn.query_string,
SC.ClientAttributes.client_address() => client_ip(conn),
SC.ClientAttributes.client_address() => conn.remote_ip,
SC.ServerAttributes.server_address() => conn.host,
SC.ServerAttributes.server_port() => conn.port,
SC.HTTPAttributes.http_request_method() => conn.method,
Expand Down Expand Up @@ -115,18 +115,6 @@ defmodule Electric.Plug.Utils do
|> Map.merge(Map.new(conn.resp_headers, fn {k, v} -> {"http.response.header.#{k}", v} end))
end

defp client_ip(%Plug.Conn{remote_ip: remote_ip} = conn) do
case Plug.Conn.get_req_header(conn, "x-forwarded-for") do
[] ->
remote_ip
|> :inet_parse.ntoa()
|> to_string()

[ip_address | _] ->
ip_address
end
end

defp user_agent(%Plug.Conn{} = conn) do
case Plug.Conn.get_req_header(conn, "user-agent") do
[] -> ""
Expand Down
25 changes: 22 additions & 3 deletions packages/sync-service/lib/electric/postgres/replication_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Electric.Postgres.ReplicationClient do
"""
use Postgrex.ReplicationConnection

alias Electric.Replication.Changes.Transaction
alias Electric.Postgres.LogicalReplication.Decoder
alias Electric.Postgres.Lsn
alias Electric.Postgres.ReplicationClient.Collector
Expand All @@ -26,6 +27,7 @@ defmodule Electric.Postgres.ReplicationClient do
defmodule State do
@enforce_keys [:transaction_received, :relation_received, :publication_name]
defstruct [
:stack_id,
:connection_manager,
:transaction_received,
:relation_received,
Expand All @@ -50,6 +52,7 @@ defmodule Electric.Postgres.ReplicationClient do
]

@type t() :: %__MODULE__{
stack_id: String.t(),
connection_manager: pid(),
transaction_received: {module(), atom(), [term()]},
relation_received: {module(), atom(), [term()]},
Expand All @@ -66,6 +69,7 @@ defmodule Electric.Postgres.ReplicationClient do
}

@opts_schema NimbleOptions.new!(
stack_id: [required: true, type: :string],
connection_manager: [required: true, type: :pid],
transaction_received: [required: true, type: :mfa],
relation_received: [required: true, type: :mfa],
Expand Down Expand Up @@ -105,7 +109,11 @@ defmodule Electric.Postgres.ReplicationClient do
auto_reconnect: false
] ++ Electric.Utils.deobfuscate_password(config.connection_opts)

Postgrex.ReplicationConnection.start_link(__MODULE__, config.replication_opts, start_opts)
Postgrex.ReplicationConnection.start_link(
__MODULE__,
config.replication_opts ++ [stack_id: config.stack_id],
start_opts
)
end

def name(stack_id) do
Expand Down Expand Up @@ -240,11 +248,22 @@ defmodule Electric.Postgres.ReplicationClient do

{:noreply, %{state | txn_collector: txn_collector}}

{txn, %Collector{} = txn_collector} ->
{%Transaction{} = txn, %Collector{} = txn_collector} ->
state = %{state | txn_collector: txn_collector}

{m, f, args} = state.transaction_received

:telemetry.execute(
[:electric, :postgres, :replication, :transaction_received],
%{
monotonic_time: System.monotonic_time(),
bytes: byte_size(data),
count: 1,
operations: txn.num_changes
},
%{stack_id: state.stack_id}
)

# this will block until all the consumers have processed the transaction because
# the log collector uses manual demand, and only replies to the `call` once it
# receives more demand.
Expand All @@ -259,7 +278,7 @@ defmodule Electric.Postgres.ReplicationClient do
# individual storage write can timeout the entire batch.
OpenTelemetry.with_span(
"pg_txn.replication_client.transaction_received",
[num_changes: length(txn.changes), num_relations: MapSet.size(txn.affected_relations)],
[num_changes: txn.num_changes, num_relations: MapSet.size(txn.affected_relations)],
fn -> apply(m, f, [txn | args]) end
)
|> case do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Electric.Postgres.ReplicationClient.Collector do
Handle incoming logical replication message by either building up a transaction or
returning a complete built up transaction.
"""
@spec handle_message(LR.message(), t()) :: t() | {Transaction.t(), t()}
@spec handle_message(LR.message(), t()) :: t() | {Transaction.t() | Relation.t(), t()}
def handle_message(%LR.Message{} = msg, state) do
Logger.info("Got a message from PG via logical replication: #{inspect(msg)}")

Expand Down
11 changes: 7 additions & 4 deletions packages/sync-service/lib/electric/replication/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule Electric.Replication.Changes do
@type t() :: %__MODULE__{
xid: Changes.xid() | nil,
changes: [Changes.change()],
num_changes: non_neg_integer(),
affected_relations: MapSet.t(Changes.relation_name()),
commit_timestamp: DateTime.t(),
lsn: Electric.Postgres.Lsn.t(),
Expand All @@ -53,6 +54,7 @@ defmodule Electric.Replication.Changes do
:lsn,
:last_log_offset,
changes: [],
num_changes: 0,
affected_relations: MapSet.new()
]

Expand All @@ -70,6 +72,7 @@ defmodule Electric.Replication.Changes do
%{
txn
| changes: [change | changes],
num_changes: txn.num_changes + 1,
affected_relations: MapSet.put(rels, rel)
}
end
Expand All @@ -82,7 +85,7 @@ defmodule Electric.Replication.Changes do
relation: Changes.relation_name(),
record: Changes.record(),
log_offset: LogOffset.t(),
key: String.t()
key: String.t() | nil
}
end

Expand All @@ -103,8 +106,8 @@ defmodule Electric.Replication.Changes do
old_record: Changes.record() | nil,
record: Changes.record(),
log_offset: LogOffset.t(),
key: String.t(),
old_key: String.t(),
key: String.t() | nil,
old_key: String.t() | nil,
tags: [Changes.tag()],
changed_columns: MapSet.t()
}
Expand Down Expand Up @@ -148,7 +151,7 @@ defmodule Electric.Replication.Changes do
relation: Changes.relation_name(),
old_record: Changes.record(),
log_offset: LogOffset.t(),
key: String.t(),
key: String.t() | nil,
tags: [Changes.tag()]
}
end
Expand Down
15 changes: 9 additions & 6 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ defmodule Electric.ShapeCacheBehaviour do
@callback update_shape_latest_offset(shape_handle(), LogOffset.t(), keyword()) :: :ok

@callback get_shape(shape_def(), opts :: keyword()) ::
{shape_handle(), current_snapshot_offset :: LogOffset.t()}
{shape_handle(), current_snapshot_offset :: LogOffset.t()} | nil
@callback get_or_create_shape_handle(shape_def(), opts :: keyword()) ::
{shape_handle(), current_snapshot_offset :: LogOffset.t()}
@callback list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_handle(), Shape.t()}]
@callback list_shapes(keyword() | map()) :: [{shape_handle(), Shape.t()}]
@callback await_snapshot_start(shape_handle(), opts :: keyword()) :: :started | {:error, term()}
@callback handle_truncate(shape_handle(), keyword()) :: :ok
@callback clean_shape(shape_handle(), keyword()) :: :ok
@callback clean_all_shapes(GenServer.name()) :: :ok
@callback clean_all_shapes(keyword()) :: :ok
@callback has_shape?(shape_handle(), keyword()) :: boolean()
end

Expand Down Expand Up @@ -132,10 +132,12 @@ defmodule Electric.ShapeCache do
end

@impl Electric.ShapeCacheBehaviour
@spec list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_handle(), Shape.t()}]
@spec list_shapes(keyword()) :: [{shape_handle(), Shape.t()}]
def list_shapes(opts) do
shape_status = Access.get(opts, :shape_status, ShapeStatus)
shape_status.list_shapes(opts)
shape_status.list_shapes(%ShapeStatus{shape_meta_table: get_shape_meta_table(opts)})
rescue
ArgumentError -> []
end

@impl Electric.ShapeCacheBehaviour
Expand Down Expand Up @@ -364,5 +366,6 @@ defmodule Electric.ShapeCache do
end
end

def get_shape_meta_table(opts), do: :"#{opts[:stack_id]}:shape_meta_table"
def get_shape_meta_table(opts),
do: opts[:shape_meta_table] || :"#{opts[:stack_id]}:shape_meta_table"
end
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Electric.ShapeCache.CrashingFileStorage do
defdelegate start_link(opts), to: FileStorage
defdelegate set_shape_definition(shape, opts), to: FileStorage
defdelegate get_all_stored_shapes(opts), to: FileStorage
defdelegate get_total_disk_usage(opts), to: FileStorage
defdelegate get_current_position(opts), to: FileStorage
defdelegate set_snapshot_xmin(xmin, opts), to: FileStorage
defdelegate snapshot_started?(opts), to: FileStorage
Expand Down
25 changes: 25 additions & 0 deletions packages/sync-service/lib/electric/shape_cache/file_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ defmodule Electric.ShapeCache.FileStorage do
end
end

@impl Electric.ShapeCache.Storage
def get_total_disk_usage(%{base_path: shapes_dir} = opts) do
case File.ls(shapes_dir) do
{:ok, shape_handles} ->
shape_handles
|> Enum.map(&for_shape(&1, opts))
|> Enum.map(fn fs ->
maybe_get_size(shape_definition_path(fs)) +
maybe_get_size(shape_snapshot_path(fs)) +
maybe_get_size(CubDB.current_db_file(fs.db))
end)
|> Enum.sum()

_ ->
0
end
end

defp maybe_get_size(path) do
case File.stat(path) do
{:ok, stat} -> stat.size
{:error, _} -> 0
end
end

@impl Electric.ShapeCache.Storage
def get_current_position(%FS{} = opts) do
{:ok, latest_offset(opts), snapshot_xmin(opts)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ defmodule Electric.ShapeCache.InMemoryStorage do
{:ok, %{}}
end

@impl Electric.ShapeCache.Storage
def get_total_disk_usage(_opts) do
0
end

@impl Electric.ShapeCache.Storage
def snapshot_started?(%MS{} = opts) do
try do
Expand Down
Loading

0 comments on commit a16ab24

Please sign in to comment.