From e510720a46f99f4493f50b9c973e9b0f2651271e Mon Sep 17 00:00:00 2001 From: Mariusz Reichert Date: Fri, 20 Sep 2024 12:00:00 +0200 Subject: [PATCH] Opentelemetry instrumentation implementation --- Cargo.lock | 33 +------------ Cargo.toml | 24 +++++++--- src/bin/electrs.rs | 17 ++++++- src/bin/tx-fingerprint-stats.rs | 16 ++++--- src/config.rs | 1 + src/daemon.rs | 35 ++++++++++++++ src/electrum/discovery.rs | 44 ++++++++++------- src/electrum/server.rs | 11 +++++ src/lib.rs | 9 ++++ src/new_index/fetch.rs | 8 ++++ src/new_index/mempool.rs | 34 ++++++++++--- src/new_index/precache.rs | 4 ++ src/new_index/query.rs | 20 ++++++++ src/new_index/schema.rs | 3 +- src/otlp_trace.rs | 85 +++++++++++++++++++++++++++++++++ src/rest.rs | 4 ++ src/util/block.rs | 7 +++ src/util/electrum_merkle.rs | 6 ++- src/util/fees.rs | 3 ++ 19 files changed, 293 insertions(+), 71 deletions(-) create mode 100644 src/otlp_trace.rs diff --git a/Cargo.lock b/Cargo.lock index cb397384f..226becc4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1793,16 +1793,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi 0.3.9", -] - [[package]] name = "num-conv" version = "0.1.0" @@ -1974,12 +1964,6 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "page_size" version = "0.6.0" @@ -3345,7 +3329,6 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3383,17 +3366,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - [[package]] name = "tracing-opentelemetry" version = "0.21.0" @@ -3406,7 +3378,7 @@ dependencies = [ "smallvec", "tracing", "tracing-core", - "tracing-log 0.1.4", + "tracing-log", "tracing-subscriber", ] @@ -3417,15 +3389,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", - "nu-ansi-term", "once_cell", "regex", "sharded-slab", - "smallvec", "thread_local", "tracing", "tracing-core", - "tracing-log 0.2.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1c0644a95..a10193966 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,18 @@ default-run = "electrs" liquid = ["elements"] electrum-discovery = ["electrum-client"] bench = [] +default = ["no-otlp-tracing"] +otlp-tracing = [ + "tracing/max_level_trace", + "tracing-subscriber", + "opentelemetry", + "tracing-opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions" +] +no-otlp-tracing = [ + "tracing/max_level_off" +] [dependencies] arraydeque = "0.5.1" @@ -54,12 +66,12 @@ hyper = "0.14" hyperlocal = "0.8" # close to same tokio version as dependent by hyper v0.14 and hyperlocal 0.8 -- things can go awry if they mismatch tokio = { version = "1", features = ["sync", "macros", "rt-multi-thread", "rt"] } -opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } -tracing-opentelemetry = "0.21.0" -opentelemetry-otlp = { version = "0.13.0", default-features = false, features = ["http-proto", "reqwest-client"] } -tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } -opentelemetry-semantic-conventions = "0.12.0" -tracing = { version = "0.1.40", features = ["async-await", "log"] } +opentelemetry = { version = "0.20.0", features = ["rt-tokio"], optional = true } +tracing-opentelemetry = { version = "0.21.0", optional = true } +opentelemetry-otlp = { version = "0.13.0", default-features = false, features = ["http-proto", "reqwest-client"], optional = true } +tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt"], optional = true } +opentelemetry-semantic-conventions = { version = "0.12.0", optional = true } +tracing = { version = "0.1.40", default-features = false, features = ["attributes"] } # optional dependencies for electrum-discovery electrum-client = { version = "0.8", optional = true } diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 4d4eb1c17..dd094754b 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -20,6 +20,9 @@ use electrs::{ signal::Waiter, }; +#[cfg(feature = "otlp-tracing")] +use electrs::otlp_trace; + #[cfg(feature = "liquid")] use electrs::elements::AssetRegistry; use electrs::metrics::MetricOpts; @@ -142,10 +145,22 @@ fn run_server(config: Arc) -> Result<()> { Ok(()) } -fn main() { +fn main_() { let config = Arc::new(Config::from_args()); if let Err(e) = run_server(config) { error!("server failed: {}", e.display_chain()); process::exit(1); } } + +#[cfg(feature = "no-otlp-tracing")] +fn main() { + main_(); +} + +#[cfg(feature = "otlp-tracing")] +#[tokio::main] +async fn main() { + let _tracing_guard = otlp_trace::init_tracing("electrs"); + main_() +} diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 94a3821ab..5be313d77 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -83,13 +83,15 @@ fn main() { //info!("{:?},{:?}", txid, blockid); - let prevouts = chain.lookup_txos( - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - .collect(), - ).unwrap(); + let prevouts = chain + .lookup_txos( + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + .collect(), + ) + .unwrap(); let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum(); let small_out = tx diff --git a/src/config.rs b/src/config.rs index 4a1066f17..bfac177bd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -396,6 +396,7 @@ impl Config { stderrlog::Timestamp::Off }); log.init().expect("logging initialization failed"); + let config = Config { log, network_type, diff --git a/src/daemon.rs b/src/daemon.rs index 6190f15f5..35341013c 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -19,6 +19,8 @@ use bitcoin::consensus::encode::{deserialize, serialize_hex}; #[cfg(feature = "liquid")] use elements::encode::{deserialize, serialize_hex}; +use tracing::instrument; + use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid}; use crate::metrics::{HistogramOpts, HistogramVec, Metrics}; use crate::signal::Waiter; @@ -41,6 +43,7 @@ lazy_static! { const MAX_ATTEMPTS: u32 = 5; const RETRY_WAIT_DURATION: Duration = Duration::from_secs(1); +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn parse_hash(value: &Value) -> Result where T: FromStr, @@ -54,6 +57,7 @@ where .chain_err(|| format!("non-hex value: {}", value))?) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn header_from_value(value: Value) -> Result { let header_hex = value .as_str() @@ -145,6 +149,7 @@ struct Connection { signal: Waiter, } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result { loop { match TcpStream::connect_timeout(&addr, *DAEMON_CONNECTION_TIMEOUT) { @@ -167,6 +172,7 @@ fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result { } impl Connection { + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn new( addr: SocketAddr, cookie_getter: Arc, @@ -186,10 +192,12 @@ impl Connection { }) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn reconnect(&self) -> Result { Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone()) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn send(&mut self, request: &str) -> Result<()> { let cookie = &self.cookie_getter.get()?; let msg = format!( @@ -203,6 +211,7 @@ impl Connection { }) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn recv(&mut self) -> Result { // TODO: use proper HTTP parser. let mut in_header = true; @@ -368,6 +377,7 @@ impl Daemon { Ok(daemon) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), @@ -382,6 +392,7 @@ impl Daemon { }) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn list_blk_files(&self) -> Result> { let path = self.blocks_dir.join("blk*.dat"); debug!("listing block files at {:?}", path); @@ -397,6 +408,7 @@ impl Daemon { self.network.magic() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn call_jsonrpc(&self, method: &str, request: &Value) -> Result { let mut conn = self.conn.lock().unwrap(); let timer = self.latency.with_label_values(&[method]).start_timer(); @@ -414,6 +426,7 @@ impl Daemon { Ok(result) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!(), method = %method))] fn handle_request(&self, method: &str, params: &Value) -> Result { let id = self.message_id.next(); let req = json!({"method": method, "params": params, "id": id}); @@ -436,10 +449,12 @@ impl Daemon { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn request(&self, method: &str, params: Value) -> Result { self.retry_request(method, ¶ms) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn retry_reconnect(&self) -> Daemon { // XXX add a max reconnection attempts limit? loop { @@ -454,12 +469,14 @@ impl Daemon { // Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching), // buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned. + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn requests(&self, method: &str, params_list: Vec) -> Result> { self.requests_iter(method, params_list).collect() } // Send requests in parallel over multiple RPC connections, iterating over the results without buffering them. // Errors are included in the iterator and do not terminate other pending requests. + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn requests_iter<'a>( &'a self, method: &'a str, @@ -482,24 +499,29 @@ impl Daemon { // bitcoind JSONRPC API: + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getblockchaininfo(&self) -> Result { let info: Value = self.request("getblockchaininfo", json!([]))?; Ok(from_value(info).chain_err(|| "invalid blockchain info")?) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn getnetworkinfo(&self) -> Result { let info: Value = self.request("getnetworkinfo", json!([]))?; Ok(from_value(info).chain_err(|| "invalid network info")?) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getbestblockhash(&self) -> Result { parse_hash(&self.request("getbestblockhash", json!([]))?) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getblockheader(&self, blockhash: &BlockHash) -> Result { header_from_value(self.request("getblockheader", json!([blockhash, /*verbose=*/ false]))?) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getblockheaders(&self, heights: &[usize]) -> Result> { let heights: Vec = heights.iter().map(|height| json!([height])).collect(); let params_list: Vec = self @@ -514,6 +536,7 @@ impl Daemon { Ok(result) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getblock(&self, blockhash: &BlockHash) -> Result { let block = block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?; @@ -521,10 +544,12 @@ impl Daemon { Ok(block) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result { self.request("getblock", json!([blockhash, verbose])) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result> { let params_list: Vec = blockhashes .iter() @@ -561,6 +586,7 @@ impl Daemon { /// Fetch the given transactions in parallel over multiple threads and RPC connections, /// ignoring any missing ones and returning whatever is available. + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; @@ -585,6 +611,7 @@ impl Daemon { .collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn gettransaction_raw( &self, txid: &Txid, @@ -594,20 +621,24 @@ impl Daemon { self.request("getrawtransaction", json!([txid, verbose, blockhash])) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getmempooltx(&self, txhash: &Txid) -> Result { let value = self.request("getrawtransaction", json!([txhash, /*verbose=*/ false]))?; tx_from_value(value) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn getmempooltxids(&self) -> Result> { let res = self.request("getrawmempool", json!([/*verbose=*/ false]))?; Ok(serde_json::from_value(res).chain_err(|| "invalid getrawmempool reply")?) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn broadcast(&self, tx: &Transaction) -> Result { self.broadcast_raw(&serialize_hex(tx)) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.request("sendrawtransaction", json!([txhex]))?; Ok( @@ -619,6 +650,7 @@ impl Daemon { // Get estimated feerates for the provided confirmation targets using a batch RPC request // Missing estimates are logged but do not cause a failure, whatever is available is returned #[allow(clippy::float_cmp)] + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn estimatesmartfee_batch(&self, conf_targets: &[u16]) -> Result> { let params_list: Vec = conf_targets .iter() @@ -653,6 +685,7 @@ impl Daemon { .collect()) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn get_all_headers(&self, tip: &BlockHash) -> Result> { let info: Value = self.request("getblockheader", json!([tip]))?; let tip_height = info @@ -680,6 +713,7 @@ impl Daemon { } // Returns a list of BlockHeaders in ascending height (i.e. the tip is last). + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_new_headers( &self, indexed_headers: &HeaderList, @@ -712,6 +746,7 @@ impl Daemon { Ok(new_headers) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_relayfee(&self) -> Result { let relayfee = self.getnetworkinfo()?.relayfee; diff --git a/src/electrum/discovery.rs b/src/electrum/discovery.rs index cf70221f6..f5bb85588 100644 --- a/src/electrum/discovery.rs +++ b/src/electrum/discovery.rs @@ -547,23 +547,33 @@ mod tests { false, None, )); - discovery.add_default_server( - "electrum.blockstream.info".into(), - vec![Service::Tcp(60001)], - ).unwrap(); - discovery.add_default_server("testnet.hsmiths.com".into(), vec![Service::Ssl(53012)]).unwrap(); - discovery.add_default_server( - "tn.not.fyi".into(), - vec![Service::Tcp(55001), Service::Ssl(55002)], - ).unwrap(); - discovery.add_default_server( - "electrum.blockstream.info".into(), - vec![Service::Tcp(60001), Service::Ssl(60002)], - ).unwrap(); - discovery.add_default_server( - "explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion".into(), - vec![Service::Tcp(143)], - ).unwrap(); + discovery + .add_default_server( + "electrum.blockstream.info".into(), + vec![Service::Tcp(60001)], + ) + .unwrap(); + discovery + .add_default_server("testnet.hsmiths.com".into(), vec![Service::Ssl(53012)]) + .unwrap(); + discovery + .add_default_server( + "tn.not.fyi".into(), + vec![Service::Tcp(55001), Service::Ssl(55002)], + ) + .unwrap(); + discovery + .add_default_server( + "electrum.blockstream.info".into(), + vec![Service::Tcp(60001), Service::Ssl(60002)], + ) + .unwrap(); + discovery + .add_default_server( + "explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion".into(), + vec![Service::Tcp(143)], + ) + .unwrap(); debug!("{:#?}", discovery); diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 2beae0855..dac8b4636 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -13,6 +13,8 @@ use error_chain::ChainedError; use hex::{self, DisplayHex}; use serde_json::{from_str, Value}; +use tracing::instrument; + #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode::serialize_hex; #[cfg(feature = "liquid")] @@ -69,6 +71,7 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result< } // TODO: implement caching and delta updates +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option { if txs.is_empty() { None @@ -261,6 +264,7 @@ impl Connection { })) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn blockchain_estimatefee(&self, params: &[Value]) -> Result { let conf_target = usize_from_value(params.get(0), "blocks_count")?; let fee_rate = self @@ -388,6 +392,7 @@ impl Connection { Ok(json!(rawtx.to_lower_hex_string())) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result { let txid = Txid::from(hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?); let height = usize_from_value(params.get(1), "height")?; @@ -425,12 +430,14 @@ impl Connection { })) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!(), method = %method))] fn handle_command(&mut self, method: &str, params: &[Value], id: &Value) -> Result { let timer = self .stats .latency .with_label_values(&[method]) .start_timer(); + let result = match method { "blockchain.block.header" => self.blockchain_block_header(¶ms), "blockchain.block.headers" => self.blockchain_block_headers(¶ms), @@ -480,6 +487,7 @@ impl Connection { }) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn update_subscriptions(&mut self) -> Result> { let timer = self .stats @@ -537,6 +545,7 @@ impl Connection { Ok(()) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn handle_replies(&mut self, receiver: Receiver) -> Result<()> { let empty_params = json!([]); loop { @@ -601,6 +610,7 @@ impl Connection { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn parse_requests(mut reader: BufReader, tx: &SyncSender) -> Result<()> { loop { let mut line = Vec::::new(); @@ -663,6 +673,7 @@ impl Connection { } } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn get_history( query: &Query, scripthash: &[u8], diff --git a/src/lib.rs b/src/lib.rs index 9dbc58153..249f13d15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,3 +35,12 @@ pub mod util; #[cfg(feature = "liquid")] pub mod elements; + +#[cfg(not(any(feature = "otlp-tracing", feature = "no-otlp-tracing")))] +compile_error!("Must enable one of the 'otlp-tracing' or 'no-otlp-tracing' features"); + +#[cfg(all(feature = "otlp-tracing", feature = "no-otlp-tracing"))] +compile_error!("Cannot enable both the 'otlp-tracing' and 'no-otlp-tracing' (default) features"); + +#[cfg(feature = "otlp-tracing")] +pub mod otlp_trace; diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index d7637ee5c..02dd7a736 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -14,6 +14,8 @@ use std::path::PathBuf; use std::sync::mpsc::Receiver; use std::thread; +use tracing::instrument; + use crate::chain::{Block, BlockHash}; use crate::daemon::Daemon; use crate::errors::*; @@ -25,6 +27,7 @@ pub enum FetchFrom { BlkFiles, } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn start_fetcher( from: FetchFrom, daemon: &Daemon, @@ -67,6 +70,7 @@ impl Fetcher { } } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn bitcoind_fetcher( daemon: &Daemon, new_headers: Vec, @@ -105,6 +109,7 @@ fn bitcoind_fetcher( )) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn blkfiles_fetcher( daemon: &Daemon, new_headers: Vec, @@ -151,6 +156,7 @@ fn blkfiles_fetcher( )) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn blkfiles_reader(blk_files: Vec) -> Fetcher> { let chan = SyncChannel::new(1); let sender = chan.sender(); @@ -170,6 +176,7 @@ fn blkfiles_reader(blk_files: Vec) -> Fetcher> { ) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher> { let chan = SyncChannel::new(1); let sender = chan.sender(); @@ -188,6 +195,7 @@ fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher, magic: u32) -> Result> { let mut cursor = Cursor::new(&blob); let mut slices = vec![]; diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 4ccd4cd59..494c0820e 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -11,6 +11,7 @@ use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +use tracing::instrument; use crate::chain::{deserialize, BlockHash, Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; use crate::daemon::Daemon; @@ -107,6 +108,7 @@ impl Mempool { self.txstore.get(txid).map(serialize) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { self.edges.get(outpoint).map(|(txid, vin)| SpendingInput { txid: *txid, @@ -123,6 +125,7 @@ impl Mempool { Some(self.feeinfo.get(txid)?.fee) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn has_unconfirmed_parents(&self, txid: &Txid) -> bool { let tx = match self.txstore.get(txid) { Some(tx) => tx, @@ -133,6 +136,7 @@ impl Mempool { .any(|txin| self.txstore.contains_key(&txin.previous_output.txid)) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn history(&self, scripthash: &[u8], limit: usize) -> Vec { let _timer = self.latency.with_label_values(&["history"]).start_timer(); self.history @@ -140,6 +144,7 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn _history(&self, entries: &[TxHistoryInfo], limit: usize) -> Vec { entries .iter() @@ -151,6 +156,7 @@ impl Mempool { .collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec { let _timer = self .latency @@ -167,6 +173,7 @@ impl Mempool { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn utxo(&self, scripthash: &[u8]) -> Vec { let _timer = self.latency.with_label_values(&["utxo"]).start_timer(); let entries = match self.history.get(scripthash) { @@ -209,6 +216,7 @@ impl Mempool { .collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] // @XXX avoid code duplication with ChainQuery::stats()? pub fn stats(&self, scripthash: &[u8]) -> ScriptStats { let _timer = self.latency.with_label_values(&["stats"]).start_timer(); @@ -258,12 +266,14 @@ impl Mempool { stats } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] // Get all txids in the mempool pub fn txids(&self) -> Vec<&Txid> { let _timer = self.latency.with_label_values(&["txids"]).start_timer(); self.txstore.keys().collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] // Get an overview of the most recent transactions pub fn recent_txs_overview(&self) -> Vec<&TxOverview> { // We don't bother ever deleting elements from the recent list. @@ -272,14 +282,17 @@ impl Mempool { self.recent.iter().collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn backlog_stats(&self) -> &BacklogStats { &self.backlog_stats.0 } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn txids_set(&self) -> HashSet { return HashSet::from_iter(self.txstore.keys().cloned()); } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn update_backlog_stats(&mut self) { let _timer = self .latency @@ -288,6 +301,7 @@ impl Mempool { self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn add_by_txid(&mut self, daemon: &Daemon, txid: Txid) -> Result<()> { if self.txstore.get(&txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { @@ -302,6 +316,7 @@ impl Mempool { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn add(&mut self, txs_map: HashMap) -> Result<()> { self.delta .with_label_values(&["add"]) @@ -414,12 +429,14 @@ impl Mempool { Ok(()) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn lookup_txo(&self, outpoint: &OutPoint) -> Option { self.txstore .get(&outpoint.txid) .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self .latency @@ -427,12 +444,13 @@ impl Mempool { .start_timer(); // Get the txos available in the mempool, skipping over (and collecting) missing ones - let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints - .into_iter() - .partition_map(|outpoint| match self.lookup_txo(&outpoint) { - Some(txout) => Either::Left((outpoint, txout)), - None => Either::Right(outpoint), - }); + let (mut txos, remain_outpoints): (HashMap<_, _>, _) = + outpoints + .into_iter() + .partition_map(|outpoint| match self.lookup_txo(&outpoint) { + Some(txout) => Either::Left((outpoint, txout)), + None => Either::Right(outpoint), + }); // Get the remaining txos from the chain (fails if any are missing) txos.extend(self.chain.lookup_txos(remain_outpoints)?); @@ -440,6 +458,7 @@ impl Mempool { Ok(txos) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn remove(&mut self, to_remove: HashSet<&Txid>) { self.delta .with_label_values(&["remove"]) @@ -475,6 +494,7 @@ impl Mempool { } #[cfg(feature = "liquid")] + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn asset_history(&self, asset_id: &AssetId, limit: usize) -> Vec { let _timer = self .latency @@ -487,6 +507,7 @@ impl Mempool { /// Sync our local view of the mempool with the bitcoind Daemon RPC. If the chain tip moves before /// the mempool is fetched in full, syncing is aborted and an Ok(false) is returned. + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn update( mempool: &Arc>, daemon: &Daemon, @@ -589,6 +610,7 @@ impl BacklogStats { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn new(feeinfo: &HashMap) -> Self { let (count, vsize, total_fee) = feeinfo .values() diff --git a/src/new_index/precache.rs b/src/new_index/precache.rs index 4db40c27d..402f107dd 100644 --- a/src/new_index/precache.rs +++ b/src/new_index/precache.rs @@ -13,6 +13,9 @@ use std::io; use std::io::prelude::*; use std::str::FromStr; +use tracing::instrument; + +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn precache(chain: &ChainQuery, scripthashes: Vec) { let total = scripthashes.len(); info!("Pre-caching stats and utxo set for {} scripthashes", total); @@ -36,6 +39,7 @@ pub fn precache(chain: &ChainQuery, scripthashes: Vec) { }); } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn scripthashes_from_file(path: String) -> Result> { let reader = io::BufReader::new(File::open(path).chain_err(|| "cannot open precache scripthash file")?); diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 26d983734..d10c468e4 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -11,6 +11,8 @@ use crate::errors::*; use crate::new_index::{ChainQuery, Mempool, ScriptStats, SpendingInput, Utxo}; use crate::util::{is_spendable, BlockId, Bytes, TransactionStatus}; +use tracing::instrument; + #[cfg(feature = "liquid")] use crate::{ chain::AssetId, @@ -69,6 +71,7 @@ impl Query { self.mempool.read().unwrap() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.daemon.broadcast_raw(txhex)?; let _ = self @@ -79,6 +82,7 @@ impl Query { Ok(txid) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn utxo(&self, scripthash: &[u8]) -> Result> { let mut utxos = self.chain.utxo(scripthash, self.config.utxos_limit)?; let mempool = self.mempool(); @@ -87,6 +91,7 @@ impl Query { Ok(utxos) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn history_txids(&self, scripthash: &[u8], limit: usize) -> Vec<(Txid, Option)> { let confirmed_txids = self.chain.history_txids(scripthash, limit); let confirmed_len = confirmed_txids.len(); @@ -108,17 +113,21 @@ impl Query { ) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_txn(&self, txid: &Txid) -> Option { self.chain .lookup_txn(txid, None) .or_else(|| self.mempool().lookup_txn(txid)) } + + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_raw_txn(&self, txid: &Txid) -> Option { self.chain .lookup_raw_txn(txid, None) .or_else(|| self.mempool().lookup_raw_txn(txid)) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_txos(&self, outpoints: BTreeSet) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool() @@ -126,12 +135,14 @@ impl Query { .expect("failed loading txos") } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { self.chain .lookup_spend(outpoint) .or_else(|| self.mempool().lookup_spend(outpoint)) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_tx_spends(&self, tx: Transaction) -> Vec> { let txid = tx.txid(); @@ -151,18 +162,22 @@ impl Query { .collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_tx_status(&self, txid: &Txid) -> TransactionStatus { TransactionStatus::from(self.chain.tx_confirming_block(txid)) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_mempool_tx_fee(&self, txid: &Txid) -> Option { self.mempool().get_tx_fee(txid) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn has_unconfirmed_parents(&self, txid: &Txid) -> bool { self.mempool().has_unconfirmed_parents(txid) } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn estimate_fee(&self, conf_target: u16) -> Option { if self.config.network_type.is_regtest() { return self.get_relayfee().ok(); @@ -182,6 +197,7 @@ impl Query { .copied() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn estimate_fee_map(&self) -> HashMap { if let (ref cache, Some(cache_time)) = *self.cached_estimates.read().unwrap() { if cache_time.elapsed() < Duration::from_secs(FEE_ESTIMATES_TTL) { @@ -193,6 +209,7 @@ impl Query { self.cached_estimates.read().unwrap().0.clone() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn update_fee_estimates(&self) { match self.daemon.estimatesmartfee_batch(&CONF_TARGETS) { Ok(estimates) => { @@ -204,6 +221,7 @@ impl Query { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_relayfee(&self) -> Result { if let Some(cached) = *self.cached_relayfee.read().unwrap() { return Ok(cached); @@ -234,11 +252,13 @@ impl Query { } #[cfg(feature = "liquid")] + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn lookup_asset(&self, asset_id: &AssetId) -> Result> { lookup_asset(&self, self.asset_db.as_ref(), asset_id, None) } #[cfg(feature = "liquid")] + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn list_registry_assets( &self, start_index: usize, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 1ea2a97ef..c2de00f8c 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -377,7 +377,6 @@ impl ChainQuery { pub fn get_block_txids(&self, hash: &BlockHash) -> Option> { let _timer = self.start_timer("get_block_txids"); - if self.light_mode { // TODO fetch block as binary from REST API instead of as hex let mut blockinfo = self.daemon.getblock_raw(hash, 1).ok()?; @@ -458,6 +457,7 @@ impl ChainQuery { &TxHistoryRow::prefix_height(code, &hash[..], start_height as u32), ) } + fn history_iter_scan_reverse(&self, code: u8, hash: &[u8]) -> ReverseScanIterator { self.store.history_db.iter_scan_reverse( &TxHistoryRow::filter(code, &hash[..]), @@ -879,6 +879,7 @@ impl ChainQuery { }) }) } + pub fn tx_confirming_block(&self, txid: &Txid) -> Option { let _timer = self.start_timer("tx_confirming_block"); let headers = self.store.indexed_headers.read().unwrap(); diff --git a/src/otlp_trace.rs b/src/otlp_trace.rs new file mode 100644 index 000000000..834d56737 --- /dev/null +++ b/src/otlp_trace.rs @@ -0,0 +1,85 @@ +use opentelemetry::{ + runtime, + sdk::{ + trace::{BatchConfig, RandomIdGenerator, Sampler, Tracer}, + Resource, + }, + KeyValue, +}; +use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig}; +use opentelemetry_semantic_conventions::{ + resource::{SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use std::env::var; +use std::time::Duration; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +fn init_tracer(resource: Resource, endpoint: &str) -> Tracer { + let export_config = ExportConfig { + endpoint: endpoint.to_string(), + timeout: Duration::from_secs(3), + protocol: Protocol::Grpc, + }; + + opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry::sdk::trace::Config::default() + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource), + ) + .with_batch_config(BatchConfig::default()) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint) + .with_export_config(export_config), + ) + .install_batch(runtime::Tokio) + .unwrap() +} + +fn init_tracing_subscriber(service_name: &str) -> OtelGuard { + let resource = Resource::from_schema_url( + [ + KeyValue::new(SERVICE_NAME, service_name.to_owned()), + KeyValue::new(SERVICE_VERSION, "0.4.1"), + ], + SCHEMA_URL, + ); + + let env_filter = EnvFilter::from_default_env(); + + let reg = tracing_subscriber::registry().with(env_filter).with( + tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_ansi(false) + .compact(), + ); + let _ = if let Ok(endpoint) = var("OTLP_ENDPOINT") { + reg.with(OpenTelemetryLayer::new(init_tracer(resource, &endpoint))) + .try_init() + } else { + reg.try_init() + }; + + log::debug!("Initialized tracing"); + + OtelGuard {} +} + +pub fn init_tracing(service_name: &str) -> OtelGuard { + init_tracing_subscriber(service_name) +} + +pub struct OtelGuard {} +impl Drop for OtelGuard { + fn drop(&mut self) { + opentelemetry::global::shutdown_tracer_provider(); + } +} diff --git a/src/rest.rs b/src/rest.rs index 43eab5c60..fb07fd940 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -24,6 +24,8 @@ use tokio::sync::oneshot; use std::fs; use std::str::FromStr; +use tracing::instrument; + #[cfg(feature = "liquid")] use { crate::elements::{ebcompact::*, peg::PegoutValue, AssetSorting, IssuanceValue}, @@ -579,6 +581,7 @@ impl Handle { } } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn handle_request( method: Method, uri: hyper::Uri, @@ -1154,6 +1157,7 @@ fn json_response(value: T, ttl: u32) -> Result, Htt .unwrap()) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] fn blocks(query: &Query, start_height: Option) -> Result, HttpError> { let mut values = Vec::new(); let mut current_hash = match start_height { diff --git a/src/util/block.rs b/src/util/block.rs index de28cd389..bbd219895 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -9,6 +9,8 @@ use std::slice; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime as DateTime; +use tracing::instrument; + const MTP_SPAN: usize = 11; lazy_static! { @@ -92,6 +94,7 @@ impl HeaderList { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn new( mut headers_map: HashMap, tip_hash: BlockHash, @@ -129,6 +132,7 @@ impl HeaderList { headers } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn order(&self, new_headers: Vec) -> Vec { // header[i] -> header[i-1] (i.e. header.last() is the tip) struct HashedHeader { @@ -168,6 +172,7 @@ impl HeaderList { .collect() } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn apply(&mut self, new_headers: Vec) { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) for i in 1..new_headers.len() { @@ -205,6 +210,7 @@ impl HeaderList { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> { let height = self.heights.get(blockhash)?; let header = self.headers.get(*height)?; @@ -215,6 +221,7 @@ impl HeaderList { } } + #[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn header_by_height(&self, height: usize) -> Option<&HeaderEntry> { self.headers.get(height).map(|entry| { assert_eq!(entry.height(), height); diff --git a/src/util/electrum_merkle.rs b/src/util/electrum_merkle.rs index 954782a7d..0ad6eeae0 100644 --- a/src/util/electrum_merkle.rs +++ b/src/util/electrum_merkle.rs @@ -3,6 +3,9 @@ use crate::errors::*; use crate::new_index::ChainQuery; use bitcoin::hashes::{sha256d::Hash as Sha256dHash, Hash}; +use tracing::instrument; + +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_tx_merkle_proof( chain: &ChainQuery, tx_hash: &Txid, @@ -21,6 +24,7 @@ pub fn get_tx_merkle_proof( Ok((branch, pos)) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_header_merkle_proof( chain: &ChainQuery, height: usize, @@ -49,7 +53,7 @@ pub fn get_header_merkle_proof( let header_hashes = header_hashes.into_iter().map(Sha256dHash::from).collect(); Ok(create_merkle_branch_and_root(header_hashes, height)) } - +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn get_id_from_pos( chain: &ChainQuery, height: usize, diff --git a/src/util/fees.rs b/src/util/fees.rs index 9cbe6c1d7..a3f62c351 100644 --- a/src/util/fees.rs +++ b/src/util/fees.rs @@ -1,6 +1,8 @@ use crate::chain::{Network, Transaction, TxOut}; use std::collections::HashMap; +use tracing::instrument; + const VSIZE_BIN_WIDTH: u64 = 50_000; // in vbytes pub struct TxFeeInfo { @@ -46,6 +48,7 @@ pub fn get_tx_fee(tx: &Transaction, _prevouts: &HashMap, network: N tx.fee_in(*network.native_asset()) } +#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))] pub fn make_fee_histogram(mut entries: Vec<&TxFeeInfo>) -> Vec<(f64, u64)> { entries.sort_unstable_by(|e1, e2| e1.fee_per_vbyte.partial_cmp(&e2.fee_per_vbyte).unwrap());