Skip to content

Commit

Permalink
Tracing and host metrics with Opentelemetry (#1487)
Browse files Browse the repository at this point in the history
- [x] use otel collector to collect and export metric
- [x] add new docker-compose for telemetry services
- [x] ~~Do something with statsd metrics~~ collect and export metrics
with openTelemetry collector
- [x] Add other exporters for local use
- [x] Learn what was done in our prev observability
[PR](febb710),
which seems a lot more comprehensive
- [x] Need to document how to setup
- [x] Need to replace host metric collector with prometheus-based one
- [x] Discuss with team issue with opentelemetry_bandit

Closes #1472.

---------

Co-authored-by: Valter Balegas <[email protected]>
Co-authored-by: Ilia Borovitinov <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent 07ce5e1 commit f413a68
Show file tree
Hide file tree
Showing 19 changed files with 477 additions and 69 deletions.
3 changes: 2 additions & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ export default defineConfig({
// { text: 'Usage', link: '/guides/usage' },
{ text: 'Shapes', link: '/guides/shapes' },
// { text: 'Deployment', link: '/guides/deployment' },
// { text: 'Writing clients', link: '/guides/write-your-own-client' }
// { text: 'Writing clients', link: '/guides/write-your-own-client' },
{ text: 'Telemetry', link: '/guides/telemetry' }
]
},
{
Expand Down
35 changes: 35 additions & 0 deletions docs/guides/telemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Telemetry

Electric provides telemetry data — such as traces, logs, and metrics — for real-time system monitoring.

## Metrics
Metrics are reported in StatsD and Prometheus formats. To configure Electric to expose metric information in those formats use the following environment variables.

| VARIABLE| Description |
|---------------|------------|
| STATSD_HOST | The address of the StatsD server |
| PROMETHEUS_PORT | The scrape port for Prometheus |

You can get the current status of the service by calling the `http://electric-hostname:PROMETHUES_PORT/metrics` endpoint.

## OpenTelemetry
Metrics, traces and logs are exported using the OpenTelemetry Protocol (OTLP). You can configure the OpenTelemetry Exporter for Electric using the following environment variables.

| VARIABLE| Description |
|---------------|------------|
| OTEL_EXPORT | `debug` outputs telemetry data to stdout |
| | `otlp` sends the telemetry data to an OTLP endpoint |
| OTLP_ENDPOINT | The exporter endpoint url |

Electric always adds the following resource attributes to events:
```elixir
%{service: %{name: service_name, version: version}, instance: %{id: instance_id}}
```

Attributes `service_name` and `instance_id` can be overriden with `ELECTRIC_SERVICE_NAME` and `ELECTRIC_INSTANCE_ID` respectively. By default, `instance_id` is an uuid.

Electric will also load additional resource attributes from `OTEL_RESOURCE_ATTRIBUTES`. Learn more about resource attributes in [OpenTelemetry documentation](https://opentelemetry.io/docs/languages/js/resources/).

## Example
You can find an example of a docker compose that runs Electric with an OpenTelemetry Collector agent that sends telemetry data to Honeycomb under `packages/sync-service/dev`. Set `HNY_DATASET` and `HNY_API_KEY` with your Honeycomb information and start the compose file like: ```docker compose -f docker-compose-otel.yml up```

2 changes: 2 additions & 0 deletions packages/sync-service/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ electric-*.tar

# The shape database, created when the Sync Service runs
/shapes/

.env.dev.local
34 changes: 33 additions & 1 deletion packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,39 @@ if config_env() in [:dev, :test] do
source!([".env.#{config_env()}", ".env.#{config_env()}.local", System.get_env()])
end

service_name = env!("ELECTRIC_SERVICE_NAME", :string, "electric")
instance_id = env!("ELECTRIC_INSTANCE_ID", :string, Electric.Utils.uuid4())
version = Electric.version()

config :telemetry_poller, :default, period: 500

config :opentelemetry,
resource_detectors: [:otel_resource_env_var, :otel_resource_app_env],
resource: %{service: %{name: service_name, version: version}, instance: %{id: instance_id}}

otel_export = env!("OTEL_EXPORT", :string, nil)
prometheus_port = env!("PROMETHEUS_PORT", :integer, nil)

case otel_export do
"otlp" ->
if endpoint = env!("OTLP_ENDPOINT", :string, nil) do
config :opentelemetry_exporter,
otlp_protocol: :http_protobuf,
otlp_endpoint: endpoint,
otlp_compression: :gzip
end

"debug" ->
# In this mode, each span is printed to stdout as soon as it ends, without batching.
config :opentelemetry, :processors,
otel_simple_processor: %{exporter: {:otel_exporter_stdout, []}}

_ ->
config :opentelemetry,
processors: [],
traces_exporter: :none
end

if Config.config_env() == :test do
config :electric,
connection_opts: [
Expand Down Expand Up @@ -66,6 +97,7 @@ config :electric,
cache_stale_age: cache_stale_age,
# Used in telemetry
environment: config_env(),
instance_id: env!("ELECTRIC_INSTANCE_ID", :string, Electric.Utils.uuid4()),
instance_id: instance_id,
telemetry_statsd_host: statsd_host,
prometheus_port: prometheus_port,
storage: storage
60 changes: 60 additions & 0 deletions packages/sync-service/dev/docker-compose-otel.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
version: "3.8"
name: "electric_dev_otel"

services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: electric
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "54321:5432"
volumes:
- ./postgres.conf:/etc/postgresql.conf:ro
- ./init.sql:/docker-entrypoint-initdb.d/00_shared_init.sql:ro
tmpfs:
- /var/lib/postgresql/data
- /tmp
entrypoint:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf
electric:
image: electricsql/electric-next:latest
environment:
OTEL_EXPORT: ${OTEL_EXPORT:-otlp}
OTLP_ENDPOINT: http://otel:4318/
PROMETHEUS_PORT: ${PROMETHEUS_PORT:-4000}
DATABASE_URL: postgresql://postgres:password@postgres:5432/electric
OTEL_RESOURCE_ATTRIBUTES: ${OTEL_RESOURCE_ATTRIBUTES_FOR_DOCKER_COMPOSE}
ports:
- 3000:3000
- 4000:4000
otel:
image: otel/opentelemetry-collector-contrib:0.105.0
environment:
HNY_DATASET: ${HNY_DATASET}
HNY_API_KEY: ${HNY_API_KEY}
PROMETHEUS_SCRAPE_ENDPOINT: electric:4000
# Need to set OTEL_RESOURCE_ATTRIBUTES_FOR_DOCKER_COMPOSE
# see https://github.com/docker/cli/issues/4958
OTEL_RESOURCE_ATTRIBUTES: ${OTEL_RESOURCE_ATTRIBUTES_FOR_DOCKER_COMPOSE}
ports:
- 4317:4317
- 4318:4318
command: ['--config=/conf/otel-collector-config.yaml']
volumes:
- ./otel-collector-honeycomb-config.yaml:/conf/otel-collector-config.yaml
depends_on:
- electric
nginx:
image: nginx:latest
ports:
- "3002:3002"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
- electric
1 change: 0 additions & 1 deletion packages/sync-service/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ services:
- docker-entrypoint.sh
- -c
- config_file=/etc/postgresql.conf

nginx:
image: nginx:latest
ports:
Expand Down
57 changes: 57 additions & 0 deletions packages/sync-service/dev/otel-collector-honeycomb-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
exporters:
otlp/honeycomb:
endpoint: api.honeycomb.io:443
headers:
x-honeycomb-team: ${env:HNY_API_KEY}
x-honeycomb-dataset: ${env:HNY_DATASET}
logging:
loglevel: info
receivers:
otlp:
protocols:
grpc:
endpoint: '0.0.0.0:4317'
http:
endpoint: '0.0.0.0:4318'
prometheus:
config:
scrape_configs:
- job_name: otel-prometheus
scrape_interval: 5s
static_configs:
- targets: ["${env:PROMETHEUS_SCRAPE_ENDPOINT}"]
processors:
resourcedetection:
detectors:
- env
- system
transform:
error_mode: ignore
metric_statements:
- context: datapoint
statements:
- set(time, TruncateTime(time, Duration("1s")))
batch:
send_batch_size: 8192
timeout: 200ms
service:
pipelines:
metrics:
receivers:
- prometheus
processors:
- transform
- resourcedetection
- batch
exporters:
- otlp/honeycomb
- logging
traces:
receivers:
- otlp
processors:
- resourcedetection
- batch
exporters:
- otlp/honeycomb
- logging
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Electric do
@type relation :: {schema :: String.t(), table :: String.t()}

@current_vsn Mix.Project.config()[:version]
def vsn do
def version do
@current_vsn
end
end
13 changes: 13 additions & 0 deletions packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ defmodule Electric.Application do
allow_shape_deletion: Application.get_env(:electric, :allow_shape_deletion, false)},
port: 3000}
]
|> add_prometheus_router(Application.fetch_env!(:electric, :prometheus_port))
else
[]
end
Expand All @@ -68,4 +69,16 @@ defmodule Electric.Application do
Supervisor.start_link(children, opts)
end
end

defp add_prometheus_router(children, nil), do: children

defp add_prometheus_router(children, port) do
children ++
[
{
Bandit,
plug: {Electric.Plug.UtilityRouter, []}, port: port
}
]
end
end
2 changes: 1 addition & 1 deletion packages/sync-service/lib/electric/plug/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Electric.Plug.Router do
use Plug.Router, copy_opts_to_assign: :config

plug Plug.RequestId, assign_as: :plug_request_id
plug :server_header, Electric.vsn()
plug :server_header, Electric.version()
plug :match
plug Electric.Plug.LabelProcessPlug
plug Plug.Telemetry, event_prefix: [:electric, :routing]
Expand Down
8 changes: 8 additions & 0 deletions packages/sync-service/lib/electric/plug/utility_router.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Electric.Plug.UtilityRouter do
use Plug.Router

plug :match
plug :dispatch

get "/metrics", do: resp(conn, 200, TelemetryMetricsPrometheus.Core.scrape())
end
65 changes: 37 additions & 28 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ defmodule Electric.ShapeCache do
alias Electric.Shapes.Querying
alias Electric.Shapes.Shape
alias Electric.Replication.LogOffset
alias Electric.Telemetry.OpenTelemetry

use GenServer
@behaviour Electric.ShapeCacheBehaviour

Expand Down Expand Up @@ -288,16 +290,21 @@ defmodule Electric.ShapeCache do

affected_tables = Shape.affected_tables(shape)

Task.start(fn ->
try do
Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [pool, affected_tables])

apply(create_snapshot_fn, [parent, shape_id, shape, pool, storage])
GenServer.cast(parent, {:snapshot_ready, shape_id})
rescue
error -> GenServer.cast(parent, {:snapshot_failed, shape_id, error, __STACKTRACE__})
OpenTelemetry.async_fun(
"shape_cache.create_snapshot_task",
[],
fn ->
try do
Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [pool, affected_tables])

apply(create_snapshot_fn, [parent, shape_id, shape, pool, storage])
GenServer.cast(parent, {:snapshot_ready, shape_id})
rescue
error -> GenServer.cast(parent, {:snapshot_failed, shape_id, error, __STACKTRACE__})
end
end
end)
)
|> Task.start()

