Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling tracing by usage of opentelemetry #128

Draft
wants to merge 2 commits into
base: new-index
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,576 changes: 1,243 additions & 333 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 19 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ default-run = "electrs"
liquid = ["elements"]
electrum-discovery = ["electrum-client"]
bench = []
default = ["no-otlp-tracing"]
otlp-tracing = [
"tracing/max_level_trace",
"tracing-subscriber",
"opentelemetry",
"tracing-opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions"
]
no-otlp-tracing = [
"tracing/max_level_off"
]

[dependencies]
arraydeque = "0.5.1"
Expand Down Expand Up @@ -53,7 +65,13 @@ url = "2.2.0"
hyper = "0.14"
hyperlocal = "0.8"
# close to same tokio version as dependent by hyper v0.14 and hyperlocal 0.8 -- things can go awry if they mismatch
tokio = { version = "1", features = ["sync", "macros"] }
tokio = { version = "1", features = ["sync", "macros", "rt-multi-thread", "rt"] }
opentelemetry = { version = "0.20.0", features = ["rt-tokio"], optional = true }
tracing-opentelemetry = { version = "0.21.0", optional = true }
opentelemetry-otlp = { version = "0.13.0", default-features = false, features = ["http-proto", "reqwest-client"], optional = true }
tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt"], optional = true }
opentelemetry-semantic-conventions = { version = "0.12.0", optional = true }
tracing = { version = "0.1.40", default-features = false, features = ["attributes"] }

# optional dependencies for electrum-discovery
electrum-client = { version = "0.8", optional = true }
Expand Down
17 changes: 16 additions & 1 deletion src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use electrs::{
signal::Waiter,
};

#[cfg(feature = "otlp-tracing")]
use electrs::otlp_trace;

#[cfg(feature = "liquid")]
use electrs::elements::AssetRegistry;
use electrs::metrics::MetricOpts;
Expand Down Expand Up @@ -148,10 +151,22 @@ fn run_server(config: Arc<Config>) -> Result<()> {
Ok(())
}

fn main() {
fn main_() {
let config = Arc::new(Config::from_args());
if let Err(e) = run_server(config) {
error!("server failed: {}", e.display_chain());
process::exit(1);
}
}

#[cfg(feature = "no-otlp-tracing")]
fn main() {
main_();
}

#[cfg(feature = "otlp-tracing")]
#[tokio::main]
async fn main() {
let _tracing_guard = otlp_trace::init_tracing("electrs");
main_()
}
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ impl Config {
stderrlog::Timestamp::Off
});
log.init().expect("logging initialization failed");

