diff --git a/.changeset/five-masks-check.md b/.changeset/five-masks-check.md new file mode 100644 index 0000000000..59af5ab8a0 --- /dev/null +++ b/.changeset/five-masks-check.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +feat: add more telemetry diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index b1b6b79f2b..db4c0b2c46 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -206,4 +206,6 @@ config :electric, prometheus_port: prometheus_port, storage: storage, persistent_kv: persistent_kv, - listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false) + listen_on_ipv6?: env!("ELECTRIC_LISTEN_ON_IPV6", :boolean, false), + call_home_telemetry: env!("ELECTRIC_USAGE_REPORTING", :boolean, Mix.env() == :prod), + telemetry_url: "https://checkpoint.electric-sql.com" diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 44bf27b09a..5c887d4081 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -63,7 +63,8 @@ defmodule Electric.Application do pool_opts: [pool_size: Application.fetch_env!(:electric, :db_pool_size)], storage: Application.fetch_env!(:electric, :storage), chunk_bytes_threshold: Application.fetch_env!(:electric, :chunk_bytes_threshold)}, - {Electric.Telemetry, stack_id: stack_id}, + {Electric.Telemetry, + stack_id: stack_id, storage: Application.fetch_env!(:electric, :storage)}, {Bandit, plug: {Electric.Plug.Router, router_opts}, port: Application.fetch_env!(:electric, :service_port), diff --git a/packages/sync-service/lib/electric/connection/manager.ex b/packages/sync-service/lib/electric/connection/manager.ex index e9438ee033..d369c54103 100644 --- a/packages/sync-service/lib/electric/connection/manager.ex +++ b/packages/sync-service/lib/electric/connection/manager.ex @@ -234,7 +234,7 @@ defmodule Electric.Connection.Manager do def handle_call(:report_retained_wal_size, _from, state) do if state.monitoring_started? do slot_name = Keyword.fetch!(state.replication_opts, :slot_name) - query_and_report_retained_wal_size(state.pool_pid, slot_name) + query_and_report_retained_wal_size(state.pool_pid, slot_name, state.stack_id) end {:reply, :ok, state} @@ -426,6 +426,16 @@ defmodule Electric.Connection.Manager do end def handle_cast({:pg_info_looked_up, {server_version, system_identifier, timeline_id}}, state) do + :telemetry.execute( + [:electric, :postgres, :info_looked_up], + %{ + pg_version: server_version, + pg_system_identifier: system_identifier, + pg_timeline_id: timeline_id + }, + %{stack_id: state.stack_id} + ) + {:noreply, %{ state @@ -654,7 +664,7 @@ defmodule Electric.Connection.Manager do end end - defp query_and_report_retained_wal_size(pool, slot_name) do + defp query_and_report_retained_wal_size(pool, slot_name, stack_id) do query = """ SELECT pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) @@ -666,7 +676,9 @@ defmodule Electric.Connection.Manager do case Postgrex.query(pool, query, [slot_name]) do {:ok, %Postgrex.Result{rows: [[wal_size]]}} -> - :telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size}) + :telemetry.execute([:electric, :postgres, :replication], %{wal_size: wal_size}, %{ + stack_id: stack_id + }) {:error, error} -> Logger.warning("Failed to query retained WAL size\nError: #{inspect(error)}") diff --git a/packages/sync-service/lib/electric/plug/router.ex b/packages/sync-service/lib/electric/plug/router.ex index 4cd912a69f..4cd20c0c98 100644 --- a/packages/sync-service/lib/electric/plug/router.ex +++ b/packages/sync-service/lib/electric/plug/router.ex @@ -8,6 +8,7 @@ defmodule Electric.Plug.Router do plug :server_header, Electric.version() # converts HEAD requests to GET requests plug Plug.Head + plug RemoteIp plug :match plug Electric.Plug.LabelProcessPlug plug Electric.Plug.TraceContextPlug diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index f2236c366a..2cdd2200d6 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -577,11 +577,7 @@ defmodule Electric.Plug.ServeShapePlug do defp open_telemetry_attrs(%Conn{assigns: assigns} = conn) do shape_handle = - if is_struct(conn.query_params, Plug.Conn.Unfetched) do - assigns[:active_shape_handle] || assigns[:shape_handle] - else - conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:shape_handle] - end + conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:handle] maybe_up_to_date = if up_to_date = assigns[:up_to_date], do: up_to_date != [] @@ -626,7 +622,24 @@ defmodule Electric.Plug.ServeShapePlug do # We want to have all the relevant HTTP and shape request attributes on the root span. This # is the place to assign them because we keep this plug last in the "plug pipeline" defined # in this module. - defp end_telemetry_span(conn, _ \\ nil) do + defp end_telemetry_span(%Conn{assigns: assigns} = conn, _ \\ nil) do + :telemetry.execute( + [:electric, :plug, :serve_shape], + %{ + count: 1, + bytes: assigns[:streaming_bytes_sent] || 0, + monotonic_time: System.monotonic_time() + }, + %{ + live: assigns[:live], + shape_handle: + conn.query_params["handle"] || assigns[:active_shape_handle] || assigns[:handle], + client_ip: conn.remote_ip, + status: conn.status, + stack_id: assigns.config[:stack_id] + } + ) + add_span_attrs_from_conn(conn) OpentelemetryTelemetry.end_telemetry_span(OpenTelemetry, %{}) conn diff --git a/packages/sync-service/lib/electric/plug/utils.ex b/packages/sync-service/lib/electric/plug/utils.ex index 810d944805..9dd7d1104a 100644 --- a/packages/sync-service/lib/electric/plug/utils.ex +++ b/packages/sync-service/lib/electric/plug/utils.ex @@ -87,7 +87,7 @@ defmodule Electric.Plug.Utils do "error.type" => assigns[:error_str], "http.request_id" => assigns[:plug_request_id], "http.query_string" => conn.query_string, - SC.ClientAttributes.client_address() => client_ip(conn), + SC.ClientAttributes.client_address() => conn.remote_ip, SC.ServerAttributes.server_address() => conn.host, SC.ServerAttributes.server_port() => conn.port, SC.HTTPAttributes.http_request_method() => conn.method, @@ -115,18 +115,6 @@ defmodule Electric.Plug.Utils do |> Map.merge(Map.new(conn.resp_headers, fn {k, v} -> {"http.response.header.#{k}", v} end)) end - defp client_ip(%Plug.Conn{remote_ip: remote_ip} = conn) do - case Plug.Conn.get_req_header(conn, "x-forwarded-for") do - [] -> - remote_ip - |> :inet_parse.ntoa() - |> to_string() - - [ip_address | _] -> - ip_address - end - end - defp user_agent(%Plug.Conn{} = conn) do case Plug.Conn.get_req_header(conn, "user-agent") do [] -> "" diff --git a/packages/sync-service/lib/electric/postgres/replication_client.ex b/packages/sync-service/lib/electric/postgres/replication_client.ex index c084e2573c..edededbd80 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client.ex @@ -4,6 +4,7 @@ defmodule Electric.Postgres.ReplicationClient do """ use Postgrex.ReplicationConnection + alias Electric.Replication.Changes.Transaction alias Electric.Postgres.LogicalReplication.Decoder alias Electric.Postgres.Lsn alias Electric.Postgres.ReplicationClient.Collector @@ -26,6 +27,7 @@ defmodule Electric.Postgres.ReplicationClient do defmodule State do @enforce_keys [:transaction_received, :relation_received, :publication_name] defstruct [ + :stack_id, :connection_manager, :transaction_received, :relation_received, @@ -50,6 +52,7 @@ defmodule Electric.Postgres.ReplicationClient do ] @type t() :: %__MODULE__{ + stack_id: String.t(), connection_manager: pid(), transaction_received: {module(), atom(), [term()]}, relation_received: {module(), atom(), [term()]}, @@ -66,6 +69,7 @@ defmodule Electric.Postgres.ReplicationClient do } @opts_schema NimbleOptions.new!( + stack_id: [required: true, type: :string], connection_manager: [required: true, type: :pid], transaction_received: [required: true, type: :mfa], relation_received: [required: true, type: :mfa], @@ -105,7 +109,11 @@ defmodule Electric.Postgres.ReplicationClient do auto_reconnect: false ] ++ Electric.Utils.deobfuscate_password(config.connection_opts) - Postgrex.ReplicationConnection.start_link(__MODULE__, config.replication_opts, start_opts) + Postgrex.ReplicationConnection.start_link( + __MODULE__, + config.replication_opts ++ [stack_id: config.stack_id], + start_opts + ) end def name(stack_id) do @@ -240,11 +248,22 @@ defmodule Electric.Postgres.ReplicationClient do {:noreply, %{state | txn_collector: txn_collector}} - {txn, %Collector{} = txn_collector} -> + {%Transaction{} = txn, %Collector{} = txn_collector} -> state = %{state | txn_collector: txn_collector} {m, f, args} = state.transaction_received + :telemetry.execute( + [:electric, :postgres, :replication, :transaction_received], + %{ + monotonic_time: System.monotonic_time(), + bytes: byte_size(data), + count: 1, + operations: txn.num_changes + }, + %{stack_id: state.stack_id} + ) + # this will block until all the consumers have processed the transaction because # the log collector uses manual demand, and only replies to the `call` once it # receives more demand. @@ -259,7 +278,7 @@ defmodule Electric.Postgres.ReplicationClient do # individual storage write can timeout the entire batch. OpenTelemetry.with_span( "pg_txn.replication_client.transaction_received", - [num_changes: length(txn.changes), num_relations: MapSet.size(txn.affected_relations)], + [num_changes: txn.num_changes, num_relations: MapSet.size(txn.affected_relations)], fn -> apply(m, f, [txn | args]) end ) |> case do diff --git a/packages/sync-service/lib/electric/postgres/replication_client/collector.ex b/packages/sync-service/lib/electric/postgres/replication_client/collector.ex index f7bf5d1e1e..bfda250e48 100644 --- a/packages/sync-service/lib/electric/postgres/replication_client/collector.ex +++ b/packages/sync-service/lib/electric/postgres/replication_client/collector.ex @@ -31,7 +31,7 @@ defmodule Electric.Postgres.ReplicationClient.Collector do Handle incoming logical replication message by either building up a transaction or returning a complete built up transaction. """ - @spec handle_message(LR.message(), t()) :: t() | {Transaction.t(), t()} + @spec handle_message(LR.message(), t()) :: t() | {Transaction.t() | Relation.t(), t()} def handle_message(%LR.Message{} = msg, state) do Logger.info("Got a message from PG via logical replication: #{inspect(msg)}") diff --git a/packages/sync-service/lib/electric/replication/changes.ex b/packages/sync-service/lib/electric/replication/changes.ex index 7fef2340da..40ae58d43d 100644 --- a/packages/sync-service/lib/electric/replication/changes.ex +++ b/packages/sync-service/lib/electric/replication/changes.ex @@ -41,6 +41,7 @@ defmodule Electric.Replication.Changes do @type t() :: %__MODULE__{ xid: Changes.xid() | nil, changes: [Changes.change()], + num_changes: non_neg_integer(), affected_relations: MapSet.t(Changes.relation_name()), commit_timestamp: DateTime.t(), lsn: Electric.Postgres.Lsn.t(), @@ -53,6 +54,7 @@ defmodule Electric.Replication.Changes do :lsn, :last_log_offset, changes: [], + num_changes: 0, affected_relations: MapSet.new() ] @@ -70,6 +72,7 @@ defmodule Electric.Replication.Changes do %{ txn | changes: [change | changes], + num_changes: txn.num_changes + 1, affected_relations: MapSet.put(rels, rel) } end @@ -82,7 +85,7 @@ defmodule Electric.Replication.Changes do relation: Changes.relation_name(), record: Changes.record(), log_offset: LogOffset.t(), - key: String.t() + key: String.t() | nil } end @@ -103,8 +106,8 @@ defmodule Electric.Replication.Changes do old_record: Changes.record() | nil, record: Changes.record(), log_offset: LogOffset.t(), - key: String.t(), - old_key: String.t(), + key: String.t() | nil, + old_key: String.t() | nil, tags: [Changes.tag()], changed_columns: MapSet.t() } @@ -148,7 +151,7 @@ defmodule Electric.Replication.Changes do relation: Changes.relation_name(), old_record: Changes.record(), log_offset: LogOffset.t(), - key: String.t(), + key: String.t() | nil, tags: [Changes.tag()] } end diff --git a/packages/sync-service/lib/electric/shape_cache.ex b/packages/sync-service/lib/electric/shape_cache.ex index aec0b9ef73..c4359a71eb 100644 --- a/packages/sync-service/lib/electric/shape_cache.ex +++ b/packages/sync-service/lib/electric/shape_cache.ex @@ -13,14 +13,14 @@ defmodule Electric.ShapeCacheBehaviour do @callback update_shape_latest_offset(shape_handle(), LogOffset.t(), keyword()) :: :ok @callback get_shape(shape_def(), opts :: keyword()) :: - {shape_handle(), current_snapshot_offset :: LogOffset.t()} + {shape_handle(), current_snapshot_offset :: LogOffset.t()} | nil @callback get_or_create_shape_handle(shape_def(), opts :: keyword()) :: {shape_handle(), current_snapshot_offset :: LogOffset.t()} - @callback list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_handle(), Shape.t()}] + @callback list_shapes(keyword() | map()) :: [{shape_handle(), Shape.t()}] @callback await_snapshot_start(shape_handle(), opts :: keyword()) :: :started | {:error, term()} @callback handle_truncate(shape_handle(), keyword()) :: :ok @callback clean_shape(shape_handle(), keyword()) :: :ok - @callback clean_all_shapes(GenServer.name()) :: :ok + @callback clean_all_shapes(keyword()) :: :ok @callback has_shape?(shape_handle(), keyword()) :: boolean() end @@ -132,10 +132,12 @@ defmodule Electric.ShapeCache do end @impl Electric.ShapeCacheBehaviour - @spec list_shapes(Electric.ShapeCache.ShapeStatus.t()) :: [{shape_handle(), Shape.t()}] + @spec list_shapes(keyword()) :: [{shape_handle(), Shape.t()}] def list_shapes(opts) do shape_status = Access.get(opts, :shape_status, ShapeStatus) - shape_status.list_shapes(opts) + shape_status.list_shapes(%ShapeStatus{shape_meta_table: get_shape_meta_table(opts)}) + rescue + ArgumentError -> [] end @impl Electric.ShapeCacheBehaviour @@ -364,5 +366,6 @@ defmodule Electric.ShapeCache do end end - def get_shape_meta_table(opts), do: :"#{opts[:stack_id]}:shape_meta_table" + def get_shape_meta_table(opts), + do: opts[:shape_meta_table] || :"#{opts[:stack_id]}:shape_meta_table" end diff --git a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex b/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex index 33da9068de..79cfb25e58 100644 --- a/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex @@ -13,6 +13,7 @@ defmodule Electric.ShapeCache.CrashingFileStorage do defdelegate start_link(opts), to: FileStorage defdelegate set_shape_definition(shape, opts), to: FileStorage defdelegate get_all_stored_shapes(opts), to: FileStorage + defdelegate get_total_disk_usage(opts), to: FileStorage defdelegate get_current_position(opts), to: FileStorage defdelegate set_snapshot_xmin(xmin, opts), to: FileStorage defdelegate snapshot_started?(opts), to: FileStorage diff --git a/packages/sync-service/lib/electric/shape_cache/file_storage.ex b/packages/sync-service/lib/electric/shape_cache/file_storage.ex index ae92a93f21..a4c32b4284 100644 --- a/packages/sync-service/lib/electric/shape_cache/file_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/file_storage.ex @@ -154,6 +154,31 @@ defmodule Electric.ShapeCache.FileStorage do end end + @impl Electric.ShapeCache.Storage + def get_total_disk_usage(%{base_path: shapes_dir} = opts) do + case File.ls(shapes_dir) do + {:ok, shape_handles} -> + shape_handles + |> Enum.map(&for_shape(&1, opts)) + |> Enum.map(fn fs -> + maybe_get_size(shape_definition_path(fs)) + + maybe_get_size(shape_snapshot_path(fs)) + + maybe_get_size(CubDB.current_db_file(fs.db)) + end) + |> Enum.sum() + + _ -> + 0 + end + end + + defp maybe_get_size(path) do + case File.stat(path) do + {:ok, stat} -> stat.size + {:error, _} -> 0 + end + end + @impl Electric.ShapeCache.Storage def get_current_position(%FS{} = opts) do {:ok, latest_offset(opts), snapshot_xmin(opts)} diff --git a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex index 2c363d0426..13587be332 100644 --- a/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex @@ -123,6 +123,11 @@ defmodule Electric.ShapeCache.InMemoryStorage do {:ok, %{}} end + @impl Electric.ShapeCache.Storage + def get_total_disk_usage(_opts) do + 0 + end + @impl Electric.ShapeCache.Storage def snapshot_started?(%MS{} = opts) do try do diff --git a/packages/sync-service/lib/electric/shape_cache/log_chunker.ex b/packages/sync-service/lib/electric/shape_cache/log_chunker.ex index 64573c58b2..9306c491ff 100644 --- a/packages/sync-service/lib/electric/shape_cache/log_chunker.ex +++ b/packages/sync-service/lib/electric/shape_cache/log_chunker.ex @@ -4,26 +4,28 @@ defmodule Electric.ShapeCache.LogChunker do @default_threshold 10 * 1024 * 1024 @doc """ - Add bytes to the current chunk of a given shape - if the chunk exceeds the specified - byte size threshold, a new chunk is reset and `:threshold_exceeded` is returned. + Check if adding the given number of bytes to the current chunk would exceed the threshold. + + Returns either an ok-tuple with the new total chunk size or a threshold_exceeded-tuple with the + new chunk size of 0. """ - @spec add_to_chunk(bitstring(), non_neg_integer(), non_neg_integer()) :: + @spec fit_into_chunk(non_neg_integer(), non_neg_integer(), non_neg_integer()) :: {:ok | :threshold_exceeded, non_neg_integer()} - def add_to_chunk(chunk_bytes, total_chunk_size, chunk_bytes_threshold \\ @default_threshold) + def fit_into_chunk(chunk_bytes, total_chunk_size, chunk_bytes_threshold \\ @default_threshold) - # Ignore zero-length bytestrings - they can always be "added" to an existing chunk - def add_to_chunk(_chunk_bytes = <<>>, total_chunk_byte_size, _chunk_bytes_threshold), + def fit_into_chunk(0, total_chunk_byte_size, _chunk_bytes_threshold), do: {:ok, total_chunk_byte_size} - def add_to_chunk(chunk_bytes, total_chunk_byte_size, chunk_bytes_threshold) - when is_number(chunk_bytes_threshold) do - chunk_bytes_size = byte_size(chunk_bytes) - total_chunk_byte_size = total_chunk_byte_size + chunk_bytes_size + def fit_into_chunk(chunk_bytes_size, total_chunk_byte_size, chunk_bytes_threshold) + when is_number(chunk_bytes_size) and is_number(total_chunk_byte_size) and + is_number(chunk_bytes_threshold) and + total_chunk_byte_size + chunk_bytes_size >= chunk_bytes_threshold, + do: {:threshold_exceeded, 0} - if total_chunk_byte_size >= chunk_bytes_threshold, - do: {:threshold_exceeded, 0}, - else: {:ok, total_chunk_byte_size} - end + def fit_into_chunk(chunk_bytes_size, total_chunk_byte_size, chunk_bytes_threshold) + when is_number(chunk_bytes_size) and is_number(total_chunk_byte_size) and + is_number(chunk_bytes_threshold), + do: {:ok, total_chunk_byte_size + chunk_bytes_size} @spec default_chunk_size_threshold() :: non_neg_integer() def default_chunk_size_threshold(), do: @default_threshold diff --git a/packages/sync-service/lib/electric/shape_cache/storage.ex b/packages/sync-service/lib/electric/shape_cache/storage.ex index 5f7e431e22..23773860e3 100644 --- a/packages/sync-service/lib/electric/shape_cache/storage.ex +++ b/packages/sync-service/lib/electric/shape_cache/storage.ex @@ -16,13 +16,12 @@ defmodule Electric.ShapeCache.Storage do @type shape_storage :: {module(), shape_opts()} @type log_item :: {LogOffset.t(), Querying.json_iodata()} | {:chunk_boundary | LogOffset.t()} - @type log_state :: %{current_chunk_byte_size: non_neg_integer()} @type log :: Enumerable.t(Querying.json_iodata()) @type row :: list() @doc "Validate and initialise storage base configuration from application configuration" - @callback shared_opts(Keyword.t()) :: compiled_opts() + @callback shared_opts(term()) :: compiled_opts() @doc "Initialise shape-specific opts from the shared, global, configuration" @callback for_shape(shape_handle(), compiled_opts()) :: shape_opts() @@ -40,6 +39,9 @@ defmodule Electric.ShapeCache.Storage do @callback get_all_stored_shapes(compiled_opts()) :: {:ok, %{shape_handle() => Shape.t()}} | {:error, term()} + @doc "Get the total disk usage for all shapes" + @callback get_total_disk_usage(compiled_opts()) :: non_neg_integer() + @doc """ Get the current xmin and offset for the shape storage. @@ -145,6 +147,11 @@ defmodule Electric.ShapeCache.Storage do mod.get_all_stored_shapes(opts) end + @impl __MODULE__ + def get_total_disk_usage({mod, opts}) do + mod.get_total_disk_usage(opts) + end + @impl __MODULE__ def get_current_position({mod, shape_opts}) do mod.get_current_position(shape_opts) diff --git a/packages/sync-service/lib/electric/shapes.ex b/packages/sync-service/lib/electric/shapes.ex index b868390ca5..5aa7131556 100644 --- a/packages/sync-service/lib/electric/shapes.ex +++ b/packages/sync-service/lib/electric/shapes.ex @@ -42,7 +42,7 @@ defmodule Electric.Shapes do @doc """ Get the shape that corresponds to this shape definition and return it along with the latest offset of the shape """ - @spec get_shape(keyword(), Shape.t()) :: {shape_handle(), LogOffset.t()} + @spec get_shape(keyword(), Shape.t()) :: {shape_handle(), LogOffset.t()} | nil def get_shape(config, shape_def) do {shape_cache, opts} = Access.get(config, :shape_cache, {ShapeCache, []}) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 7e694eba33..c0c79e0d7f 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -14,7 +14,11 @@ defmodule Electric.Shapes.Consumer do require Logger - @initial_log_state %{current_chunk_byte_size: 0} + @initial_log_state %{current_chunk_byte_size: 0, current_txn_bytes: 0} + @type log_state :: %{ + current_chunk_byte_size: non_neg_integer(), + current_txn_bytes: non_neg_integer() + } def name(%{ stack_id: stack_id, @@ -252,7 +256,7 @@ defmodule Electric.Shapes.Consumer do defp handle_txn(%Transaction{} = txn, state) do ot_attrs = - [xid: txn.xid, num_changes: length(txn.changes)] ++ + [xid: txn.xid, total_num_changes: txn.num_changes] ++ shape_attrs(state.shape_handle, state.shape) OpenTelemetry.with_span("shape_write.consumer.handle_txn", ot_attrs, fn -> @@ -275,10 +279,20 @@ defmodule Electric.Shapes.Consumer do %{xid: xid, changes: changes, lsn: _lsn, last_log_offset: last_log_offset} = txn - relevant_changes = Enum.flat_map(changes, &Shape.convert_change(shape, &1)) + {relevant_changes, {num_changes, has_truncate?}} = + Enum.flat_map_reduce(changes, {0, false}, fn + %Changes.TruncatedRelation{}, _ -> + {:halt, {0, true}} + + change, {ops, false} -> + case Shape.convert_change(shape, change) do + [] -> {[], {ops, false}} + [change] -> {[change], {ops + 1, false}} + end + end) cond do - Enum.any?(relevant_changes, &is_struct(&1, Changes.TruncatedRelation)) -> + has_truncate? -> # TODO: This is a very naive way to handle truncations: if ANY relevant truncates are # present in the transaction, we're considering the whole transaction empty, and # just rotate the shape handle. "Correct" way to handle truncates is to be designed. @@ -292,14 +306,32 @@ defmodule Electric.Shapes.Consumer do {:halt, {:truncate, notify(txn, %{state | log_state: @initial_log_state})}} - relevant_changes != [] -> + num_changes > 0 -> {log_entries, new_log_state} = prepare_log_entries(relevant_changes, xid, shape, log_state, chunk_bytes_threshold) + timestamp = System.monotonic_time() + # TODO: what's a graceful way to handle failure to append to log? # Right now we'll just fail everything :ok = ShapeCache.Storage.append_to_log!(log_entries, storage) + OpenTelemetry.add_span_attributes(%{ + num_bytes: new_log_state.current_txn_bytes, + actual_num_changes: num_changes + }) + + :telemetry.execute( + [:electric, :storage, :transaction_stored], + %{ + duration: System.monotonic_time() - timestamp, + bytes: new_log_state.current_txn_bytes, + count: 1, + operations: num_changes + }, + Map.new(shape_attrs(state.shape_handle, state.shape)) + ) + shape_cache.update_shape_latest_offset(shape_handle, last_log_offset, shape_cache_opts) notify_listeners(registry, :new_changes, shape_handle, last_log_offset) @@ -383,9 +415,9 @@ defmodule Electric.Shapes.Consumer do Enumerable.t(Electric.Replication.Changes.data_change()), non_neg_integer() | nil, Shape.t(), - ShapeCache.Storage.log_state(), + log_state(), non_neg_integer() - ) :: {Enumerable.t(ShapeCache.Storage.log_item()), ShapeCache.Storage.log_state()} + ) :: {Enumerable.t(ShapeCache.Storage.log_item()), log_state()} defp prepare_log_entries( changes, xid, @@ -393,16 +425,27 @@ defmodule Electric.Shapes.Consumer do log_state, chunk_bytes_threshold ) do + log_state = %{ + current_chunk_byte_size: log_state.current_chunk_byte_size, + current_txn_bytes: 0 + } + {log_items, new_log_state} = changes |> Stream.flat_map( &LogItems.from_change(&1, xid, Shape.pk(shape, &1.relation), shape.replica) ) |> Enum.flat_map_reduce(log_state, fn log_item, - %{current_chunk_byte_size: chunk_size} = state -> + %{ + current_chunk_byte_size: chunk_size, + current_txn_bytes: txn_bytes + } = state -> json_log_item = Jason.encode!(log_item) + item_byte_size = byte_size(json_log_item) + + state = %{state | current_txn_bytes: txn_bytes + item_byte_size} - case LogChunker.add_to_chunk(json_log_item, chunk_size, chunk_bytes_threshold) do + case LogChunker.fit_into_chunk(item_byte_size, chunk_size, chunk_bytes_threshold) do {:ok, new_chunk_size} -> {[{log_item.offset, json_log_item}], %{state | current_chunk_byte_size: new_chunk_size}} diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b8c669e29e..3ee3efcc9d 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -133,7 +133,8 @@ defmodule Electric.StackSupervisor do ] end - defp storage_mod_arg(%{stack_id: stack_id, storage: {mod, arg}}) do + @doc false + def storage_mod_arg(%{stack_id: stack_id, storage: {mod, arg}}) do {mod, arg |> Keyword.put(:stack_id, stack_id) |> mod.shared_opts()} end diff --git a/packages/sync-service/lib/electric/telemetry.ex b/packages/sync-service/lib/electric/telemetry.ex index 43288168ed..4aaba54973 100644 --- a/packages/sync-service/lib/electric/telemetry.ex +++ b/packages/sync-service/lib/electric/telemetry.ex @@ -6,17 +6,105 @@ defmodule Electric.Telemetry do Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) end - def init(stack_id: stack_id) do + def init(opts) do children = [ - {:telemetry_poller, measurements: periodic_measurements(stack_id), period: 2_000} + {:telemetry_poller, measurements: periodic_measurements(opts), period: 2_000} ] children |> add_statsd_reporter(Application.fetch_env!(:electric, :telemetry_statsd_host)) |> add_prometheus_reporter(Application.fetch_env!(:electric, :prometheus_port)) + |> add_call_home_reporter(Application.fetch_env!(:electric, :call_home_telemetry)) |> Supervisor.init(strategy: :one_for_one) end + defp add_call_home_reporter(children, false), do: children + + defp add_call_home_reporter(children, true) do + children ++ + [ + {Electric.Telemetry.CallHomeReporter, + static_info: static_info(), + metrics: call_home_metrics(), + first_report_in: {2, :minute}, + reporting_period: {30, :minute}, + reporter_fn: &Electric.Telemetry.CallHomeReporter.report_home/1} + ] + end + + def static_info() do + {total_mem, _, _} = :memsup.get_memory_data() + processors = :erlang.system_info(:logical_processors) + {os_family, os_name} = :os.type() + arch = :erlang.system_info(:system_architecture) + + %{ + electric_version: to_string(Electric.version()), + environment: %{ + os: %{family: os_family, name: os_name}, + arch: to_string(arch), + cores: processors, + ram: total_mem, + electric_instance_id: Electric.instance_id() + } + } + end + + def call_home_metrics() do + [ + environment: [ + pg_version: + last_value("electric.postgres.info_looked_up.pg_version", + reporter_options: [persist_between_sends: true] + ) + ], + resources: [ + uptime: + last_value("vm.uptime.total", + unit: :second, + measurement: &:erlang.convert_time_unit(&1.total, :native, :second) + ), + used_memory: summary("vm.memory.total", unit: :byte), + run_queue_total: summary("vm.total_run_queue_lengths.total"), + run_queue_cpu: summary("vm.total_run_queue_lengths.cpu"), + run_queue_io: summary("vm.total_run_queue_lengths.io") + ], + usage: [ + inbound_bytes: + sum("electric.postgres.replication.transaction_received.bytes", unit: :byte), + inbound_transactions: sum("electric.postgres.replication.transaction_received.count"), + inbound_operations: sum("electric.postgres.replication.transaction_received.operations"), + stored_bytes: sum("electric.storage.transaction_stored.bytes", unit: :byte), + stored_transactions: sum("electric.storage.transaction_stored.count"), + stored_operations: sum("electric.storage.transaction_stored.operations"), + total_used_storage_kb: last_value("electric.storage.used", unit: {:byte, :kilobyte}), + total_shapes: last_value("electric.shapes.total_shapes.count"), + active_shapes: + summary("electric.plug.serve_shape.monotonic_time", + unit: :unique, + reporter_options: [count_unique: :shape_handle], + keep: &(&1.status < 300) + ), + unique_clients: + summary("electric.plug.serve_shape.monotonic_time", + unit: :unique, + reporter_options: [count_unique: :client_ip], + keep: &(&1.status < 300) + ), + sync_requests: + counter("electric.plug.serve_shape.monotonic_time", + drop: &(Map.get(&1, :live, false) || false) + ), + live_requests: + counter("electric.plug.serve_shape.monotonic_time", + keep: &(Map.get(&1, :live, false) || false) + ), + served_bytes: sum("electric.plug.serve_shape.bytes", unit: :byte), + wal_size: summary("electric.postgres.replication.wal_size", unit: :byte) + ] + ] + end + defp add_statsd_reporter(children, nil), do: children defp add_statsd_reporter(children, host) do @@ -89,10 +177,14 @@ defmodule Electric.Telemetry do ] end - defp periodic_measurements(stack_id) do + defp periodic_measurements(opts) do + stack_id = Keyword.fetch!(opts, :stack_id) + [ # A module, function and arguments to be invoked periodically. {__MODULE__, :uptime_event, []}, + {__MODULE__, :count_shapes, [stack_id]}, + {__MODULE__, :get_total_disk_usage, [opts]}, {Electric.Connection.Manager, :report_retained_wal_size, [Electric.Connection.Manager.name(stack_id)]} ] @@ -103,4 +195,26 @@ defmodule Electric.Telemetry do total: :erlang.monotonic_time() - :erlang.system_info(:start_time) }) end + + def count_shapes(stack_id) do + Electric.ShapeCache.list_shapes(stack_id: stack_id) + |> length() + |> then( + &:telemetry.execute([:electric, :shapes, :total_shapes], %{count: &1}, %{stack_id: stack_id}) + ) + end + + def get_total_disk_usage(opts) do + storage = Electric.StackSupervisor.storage_mod_arg(Map.new(opts)) + + Electric.ShapeCache.Storage.get_total_disk_usage(storage) + |> then( + &:telemetry.execute([:electric, :storage], %{used: &1}, %{ + stack_id: opts[:stack_id] + }) + ) + catch + :exit, {:noproc, _} -> + :ok + end end diff --git a/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex new file mode 100644 index 0000000000..b2d1ae09bd --- /dev/null +++ b/packages/sync-service/lib/electric/telemetry/call_home_reporter.ex @@ -0,0 +1,327 @@ +defmodule Electric.Telemetry.CallHomeReporter do + @moduledoc """ + Reporter that collects runtime telemetry information and sends it to a configured + home server once in a while. The information is aggregated over a period of time, + with percentile values calculated for the metrics that have them. + """ + + use GenServer + require Logger + alias Telemetry.Metrics + + @type metric :: Telemetry.Metrics.t() + @type report_format :: keyword(metric() | report_format()) + + def start_link(opts) do + name = Keyword.get(opts, :name, __MODULE__) + metrics = Keyword.fetch!(opts, :metrics) + static_info = Keyword.get(opts, :static_info, %{}) + first_report_in = cast_time_to_ms(Keyword.fetch!(opts, :first_report_in)) + reporting_period = cast_time_to_ms(Keyword.fetch!(opts, :reporting_period)) + reporter_fn = Keyword.fetch!(opts, :reporter_fn) + + GenServer.start_link( + __MODULE__, + {metrics, first_report_in, reporting_period, name, static_info, reporter_fn}, + name: name + ) + end + + def report_home(results) do + url = Application.fetch_env!(:electric, :telemetry_url) + + Req.post!(url, json: results, retry: :transient) + :ok + end + + def print_stats(name \\ __MODULE__) do + GenServer.call(name, :print_stats) + end + + defp cast_time_to_ms({time, :minute}), do: time * 60 * 1000 + defp cast_time_to_ms({time, :second}), do: time * 1000 + + @impl GenServer + def init({metrics, first_report_in, reporting_period, name, static_info, reporter_fn}) do + # We need to trap exits here so that `terminate/2` callback has more chances to run + # and send data before crash/shutdown + Process.flag(:trap_exit, true) + Process.set_label({:call_home_reporter, name}) + + Logger.notice( + "Starting telemetry reporter. Electric will send anonymous usage data to #{Application.fetch_env!(:electric, :telemetry_url)}. You can configure this with `ELECTRIC_USAGE_REPORTING` environment variable, see https://electric-sql.com/docs/reference/telemetry for more information." + ) + + metrics = save_target_path_to_options(metrics) + + groups = Enum.group_by(metrics, & &1.event_name) + + aggregates_table = create_table(name, :set) + summary_table = create_table(String.to_atom("#{name}_summary"), :duplicate_bag) + + context = %{ + table: aggregates_table, + summary_table: summary_table + } + + # Attach a listener per event + for {event, metrics} <- groups do + id = {__MODULE__, event, self()} + :telemetry.attach(id, event, &__MODULE__.handle_event/4, {metrics, context}) + end + + # Save some information about the metrics to use when building an output object + summary_types = + metrics + |> Enum.flat_map(fn + %Metrics.Summary{unit: :unique} = m -> [{get_result_path(m), :count_unique}] + %Metrics.Summary{} = m -> [{get_result_path(m), :summary}] + _ -> [] + end) + + all_paths = Enum.map(metrics, &get_result_path/1) + + persisted_paths = + metrics + |> Enum.filter(&Keyword.get(&1.reporter_options, :persist_between_sends, false)) + |> Enum.map(&get_result_path/1) + + Process.send_after(self(), :report, first_report_in) + + {:ok, + Map.merge(context, %{ + event_ids: Map.keys(groups), + summary_types: summary_types, + all_paths: all_paths, + reporting_period: reporting_period, + static_info: static_info, + persisted_paths: persisted_paths, + reporter_fn: reporter_fn, + last_reported: DateTime.utc_now() + })} + end + + @impl GenServer + def terminate(_, state) do + for id <- state.event_ids do + :telemetry.detach(id) + end + + # On shutdown try to push all the data we still can. + state.reporter_fn.(build_report(state)) + end + + @empty_summary %{ + min: 0, + max: 0, + mean: 0, + median: 0, + mode: nil + } + + @impl GenServer + def handle_call(:print_stats, _from, state) do + {:reply, build_stats(state), state} + end + + @impl GenServer + def handle_info(:report, state) do + full_report = build_report(state) + + state = + try do + :ok = state.reporter_fn.(full_report) + clear_stats(%{state | last_reported: full_report.timestamp}) + rescue + e -> + Logger.warning( + "Reporter function failed while trying to send telemetry data.\nError: #{Exception.format(:error, e, __STACKTRACE__)}" + ) + + state + end + + # If we've failed to send the results for more than 24 hours, then drop current stats + # to save memory + state = + if DateTime.diff(DateTime.utc_now(), state.last_reported, :hour) >= 24 do + clear_stats(%{state | last_reported: DateTime.utc_now()}) + else + state + end + + Process.send_after(self(), :report, state.reporting_period) + {:noreply, state} + end + + defp build_report(state) do + %{ + last_reported: state.last_reported, + timestamp: DateTime.utc_now(), + report_version: 2, + data: build_stats(state) + } + end + + defp build_stats(state) do + result = empty_result(state.all_paths, Map.new(state.summary_types)) + + result = + :ets.tab2list(state.table) + |> fill_map_from_path_tuples(result) + + result = + state.summary_types + |> Enum.map(&{elem(&1, 0), calculate_summary(&1, state.summary_table)}) + |> fill_map_from_path_tuples(result) + + deep_merge(result, state.static_info) + end + + defp clear_stats(state) do + for key <- state.all_paths -- state.persisted_paths do + table = + if(is_map_key(Map.new(state.summary_types), key), + do: state.summary_table, + else: state.table + ) + + :ets.delete(table, key) + end + + state + end + + defp calculate_summary({path, :count_unique}, table) do + :ets.lookup_element(table, path, 2) + |> Enum.uniq() + |> Enum.count() + rescue + ArgumentError -> 0 + end + + defp calculate_summary({path, :summary}, table) do + items = :ets.lookup_element(table, path, 2) + + length = length(items) + + {min, max} = Enum.min_max(items) + + %{ + min: min, + max: max, + mean: mean(items, length), + median: median(items, length), + mode: mode(items) + } + rescue + [ArgumentError, Enum.EmptyError, ArithmeticError] -> + # Enum.EmptyError may be raised when there are no elements in the ETS table under the key `path` + # ArithmeticError may be raised when an element in the ETS table is `nil` + @empty_summary + end + + defp mean(elements, length), do: Enum.sum(elements) / length + + defp median(elements, length) when rem(length, 2) == 1 do + Enum.at(elements, div(length, 2)) + end + + defp median(elements, length) when rem(length, 2) == 0 do + Enum.slice(elements, div(length, 2) - 1, 2) |> mean(length) + end + + defp mode(elements), do: Enum.frequencies(elements) |> Enum.max_by(&elem(&1, 1)) |> elem(0) + + defp empty_result(all_paths, summary_types) do + all_paths + |> Enum.map(fn path -> + case summary_types do + %{^path => :summary} -> {path, @empty_summary} + %{^path => :unique_count} -> {path, 0} + _ -> {path, 0} + end + end) + |> fill_map_from_path_tuples() + end + + @spec fill_map_from_path_tuples([{tuple(), term()}], map()) :: map() + defp fill_map_from_path_tuples(tuples, into \\ %{}) do + Enum.reduce(tuples, into, fn {path, val}, acc -> + path = path |> Tuple.to_list() |> Enum.map(&Access.key(&1, %{})) + put_in(acc, path, val) + end) + end + + @spec create_table(name :: atom, type :: atom) :: :ets.tid() | atom + defp create_table(name, type) do + :ets.new(name, [:named_table, :public, type, {:write_concurrency, true}]) + end + + def handle_event(_event_name, measurements, metadata, {metrics, context}) do + for %{reporter_options: opts} = metric <- metrics, keep?(metric, metadata) do + path = Keyword.fetch!(opts, :result_path) + measurement = extract_measurement(metric, measurements, metadata) + + case metric do + %Metrics.Counter{} -> + :ets.update_counter(context.table, path, 1, {path, 0}) + + %Metrics.Sum{} -> + :ets.update_counter(context.table, path, measurement, {path, 0}) + + %Metrics.LastValue{} -> + :ets.insert(context.table, {path, measurement}) + + %Metrics.Summary{unit: :unique} -> + add_to_summary(path, context, metadata[Keyword.fetch!(opts, :count_unique)]) + + %Metrics.Summary{} -> + add_to_summary(path, context, measurement) + end + end + end + + defp add_to_summary(path, %{summary_table: tbl}, value) do + :ets.insert(tbl, {path, value}) + end + + defp keep?(%{keep: nil}, _metadata), do: true + defp keep?(metric, metadata), do: metric.keep.(metadata) + + defp extract_measurement(metric, measurements, metadata) do + case metric.measurement do + fun when is_function(fun, 2) -> fun.(measurements, metadata) + fun when is_function(fun, 1) -> fun.(measurements) + key -> measurements[key] + end + end + + @spec save_target_path_to_options(report_format()) :: [metric()] + defp save_target_path_to_options(report, prefix \\ []) when is_list(report) do + Enum.flat_map(report, fn + {k, v} when is_list(v) -> + save_target_path_to_options(v, prefix ++ [k]) + + {k, v} -> + if v.tags != [], do: raise("Call home reporter doesn't support splitting metrics by tags") + + [ + Map.update!( + v, + :reporter_options, + &Keyword.put(&1, :result_path, List.to_tuple(prefix ++ [k])) + ) + ] + end) + end + + defp get_result_path(%{reporter_options: opts}), do: Keyword.fetch!(opts, :result_path) + + def deep_merge(left, right) do + Map.merge(left, right, fn + _, %{} = l, %{} = r -> deep_merge(l, r) + _, _, r -> r + end) + end +end diff --git a/packages/sync-service/mix.exs b/packages/sync-service/mix.exs index a71fee8243..5a039cf41a 100644 --- a/packages/sync-service/mix.exs +++ b/packages/sync-service/mix.exs @@ -54,7 +54,7 @@ defmodule Electric.MixProject do def application do [ - extra_applications: [:logger, :tls_certificate_check], + extra_applications: [:logger, :tls_certificate_check, :os_mon, :runtime_tools], # Using a compile-time flag to select the application module or lack thereof allows # using this app as a dependency with this additional flag mod: @@ -90,6 +90,8 @@ defmodule Electric.MixProject do {:plug, "~> 1.16"}, {:postgrex, "~> 0.19"}, {:retry, "~> 0.18"}, + {:remote_ip, "~> 1.2"}, + {:req, "~> 0.5"}, {:telemetry_metrics_prometheus_core, "~> 1.1"}, {:telemetry_metrics_statsd, "~> 0.7"}, {:telemetry_poller, "~> 1.1"}, diff --git a/packages/sync-service/mix.lock b/packages/sync-service/mix.lock index 5b3ba7842a..d3595f6fcc 100644 --- a/packages/sync-service/mix.lock +++ b/packages/sync-service/mix.lock @@ -3,6 +3,7 @@ "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, "bandit": {:hex, :bandit, "1.5.5", "df28f1c41f745401fe9e85a6882033f5f3442ab6d30c8a2948554062a4ab56e0", [:mix], [{:hpax, "~> 0.2.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "f21579a29ea4bc08440343b2b5f16f7cddf2fea5725d31b72cf973ec729079e1"}, "chatterbox": {:hex, :ts_chatterbox, "0.15.1", "5cac4d15dd7ad61fc3c4415ce4826fc563d4643dee897a558ec4ea0b1c835c9c", [:rebar3], [{:hpack, "~> 0.3.0", [hex: :hpack_erl, repo: "hexpm", optional: false]}], "hexpm", "4f75b91451338bc0da5f52f3480fa6ef6e3a2aeecfc33686d6b3d0a0948f31aa"}, + "combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"}, "ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"}, "cubdb": {:hex, :cubdb, "2.0.2", "d4253885084dae37a8ff73887d232864eb38ecac962aa08543e686b0183a1d62", [:mix], [], "hexpm", "c99cc8f9e6c4deb98d16cca5ded1928edd22e48b4736b76e8a1a85367d7fe921"}, "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, @@ -41,6 +42,7 @@ "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, "postgrex": {:hex, :postgrex, "0.19.0", "f7d50e50cb42e0a185f5b9a6095125a9ab7e4abccfbe2ab820ab9aa92b71dbab", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "dba2d2a0a8637defbf2307e8629cb2526388ba7348f67d04ec77a5d6a72ecfae"}, "protox": {:hex, :protox, "1.7.3", "dff5488a648850c95cbd1cca5430be7ccedc99e4102aa934dbf60abfa30e64c1", [:mix], [{:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "b936c0654b68b306c4be853db23bb5623e2be89e11238908f2ff6da10fc0275f"}, + "remote_ip": {:hex, :remote_ip, "1.2.0", "fb078e12a44414f4cef5a75963c33008fe169b806572ccd17257c208a7bc760f", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2ff91de19c48149ce19ed230a81d377186e4412552a597d6a5137373e5877cb7"}, "req": {:hex, :req, "0.5.7", "b722680e03d531a2947282adff474362a48a02aa54b131196fbf7acaff5e4cee", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "c6035374615120a8923e8089d0c21a3496cf9eda2d287b806081b8f323ceee29"}, "retry": {:hex, :retry, "0.18.0", "dc58ebe22c95aa00bc2459f9e0c5400e6005541cf8539925af0aa027dc860543", [:mix], [], "hexpm", "9483959cc7bf69c9e576d9dfb2b678b71c045d3e6f39ab7c9aa1489df4492d73"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, diff --git a/packages/sync-service/test/electric/postgres/replication_client_test.exs b/packages/sync-service/test/electric/postgres/replication_client_test.exs index 1e0ef5d8ec..bdc6b83a9b 100644 --- a/packages/sync-service/test/electric/postgres/replication_client_test.exs +++ b/packages/sync-service/test/electric/postgres/replication_client_test.exs @@ -319,6 +319,7 @@ defmodule Electric.Postgres.ReplicationClientTest do test "correctly responds to a status update request message from PG", ctx do state = ReplicationClient.State.new( + stack_id: ctx.stack_id, transaction_received: nil, relation_received: nil, publication_name: "", diff --git a/packages/sync-service/test/electric/shape_cache/log_chunker_test.exs b/packages/sync-service/test/electric/shape_cache/log_chunker_test.exs index ef7be534a0..e010a73942 100644 --- a/packages/sync-service/test/electric/shape_cache/log_chunker_test.exs +++ b/packages/sync-service/test/electric/shape_cache/log_chunker_test.exs @@ -2,20 +2,21 @@ defmodule Electric.ShapeCache.LogChunkerTest do use ExUnit.Case, async: true alias Electric.ShapeCache.LogChunker - describe "add_chunk/3" do + describe "fit_into_chunk/3" do test "should reset counter upon exceeding threshold", _ do - chunk_bytes = "test" - chunk_byte_size = byte_size(chunk_bytes) + chunk_byte_size = byte_size("test") threshold = 3 * chunk_byte_size - assert {:ok, new_size} = LogChunker.add_to_chunk(chunk_bytes, 0, threshold) + assert {:ok, new_size} = LogChunker.fit_into_chunk(chunk_byte_size, 0, threshold) assert new_size == chunk_byte_size - assert {:ok, new_size} = LogChunker.add_to_chunk(chunk_bytes, chunk_byte_size, threshold) + assert {:ok, new_size} = + LogChunker.fit_into_chunk(chunk_byte_size, chunk_byte_size, threshold) + assert new_size == 2 * chunk_byte_size assert {:threshold_exceeded, 0} = - LogChunker.add_to_chunk(chunk_bytes, 2 * chunk_byte_size, threshold) + LogChunker.fit_into_chunk(chunk_byte_size, 2 * chunk_byte_size, threshold) end test "should ignore zero length bytestrings", _ do @@ -25,20 +26,19 @@ defmodule Electric.ShapeCache.LogChunkerTest do # despite next chunk already being full from the large string, if not # bytes are added we should not exceed the threshold assert {:ok, ^just_below_threshold} = - LogChunker.add_to_chunk("", just_below_threshold, threshold) - - assert {:ok, ^just_below_threshold} = - LogChunker.add_to_chunk(<<>>, just_below_threshold, threshold) + LogChunker.fit_into_chunk(0, just_below_threshold, threshold) # adding a single byte should make it exceed assert {:threshold_exceeded, 0} = - LogChunker.add_to_chunk(<<0>>, threshold - 1, threshold) + LogChunker.fit_into_chunk(1, threshold - 1, threshold) end test "should reset threshold with single very large values", _ do threshold = 10 large_string = String.duplicate("a", threshold * 2) - assert {:threshold_exceeded, 0} = LogChunker.add_to_chunk(large_string, 0, threshold) + + assert {:threshold_exceeded, 0} = + LogChunker.fit_into_chunk(byte_size(large_string), 0, threshold) end end end diff --git a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs index a0bffbb0d8..6392c453c2 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_implementations_test.exs @@ -567,13 +567,34 @@ defmodule Electric.ShapeCache.StorageImplimentationsTest do refute File.exists?(opts.data_dir) end end + + describe "#{module_name}.get_total_disk_usage/1" do + setup do + {:ok, %{module: unquote(module)}} + end + + test "returns 0 if no shapes exist", %{module: module} = context do + opts = module |> opts(context) |> module.shared_opts() + + assert 0 = Electric.ShapeCache.Storage.get_total_disk_usage({module, opts}) + end + + test "returns the total disk usage for all shapes", %{module: storage} = context do + {:ok, %{opts: shape_opts, shared_opts: opts}} = start_storage(context) + + storage.initialise(shape_opts) + storage.set_shape_definition(@shape, shape_opts) + + assert 2274 = Electric.ShapeCache.Storage.get_total_disk_usage({storage, opts}) + end + end end defp start_storage(%{module: module} = context) do opts = module |> opts(context) |> module.shared_opts() shape_opts = module.for_shape(@shape_handle, opts) {:ok, pid} = module.start_link(shape_opts) - {:ok, %{module: module, opts: shape_opts, pid: pid}} + {:ok, %{opts: shape_opts, shared_opts: opts, pid: pid}} end defp opts(InMemoryStorage, %{stack_id: stack_id}) do diff --git a/packages/sync-service/test/electric/shape_cache/storage_test.exs b/packages/sync-service/test/electric/shape_cache/storage_test.exs index 0ba742dd2f..023541b94c 100644 --- a/packages/sync-service/test/electric/shape_cache/storage_test.exs +++ b/packages/sync-service/test/electric/shape_cache/storage_test.exs @@ -19,6 +19,7 @@ defmodule Electric.ShapeCache.StorageTest do |> Mox.expect(:snapshot_started?, fn {^shape_handle, :opts} -> true end) |> Mox.expect(:get_snapshot, fn {^shape_handle, :opts} -> {1, []} end) |> Mox.expect(:append_to_log!, fn _, {^shape_handle, :opts} -> :ok end) + |> Mox.expect(:get_total_disk_usage, fn :opts -> 0 end) |> Mox.expect(:get_log_stream, fn _, _, {^shape_handle, :opts} -> [] end) shape_storage = Storage.for_shape(shape_handle, storage) @@ -28,6 +29,7 @@ defmodule Electric.ShapeCache.StorageTest do Storage.get_snapshot(shape_storage) Storage.append_to_log!([], shape_storage) Storage.get_log_stream(LogOffset.first(), shape_storage) + Storage.get_total_disk_usage(storage) end test "get_log_stream/4 correctly guards offset ordering" do diff --git a/packages/sync-service/test/support/test_storage.ex b/packages/sync-service/test/support/test_storage.ex index bcda9360dd..4efbf2c892 100644 --- a/packages/sync-service/test/support/test_storage.ex +++ b/packages/sync-service/test/support/test_storage.ex @@ -76,6 +76,12 @@ defmodule Support.TestStorage do Storage.get_all_stored_shapes(storage) end + @impl Electric.ShapeCache.Storage + def get_total_disk_usage({parent, _init, storage}) do + send(parent, {__MODULE__, :get_total_disk_usage}) + Storage.get_total_disk_usage(storage) + end + @impl Electric.ShapeCache.Storage def get_current_position({parent, shape_handle, _, storage}) do send(parent, {__MODULE__, :get_current_position, shape_handle}) diff --git a/website/docs/api/config.md b/website/docs/api/config.md index 404351d9d0..0b6b2de641 100644 --- a/website/docs/api/config.md +++ b/website/docs/api/config.md @@ -230,6 +230,8 @@ Path to root folder for storing data on the filesystem. ## Telemetry +These environment variables allow configuration of metric and trace export for visibility into performance of the Electric instance. + ### ELECTRIC_OTLP_ENDPOINT + +## Usage reporting + +### ELECTRIC_USAGE_REPORTING + +These environment variables allow configuration of anonymous usage data reporting back to https://electric-sql.com + + + +Configure anonymous usage data about the instance being sent to a central checkpoint service. Collected information is anonymised and doesn't contain any information from the replicated data. You can read more about it in our [telemetry docs](../reference/telemetry.md#anonymous-usage-data). + + diff --git a/website/docs/reference/telemetry.md b/website/docs/reference/telemetry.md index a1ab752cbb..dab053df45 100644 --- a/website/docs/reference/telemetry.md +++ b/website/docs/reference/telemetry.md @@ -7,7 +7,7 @@ outline: deep # Telemetry -Electric provides telemetry data — such as traces, logs, and metrics — for real-time system monitoring. +Electric provides telemetry data — such as traces, logs, and metrics — for real-time system monitoring. Self-hosted Electric instances are also configured by default to send aggregated, anonymous usage data to ElectricSQL to help us understand how our software is being used. You can opt-out of this reporting by setting an environment variable. See the [Anonymous usage data](#anonymous-usage-data) section below for more details. ## Metrics @@ -22,7 +22,7 @@ You can get the current status of the service by calling the `http://electric-ho ## OpenTelemetry -Metrics, traces and logs are exported using the OpenTelemetry Protocol (OTLP). You can configure the OpenTelemetry Exporter for Electric using the following environment variables. +Traces are exported using the OpenTelemetry Protocol (OTLP). You can configure the OpenTelemetry Exporter for Electric using the following environment variables. | VARIABLE | Type | Description | |---------------|-----------|-----------------| @@ -56,3 +56,11 @@ Set `ELECTRIC_HNY_DATASET` and `ELECTRIC_HNY_API_KEY` environment variables in a ```shell docker compose -f docker-compose-otel.yml up ``` + +## Anonymous usage data + +Electric instances are configured by default to send anonymized usage data to checkpoint.electric-sql.com to help us understand how the software is being used. Absolutely no information about transaction contents is sent. I.e.: none of your data that you're using Electric to replicate is captured in the telemetry information or shared with the Electric checkpoint service. Captured data includes the disk usage by the shape cache, CPU/memory information about the running Electric instance, Postgres version, number of shapes, amount of distinct shape requests, and numerical information about processed transactions: byte size, amount of operations, and percentiles of response times. Essentially, what kind of load Electric was under, and how did it cope. + +Aggregated statistics are sent every 30 minutes. + +To disable anonymous usage data, set the `ELECTRIC_USAGE_REPORTING` environment variable to `false`. We encourage everyone to keep this enabled so we can get a better understanding of how Electric is used. If you have any further questions about what we collect, feel free to ask on our [open community Discord](https://discord.electric-sql.com) or reach out to us via email at [info@electric-sql.com](mailto:info@electric-sql.com).