From 47279a14c22e4a23431bbea16de89a615be8f9e3 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 2 May 2024 11:47:01 -0700 Subject: [PATCH 1/5] Hooking Jaeger --- Cargo.lock | 221 ++++++++++++++++-- rust/worker/Cargo.toml | 3 + rust/worker/src/bin/query_service.rs | 3 - rust/worker/src/lib.rs | 3 + rust/worker/src/tracing/mod.rs | 1 + .../src/tracing/opentelemetry_config.rs | 33 +++ 6 files changed, 246 insertions(+), 18 deletions(-) create mode 100644 rust/worker/src/tracing/mod.rs create mode 100644 rust/worker/src/tracing/opentelemetry_config.rs diff --git a/Cargo.lock b/Cargo.lock index 81b2b455355..a9afb35aee5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,7 +147,7 @@ dependencies = [ "arrow-data", "arrow-schema", "arrow-select", - "base64", + "base64 0.21.7", "chrono", "half 2.4.0", "lexical-core", @@ -784,6 +784,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -1170,6 +1176,19 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.6.1" @@ -1853,7 +1872,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edc3606fd16aca7989db2f84bb25684d0270c6d6fa1dbcd0025af7b4130523a6" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "chrono", "serde", @@ -1880,7 +1899,7 @@ version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "033450dfa0762130565890dadf2f8835faedf749376ca13345bcd8ecd6b5f29f" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "chrono", "either", @@ -2354,6 +2373,85 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca" +dependencies = [ + "async-trait", + "futures", + "futures-util", + "http 0.2.12", + "opentelemetry", + "opentelemetry-proto", + "prost 0.11.9", + "thiserror", + "tokio", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c" +dependencies = [ + "futures", + "futures-util", + "opentelemetry", + "prost 0.11.9", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry_api" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -2447,7 +2545,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ - "base64", + "base64 0.21.7", "serde", ] @@ -2661,6 +2759,16 @@ dependencies = [ "proptest", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.12.3" @@ -2668,7 +2776,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.3", ] [[package]] @@ -2685,7 +2793,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.3", "prost-types", "regex", "syn 2.0.52", @@ -2693,6 +2801,19 @@ dependencies = [ "which", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.12.3" @@ -2712,7 +2833,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" dependencies = [ - "prost", + "prost 0.12.3", ] [[package]] @@ -2958,7 +3079,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -3348,7 +3469,7 @@ dependencies = [ "aho-corasick", "arc-swap", "async-trait", - "base64", + "base64 0.21.7", "bitpacking", "byteorder", "census", @@ -3670,6 +3791,38 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.9", + "prost-derive 0.11.9", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic" version = "0.10.2" @@ -3679,7 +3832,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.21.7", "bytes", "h2", "http 0.2.12", @@ -3688,7 +3841,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.3", "tokio", "tokio-stream", "tower", @@ -3736,7 +3889,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "base64", + "base64 0.21.7", "bitflags 2.4.2", "bytes", "futures-core", @@ -3796,6 +3949,27 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -3807,6 +3981,20 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log 0.1.4", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -3822,7 +4010,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", ] [[package]] @@ -4279,10 +4467,12 @@ dependencies = [ "kube", "murmur3", "num_cpus", + "opentelemetry", + "opentelemetry-otlp", "parking_lot", "proptest", "proptest-state-machine", - "prost", + "prost 0.12.3", "prost-types", "rand", "rayon", @@ -4295,9 +4485,10 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tonic", + "tonic 0.10.2", "tonic-build", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", ] diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 688c6e9bdfa..93a5130d410 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -45,7 +45,10 @@ arrow = "50.0.0" roaring = "0.10.3" tantivy = "0.21.1" tracing = "0.1" +tracing-opentelemetry = "0.19.0" tracing-subscriber = "0.3" +opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry-otlp = "0.12.0" [dev-dependencies] proptest = "1.4.0" diff --git a/rust/worker/src/bin/query_service.rs b/rust/worker/src/bin/query_service.rs index ee4fe280c65..f3cfa4c8282 100644 --- a/rust/worker/src/bin/query_service.rs +++ b/rust/worker/src/bin/query_service.rs @@ -2,8 +2,5 @@ use worker::query_service_entrypoint; #[tokio::main] async fn main() { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) - .init(); query_service_entrypoint().await; } diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index eb1f93120ad..47292a5d515 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -13,6 +13,7 @@ mod server; mod storage; mod sysdb; mod system; +mod tracing; mod types; use config::Configurable; @@ -34,6 +35,8 @@ pub async fn query_service_entrypoint() { Err(_) => config::RootConfig::load(), }; + crate::tracing::opentelemetry_config::init_oltp_tracing(); + let config = config.query_service; let system: system::System = system::System::new(); let dispatcher = diff --git a/rust/worker/src/tracing/mod.rs b/rust/worker/src/tracing/mod.rs new file mode 100644 index 00000000000..5d70633af78 --- /dev/null +++ b/rust/worker/src/tracing/mod.rs @@ -0,0 +1 @@ +pub(crate) mod opentelemetry_config; diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs new file mode 100644 index 00000000000..ac8c01dded1 --- /dev/null +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -0,0 +1,33 @@ +use opentelemetry::global; +use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::trace; +use opentelemetry_otlp::WithExportConfig; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::Registry; + +pub(crate) fn init_oltp_tracing() { + let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new( + "service.name", + "sanket-test", + )]); + // Prepare trace config. + let trace_config = trace::config() + .with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn) + .with_resource(resource); + // Prepare exporter. Jaeger only for now. + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint("http://jaeger:4317"); + let otlp_tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(trace_config) + .install_batch(opentelemetry::runtime::Tokio) + .expect("Error - Failed to create tracer."); + // Layer for adding our configured tracer. + let tracing_layer = tracing_opentelemetry::layer().with_tracer(otlp_tracer); + global::set_text_map_propagator(TraceContextPropagator::new()); + + Registry::default().with(tracing_layer).init(); +} From 3b58516e51db1c7d6d5cf222d5f1004d0dabf7e0 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 2 May 2024 15:56:23 -0700 Subject: [PATCH 2/5] Add config for export endpoint and service name --- rust/worker/Cargo.toml | 2 +- rust/worker/chroma_config.yaml | 4 ++++ rust/worker/src/config.rs | 4 ++++ rust/worker/src/lib.rs | 14 ++++++++++++-- .../worker/src/tracing/opentelemetry_config.rs | 18 +++++++++++------- 5 files changed, 32 insertions(+), 10 deletions(-) diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 93a5130d410..a70028e8c05 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -46,7 +46,7 @@ roaring = "0.10.3" tantivy = "0.21.1" tracing = "0.1" tracing-opentelemetry = "0.19.0" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] } opentelemetry-otlp = "0.12.0" diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 95eb1924932..565fc370016 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -4,6 +4,8 @@ # for now we nest it in the worker directory query_service: + svc_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "10.244.0.9" my_port: 50051 assignment_policy: @@ -32,6 +34,8 @@ query_service: worker_queue_size: 100 compaction_service: + svc_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "10.244.0.9" my_port: 50051 assignment_policy: diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index a441074f146..3c23aba0dca 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -93,6 +93,8 @@ impl RootConfig { /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. pub(crate) struct QueryServiceConfig { + pub(crate) svc_name: String, + pub(crate) otel_endpoint: String, pub(crate) my_ip: String, pub(crate) my_port: u16, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, @@ -115,6 +117,8 @@ pub(crate) struct QueryServiceConfig { /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. pub(crate) struct CompactionServiceConfig { + pub(crate) svc_name: String, + pub(crate) otel_endpoint: String, pub(crate) my_ip: String, pub(crate) my_port: u16, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index 47292a5d515..d28e4487a66 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -35,9 +35,13 @@ pub async fn query_service_entrypoint() { Err(_) => config::RootConfig::load(), }; - crate::tracing::opentelemetry_config::init_oltp_tracing(); - let config = config.query_service; + + crate::tracing::opentelemetry_config::init_otel_tracing( + &config.svc_name, + &config.otel_endpoint, + ); + let system: system::System = system::System::new(); let dispatcher = match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await { @@ -97,6 +101,12 @@ pub async fn compaction_service_entrypoint() { }; let config = config.compaction_service; + + crate::tracing::opentelemetry_config::init_otel_tracing( + &config.svc_name, + &config.otel_endpoint, + ); + let system: system::System = system::System::new(); let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config( diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index ac8c01dded1..36a0f7cd352 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -3,13 +3,12 @@ use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace; use opentelemetry_otlp::WithExportConfig; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::Registry; +use tracing_subscriber::{EnvFilter, Registry}; -pub(crate) fn init_oltp_tracing() { +pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new( "service.name", - "sanket-test", + service_name.clone(), )]); // Prepare trace config. let trace_config = trace::config() @@ -18,7 +17,7 @@ pub(crate) fn init_oltp_tracing() { // Prepare exporter. Jaeger only for now. let exporter = opentelemetry_otlp::new_exporter() .tonic() - .with_endpoint("http://jaeger:4317"); + .with_endpoint(otel_endpoint); let otlp_tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter(exporter) @@ -27,7 +26,12 @@ pub(crate) fn init_oltp_tracing() { .expect("Error - Failed to create tracer."); // Layer for adding our configured tracer. let tracing_layer = tracing_opentelemetry::layer().with_tracer(otlp_tracer); + // Level filter layer to filter traces based on level (trace, debug, info, warn, error). + let level_filter_layer = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); + let subscriber = Registry::default() + .with(tracing_layer) + .with(level_filter_layer); global::set_text_map_propagator(TraceContextPropagator::new()); - - Registry::default().with(tracing_layer).init(); + tracing::subscriber::set_global_default(subscriber) + .expect("Set global default subscriber failed"); } From 8b7507af02129e456acaaba179d666fde1d73849 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Sat, 4 May 2024 13:25:32 -0700 Subject: [PATCH 3/5] Add stdout layer + review comments --- Cargo.lock | 29 +++++++++++++++++ rust/worker/Cargo.toml | 1 + rust/worker/chroma_config.yaml | 4 +-- rust/worker/src/config.rs | 4 +-- rust/worker/src/lib.rs | 4 +-- .../src/tracing/opentelemetry_config.rs | 32 +++++++++++++------ 6 files changed, 59 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9afb35aee5..ace1e4af6da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1488,6 +1488,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -3939,6 +3949,24 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "tracing-bunyan-formatter" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" +dependencies = [ + "ahash", + "gethostname", + "log", + "serde", + "serde_json", + "time", + "tracing", + "tracing-core", + "tracing-log 0.1.4", + "tracing-subscriber", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -4488,6 +4516,7 @@ dependencies = [ "tonic 0.10.2", "tonic-build", "tracing", + "tracing-bunyan-formatter", "tracing-opentelemetry", "tracing-subscriber", "uuid", diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index a70028e8c05..bd248dd9638 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -45,6 +45,7 @@ arrow = "50.0.0" roaring = "0.10.3" tantivy = "0.21.1" tracing = "0.1" +tracing-bunyan-formatter = "0.3.3" tracing-opentelemetry = "0.19.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] } diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 565fc370016..453bad7c99e 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -4,7 +4,7 @@ # for now we nest it in the worker directory query_service: - svc_name: "query-service" + service_name: "query-service" otel_endpoint: "http://jaeger:4317" my_ip: "10.244.0.9" my_port: 50051 @@ -34,7 +34,7 @@ query_service: worker_queue_size: 100 compaction_service: - svc_name: "compaction-service" + service_name: "compaction-service" otel_endpoint: "http://jaeger:4317" my_ip: "10.244.0.9" my_port: 50051 diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 3c23aba0dca..eb53759f5d4 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -93,7 +93,7 @@ impl RootConfig { /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. pub(crate) struct QueryServiceConfig { - pub(crate) svc_name: String, + pub(crate) service_name: String, pub(crate) otel_endpoint: String, pub(crate) my_ip: String, pub(crate) my_port: u16, @@ -117,7 +117,7 @@ pub(crate) struct QueryServiceConfig { /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. pub(crate) struct CompactionServiceConfig { - pub(crate) svc_name: String, + pub(crate) service_name: String, pub(crate) otel_endpoint: String, pub(crate) my_ip: String, pub(crate) my_port: u16, diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index d28e4487a66..1ae4fde5675 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -38,7 +38,7 @@ pub async fn query_service_entrypoint() { let config = config.query_service; crate::tracing::opentelemetry_config::init_otel_tracing( - &config.svc_name, + &config.service_name, &config.otel_endpoint, ); @@ -103,7 +103,7 @@ pub async fn compaction_service_entrypoint() { let config = config.compaction_service; crate::tracing::opentelemetry_config::init_otel_tracing( - &config.svc_name, + &config.service_name, &config.otel_endpoint, ); diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index 36a0f7cd352..2630123ebcc 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -2,10 +2,14 @@ use opentelemetry::global; use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::trace; use opentelemetry_otlp::WithExportConfig; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::{EnvFilter, Registry}; +use tracing_bunyan_formatter::BunyanFormattingLayer; +use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer}; pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { + println!( + "Registering jaeger subscriber for {} at endpoint {}", + service_name, otel_endpoint + ); let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new( "service.name", service_name.clone(), @@ -14,7 +18,7 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { let trace_config = trace::config() .with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn) .with_resource(resource); - // Prepare exporter. Jaeger only for now. + // Prepare exporter. let exporter = opentelemetry_otlp::new_exporter() .tonic() .with_endpoint(otel_endpoint); @@ -25,13 +29,23 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { .install_batch(opentelemetry::runtime::Tokio) .expect("Error - Failed to create tracer."); // Layer for adding our configured tracer. - let tracing_layer = tracing_opentelemetry::layer().with_tracer(otlp_tracer); - // Level filter layer to filter traces based on level (trace, debug, info, warn, error). - let level_filter_layer = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); - let subscriber = Registry::default() - .with(tracing_layer) - .with(level_filter_layer); + // Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end. + let exporter_layer = tracing_opentelemetry::layer() + .with_tracer(otlp_tracer) + .with_filter(tracing_subscriber::filter::LevelFilter::TRACE); + // Layer for printing spans to stdout. Only print INFO logs by default. + let stdout_layer = + BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout) + .with_filter(tracing_subscriber::filter::LevelFilter::INFO); + // global filter layer. Don't filter anything at global layer. + let global_layer = EnvFilter::new("TRACE"); + // Create subscriber. + let subscriber = tracing_subscriber::registry() + .with(global_layer) + .with(stdout_layer) + .with(exporter_layer); global::set_text_map_propagator(TraceContextPropagator::new()); tracing::subscriber::set_global_default(subscriber) .expect("Set global default subscriber failed"); + println!("Set global subscriber for {}", service_name); } From 508f73716ecf4f37983b94b6a696f0818a2a70eb Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Sat, 4 May 2024 13:45:07 -0700 Subject: [PATCH 4/5] Fix test --- rust/worker/src/config.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index eb53759f5d4..602723fcc85 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -231,6 +231,8 @@ mod tests { "random_path.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -259,6 +261,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: From 1e27d6f7f5976f9bfaf30631fc8bb8f60352641e Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Sat, 4 May 2024 15:00:56 -0700 Subject: [PATCH 5/5] fix test --- rust/worker/src/config.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 602723fcc85..b2e488085f2 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -154,6 +154,8 @@ mod tests { "chroma_config.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -182,6 +184,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -330,6 +334,8 @@ mod tests { "chroma_config.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -358,6 +364,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -409,6 +417,8 @@ mod tests { "chroma_config.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" assignment_policy: RendezvousHashing: hasher: Murmur3 @@ -435,6 +445,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" assignment_policy: RendezvousHashing: hasher: Murmur3