Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add otel support for query and compaction service #2122

Merged
merged 5 commits into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 235 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ arrow = "50.0.0"
roaring = "0.10.3"
tantivy = "0.21.1"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-bunyan-formatter = "0.3.3"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-otlp = "0.12.0"

[dev-dependencies]
proptest = "1.4.0"
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# for now we nest it in the worker directory

query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "10.244.0.9"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -32,6 +34,8 @@ query_service:
worker_queue_size: 100

compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "10.244.0.9"
my_port: 50051
assignment_policy:
Expand Down
3 changes: 0 additions & 3 deletions rust/worker/src/bin/query_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,5 @@ use worker::query_service_entrypoint;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
query_service_entrypoint().await;
}
20 changes: 20 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ impl RootConfig {
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
/// have its own field in this struct for its Config struct.
pub(crate) struct QueryServiceConfig {
pub(crate) service_name: String,
pub(crate) otel_endpoint: String,
pub(crate) my_ip: String,
pub(crate) my_port: u16,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
Expand All @@ -115,6 +117,8 @@ pub(crate) struct QueryServiceConfig {
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
/// have its own field in this struct for its Config struct.
pub(crate) struct CompactionServiceConfig {
pub(crate) service_name: String,
pub(crate) otel_endpoint: String,
pub(crate) my_ip: String,
pub(crate) my_port: u16,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
Expand Down Expand Up @@ -150,6 +154,8 @@ mod tests {
"chroma_config.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -178,6 +184,8 @@ mod tests {
worker_queue_size: 100

compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -227,6 +235,8 @@ mod tests {
"random_path.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -255,6 +265,8 @@ mod tests {
worker_queue_size: 100

compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -322,6 +334,8 @@ mod tests {
"chroma_config.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -350,6 +364,8 @@ mod tests {
worker_queue_size: 100

compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -401,6 +417,8 @@ mod tests {
"chroma_config.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
Expand All @@ -427,6 +445,8 @@ mod tests {
worker_queue_size: 100

compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
Expand Down
13 changes: 13 additions & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod server;
mod storage;
mod sysdb;
mod system;
mod tracing;
mod types;

use config::Configurable;
Expand All @@ -35,6 +36,12 @@ pub async fn query_service_entrypoint() {
};

let config = config.query_service;

crate::tracing::opentelemetry_config::init_otel_tracing(
&config.service_name,
&config.otel_endpoint,
);

let system: system::System = system::System::new();
let dispatcher =
match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await {
Expand Down Expand Up @@ -94,6 +101,12 @@ pub async fn compaction_service_entrypoint() {
};

let config = config.compaction_service;

crate::tracing::opentelemetry_config::init_otel_tracing(
&config.service_name,
&config.otel_endpoint,
);

let system: system::System = system::System::new();

let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config(
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/tracing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod opentelemetry_config;
51 changes: 51 additions & 0 deletions rust/worker/src/tracing/opentelemetry_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace;
use opentelemetry_otlp::WithExportConfig;
use tracing_bunyan_formatter::BunyanFormattingLayer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};

pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
println!(
"Registering jaeger subscriber for {} at endpoint {}",
service_name, otel_endpoint
);
let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
service_name.clone(),
)]);
// Prepare trace config.
let trace_config = trace::config()
.with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn)
.with_resource(resource);
// Prepare exporter.
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otel_endpoint);
let otlp_tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(trace_config)
.install_batch(opentelemetry::runtime::Tokio)
.expect("Error - Failed to create tracer.");
// Layer for adding our configured tracer.
// Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end.
let exporter_layer = tracing_opentelemetry::layer()
.with_tracer(otlp_tracer)
.with_filter(tracing_subscriber::filter::LevelFilter::TRACE);
// Layer for printing spans to stdout. Only print INFO logs by default.
let stdout_layer =
BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout)
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
// global filter layer. Don't filter anything at global layer.
let global_layer = EnvFilter::new("TRACE");
// Create subscriber.
let subscriber = tracing_subscriber::registry()
.with(global_layer)
.with(stdout_layer)
.with(exporter_layer);
global::set_text_map_propagator(TraceContextPropagator::new());
tracing::subscriber::set_global_default(subscriber)
.expect("Set global default subscriber failed");
println!("Set global subscriber for {}", service_name);
}
Loading