add_waiter(state, shape_id, nil)
else
Expand All @@ -321,25 +328,27 @@ defmodule Electric.ShapeCache do
@doc false
def query_in_readonly_txn(parent, shape_id, shape, db_pool, storage) do
Postgrex.transaction(db_pool, fn conn ->
Postgrex.query!(
conn,
"SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY",
[]
)

%{rows: [[xmin]]} =
Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", [])

# Enforce display settings *before* querying initial data to maintain consistent
# formatting between snapshot and live log entries.
Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, []))

GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin})
{query, stream} = Querying.stream_initial_data(conn, shape)

# could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item
# that way it has the relation, but it is still missing the pk_cols
Storage.make_new_snapshot!(shape_id, shape, query, stream, storage)
OpenTelemetry.with_span("shape_cache.query_in_readonly_txn", [], fn ->
Postgrex.query!(
conn,
"SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY",
[]
)

%{rows: [[xmin]]} =
Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", [])

# Enforce display settings *before* querying initial data to maintain consistent
# formatting between snapshot and live log entries.
Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, []))

GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin})
{query, stream} = Querying.stream_initial_data(conn, shape)

# could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item
# that way it has the relation, but it is still missing the pk_cols
Storage.make_new_snapshot!(shape_id, shape, query, stream, storage)
end)
end)
end

Expand Down
Loading

0 comments on commit f413a68

Please sign in to comment.