diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index ee92d7e0d..4d4eb1c17 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -82,14 +82,11 @@ fn run_server(config: Arc) -> Result<()> { &metrics, Arc::clone(&config), ))); - loop { - match Mempool::update(&mempool, &daemon) { - Ok(_) => break, - Err(e) => { - warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain()); - signal.wait(Duration::from_secs(5), false)?; - }, - } + + while !Mempool::update(&mempool, &daemon, &tip)? { + // Mempool syncing was aborted because the chain tip moved; + // Index the new block(s) and try again. + tip = indexer.update(&daemon)?; } #[cfg(feature = "liquid")] @@ -118,7 +115,6 @@ fn run_server(config: Arc) -> Result<()> { )); loop { - main_loop_count.inc(); if let Err(err) = signal.wait(Duration::from_secs(5), true) { @@ -131,14 +127,12 @@ fn run_server(config: Arc) -> Result<()> { // Index new blocks let current_tip = daemon.getbestblockhash()?; if current_tip != tip { - indexer.update(&daemon)?; - tip = current_tip; + tip = indexer.update(&daemon)?; }; // Update mempool - if let Err(e) = Mempool::update(&mempool, &daemon) { - // Log the error if the result is an Err - warn!("Error updating mempool, skipping mempool update: {}", e.display_chain()); + if !Mempool::update(&mempool, &daemon, &tip)? { + warn!("skipped failed mempool update, trying again in 5 seconds"); } // Update subscribed clients diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index b260c09af..8dae5bc62 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -11,7 +11,7 @@ use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; +use crate::chain::{deserialize, BlockHash, Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -487,8 +487,13 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } - /// Sync our local view of the mempool with the bitcoind RPC. - pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { + /// Sync our local view of the mempool with the bitcoind Daemon RPC. If the chain tip moves before + /// the mempool is fetched in full, syncing is aborted and an Ok(false) is returned. + pub fn update( + mempool: &Arc>, + daemon: &Daemon, + tip: &BlockHash, + ) -> Result { let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); // Continuously attempt to fetch mempool transactions until we're able to get them in full @@ -515,6 +520,13 @@ impl Mempool { .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) .collect::>(); let new_txs = daemon.gettransactions_available(&new_txids)?; + + // Abort if the chain tip moved while fetching transactions + if daemon.getbestblockhash()? != *tip { + warn!("chain tip moved while updating mempool"); + return Ok(false); + } + let fetched_count = new_txs.len(); fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx))); @@ -546,7 +558,7 @@ impl Mempool { } } - Ok(()) + Ok(true) } } diff --git a/tests/common.rs b/tests/common.rs index 6aa4af19d..51751a5d2 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -149,7 +149,7 @@ impl TestRunner { }; let mut indexer = Indexer::open(Arc::clone(&store), fetch_from, &config, &metrics); - indexer.update(&daemon)?; + let tip = indexer.update(&daemon)?; indexer.fetch_from(FetchFrom::Bitcoind); let chain = Arc::new(ChainQuery::new( @@ -164,7 +164,7 @@ impl TestRunner { &metrics, Arc::clone(&config), ))); - Mempool::update(&mempool, &daemon)?; + assert!(Mempool::update(&mempool, &daemon, &tip)?); let query = Arc::new(Query::new( Arc::clone(&chain), @@ -195,8 +195,8 @@ impl TestRunner { } pub fn sync(&mut self) -> Result<()> { - self.indexer.update(&self.daemon)?; - Mempool::update(&self.mempool, &self.daemon)?; + let tip = self.indexer.update(&self.daemon)?; + assert!(Mempool::update(&self.mempool, &self.daemon, &tip)?); // force an update for the mempool stats, which are normally cached self.mempool.write().unwrap().update_backlog_stats(); Ok(())