Skip to content

Commit

Permalink
Merge branch 'new-index' into trace-otlp
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusz-reichert authored and m-reichert committed Oct 7, 2024
2 parents 6676626 + 004fc74 commit 9a22b04
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 71 deletions.
8 changes: 8 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

if ! has nix_direnv_version || ! nix_direnv_version 2.2.1; then
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/2.2.1/direnvrc" "sha256-zelF0vLbEl5uaqrfIzbgNzJWGmLzCmYAkInj/LNxvKs="
fi

watch_file rust-toolchain.toml
use flake
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ target
*~
*.pyc
result
.direnv
26 changes: 17 additions & 9 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,37 @@
flake-utils.lib.eachDefaultSystem
(system:
let
overlays = [ (import rust-overlay) ];
overlays = [
(import rust-overlay)
(import ./rocksdb-overlay.nix)
];
pkgs = import nixpkgs {
inherit system overlays;
};
rustToolchain = pkgs.pkgsBuildHost.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;

craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain;

src = craneLib.cleanCargoSource ./.;
src = craneLib.cleanCargoSource ./.;

nativeBuildInputs = with pkgs; [ rustToolchain clang ]; # required only at build time
buildInputs = with pkgs; [ ]; # also required at runtime

commonArgs = {
inherit src buildInputs nativeBuildInputs;
envVars = {
LIBCLANG_PATH = "${pkgs.libclang.lib}/lib";
ELEMENTSD_SKIP_DOWNLOAD = true;
BITCOIND_SKIP_DOWNLOAD = true;
ELECTRUMD_SKIP_DOWNLOAD = true;

# link rocksdb dynamically
ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include";
ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib";
};

commonArgs = {
inherit src buildInputs nativeBuildInputs;
} // envVars;

cargoArtifacts = craneLib.buildDepsOnly commonArgs;
bin = craneLib.buildPackage (commonArgs // {
inherit cargoArtifacts;
Expand All @@ -55,7 +66,6 @@
doCheck = false;
});


in
with pkgs;
{
Expand All @@ -76,11 +86,9 @@
program = "${bin}/bin/electrs";
};


devShells.default = mkShell {
devShells.default = mkShell (envVars // {
inputsFrom = [ bin ];
LIBCLANG_PATH = "${pkgs.libclang.lib}/lib"; # for rocksdb
};
});
}
);
}
13 changes: 13 additions & 0 deletions rocksdb-overlay.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
final: prev: {

rocksdb = prev.rocksdb.overrideAttrs (oldAttrs: rec {
version = "8.1.1";

src = final.fetchFromGitHub {
owner = "facebook";
repo = oldAttrs.pname;
rev = "v${version}";
hash = "sha256-79hRtc5QSWLLyjRGCmuYZSoIc9IcIsnl8UCinz2sVw4=";
};
});
}
8 changes: 8 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
[toolchain]
channel = "1.75.0"
components = [
"cargo",
"clippy",
"rust-src",
"rust-std",
"rustc",
"rustfmt"
]
4 changes: 2 additions & 2 deletions src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ fn main() {
//info!("{:?},{:?}", txid, blockid);

let prevouts = chain.lookup_txos(
&tx.input
tx.input
.iter()
.filter(|txin| has_prevout(txin))
.map(|txin| txin.previous_output)
.collect(),
);
).unwrap();

let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum();
let small_out = tx
Expand Down
8 changes: 8 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ impl DB {
self.db.get(key).unwrap().map(|v| v.to_vec())
}

pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
self.db.multi_get(keys)
}