let config = Config {
log,
network_type,
Expand Down
35 changes: 35 additions & 0 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use bitcoin::consensus::encode::{deserialize, serialize_hex};
#[cfg(feature = "liquid")]
use elements::encode::{deserialize, serialize_hex};

use tracing::instrument;

use crate::chain::{Block, BlockHash, BlockHeader, Network, Transaction, Txid};
use crate::metrics::{HistogramOpts, HistogramVec, Metrics};
use crate::signal::Waiter;
Expand All @@ -42,6 +44,7 @@ lazy_static! {
const MAX_ATTEMPTS: u32 = 5;
const RETRY_WAIT_DURATION: Duration = Duration::from_secs(1);

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn parse_hash<T>(value: &Value) -> Result<T>
where
T: FromStr,
Expand All @@ -55,6 +58,7 @@ where
.chain_err(|| format!("non-hex value: {}", value))?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn header_from_value(value: Value) -> Result<BlockHeader> {
let header_hex = value
.as_str()
Expand Down Expand Up @@ -149,6 +153,7 @@ struct Connection {
signal: Waiter,
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
loop {
match TcpStream::connect_timeout(&addr, *DAEMON_CONNECTION_TIMEOUT) {
Expand All @@ -171,6 +176,7 @@ fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
}

impl Connection {
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn new(
addr: SocketAddr,
cookie_getter: Arc<dyn CookieGetter>,
Expand All @@ -190,10 +196,12 @@ impl Connection {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn reconnect(&self) -> Result<Connection> {
Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone())
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn send(&mut self, request: &str) -> Result<()> {
let cookie = &self.cookie_getter.get()?;
let msg = format!(
Expand All @@ -207,6 +215,7 @@ impl Connection {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn recv(&mut self) -> Result<String> {
// TODO: use proper HTTP parser.
let mut in_header = true;
Expand Down Expand Up @@ -372,6 +381,7 @@ impl Daemon {
Ok(daemon)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
Expand All @@ -386,6 +396,7 @@ impl Daemon {
})
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn list_blk_files(&self) -> Result<Vec<PathBuf>> {
let path = self.blocks_dir.join("blk*.dat");
debug!("listing block files at {:?}", path);
Expand Down Expand Up @@ -421,6 +432,7 @@ impl Daemon {
self.network.magic()
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
let mut conn = self.conn.lock().unwrap();
let timer = self.latency.with_label_values(&[method]).start_timer();
Expand All @@ -438,6 +450,7 @@ impl Daemon {
Ok(result)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!(), method = %method))]
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let req = json!({"method": method, "params": params, "id": id});
Expand All @@ -460,10 +473,12 @@ impl Daemon {
}
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn request(&self, method: &str, params: Value) -> Result<Value> {
self.retry_request(method, &params)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn retry_reconnect(&self) -> Daemon {
// XXX add a max reconnection attempts limit?
loop {
Expand All @@ -478,12 +493,14 @@ impl Daemon {

// Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching),
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn requests_iter<'a>(
&'a self,
method: &'a str,
Expand All @@ -506,24 +523,29 @@ impl Daemon {

// bitcoind JSONRPC API:

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getblockchaininfo(&self) -> Result<BlockchainInfo> {
let info: Value = self.request("getblockchaininfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid blockchain info")?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn getnetworkinfo(&self) -> Result<NetworkInfo> {
let info: Value = self.request("getnetworkinfo", json!([]))?;
Ok(from_value(info).chain_err(|| "invalid network info")?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getbestblockhash(&self) -> Result<BlockHash> {
parse_hash(&self.request("getbestblockhash", json!([]))?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getblockheader(&self, blockhash: &BlockHash) -> Result<BlockHeader> {
header_from_value(self.request("getblockheader", json!([blockhash, /*verbose=*/ false]))?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
Expand All @@ -538,17 +560,20 @@ impl Daemon {
Ok(result)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getblock(&self, blockhash: &BlockHash) -> Result<Block> {
let block =
block_from_value(self.request("getblock", json!([blockhash, /*verbose=*/ false]))?)?;
assert_eq!(block.block_hash(), *blockhash);
Ok(block)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getblock_raw(&self, blockhash: &BlockHash, verbose: u32) -> Result<Value> {
self.request("getblock", json!([blockhash, verbose]))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getblocks(&self, blockhashes: &[BlockHash]) -> Result<Vec<Block>> {
let params_list: Vec<Value> = blockhashes
.iter()
Expand Down Expand Up @@ -585,6 +610,7 @@ impl Daemon {

/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<(Txid, Transaction)>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

Expand All @@ -609,6 +635,7 @@ impl Daemon {
.collect()
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn gettransaction_raw(
&self,
txid: &Txid,
Expand All @@ -618,20 +645,24 @@ impl Daemon {
self.request("getrawtransaction", json!([txid, verbose, blockhash]))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getmempooltx(&self, txhash: &Txid) -> Result<Transaction> {
let value = self.request("getrawtransaction", json!([txhash, /*verbose=*/ false]))?;
tx_from_value(value)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn getmempooltxids(&self) -> Result<HashSet<Txid>> {
let res = self.request("getrawmempool", json!([/*verbose=*/ false]))?;
Ok(serde_json::from_value(res).chain_err(|| "invalid getrawmempool reply")?)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn broadcast(&self, tx: &Transaction) -> Result<Txid> {
self.broadcast_raw(&serialize_hex(tx))
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn broadcast_raw(&self, txhex: &str) -> Result<Txid> {
let txid = self.request("sendrawtransaction", json!([txhex]))?;
Ok(
Expand All @@ -643,6 +674,7 @@ impl Daemon {
// Get estimated feerates for the provided confirmation targets using a batch RPC request
// Missing estimates are logged but do not cause a failure, whatever is available is returned
#[allow(clippy::float_cmp)]
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn estimatesmartfee_batch(&self, conf_targets: &[u16]) -> Result<HashMap<u16, f64>> {
let params_list: Vec<Value> = conf_targets
.iter()
Expand Down Expand Up @@ -677,6 +709,7 @@ impl Daemon {
.collect())
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
fn get_all_headers(&self, tip: &BlockHash) -> Result<Vec<BlockHeader>> {
let info: Value = self.request("getblockheader", json!([tip]))?;
let tip_height = info
Expand Down Expand Up @@ -704,6 +737,7 @@ impl Daemon {
}

// Returns a list of BlockHeaders in ascending height (i.e. the tip is last).
#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn get_new_headers(
&self,
indexed_headers: &HeaderList,
Expand Down Expand Up @@ -736,6 +770,7 @@ impl Daemon {
Ok(new_headers)
}

#[instrument(skip_all, fields(module = module_path!(), file = file!(), line = line!()))]
pub fn get_relayfee(&self) -> Result<f64> {
let relayfee = self.getnetworkinfo()?.relayfee;

Expand Down
44 changes: 27 additions & 17 deletions src/electrum/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,23 +548,33 @@ mod tests {
false,
None,
));
discovery.add_default_server(
"electrum.blockstream.info".into(),
vec![Service::Tcp(60001)],
).unwrap();
discovery.add_default_server("testnet.hsmiths.com".into(), vec![Service::Ssl(53012)]).unwrap();
discovery.add_default_server(
"tn.not.fyi".into(),
vec![Service::Tcp(55001), Service::Ssl(55002)],
).unwrap();
discovery.add_default_server(
"electrum.blockstream.info".into(),
vec![Service::Tcp(60001), Service::Ssl(60002)],
).unwrap();
discovery.add_default_server(
"explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion".into(),
vec![Service::Tcp(143)],
).unwrap();
discovery
.add_default_server(
"electrum.blockstream.info".into(),
vec![Service::Tcp(60001)],
)
.unwrap();
discovery
.add_default_server("testnet.hsmiths.com".into(), vec![Service::Ssl(53012)])
.unwrap();
discovery
.add_default_server(
"tn.not.fyi".into(),
vec![Service::Tcp(55001), Service::Ssl(55002)],
)
.unwrap();
discovery
.add_default_server(
"electrum.blockstream.info".into(),
vec![Service::Tcp(60001), Service::Ssl(60002)],
)
.unwrap();
discovery
.add_default_server(
"explorerzydxu5ecjrkwceayqybizmpjjznk5izmitf2modhcusuqlid.onion".into(),
vec![Service::Tcp(143)],
)
.unwrap();

debug!("{:#?}", discovery);

Expand Down
Loading
Loading