diff --git a/Cargo.lock b/Cargo.lock index 81b2b455355..ace1e4af6da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,7 +147,7 @@ dependencies = [ "arrow-data", "arrow-schema", "arrow-select", - "base64", + "base64 0.21.7", "chrono", "half 2.4.0", "lexical-core", @@ -784,6 +784,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -1170,6 +1176,19 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.6.1" @@ -1469,6 +1488,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "getrandom" version = "0.2.12" @@ -1853,7 +1882,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edc3606fd16aca7989db2f84bb25684d0270c6d6fa1dbcd0025af7b4130523a6" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "chrono", "serde", @@ -1880,7 +1909,7 @@ version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "033450dfa0762130565890dadf2f8835faedf749376ca13345bcd8ecd6b5f29f" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "chrono", "either", @@ -2354,6 +2383,85 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8af72d59a4484654ea8eb183fea5ae4eb6a41d7ac3e3bae5f4d2a282a3a7d3ca" +dependencies = [ + "async-trait", + "futures", + "futures-util", + "http 0.2.12", + "opentelemetry", + "opentelemetry-proto", + "prost 0.11.9", + "thiserror", + "tokio", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "045f8eea8c0fa19f7d48e7bc3128a39c2e5c533d5c61298c548dfefc1064474c" +dependencies = [ + "futures", + "futures-util", + "opentelemetry", + "prost 0.11.9", + "tonic 0.8.3", +] + +[[package]] +name = "opentelemetry_api" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +dependencies = [ + "fnv", + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +dependencies = [ + "async-trait", + "crossbeam-channel", + "dashmap", + "fnv", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -2447,7 +2555,7 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ - "base64", + "base64 0.21.7", "serde", ] @@ -2661,6 +2769,16 @@ dependencies = [ "proptest", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.12.3" @@ -2668,7 +2786,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.3", ] [[package]] @@ -2685,7 +2803,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.12.3", "prost-types", "regex", "syn 2.0.52", @@ -2693,6 +2811,19 @@ dependencies = [ "which", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.12.3" @@ -2712,7 +2843,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" dependencies = [ - "prost", + "prost 0.12.3", ] [[package]] @@ -2958,7 +3089,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -3348,7 +3479,7 @@ dependencies = [ "aho-corasick", "arc-swap", "async-trait", - "base64", + "base64 0.21.7", "bitpacking", "byteorder", "census", @@ -3670,6 +3801,38 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.13.1", + "bytes", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost 0.11.9", + "prost-derive 0.11.9", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic" version = "0.10.2" @@ -3679,7 +3842,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.21.7", "bytes", "h2", "http 0.2.12", @@ -3688,7 +3851,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.3", "tokio", "tokio-stream", "tower", @@ -3736,7 +3899,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "base64", + "base64 0.21.7", "bitflags 2.4.2", "bytes", "futures-core", @@ -3786,6 +3949,24 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "tracing-bunyan-formatter" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5c266b9ac83dedf0e0385ad78514949e6d89491269e7065bee51d2bb8ec7373" +dependencies = [ + "ahash", + "gethostname", + "log", + "serde", + "serde_json", + "time", + "tracing", + "tracing-core", + "tracing-log 0.1.4", + "tracing-subscriber", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -3796,6 +3977,27 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + +[[package]] +name = "tracing-log" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -3807,6 +4009,20 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00a39dcf9bfc1742fa4d6215253b33a6e474be78275884c216fc2a06267b3600" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log 0.1.4", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -3822,7 +4038,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", ] [[package]] @@ -4279,10 +4495,12 @@ dependencies = [ "kube", "murmur3", "num_cpus", + "opentelemetry", + "opentelemetry-otlp", "parking_lot", "proptest", "proptest-state-machine", - "prost", + "prost 0.12.3", "prost-types", "rand", "rayon", @@ -4295,9 +4513,11 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tonic", + "tonic 0.10.2", "tonic-build", "tracing", + "tracing-bunyan-formatter", + "tracing-opentelemetry", "tracing-subscriber", "uuid", ] diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 688c6e9bdfa..bd248dd9638 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -45,7 +45,11 @@ arrow = "50.0.0" roaring = "0.10.3" tantivy = "0.21.1" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-bunyan-formatter = "0.3.3" +tracing-opentelemetry = "0.19.0" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry-otlp = "0.12.0" [dev-dependencies] proptest = "1.4.0" diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 95eb1924932..453bad7c99e 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -4,6 +4,8 @@ # for now we nest it in the worker directory query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "10.244.0.9" my_port: 50051 assignment_policy: @@ -32,6 +34,8 @@ query_service: worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "10.244.0.9" my_port: 50051 assignment_policy: diff --git a/rust/worker/src/bin/query_service.rs b/rust/worker/src/bin/query_service.rs index ee4fe280c65..f3cfa4c8282 100644 --- a/rust/worker/src/bin/query_service.rs +++ b/rust/worker/src/bin/query_service.rs @@ -2,8 +2,5 @@ use worker::query_service_entrypoint; #[tokio::main] async fn main() { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) - .init(); query_service_entrypoint().await; } diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index a441074f146..b2e488085f2 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -93,6 +93,8 @@ impl RootConfig { /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. pub(crate) struct QueryServiceConfig { + pub(crate) service_name: String, + pub(crate) otel_endpoint: String, pub(crate) my_ip: String, pub(crate) my_port: u16, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, @@ -115,6 +117,8 @@ pub(crate) struct QueryServiceConfig { /// Each submodule that needs to be configured from the config object should implement the Configurable trait and /// have its own field in this struct for its Config struct. pub(crate) struct CompactionServiceConfig { + pub(crate) service_name: String, + pub(crate) otel_endpoint: String, pub(crate) my_ip: String, pub(crate) my_port: u16, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, @@ -150,6 +154,8 @@ mod tests { "chroma_config.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -178,6 +184,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -227,6 +235,8 @@ mod tests { "random_path.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -255,6 +265,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -322,6 +334,8 @@ mod tests { "chroma_config.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -350,6 +364,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" my_ip: "192.0.0.1" my_port: 50051 assignment_policy: @@ -401,6 +417,8 @@ mod tests { "chroma_config.yaml", r#" query_service: + service_name: "query-service" + otel_endpoint: "http://jaeger:4317" assignment_policy: RendezvousHashing: hasher: Murmur3 @@ -427,6 +445,8 @@ mod tests { worker_queue_size: 100 compaction_service: + service_name: "compaction-service" + otel_endpoint: "http://jaeger:4317" assignment_policy: RendezvousHashing: hasher: Murmur3 diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index eb1f93120ad..1ae4fde5675 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -13,6 +13,7 @@ mod server; mod storage; mod sysdb; mod system; +mod tracing; mod types; use config::Configurable; @@ -35,6 +36,12 @@ pub async fn query_service_entrypoint() { }; let config = config.query_service; + + crate::tracing::opentelemetry_config::init_otel_tracing( + &config.service_name, + &config.otel_endpoint, + ); + let system: system::System = system::System::new(); let dispatcher = match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await { @@ -94,6 +101,12 @@ pub async fn compaction_service_entrypoint() { }; let config = config.compaction_service; + + crate::tracing::opentelemetry_config::init_otel_tracing( + &config.service_name, + &config.otel_endpoint, + ); + let system: system::System = system::System::new(); let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config( diff --git a/rust/worker/src/tracing/mod.rs b/rust/worker/src/tracing/mod.rs new file mode 100644 index 00000000000..5d70633af78 --- /dev/null +++ b/rust/worker/src/tracing/mod.rs @@ -0,0 +1 @@ +pub(crate) mod opentelemetry_config; diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs new file mode 100644 index 00000000000..2630123ebcc --- /dev/null +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -0,0 +1,51 @@ +use opentelemetry::global; +use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::trace; +use opentelemetry_otlp::WithExportConfig; +use tracing_bunyan_formatter::BunyanFormattingLayer; +use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer}; + +pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { + println!( + "Registering jaeger subscriber for {} at endpoint {}", + service_name, otel_endpoint + ); + let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new( + "service.name", + service_name.clone(), + )]); + // Prepare trace config. + let trace_config = trace::config() + .with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn) + .with_resource(resource); + // Prepare exporter. + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(otel_endpoint); + let otlp_tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .with_trace_config(trace_config) + .install_batch(opentelemetry::runtime::Tokio) + .expect("Error - Failed to create tracer."); + // Layer for adding our configured tracer. + // Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end. + let exporter_layer = tracing_opentelemetry::layer() + .with_tracer(otlp_tracer) + .with_filter(tracing_subscriber::filter::LevelFilter::TRACE); + // Layer for printing spans to stdout. Only print INFO logs by default. + let stdout_layer = + BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout) + .with_filter(tracing_subscriber::filter::LevelFilter::INFO); + // global filter layer. Don't filter anything at global layer. + let global_layer = EnvFilter::new("TRACE"); + // Create subscriber. + let subscriber = tracing_subscriber::registry() + .with(global_layer) + .with(stdout_layer) + .with(exporter_layer); + global::set_text_map_propagator(TraceContextPropagator::new()); + tracing::subscriber::set_global_default(subscriber) + .expect("Set global default subscriber failed"); + println!("Set global subscriber for {}", service_name); +}