From 16d3541dede0e420396a2aedad744b452dfe5ed6 Mon Sep 17 00:00:00 2001 From: itowlson Date: Thu, 21 Nov 2024 13:34:49 +1300 Subject: [PATCH 1/2] Upgrade OTel to stop Tokio being pinned Signed-off-by: itowlson --- Cargo.lock | 39 ++++++++++++----- Cargo.toml | 6 +-- crates/telemetry/Cargo.toml | 3 +- crates/telemetry/src/lib.rs | 78 ++++++++++++++++++--------------- crates/telemetry/src/logs.rs | 20 ++++----- crates/telemetry/src/metrics.rs | 20 ++++----- crates/telemetry/src/traces.rs | 26 +++++++---- 7 files changed, 110 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7ca403378..0b654f6b97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5470,9 +5470,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.25.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" +checksum = "0f3cebff57f7dbd1255b44d8bddc2cebeb0ea677dbaa2e25a3070a91b318f660" dependencies = [ "futures-core", "futures-sink", @@ -5482,11 +5482,23 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab5feffc321035ad94088a7e5333abb4d84a8726e54a802e736ce9dd7237e85b" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" -version = "0.25.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d8c2b76e5f7848a289aa9666dbe56b16f8a22a4c5246ef37a14941818d2913" +checksum = "10a8a7f5f6ba7c1b286c2fbca0454eaba116f63bbe69ed250b642d36fbb04d80" dependencies = [ "async-trait", "bytes", @@ -5497,9 +5509,9 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.25.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "596b1719b3cab83addb20bcbffdf21575279d9436d9ccccfe651a3bf0ab5ab06" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" dependencies = [ "async-trait", "futures-core", @@ -5513,13 +5525,14 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tonic", + "tracing", ] [[package]] name = "opentelemetry-proto" -version = "0.25.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c43620e8f93359eb7e627a3b16ee92d8585774986f24f2ab010817426c5ce61" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" dependencies = [ "opentelemetry", "opentelemetry_sdk", @@ -5529,9 +5542,9 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.25.0" +version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" +checksum = "27b742c1cae4693792cc564e58d75a2a0ba29421a34a85b50da92efa89ecb2bc" dependencies = [ "async-trait", "futures-channel", @@ -5546,6 +5559,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", + "tracing", ] [[package]] @@ -8326,6 +8340,7 @@ dependencies = [ "http 0.2.12", "http 1.1.0", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry_sdk", "terminal", @@ -9309,9 +9324,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.26.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" dependencies = [ "js-sys", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index d54b0a3f04..faee0c509b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,8 +127,8 @@ http = "1" http-body-util = "0.1" hyper = { version = "1", features = ["full"] } itertools = "0.13" -opentelemetry = { version = "0.25", features = ["metrics", "trace", "logs"] } -opentelemetry_sdk = { version = "0.25", features = ["rt-tokio", "logs_level_enabled", "metrics"] } +opentelemetry = { version = "0.27", features = ["metrics", "trace", "logs"] } +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "spec_unstable_logs_enabled", "metrics"] } rand = "0.8" regex = "1" reqwest = { version = "0.12", features = ["stream", "blocking"] } @@ -144,7 +144,7 @@ thiserror = "1" tokio = "1" toml = "0.8" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = { version = "0.26", default-features = false, features = ["metrics"] } +tracing-opentelemetry = { version = "0.28", default-features = false, features = ["metrics"] } url = "2" wasi-common-preview1 = { version = "25.0.0", package = "wasi-common", features = [ "tokio", diff --git a/crates/telemetry/Cargo.toml b/crates/telemetry/Cargo.toml index c33e0eff29..fce408986d 100644 --- a/crates/telemetry/Cargo.toml +++ b/crates/telemetry/Cargo.toml @@ -9,7 +9,8 @@ anyhow = { workspace = true } http0 = { version = "0.2.9", package = "http" } http1 = { version = "1.0.0", package = "http" } opentelemetry = { workspace = true } -opentelemetry-otlp = { version = "0.25", features = ["http-proto", "http", "reqwest-client"] } +opentelemetry-appender-tracing = "0.27" +opentelemetry-otlp = { version = "0.27", features = ["http-proto", "http", "reqwest-client"] } opentelemetry_sdk = { workspace = true } terminal = { path = "../terminal" } tracing = { workspace = true } diff --git a/crates/telemetry/src/lib.rs b/crates/telemetry/src/lib.rs index 6c54318592..0cdf2c8b13 100644 --- a/crates/telemetry/src/lib.rs +++ b/crates/telemetry/src/lib.rs @@ -1,12 +1,11 @@ -use std::cell::Cell; use std::io::IsTerminal; -use std::time::Duration; -use std::time::Instant; use env::otel_logs_enabled; use env::otel_metrics_enabled; use env::otel_tracing_enabled; use opentelemetry_sdk::propagation::TraceContextPropagator; +use tracing_subscriber::filter::filter_fn; +use tracing_subscriber::fmt::Layer as OtelFmtLayer; use tracing_subscriber::{fmt, prelude::*, registry, EnvFilter, Layer}; pub mod detector; @@ -67,8 +66,15 @@ pub fn init(spin_version: String) -> anyhow::Result { .add_directive("[{app_log_non_utf8}]=off".parse()?), ); - // Even if metrics or tracing aren't enabled we're okay to turn on the global error handler - opentelemetry::global::set_error_handler(otel_error_handler)?; + let opentelemetry_layer = OtelFmtLayer::new() + .with_writer(std::io::stderr) + .with_filter(filter_fn(|metadata| { + metadata.target().starts_with("opentelemetry") + })); + + // let non_opentelemetry_filter = filter_fn(|metadata| !metadata.target().starts_with("opentelemetry")); + // let otel_bridge_layer = layer::OpenTelemetryTracingBridge::new(&cloned_provider) + // .with_filter(non_opentelemetry_filter); let otel_tracing_layer = if otel_tracing_enabled() { Some(traces::otel_tracing_layer(spin_version.clone())?) @@ -87,6 +93,8 @@ pub fn init(spin_version: String) -> anyhow::Result { .with(otel_tracing_layer) .with(otel_metrics_layer) .with(fmt_layer) + .with(opentelemetry_layer) + // .with(otel_bridge_layer) .init(); // Used to propagate trace information in the standard W3C TraceContext format. Even if the otel @@ -100,36 +108,36 @@ pub fn init(spin_version: String) -> anyhow::Result { Ok(ShutdownGuard) } -fn otel_error_handler(err: opentelemetry::global::Error) { - // Track the error count - let signal = match err { - opentelemetry::global::Error::Metric(_) => "metrics", - opentelemetry::global::Error::Trace(_) => "traces", - opentelemetry::global::Error::Log(_) => "logs", - _ => "unknown", - }; - metrics::monotonic_counter!(spin.otel_error_count = 1, signal = signal); - - // Only log the first error at ERROR level, subsequent errors will be logged at higher levels and rate limited - static FIRST_OTEL_ERROR: std::sync::Once = std::sync::Once::new(); - FIRST_OTEL_ERROR.call_once(|| { - tracing::error!(?err, "OpenTelemetry error"); - tracing::error!("There has been an error with the OpenTelemetry system. Traces, logs or metrics are likely failing to export."); - tracing::error!("Further OpenTelemetry errors will be available at WARN level (rate limited) or at TRACE level."); - }); - - // Rate limit the logging of the OTel errors to not occur more frequently on each thread than OTEL_ERROR_INTERVAL - const OTEL_ERROR_INTERVAL: Duration = Duration::from_millis(5000); - thread_local! { - static LAST_OTEL_ERROR: Cell = Cell::new(Instant::now() - OTEL_ERROR_INTERVAL); - } - if LAST_OTEL_ERROR.get().elapsed() > OTEL_ERROR_INTERVAL { - LAST_OTEL_ERROR.set(Instant::now()); - tracing::warn!(?err, "OpenTelemetry error"); - } else { - tracing::trace!(?err, "OpenTelemetry error"); - } -} +// fn otel_error_handler(err: opentelemetry::global::Error) { +// // Track the error count +// let signal = match err { +// opentelemetry::global::Error::Metric(_) => "metrics", +// opentelemetry::global::Error::Trace(_) => "traces", +// opentelemetry::global::Error::Log(_) => "logs", +// _ => "unknown", +// }; +// metrics::monotonic_counter!(spin.otel_error_count = 1, signal = signal); + +// // Only log the first error at ERROR level, subsequent errors will be logged at higher levels and rate limited +// static FIRST_OTEL_ERROR: std::sync::Once = std::sync::Once::new(); +// FIRST_OTEL_ERROR.call_once(|| { +// tracing::error!(?err, "OpenTelemetry error"); +// tracing::error!("There has been an error with the OpenTelemetry system. Traces, logs or metrics are likely failing to export."); +// tracing::error!("Further OpenTelemetry errors will be available at WARN level (rate limited) or at TRACE level."); +// }); + +// // Rate limit the logging of the OTel errors to not occur more frequently on each thread than OTEL_ERROR_INTERVAL +// const OTEL_ERROR_INTERVAL: Duration = Duration::from_millis(5000); +// thread_local! { +// static LAST_OTEL_ERROR: Cell = Cell::new(Instant::now() - OTEL_ERROR_INTERVAL); +// } +// if LAST_OTEL_ERROR.get().elapsed() > OTEL_ERROR_INTERVAL { +// LAST_OTEL_ERROR.set(Instant::now()); +// tracing::warn!(?err, "OpenTelemetry error"); +// } else { +// tracing::trace!(?err, "OpenTelemetry error"); +// } +// } /// An RAII implementation for connection to open telemetry services. /// diff --git a/crates/telemetry/src/logs.rs b/crates/telemetry/src/logs.rs index 93f2cd3645..338fc56fee 100644 --- a/crates/telemetry/src/logs.rs +++ b/crates/telemetry/src/logs.rs @@ -2,7 +2,6 @@ use std::{ascii::escape_default, sync::OnceLock, time::Duration}; use anyhow::bail; use opentelemetry::logs::{LogRecord, Logger, LoggerProvider}; -use opentelemetry_otlp::LogExporterBuilder; use opentelemetry_sdk::{ logs::{BatchConfigBuilder, BatchLogProcessor, Logger as SdkLogger}, resource::{EnvResourceDetector, TelemetryResourceDetector}, @@ -86,21 +85,22 @@ pub(crate) fn init_otel_logging_backend(spin_version: String) -> anyhow::Result< // currently default to using the HTTP exporter but in the future we could select off of the // combination of OTEL_EXPORTER_OTLP_PROTOCOL and OTEL_EXPORTER_OTLP_LOGS_PROTOCOL to // determine whether we should use http/protobuf or grpc. - let exporter_builder: LogExporterBuilder = match OtlpProtocol::logs_protocol_from_env() { - OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(), - OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(), + let exporter = match OtlpProtocol::logs_protocol_from_env() { + OtlpProtocol::Grpc => opentelemetry_otlp::LogExporter::builder() + .with_tonic() + .build()?, + OtlpProtocol::HttpProtobuf => opentelemetry_otlp::LogExporter::builder() + .with_http() + .build()?, OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"), }; let provider = opentelemetry_sdk::logs::LoggerProvider::builder() .with_resource(resource) .with_log_processor( - BatchLogProcessor::builder( - exporter_builder.build_log_exporter()?, - opentelemetry_sdk::runtime::Tokio, - ) - .with_batch_config(BatchConfigBuilder::default().build()) - .build(), + BatchLogProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio) + .with_batch_config(BatchConfigBuilder::default().build()) + .build(), ) .build(); diff --git a/crates/telemetry/src/metrics.rs b/crates/telemetry/src/metrics.rs index 58e62826f5..6a445e5902 100644 --- a/crates/telemetry/src/metrics.rs +++ b/crates/telemetry/src/metrics.rs @@ -2,12 +2,8 @@ use std::time::Duration; use anyhow::{bail, Result}; use opentelemetry::global; -use opentelemetry_otlp::MetricsExporterBuilder; use opentelemetry_sdk::{ - metrics::{ - reader::{DefaultAggregationSelector, DefaultTemporalitySelector}, - PeriodicReader, SdkMeterProvider, - }, + metrics::{PeriodicReader, SdkMeterProvider}, resource::{EnvResourceDetector, TelemetryResourceDetector}, runtime, Resource, }; @@ -42,15 +38,15 @@ pub(crate) fn otel_metrics_layer LookupSpan<'span>>( // currently default to using the HTTP exporter but in the future we could select off of the // combination of OTEL_EXPORTER_OTLP_PROTOCOL and OTEL_EXPORTER_OTLP_TRACES_PROTOCOL to // determine whether we should use http/protobuf or grpc. - let exporter_builder: MetricsExporterBuilder = match OtlpProtocol::metrics_protocol_from_env() { - OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(), - OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(), + let exporter = match OtlpProtocol::metrics_protocol_from_env() { + OtlpProtocol::Grpc => opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .build()?, + OtlpProtocol::HttpProtobuf => opentelemetry_otlp::MetricExporter::builder() + .with_http() + .build()?, OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"), }; - let exporter = exporter_builder.build_metrics_exporter( - Box::new(DefaultTemporalitySelector::new()), - Box::new(DefaultAggregationSelector::new()), - )?; let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); let meter_provider = SdkMeterProvider::builder() diff --git a/crates/telemetry/src/traces.rs b/crates/telemetry/src/traces.rs index 213684fe9d..25fa2e07f2 100644 --- a/crates/telemetry/src/traces.rs +++ b/crates/telemetry/src/traces.rs @@ -2,7 +2,6 @@ use std::time::Duration; use anyhow::bail; use opentelemetry::{global, trace::TracerProvider}; -use opentelemetry_otlp::SpanExporterBuilder; use opentelemetry_sdk::{ resource::{EnvResourceDetector, TelemetryResourceDetector}, Resource, @@ -35,17 +34,26 @@ pub(crate) fn otel_tracing_layer LookupSpan<'span>>( ); // This will configure the exporter based on the OTEL_EXPORTER_* environment variables. - let exporter_builder: SpanExporterBuilder = match OtlpProtocol::traces_protocol_from_env() { - OtlpProtocol::Grpc => opentelemetry_otlp::new_exporter().tonic().into(), - OtlpProtocol::HttpProtobuf => opentelemetry_otlp::new_exporter().http().into(), + let exporter = match OtlpProtocol::traces_protocol_from_env() { + OtlpProtocol::Grpc => opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .build()?, + OtlpProtocol::HttpProtobuf => opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build()?, OtlpProtocol::HttpJson => bail!("http/json OTLP protocol is not supported"), }; - let tracer_provider = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(exporter_builder) - .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(resource)) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; + let span_processor = opentelemetry_sdk::trace::BatchSpanProcessor::builder( + exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .build(); + + let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder() + .with_config(opentelemetry_sdk::trace::Config::default().with_resource(resource)) + .with_span_processor(span_processor) + .build(); global::set_tracer_provider(tracer_provider.clone()); From ce38299c999639be414cef3673e3d4bfab17c363 Mon Sep 17 00:00:00 2001 From: Caleb Schoepp Date: Thu, 21 Nov 2024 10:57:36 -0700 Subject: [PATCH 2/2] Fix OTel error handler migration Signed-off-by: Caleb Schoepp --- crates/telemetry/src/lib.rs | 45 ------------------------------------- 1 file changed, 45 deletions(-) diff --git a/crates/telemetry/src/lib.rs b/crates/telemetry/src/lib.rs index 0cdf2c8b13..69ba52ee76 100644 --- a/crates/telemetry/src/lib.rs +++ b/crates/telemetry/src/lib.rs @@ -4,8 +4,6 @@ use env::otel_logs_enabled; use env::otel_metrics_enabled; use env::otel_tracing_enabled; use opentelemetry_sdk::propagation::TraceContextPropagator; -use tracing_subscriber::filter::filter_fn; -use tracing_subscriber::fmt::Layer as OtelFmtLayer; use tracing_subscriber::{fmt, prelude::*, registry, EnvFilter, Layer}; pub mod detector; @@ -66,16 +64,6 @@ pub fn init(spin_version: String) -> anyhow::Result { .add_directive("[{app_log_non_utf8}]=off".parse()?), ); - let opentelemetry_layer = OtelFmtLayer::new() - .with_writer(std::io::stderr) - .with_filter(filter_fn(|metadata| { - metadata.target().starts_with("opentelemetry") - })); - - // let non_opentelemetry_filter = filter_fn(|metadata| !metadata.target().starts_with("opentelemetry")); - // let otel_bridge_layer = layer::OpenTelemetryTracingBridge::new(&cloned_provider) - // .with_filter(non_opentelemetry_filter); - let otel_tracing_layer = if otel_tracing_enabled() { Some(traces::otel_tracing_layer(spin_version.clone())?) } else { @@ -93,8 +81,6 @@ pub fn init(spin_version: String) -> anyhow::Result { .with(otel_tracing_layer) .with(otel_metrics_layer) .with(fmt_layer) - .with(opentelemetry_layer) - // .with(otel_bridge_layer) .init(); // Used to propagate trace information in the standard W3C TraceContext format. Even if the otel @@ -108,37 +94,6 @@ pub fn init(spin_version: String) -> anyhow::Result { Ok(ShutdownGuard) } -// fn otel_error_handler(err: opentelemetry::global::Error) { -// // Track the error count -// let signal = match err { -// opentelemetry::global::Error::Metric(_) => "metrics", -// opentelemetry::global::Error::Trace(_) => "traces", -// opentelemetry::global::Error::Log(_) => "logs", -// _ => "unknown", -// }; -// metrics::monotonic_counter!(spin.otel_error_count = 1, signal = signal); - -// // Only log the first error at ERROR level, subsequent errors will be logged at higher levels and rate limited -// static FIRST_OTEL_ERROR: std::sync::Once = std::sync::Once::new(); -// FIRST_OTEL_ERROR.call_once(|| { -// tracing::error!(?err, "OpenTelemetry error"); -// tracing::error!("There has been an error with the OpenTelemetry system. Traces, logs or metrics are likely failing to export."); -// tracing::error!("Further OpenTelemetry errors will be available at WARN level (rate limited) or at TRACE level."); -// }); - -// // Rate limit the logging of the OTel errors to not occur more frequently on each thread than OTEL_ERROR_INTERVAL -// const OTEL_ERROR_INTERVAL: Duration = Duration::from_millis(5000); -// thread_local! { -// static LAST_OTEL_ERROR: Cell = Cell::new(Instant::now() - OTEL_ERROR_INTERVAL); -// } -// if LAST_OTEL_ERROR.get().elapsed() > OTEL_ERROR_INTERVAL { -// LAST_OTEL_ERROR.set(Instant::now()); -// tracing::warn!(?err, "OpenTelemetry error"); -// } else { -// tracing::trace!(?err, "OpenTelemetry error"); -// } -// } - /// An RAII implementation for connection to open telemetry services. /// /// Shutdown of the open telemetry services will happen on `Drop`.