diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index fb25e68a8..ee92d7e0d 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -49,6 +49,7 @@ fn run_server(config: Arc) -> Result<()> { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(), diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 8e4c35e30..3dd9de999 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -33,6 +33,7 @@ fn main() { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal, diff --git a/src/config.rs b/src/config.rs index 8696ecf8f..4f23248fd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,7 @@ pub struct Config { pub daemon_dir: PathBuf, pub blocks_dir: PathBuf, pub daemon_rpc_addr: SocketAddr, + pub daemon_parallelism: usize, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, pub http_addr: SocketAddr, @@ -132,6 +133,12 @@ impl Config { .help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("daemon_parallelism") + .long("daemon-parallelism") + .help("Number of JSONRPC requests to send in parallel") + .default_value("10") + ) .arg( Arg::with_name("monitoring_addr") .long("monitoring-addr") @@ -386,6 +393,7 @@ impl Config { daemon_dir, blocks_dir, daemon_rpc_addr, + daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize), cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, diff --git a/src/daemon.rs b/src/daemon.rs index 81e2ce783..efd990887 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -260,6 +260,7 @@ impl Counter { pub struct Daemon { daemon_dir: PathBuf, + daemon_parallelism: usize, blocks_dir: PathBuf, network: Network, conn: Mutex, @@ -276,6 +277,7 @@ impl Daemon { daemon_dir: &PathBuf, blocks_dir: &PathBuf, daemon_rpc_addr: SocketAddr, + daemon_parallelism: usize, cookie_getter: Arc, network: Network, signal: Waiter, @@ -283,6 +285,7 @@ impl Daemon { ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), + daemon_parallelism, blocks_dir: blocks_dir.clone(), network, conn: Mutex::new(Connection::new( @@ -335,6 +338,7 @@ impl Daemon { pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), + daemon_parallelism: self.daemon_parallelism, blocks_dir: self.blocks_dir.clone(), network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), @@ -406,10 +410,17 @@ impl Daemon { fn requests(&self, method: &str, params_list: &[Value]) -> Result> { // Send in parallel as individual JSONRPC requests, with no batching. // See https://github.com/Blockstream/electrs/pull/33 - params_list - .par_iter() - .map(|params| self.retry_request(method, params)) - .collect() + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(self.daemon_parallelism) + .thread_name(|i| format!("rpc-requests-{}", i)) + .build() + .unwrap(); + pool.install(|| { + params_list + .par_iter() + .map(|params| self.retry_request(method, params)) + .collect() + }) } // bitcoind JSONRPC API: diff --git a/tests/common.rs b/tests/common.rs index 78932aa50..6aa4af19d 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -89,6 +89,7 @@ impl TestRunner { network_type, db_path: electrsdb.path().to_path_buf(), daemon_dir: daemon_subdir.clone(), + daemon_parallelism: 3, blocks_dir: daemon_subdir.join("blocks"), daemon_rpc_addr: params.rpc_socket.into(), cookie: None, @@ -127,6 +128,7 @@ impl TestRunner { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(),