Skip to content

Commit

Permalink
fix: update tracing to support new otel api
Browse files Browse the repository at this point in the history
  • Loading branch information
dolcalmi committed Nov 26, 2024
1 parent 224a93f commit a349c2c
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/fees/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl FeesClient {
};
tracing::Span::current().record(
"fee_rate",
&tracing::field::display(format!("{:?}", fee_rate)),
tracing::field::display(format!("{:?}", fee_rate)),
);
Ok(fee_rate)
}
Expand Down
2 changes: 1 addition & 1 deletion src/job/batch_broadcasting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn execute(
let blockchain = init_electrum(&blockchain_cfg.electrum_url).await?;
let batch = batches.find_by_id(data.account_id, data.batch_id).await?;
let span = tracing::Span::current();
span.record("txid", &tracing::field::display(batch.bitcoin_tx_id));
span.record("txid", tracing::field::display(batch.bitcoin_tx_id));
if batch.accounting_complete() {
if let Some(tx) = batch.signed_tx {
blockchain.broadcast(&tx).map_err(BdkError::BdkLibError)?;
Expand Down
6 changes: 3 additions & 3 deletions src/job/batch_signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn execute(
let mut new_sessions = HashMap::new();
let mut account_xpubs = HashMap::new();
let batch = batches.find_by_id(data.account_id, data.batch_id).await?;
span.record("tx_id", &tracing::field::display(batch.bitcoin_tx_id));
span.record("tx_id", tracing::field::display(batch.bitcoin_tx_id));
let unsigned_psbt = batch.unsigned_psbt;
for (wallet_id, summary) in batch.wallet_summaries {
let wallet = wallets.find_by_id(wallet_id).await?;
Expand Down Expand Up @@ -132,7 +132,7 @@ pub async fn execute(
}
let mut sessions = sessions.into_values();

span.record("stalled", &tracing::field::display(stalled));
span.record("stalled", tracing::field::display(stalled));
if let Some(mut first_signed_psbt) = sessions.find_map(|s| s.signed_psbt().cloned()) {
for s in sessions {
if let Some(psbt) = s.signed_psbt() {
Expand All @@ -141,7 +141,7 @@ pub async fn execute(
}
if current_keychain.is_none() {
let batch = batches.find_by_id(data.account_id, data.batch_id).await?;
span.record("tx_id", &tracing::field::display(batch.bitcoin_tx_id));
span.record("tx_id", tracing::field::display(batch.bitcoin_tx_id));
let wallet_id = batch.wallet_summaries.into_keys().next().unwrap();
let wallet = wallets.find_by_id(wallet_id).await?;
current_keychain = Some(wallet.current_keychain_wallet(&pool));
Expand Down
23 changes: 10 additions & 13 deletions src/job/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,14 @@ impl<'a> JobExecutor<'a> {
}

async fn handle_error<E: JobExecutionError>(&mut self, meta: JobMeta, error: &E) {
Span::current().record("error", &tracing::field::display("true"));
Span::current().record("error.message", &tracing::field::display(&error));
Span::current().record("error", tracing::field::display("true"));
Span::current().record("error.message", tracing::field::display(&error));
if meta.attempts <= self.warn_retries {
Span::current().record(
"error.level",
&tracing::field::display(tracing::Level::WARN),
);
Span::current().record("error.level", tracing::field::display(tracing::Level::WARN));
} else {
Span::current().record(
"error.level",
&tracing::field::display(tracing::Level::ERROR),
tracing::field::display(tracing::Level::ERROR),
);
}
}
Expand All @@ -133,12 +130,12 @@ impl<'a> JobExecutor<'a> {
data.job_meta.attempts += 1;
data.job_meta.tracing_data = Some(extract_tracing_data());

span.record("job_id", &tracing::field::display(self.job.id()));
span.record("job_name", &tracing::field::display(self.job.name()));
span.record("attempt", &tracing::field::display(data.job_meta.attempts));
span.record("job_id", tracing::field::display(self.job.id()));
span.record("job_name", tracing::field::display(self.job.name()));
span.record("attempt", tracing::field::display(data.job_meta.attempts));
span.record(
"checkpoint_json",
&tracing::field::display(serde_json::to_string(&data).expect("Couldn't checkpoint")),
tracing::field::display(serde_json::to_string(&data).expect("Couldn't checkpoint")),
);

let mut checkpoint =
Expand Down Expand Up @@ -169,11 +166,11 @@ impl<'a> JobExecutor<'a> {
.await?;

if data.job_meta.attempts >= self.max_attempts {
span.record("last_attempt", &tracing::field::display(true));
span.record("last_attempt", tracing::field::display(true));
self.job.complete().await?;
Ok(true)
} else {
span.record("last_attempt", &tracing::field::display(false));
span.record("last_attempt", tracing::field::display(false));
Ok(false)
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/job/process_payout_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,23 @@ pub(super) async fn execute<'a>(

let span = tracing::Span::current();
if let (Some(tx_id), Some(psbt)) = (tx_id, psbt) {
span.record("tx_id", &tracing::field::display(tx_id));
span.record("psbt", &tracing::field::display(&psbt));
span.record("tx_id", tracing::field::display(tx_id));
span.record("psbt", tracing::field::display(&psbt));

let wallet_ids = wallet_totals.keys().copied().collect();
span.record("batch_id", &tracing::field::display(data.batch_id));
span.record("total_fee_sats", &tracing::field::display(fee_satoshis));
span.record("batch_id", tracing::field::display(data.batch_id));
span.record("total_fee_sats", tracing::field::display(fee_satoshis));
span.record(
"total_change_sats",
&tracing::field::display(
tracing::field::display(
wallet_totals
.values()
.fold(Satoshis::ZERO, |acc, v| acc + v.change_satoshis),
),
);
span.record(
"cpfp_fee_sats",
&tracing::field::display(
tracing::field::display(
wallet_totals
.values()
.fold(Satoshis::ZERO, |acc, v| acc + v.cpfp_fee_satoshis),
Expand Down Expand Up @@ -188,7 +188,7 @@ pub async fn construct_psbt(
..
} = payout_queue;
span.record("payout_queue_name", queue_name);
span.record("payout_queue_id", &tracing::field::display(queue_id));
span.record("payout_queue_id", tracing::field::display(queue_id));
span.record("n_unbatched_payouts", unbatched_payouts.n_payouts());

let wallets = wallets.find_by_ids(unbatched_payouts.wallet_ids()).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/outbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ impl Outbox {

current_span.record(
"ledger_event",
&tracing::field::display(
tracing::field::display(
serde_json::to_string(&ledger_event).expect("Couldn't serialize JournalEvent"),
),
);
let payloads = Vec::<OutboxEventPayload>::from(ledger_event.metadata);
let sequences = self.sequences_for(ledger_event.account_id).await?;
let mut write_sequences = sequences.write().await;
let mut sequence = write_sequences.0;
current_span.record("next_sequence", &tracing::field::display(sequence));
current_span.record("next_sequence", tracing::field::display(sequence));
let events: Vec<OutboxEvent<_>> = payloads
.into_iter()
.map(|payload| {
Expand Down
45 changes: 25 additions & 20 deletions src/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use opentelemetry::trace::TracerProvider as _;
use opentelemetry::{propagation::TextMapPropagator, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Sampler};
use opentelemetry_sdk::trace::{Config, Sampler, TracerProvider};
use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::Span;
Expand Down Expand Up @@ -31,22 +33,25 @@ impl Default for TracingConfig {
pub fn init_tracer(config: TracingConfig) -> anyhow::Result<()> {
let tracing_endpoint = format!("http://{}:{}", config.host, config.port);
println!("Sending traces to {tracing_endpoint}");
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(tracing_endpoint),
)
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_resource(opentelemetry_sdk::Resource::new(vec![KeyValue::new(
"service.name",
config.service_name,
)])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;

let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(tracing_endpoint)
.build()?;

let config = Config::default()
.with_sampler(Sampler::AlwaysOn)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
config.service_name,
)]));

let provider = TracerProvider::builder()
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(config)
.build();
let tracer = provider.tracer("readme_example");

let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

let fmt_layer = fmt::layer().json();
Expand Down Expand Up @@ -77,9 +82,9 @@ pub fn inject_tracing_data(span: &Span, tracing_data: &HashMap<String, String>)
}

pub fn insert_error_fields(level: tracing::Level, error: impl std::fmt::Display) {
Span::current().record("error", &tracing::field::display("true"));
Span::current().record("error.level", &tracing::field::display(level));
Span::current().record("error.message", &tracing::field::display(error));
Span::current().record("error", tracing::field::display("true"));
Span::current().record("error.level", tracing::field::display(level));
Span::current().record("error.message", tracing::field::display(error));
}

pub async fn record_error<
Expand Down

0 comments on commit a349c2c

Please sign in to comment.