diff --git a/.envrc b/.envrc new file mode 100644 index 000000000..6b6394b2f --- /dev/null +++ b/.envrc @@ -0,0 +1,8 @@ +#!/bin/bash + +if ! has nix_direnv_version || ! nix_direnv_version 2.2.1; then + source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/2.2.1/direnvrc" "sha256-zelF0vLbEl5uaqrfIzbgNzJWGmLzCmYAkInj/LNxvKs=" +fi + +watch_file rust-toolchain.toml +use flake diff --git a/.gitignore b/.gitignore index 8ed8a0203..9e717b212 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ target *~ *.pyc result +.direnv diff --git a/flake.nix b/flake.nix index 2199c7625..d23f3735f 100644 --- a/flake.nix +++ b/flake.nix @@ -20,7 +20,10 @@ flake-utils.lib.eachDefaultSystem (system: let - overlays = [ (import rust-overlay) ]; + overlays = [ + (import rust-overlay) + (import ./rocksdb-overlay.nix) + ]; pkgs = import nixpkgs { inherit system overlays; }; @@ -28,18 +31,26 @@ craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain; - src = craneLib.cleanCargoSource ./.; + src = craneLib.cleanCargoSource ./.; nativeBuildInputs = with pkgs; [ rustToolchain clang ]; # required only at build time buildInputs = with pkgs; [ ]; # also required at runtime - commonArgs = { - inherit src buildInputs nativeBuildInputs; + envVars = { LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; ELEMENTSD_SKIP_DOWNLOAD = true; BITCOIND_SKIP_DOWNLOAD = true; ELECTRUMD_SKIP_DOWNLOAD = true; + + # link rocksdb dynamically + ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include"; + ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib"; }; + + commonArgs = { + inherit src buildInputs nativeBuildInputs; + } // envVars; + cargoArtifacts = craneLib.buildDepsOnly commonArgs; bin = craneLib.buildPackage (commonArgs // { inherit cargoArtifacts; @@ -55,7 +66,6 @@ doCheck = false; }); - in with pkgs; { @@ -76,11 +86,9 @@ program = "${bin}/bin/electrs"; }; - - devShells.default = mkShell { + devShells.default = mkShell (envVars // { inputsFrom = [ bin ]; - LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; # for rocksdb - }; + }); } ); } diff --git a/rocksdb-overlay.nix b/rocksdb-overlay.nix new file mode 100644 index 000000000..722ba7a7f --- /dev/null +++ b/rocksdb-overlay.nix @@ -0,0 +1,13 @@ +final: prev: { + + rocksdb = prev.rocksdb.overrideAttrs (oldAttrs: rec { + version = "8.1.1"; + + src = final.fetchFromGitHub { + owner = "facebook"; + repo = oldAttrs.pname; + rev = "v${version}"; + hash = "sha256-79hRtc5QSWLLyjRGCmuYZSoIc9IcIsnl8UCinz2sVw4="; + }; + }); +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7897a24d1..64aead883 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,10 @@ [toolchain] channel = "1.75.0" +components = [ + "cargo", + "clippy", + "rust-src", + "rust-std", + "rustc", + "rustfmt" +] diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 8e4c35e30..afe980f8c 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -83,12 +83,12 @@ fn main() { //info!("{:?},{:?}", txid, blockid); let prevouts = chain.lookup_txos( - &tx.input + tx.input .iter() .filter(|txin| has_prevout(txin)) .map(|txin| txin.previous_output) .collect(), - ); + ).unwrap(); let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum(); let small_out = tx diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 30ae70a6e..0b67e7a56 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -202,6 +202,14 @@ impl DB { self.db.get(key).unwrap().map(|v| v.to_vec()) } + pub fn multi_get(&self, keys: I) -> Vec>, rocksdb::Error>> + where + K: AsRef<[u8]>, + I: IntoIterator, + { + self.db.multi_get(keys) + } + fn verify_compatibility(&self, config: &Config) { let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap(); diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 91ea6431d..059e17846 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -1,5 +1,5 @@ use arraydeque::{ArrayDeque, Wrapping}; -use itertools::Itertools; +use itertools::{Either, Itertools}; #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode::serialize; @@ -327,7 +327,7 @@ impl Mempool { self.txstore.insert(txid, tx); } // Phase 2: index history and spend edges (can fail if some txos cannot be found) - let txos = match self.lookup_txos(&self.get_prevouts(&txids)) { + let txos = match self.lookup_txos(self.get_prevouts(&txids)) { Ok(txos) => txos, Err(err) => { warn!("lookup txouts failed: {}", err); @@ -415,35 +415,30 @@ impl Mempool { } #[instrument(skip_all, name="Mempool::lookup_txo")] - pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result { - let mut outpoints = BTreeSet::new(); - outpoints.insert(*outpoint); - Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap()) + fn lookup_txo(&self, outpoint: &OutPoint) -> Option { + self.txstore + .get(&outpoint.txid) + .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) } #[instrument(skip_all, name="Mempool::lookup_txos")] - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> Result> { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self .latency .with_label_values(&["lookup_txos"]) .start_timer(); - let confirmed_txos = self.chain.lookup_avail_txos(outpoints); + // Get the txos available in the mempool, skipping over (and collecting) missing ones + let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints + .into_iter() + .partition_map(|outpoint| match self.lookup_txo(&outpoint) { + Some(txout) => Either::Left((outpoint, txout)), + None => Either::Right(outpoint), + }); - let mempool_txos = outpoints - .iter() - .filter(|outpoint| !confirmed_txos.contains_key(outpoint)) - .map(|outpoint| { - self.txstore - .get(&outpoint.txid) - .and_then(|tx| tx.output.get(outpoint.vout as usize).cloned()) - .map(|txout| (*outpoint, txout)) - .chain_err(|| format!("missing outpoint {:?}", outpoint)) - }) - .collect::>>()?; + // Get the remaining txos from the chain (fails if any are missing) + txos.extend(self.chain.lookup_txos(remain_outpoints)?); - let mut txos = confirmed_txos; - txos.extend(mempool_txos); Ok(txos) } diff --git a/src/new_index/query.rs b/src/new_index/query.rs index ecaf413c2..c188bc1d5 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -128,7 +128,7 @@ impl Query { } #[instrument(skip_all, name="query::Query::lookup_txos")] - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> HashMap { // the mempool lookup_txos() internally looks up confirmed txos as well self.mempool() .lookup_txos(outpoints) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 86d7c46bf..306a48a8f 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -336,7 +336,7 @@ impl Indexer { fn index(&self, blocks: &[BlockEntry]) { let previous_txos_map = { let _timer = self.start_timer("index_lookup"); - lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false) + lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap() }; let rows = { let _timer = self.start_timer("index_process"); @@ -899,15 +899,9 @@ impl ChainQuery { } #[instrument(skip_all, name="schema::ChainQuery::lookup_txos")] - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: BTreeSet) -> Result> { let _timer = self.start_timer("lookup_txos"); - lookup_txos(&self.store.txstore_db, outpoints, false) - } - - #[instrument(skip_all, name="schema::ChainQuery::lookup_avail_txos")] - pub fn lookup_avail_txos(&self, outpoints: &BTreeSet) -> HashMap { - let _timer = self.start_timer("lookup_available_txos"); - lookup_txos(&self.store.txstore_db, outpoints, true) + lookup_txos(&self.store.txstore_db, outpoints) } #[instrument(skip_all, name="schema::ChainQuery::lookup_spend")] @@ -1087,31 +1081,19 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet { } #[instrument(skip_all, name = "schema::lookup_txos")] -fn lookup_txos( - txstore_db: &DB, - outpoints: &BTreeSet, - allow_missing: bool, -) -> HashMap { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(16) // we need to saturate SSD IOPS - .thread_name(|i| format!("lookup-txo-{}", i)) - .build() - .unwrap(); - pool.install(|| { - outpoints - .par_iter() - .filter_map(|outpoint| { - lookup_txo(&txstore_db, &outpoint) - .or_else(|| { - if !allow_missing { - panic!("missing txo {} in {:?}", outpoint, txstore_db); - } - None - }) - .map(|txo| (*outpoint, txo)) - }) - .collect() - }) +fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet) -> Result> { + let keys = outpoints.iter().map(TxOutRow::key).collect::>(); + txstore_db + .multi_get(keys) + .into_iter() + .zip(outpoints) + .map(|(res, outpoint)| { + let txo = res + .unwrap() + .ok_or_else(|| format!("missing txo {}", outpoint))?; + Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut"))) + }) + .collect() } fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { diff --git a/src/rest.rs b/src/rest.rs index f5cc76d69..7567bff81 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -471,7 +471,7 @@ fn prepare_txs( }) .collect(); - let prevouts = query.lookup_txos(&outpoints); + let prevouts = query.lookup_txos(outpoints); txs.into_iter() .map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config)) diff --git a/tests/common.rs b/tests/common.rs index 78932aa50..c85ebf7d9 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -57,7 +57,7 @@ impl TestRunner { } // Setup node - let node = NodeD::with_conf(noded::downloaded_exe_path().unwrap(), &node_conf).unwrap(); + let node = NodeD::with_conf(noded::exe_path().unwrap(), &node_conf).unwrap(); #[cfg(not(feature = "liquid"))] let (node_client, params) = (&node.client, &node.params); diff --git a/tests/electrum.rs b/tests/electrum.rs index d35d91eb3..504d16ca8 100644 --- a/tests/electrum.rs +++ b/tests/electrum.rs @@ -23,8 +23,7 @@ fn test_electrum() -> Result<()> { let server_arg = format!("{}:t", electrum_addr.to_string()); electrum_wallet_conf.args = vec!["-v", "--server", &server_arg]; electrum_wallet_conf.view_stdout = true; - let electrum_wallet = - ElectrumD::with_conf(electrumd::downloaded_exe_path()?, &electrum_wallet_conf)?; + let electrum_wallet = ElectrumD::with_conf(electrumd::exe_path()?, &electrum_wallet_conf)?; let notify_wallet = || { electrum_server.notify();