fn verify_compatibility(&self, config: &Config) {
let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap();

Expand Down
37 changes: 16 additions & 21 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arraydeque::{ArrayDeque, Wrapping};
use itertools::Itertools;
use itertools::{Either, Itertools};

#[cfg(not(feature = "liquid"))]
use bitcoin::consensus::encode::serialize;
Expand Down Expand Up @@ -327,7 +327,7 @@ impl Mempool {
self.txstore.insert(txid, tx);
}
// Phase 2: index history and spend edges (can fail if some txos cannot be found)
let txos = match self.lookup_txos(&self.get_prevouts(&txids)) {
let txos = match self.lookup_txos(self.get_prevouts(&txids)) {
Ok(txos) => txos,
Err(err) => {
warn!("lookup txouts failed: {}", err);
Expand Down Expand Up @@ -415,35 +415,30 @@ impl Mempool {
}

#[instrument(skip_all, name="Mempool::lookup_txo")]
pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
let mut outpoints = BTreeSet::new();
outpoints.insert(*outpoint);
Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap())
fn lookup_txo(&self, outpoint: &OutPoint) -> Option<TxOut> {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
}

#[instrument(skip_all, name="Mempool::lookup_txos")]
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
pub fn lookup_txos(&self, outpoints: BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
let _timer = self
.latency
.with_label_values(&["lookup_txos"])
.start_timer();

let confirmed_txos = self.chain.lookup_avail_txos(outpoints);
// Get the txos available in the mempool, skipping over (and collecting) missing ones
let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints
.into_iter()
.partition_map(|outpoint| match self.lookup_txo(&outpoint) {
Some(txout) => Either::Left((outpoint, txout)),
None => Either::Right(outpoint),
});

let mempool_txos = outpoints
.iter()
.filter(|outpoint| !confirmed_txos.contains_key(outpoint))
.map(|outpoint| {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
.map(|txout| (*outpoint, txout))
.chain_err(|| format!("missing outpoint {:?}", outpoint))
})
.collect::<Result<HashMap<OutPoint, TxOut>>>()?;
// Get the remaining txos from the chain (fails if any are missing)
txos.extend(self.chain.lookup_txos(remain_outpoints)?);

let mut txos = confirmed_txos;
txos.extend(mempool_txos);
Ok(txos)
}

Expand Down
2 changes: 1 addition & 1 deletion src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Query {
}

#[instrument(skip_all, name="query::Query::lookup_txos")]
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
pub fn lookup_txos(&self, outpoints: BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
// the mempool lookup_txos() internally looks up confirmed txos as well
self.mempool()
.lookup_txos(outpoints)
Expand Down
50 changes: 16 additions & 34 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl Indexer {
fn index(&self, blocks: &[BlockEntry]) {
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap()
};
let rows = {
let _timer = self.start_timer("index_process");
Expand Down Expand Up @@ -899,15 +899,9 @@ impl ChainQuery {
}

#[instrument(skip_all, name="schema::ChainQuery::lookup_txos")]
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
pub fn lookup_txos(&self, outpoints: BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
let _timer = self.start_timer("lookup_txos");
lookup_txos(&self.store.txstore_db, outpoints, false)
}

#[instrument(skip_all, name="schema::ChainQuery::lookup_avail_txos")]
pub fn lookup_avail_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
let _timer = self.start_timer("lookup_available_txos");
lookup_txos(&self.store.txstore_db, outpoints, true)
lookup_txos(&self.store.txstore_db, outpoints)
}

#[instrument(skip_all, name="schema::ChainQuery::lookup_spend")]
Expand Down Expand Up @@ -1087,31 +1081,19 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet<OutPoint> {
}

#[instrument(skip_all, name = "schema::lookup_txos")]
fn lookup_txos(
txstore_db: &DB,
outpoints: &BTreeSet<OutPoint>,
allow_missing: bool,
) -> HashMap<OutPoint, TxOut> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(16) // we need to saturate SSD IOPS
.thread_name(|i| format!("lookup-txo-{}", i))
.build()
.unwrap();
pool.install(|| {
outpoints
.par_iter()
.filter_map(|outpoint| {
lookup_txo(&txstore_db, &outpoint)
.or_else(|| {
if !allow_missing {
panic!("missing txo {} in {:?}", outpoint, txstore_db);
}
None
})
.map(|txo| (*outpoint, txo))
})
.collect()
})
fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
let keys = outpoints.iter().map(TxOutRow::key).collect::<Vec<_>>();
txstore_db
.multi_get(keys)
.into_iter()
.zip(outpoints)
.map(|(res, outpoint)| {
let txo = res
.unwrap()
.ok_or_else(|| format!("missing txo {}", outpoint))?;
Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut")))
})
.collect()
}

fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option<TxOut> {
Expand Down
2 changes: 1 addition & 1 deletion src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ fn prepare_txs(
})
.collect();

let prevouts = query.lookup_txos(&outpoints);
let prevouts = query.lookup_txos(outpoints);

txs.into_iter()
.map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config))
Expand Down
2 changes: 1 addition & 1 deletion tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl TestRunner {
}

// Setup node
let node = NodeD::with_conf(noded::downloaded_exe_path().unwrap(), &node_conf).unwrap();
let node = NodeD::with_conf(noded::exe_path().unwrap(), &node_conf).unwrap();

#[cfg(not(feature = "liquid"))]
let (node_client, params) = (&node.client, &node.params);
Expand Down
3 changes: 1 addition & 2 deletions tests/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ fn test_electrum() -> Result<()> {
let server_arg = format!("{}:t", electrum_addr.to_string());
electrum_wallet_conf.args = vec!["-v", "--server", &server_arg];
electrum_wallet_conf.view_stdout = true;
let electrum_wallet =
ElectrumD::with_conf(electrumd::downloaded_exe_path()?, &electrum_wallet_conf)?;
let electrum_wallet = ElectrumD::with_conf(electrumd::exe_path()?, &electrum_wallet_conf)?;

let notify_wallet = || {
electrum_server.notify();
Expand Down

0 comments on commit 9a22b04

Please sign in to comment.