From e4c6f89202cf2b7dd2fc4d8631928b86a5656bf0 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Mon, 25 Mar 2024 21:00:17 +0200 Subject: [PATCH 1/9] Optimize lookup_txos using rocksdb's MultiGet See https://github.com/facebook/rocksdb/wiki/MultiGet-Performance --- src/new_index/db.rs | 8 ++++++++ src/new_index/schema.rs | 33 +++++++++++++-------------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 8d895050d..f68da233c 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -196,6 +196,14 @@ impl DB { self.db.get(key).unwrap().map(|v| v.to_vec()) } + pub fn multi_get(&self, keys: I) -> Vec>, rocksdb::Error>> + where + K: AsRef<[u8]>, + I: IntoIterator, + { + self.db.multi_get(keys) + } + fn verify_compatibility(&self, config: &Config) { let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap(); diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d5eba9a51..95346cdef 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -1038,26 +1038,19 @@ fn lookup_txos( outpoints: &BTreeSet, allow_missing: bool, ) -> HashMap { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(16) // we need to saturate SSD IOPS - .thread_name(|i| format!("lookup-txo-{}", i)) - .build() - .unwrap(); - pool.install(|| { - outpoints - .par_iter() - .filter_map(|outpoint| { - lookup_txo(&txstore_db, &outpoint) - .or_else(|| { - if !allow_missing { - panic!("missing txo {} in {:?}", outpoint, txstore_db); - } - None - }) - .map(|txo| (*outpoint, txo)) - }) - .collect() - }) + let mut remain_outpoints = outpoints.iter(); + txstore_db + .multi_get(outpoints.iter().map(TxOutRow::key)) + .into_iter() + .filter_map(|res| { + let outpoint = remain_outpoints.next().unwrap(); + match res.unwrap() { + Some(txo) => Some((*outpoint, deserialize(&txo).expect("failed to parse TxOut"))), + None if allow_missing => None, + None => panic!("missing txo {}", outpoint), + } + }) + .collect() } fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { From 75e79f80323c4f5bfd112186e1f970c04ca415d7 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Mon, 25 Mar 2024 21:00:17 +0200 Subject: [PATCH 2/9] Optimize and refactor lookup_txos() - Avoid unnecessary copying of prev outpoints - When looking for both mempool and on-chain txos, accumulate the set of outpoints that remain to be looked up to avoid re-checking for them later again in the found set - Refactored lookup_txos() to use lookup_txo() internally rather than the other way around, which was less efficient - Lookup txos in mempool first, then on-chain - ChainQuery::lookup_txos() now returns a Result instead of panicking when outpoints are missing - Removed ChainQuery::lookup_avail_txos() and allow_missing, which are no longer neceesary --- src/bin/tx-fingerprint-stats.rs | 4 ++-- src/new_index/mempool.rs | 37 ++++++++++++++------------------- src/new_index/query.rs | 2 +- src/new_index/schema.rs | 29 +++++++++----------------- src/rest.rs | 2 +- 5 files changed, 30 insertions(+), 44 deletions(-) diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 8e4c35e30..afe980f8c 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -83,12 +83,12 @@ fn main() { //info!("{:?},{:?}", txid, blockid); let prevouts = chain.lookup_txos( - &tx.input + tx.input .iter() .filter(|txin| has_prevout(txin)) .map(|txin| txin.previous_output) .collect(), - ); + ).unwrap(); let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum(); let small_out = tx diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 179829fd2..94ba7a41d 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -1,5 +1,5 @@ use arraydeque::{ArrayDeque, Wrapping}; -use itertools::Itertools; +use itertools::{Either, Itertools}; #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode::serialize; @@ -310,7 +310,7 @@ impl Mempool { self.txstore.insert(txid, tx); } // Phase 2: index history and spend edges (can fail if some txos cannot be found) - let txos = match self.lookup_txos(&self.get_prevouts(&txids)) { + let txos = match self.lookup_txos(self.get_prevouts(&txids)) { Ok(txos) => txos, Err(err) => { warn!("lookup txouts failed: {}", err); @@ -397,34 +397,29 @@ impl Mempool { } } - pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result { - let mut outpoints = BTreeSet::new(); - outpoints.insert(*outpoint); - Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap()) + fn lookup_txo(&self, outpoint: &OutPoint) -> Option { + self.txstore + .get(&outpoint.txid) + .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self .latency .with_label_values(&["lookup_txos"]) .start_timer(); - let confirmed_txos = self.chain.lookup_avail_txos(outpoints); + // Get the txos available in the mempool, skipping over (and collecting) missing ones + let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints + .into_iter() + .partition_map(|outpoint| match self.lookup_txo(&outpoint) { + Some(txout) => Either::Left((outpoint, txout)), + None => Either::Right(outpoint), + }); - let mempool_txos = outpoints - .iter() - .filter(|outpoint| !confirmed_txos.contains_key(outpoint)) - .map(|outpoint| { - self.txstore - .get(&outpoint.txid) - .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) - .map(|txout| (*outpoint, txout)) - .chain_err(|| format!("missing outpoint {:?}", outpoint)) - }) - .collect::>>()?; + // Get the remaining txos from the chain (fails if any are missing) + txos.extend(self.chain.lookup_txos(remain_outpoints)?); - let mut txos = confirmed_txos; - txos.extend(mempool_txos); Ok(txos) } diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 1e621ac0d..60d7510ab 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -118,7 +118,7 @@ impl Query { .or_else(|| self.mempool().lookup_raw_txn(txid)) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool() .lookup_txos(outpoints) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 95346cdef..d738050a1 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -327,7 +327,7 @@ impl Indexer { fn index(&self, blocks: &[BlockEntry]) { let previous_txos_map = { let _timer = self.start_timer("index_lookup"); - lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false) + lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap() }; let rows = { let _timer = self.start_timer("index_process"); @@ -859,14 +859,9 @@ impl ChainQuery { lookup_txo(&self.store.txstore_db, outpoint) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self.start_timer("lookup_txos"); - lookup_txos(&self.store.txstore_db, outpoints, false) - } - - pub fn lookup_avail_txos(&self, outpoints: &BTreeSet) -> HashMap { - let _timer = self.start_timer("lookup_available_txos"); - lookup_txos(&self.store.txstore_db, outpoints, true) + lookup_txos(&self.store.txstore_db, outpoints) } pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { @@ -1033,21 +1028,17 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { .collect() } -fn lookup_txos( - txstore_db: &DB, - outpoints: &BTreeSet, - allow_missing: bool, -) -> HashMap { - let mut remain_outpoints = outpoints.iter(); +fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet) -> Result> { + let keys = outpoints.iter().map(TxOutRow::key).collect::>(); + let mut remain_outpoints = outpoints.into_iter(); txstore_db - .multi_get(outpoints.iter().map(TxOutRow::key)) + .multi_get(keys) .into_iter() - .filter_map(|res| { + .map(|res| { let outpoint = remain_outpoints.next().unwrap(); match res.unwrap() { - Some(txo) => Some((*outpoint, deserialize(&txo).expect("failed to parse TxOut"))), - None if allow_missing => None, - None => panic!("missing txo {}", outpoint), + Some(txo) => Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut"))), + None => Err(format!("missing txo {}", outpoint).into()), } }) .collect() diff --git a/src/rest.rs b/src/rest.rs index 336c43f4a..43eab5c60 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -468,7 +468,7 @@ fn prepare_txs( }) .collect(); - let prevouts = query.lookup_txos(&outpoints); + let prevouts = query.lookup_txos(outpoints); txs.into_iter() .map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config)) From fca4867b87b0131a657266b2082f044f3ac843d6 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Wed, 26 Jun 2024 19:01:28 +0300 Subject: [PATCH 3/9] Refactor lookup_txos() to use zip() Following https://github.com/Blockstream/electrs/pull/89#discussion_r1632706930 --- src/new_index/schema.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index d738050a1..bbb2e2946 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -1030,16 +1030,15 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet) -> Result> { let keys = outpoints.iter().map(TxOutRow::key).collect::>(); - let mut remain_outpoints = outpoints.into_iter(); txstore_db .multi_get(keys) .into_iter() - .map(|res| { - let outpoint = remain_outpoints.next().unwrap(); - match res.unwrap() { - Some(txo) => Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut"))), - None => Err(format!("missing txo {}", outpoint).into()), - } + .zip(outpoints) + .map(|(res, outpoint)| { + let txo = res + .unwrap() + .ok_or_else(|| format!("missing txo {}", outpoint))?; + Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut"))) }) .collect() } From 83853e4a656530c36226f1b912e0c5e45899e956 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 12 Aug 2024 13:33:02 +0200 Subject: [PATCH 4/9] nix: dinamycally include rocksdb drammatically improve first compilation by avoiding to recompile rocksdb. It may also improve performance. --- flake.nix | 17 +++++++++++++++-- rocksdb-overlay.nix | 13 +++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) create mode 100644 rocksdb-overlay.nix diff --git a/flake.nix b/flake.nix index 2199c7625..6a16330d2 100644 --- a/flake.nix +++ b/flake.nix @@ -20,7 +20,10 @@ flake-utils.lib.eachDefaultSystem (system: let - overlays = [ (import rust-overlay) ]; + overlays = [ + (import rust-overlay) + (import ./rocksdb-overlay.nix) + ]; pkgs = import nixpkgs { inherit system overlays; }; @@ -28,7 +31,7 @@ craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain; - src = craneLib.cleanCargoSource ./.; + src = craneLib.cleanCargoSource ./.; nativeBuildInputs = with pkgs; [ rustToolchain clang ]; # required only at build time buildInputs = with pkgs; [ ]; # also required at runtime @@ -39,6 +42,11 @@ ELEMENTSD_SKIP_DOWNLOAD = true; BITCOIND_SKIP_DOWNLOAD = true; ELECTRUMD_SKIP_DOWNLOAD = true; + + # link rocksdb dynamically + ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include"; + ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib"; + }; cargoArtifacts = craneLib.buildDepsOnly commonArgs; bin = craneLib.buildPackage (commonArgs // { @@ -80,6 +88,11 @@ devShells.default = mkShell { inputsFrom = [ bin ]; LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; # for rocksdb + + # to link rocksdb dynamically + ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include"; + ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib"; + }; } ); diff --git a/rocksdb-overlay.nix b/rocksdb-overlay.nix new file mode 100644 index 000000000..722ba7a7f --- /dev/null +++ b/rocksdb-overlay.nix @@ -0,0 +1,13 @@ +final: prev: { + + rocksdb = prev.rocksdb.overrideAttrs (oldAttrs: rec { + version = "8.1.1"; + + src = final.fetchFromGitHub { + owner = "facebook"; + repo = oldAttrs.pname; + rev = "v${version}"; + hash = "sha256-79hRtc5QSWLLyjRGCmuYZSoIc9IcIsnl8UCinz2sVw4="; + }; + }); +} From b7a891ac863119f9fdb80a161e675c310dfc926e Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 12 Aug 2024 13:44:33 +0200 Subject: [PATCH 5/9] nix: set up direnv users with direnv+nix have the env setup just by entering the dir --- .envrc | 8 ++++++++ .gitignore | 1 + 2 files changed, 9 insertions(+) create mode 100644 .envrc diff --git a/.envrc b/.envrc new file mode 100644 index 000000000..6b6394b2f --- /dev/null +++ b/.envrc @@ -0,0 +1,8 @@ +#!/bin/bash + +if ! has nix_direnv_version || ! nix_direnv_version 2.2.1; then + source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/2.2.1/direnvrc" "sha256-zelF0vLbEl5uaqrfIzbgNzJWGmLzCmYAkInj/LNxvKs=" +fi + +watch_file rust-toolchain.toml +use flake diff --git a/.gitignore b/.gitignore index 8ed8a0203..9e717b212 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ target *~ *.pyc result +.direnv From 4613d9c3afaa8f000b796f75a27dc30bc3a307e4 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 12 Aug 2024 13:53:35 +0200 Subject: [PATCH 6/9] nix: skip exec download in nix they don't work anyway why bother --- flake.nix | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flake.nix b/flake.nix index 6a16330d2..96c961d38 100644 --- a/flake.nix +++ b/flake.nix @@ -88,6 +88,9 @@ devShells.default = mkShell { inputsFrom = [ bin ]; LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; # for rocksdb + ELEMENTSD_SKIP_DOWNLOAD = true; + BITCOIND_SKIP_DOWNLOAD = true; + ELECTRUMD_SKIP_DOWNLOAD = true; # to link rocksdb dynamically ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include"; From 0775a42b2f3fdfc71429fb8a18c05b932a7d56b9 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 12 Aug 2024 13:59:26 +0200 Subject: [PATCH 7/9] define also needed components in rust-toolchain.toml --- rust-toolchain.toml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7897a24d1..64aead883 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,10 @@ [toolchain] channel = "1.75.0" +components = [ + "cargo", + "clippy", + "rust-src", + "rust-std", + "rustc", + "rustfmt" +] From b7d827caf08f186d4190c25d532ac92df09ec507 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 12 Aug 2024 14:20:46 +0200 Subject: [PATCH 8/9] nix: avoid env vars duplication --- flake.nix | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/flake.nix b/flake.nix index 96c961d38..d23f3735f 100644 --- a/flake.nix +++ b/flake.nix @@ -36,8 +36,7 @@ nativeBuildInputs = with pkgs; [ rustToolchain clang ]; # required only at build time buildInputs = with pkgs; [ ]; # also required at runtime - commonArgs = { - inherit src buildInputs nativeBuildInputs; + envVars = { LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; ELEMENTSD_SKIP_DOWNLOAD = true; BITCOIND_SKIP_DOWNLOAD = true; @@ -46,8 +45,12 @@ # link rocksdb dynamically ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include"; ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib"; - }; + + commonArgs = { + inherit src buildInputs nativeBuildInputs; + } // envVars; + cargoArtifacts = craneLib.buildDepsOnly commonArgs; bin = craneLib.buildPackage (commonArgs // { inherit cargoArtifacts; @@ -63,7 +66,6 @@ doCheck = false; }); - in with pkgs; { @@ -84,19 +86,9 @@ program = "${bin}/bin/electrs"; }; - - devShells.default = mkShell { + devShells.default = mkShell (envVars // { inputsFrom = [ bin ]; - LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; # for rocksdb - ELEMENTSD_SKIP_DOWNLOAD = true; - BITCOIND_SKIP_DOWNLOAD = true; - ELECTRUMD_SKIP_DOWNLOAD = true; - - # to link rocksdb dynamically - ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include"; - ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib"; - - }; + }); } ); } From 004fc74160dc4a2b3c1f3cc36b20799b0e72b86d Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 12 Aug 2024 14:31:19 +0200 Subject: [PATCH 9/9] use exe_path instead of downloaded_exe_path exe_path continue to look at downloaded_exe_path unless a specific env var is specified --- tests/common.rs | 2 +- tests/electrum.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/common.rs b/tests/common.rs index 78932aa50..c85ebf7d9 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -57,7 +57,7 @@ impl TestRunner { } // Setup node - let node = NodeD::with_conf(noded::downloaded_exe_path().unwrap(), &node_conf).unwrap(); + let node = NodeD::with_conf(noded::exe_path().unwrap(), &node_conf).unwrap(); #[cfg(not(feature = "liquid"))] let (node_client, params) = (&node.client, &node.params); diff --git a/tests/electrum.rs b/tests/electrum.rs index d35d91eb3..504d16ca8 100644 --- a/tests/electrum.rs +++ b/tests/electrum.rs @@ -23,8 +23,7 @@ fn test_electrum() -> Result<()> { let server_arg = format!("{}:t", electrum_addr.to_string()); electrum_wallet_conf.args = vec!["-v", "--server", &server_arg]; electrum_wallet_conf.view_stdout = true; - let electrum_wallet = - ElectrumD::with_conf(electrumd::downloaded_exe_path()?, &electrum_wallet_conf)?; + let electrum_wallet = ElectrumD::with_conf(electrumd::exe_path()?, &electrum_wallet_conf)?; let notify_wallet = || { electrum_server.notify();