Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize txo lookup #76

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ fn main() {
//info!("{:?},{:?}", txid, blockid);

let prevouts = chain.lookup_txos(
&tx.input
tx.input
.iter()
.filter(|txin| has_prevout(txin))
.map(|txin| txin.previous_output)
.collect(),
);
).unwrap();

let total_out: u64 = tx.output.iter().map(|out| out.value.to_sat()).sum();
let small_out = tx
Expand Down
8 changes: 8 additions & 0 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ impl DB {
self.db.get(key).unwrap().map(|v| v.to_vec())
}

pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, rocksdb::Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
self.db.multi_get(keys)
}

fn verify_compatibility(&self, config: &Config) {
let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap();

Expand Down
37 changes: 16 additions & 21 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arraydeque::{ArrayDeque, Wrapping};
use itertools::Itertools;
use itertools::{Either, Itertools};

#[cfg(not(feature = "liquid"))]
use bitcoin::consensus::encode::serialize;
Expand Down Expand Up @@ -310,7 +310,7 @@ impl Mempool {
self.txstore.insert(txid, tx);
}
// Phase 2: index history and spend edges (can fail if some txos cannot be found)
let txos = match self.lookup_txos(&self.get_prevouts(&txids)) {
let txos = match self.lookup_txos(self.get_prevouts(&txids)) {
Ok(txos) => txos,
Err(err) => {
warn!("lookup txouts failed: {}", err);
Expand Down Expand Up @@ -397,34 +397,29 @@ impl Mempool {
}
}

pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
let mut outpoints = BTreeSet::new();
outpoints.insert(*outpoint);
Ok(self.lookup_txos(&outpoints)?.remove(outpoint).unwrap())
fn lookup_txo(&self, outpoint: &OutPoint) -> Option<TxOut> {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
pub fn lookup_txos(&self, outpoints: BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
RCasatta marked this conversation as resolved.
Show resolved Hide resolved
let _timer = self
.latency
.with_label_values(&["lookup_txos"])
.start_timer();

let confirmed_txos = self.chain.lookup_avail_txos(outpoints);
// Get the txos available in the mempool, skipping over (and collecting) missing ones
let (mut txos, remain_outpoints): (HashMap<_, _>, _) = outpoints
.into_iter()
.partition_map(|outpoint| match self.lookup_txo(&outpoint) {
Some(txout) => Either::Left((outpoint, txout)),
None => Either::Right(outpoint),
});

let mempool_txos = outpoints
.iter()
.filter(|outpoint| !confirmed_txos.contains_key(outpoint))
.map(|outpoint| {
self.txstore
.get(&outpoint.txid)
.and_then(|tx| tx.output.get(outpoint.vout as usize).cloned())
.map(|txout| (*outpoint, txout))
.chain_err(|| format!("missing outpoint {:?}", outpoint))
})
.collect::<Result<HashMap<OutPoint, TxOut>>>()?;
// Get the remaining txos from the chain (fails if any are missing)
txos.extend(self.chain.lookup_txos(remain_outpoints)?);

let mut txos = confirmed_txos;
txos.extend(mempool_txos);
Ok(txos)
}

Expand Down
2 changes: 1 addition & 1 deletion src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Query {
.or_else(|| self.mempool().lookup_raw_txn(txid))
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
pub fn lookup_txos(&self, outpoints: BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
// the mempool lookup_txos() internally looks up confirmed txos as well
self.mempool()
.lookup_txos(outpoints)
Expand Down
49 changes: 16 additions & 33 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ impl Indexer {
fn index(&self, blocks: &[BlockEntry]) {
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap()
};
let rows = {
let _timer = self.start_timer("index_process");
Expand Down Expand Up @@ -859,14 +859,9 @@ impl ChainQuery {
lookup_txo(&self.store.txstore_db, outpoint)
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
pub fn lookup_txos(&self, outpoints: BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
let _timer = self.start_timer("lookup_txos");
lookup_txos(&self.store.txstore_db, outpoints, false)
}

pub fn lookup_avail_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
let _timer = self.start_timer("lookup_available_txos");
lookup_txos(&self.store.txstore_db, outpoints, true)
lookup_txos(&self.store.txstore_db, outpoints)
}

pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option<SpendingInput> {
Expand Down Expand Up @@ -1033,31 +1028,19 @@ fn get_previous_txos(block_entries: &[BlockEntry]) -> BTreeSet<OutPoint> {
.collect()
}

fn lookup_txos(
txstore_db: &DB,
outpoints: &BTreeSet<OutPoint>,
allow_missing: bool,
) -> HashMap<OutPoint, TxOut> {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(16) // we need to saturate SSD IOPS
.thread_name(|i| format!("lookup-txo-{}", i))
.build()
.unwrap();
pool.install(|| {
outpoints
.par_iter()
.filter_map(|outpoint| {
lookup_txo(&txstore_db, &outpoint)
.or_else(|| {
if !allow_missing {
panic!("missing txo {} in {:?}", outpoint, txstore_db);
}
None
})
.map(|txo| (*outpoint, txo))
})
.collect()
})
fn lookup_txos(txstore_db: &DB, outpoints: BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
let keys = outpoints.iter().map(TxOutRow::key).collect::<Vec<_>>();
txstore_db
.multi_get(keys)
.into_iter()
.zip(outpoints)
.map(|(res, outpoint)| {
let txo = res
.unwrap()
.ok_or_else(|| format!("missing txo {}", outpoint))?;
Ok((outpoint, deserialize(&txo).expect("failed to parse TxOut")))
})
.collect()
}

fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option<TxOut> {
Expand Down
2 changes: 1 addition & 1 deletion src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ fn prepare_txs(
})
.collect();

let prevouts = query.lookup_txos(&outpoints);
let prevouts = query.lookup_txos(outpoints);

txs.into_iter()
.map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config))
Expand Down
Loading