Skip to content

Commit

Permalink
Don't use RPC batching with bitcoind
Browse files Browse the repository at this point in the history
This actually hurts performance because the batched response has to be
bueffered on the bitcoind side, as @TheBlueMatt explains at
romanz#373 (comment)

Instead, send multiple individual RPC requests in parallel using a
thread pool, with a separate RPC TCP connection for each thread.

Also see romanz#374
  • Loading branch information
shesek committed May 29, 2024
1 parent 672b785 commit 06526c7
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn main() {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal,
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Config {
pub daemon_dir: PathBuf,
pub blocks_dir: PathBuf,
pub daemon_rpc_addr: SocketAddr,
pub daemon_parallelism: usize,
pub cookie: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub http_addr: SocketAddr,
Expand Down Expand Up @@ -132,6 +133,12 @@ impl Config {
.help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)")
.takes_value(true),
)
.arg(
Arg::with_name("daemon_parallelism")
.long("daemon-parallelism")
.help("Number of JSONRPC requests to send in parallel")
.default_value("8")
)
.arg(
Arg::with_name("monitoring_addr")
.long("monitoring-addr")
Expand Down Expand Up @@ -386,6 +393,7 @@ impl Config {
daemon_dir,
blocks_dir,
daemon_rpc_addr,
daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize),
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
electrum_rpc_addr,
Expand Down
82 changes: 50 additions & 32 deletions src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::env;
use std::io::{BufRead, BufReader, Lines, Write};
Expand All @@ -8,8 +9,9 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use base64::prelude::{Engine, BASE64_STANDARD};
use error_chain::ChainedError;
use hex::FromHex;
use itertools::Itertools;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -281,6 +283,7 @@ impl Counter {

pub struct Daemon {
daemon_dir: PathBuf,
daemon_parallelism: usize,
blocks_dir: PathBuf,
network: Network,
conn: Mutex<Connection>,
Expand All @@ -297,13 +300,15 @@ impl Daemon {
daemon_dir: &PathBuf,
blocks_dir: &PathBuf,
daemon_rpc_addr: SocketAddr,
daemon_parallelism: usize,
cookie_getter: Arc<dyn CookieGetter>,
network: Network,
signal: Waiter,
metrics: &Metrics,
) -> Result<Daemon> {
let daemon = Daemon {
daemon_dir: daemon_dir.clone(),
daemon_parallelism,
blocks_dir: blocks_dir.clone(),
network,
conn: Mutex::new(Connection::new(
Expand Down Expand Up @@ -356,6 +361,7 @@ impl Daemon {
pub fn reconnect(&self) -> Result<Daemon> {
Ok(Daemon {
daemon_dir: self.daemon_dir.clone(),
daemon_parallelism: self.daemon_parallelism,
blocks_dir: self.blocks_dir.clone(),
network: self.network,
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
Expand Down Expand Up @@ -398,31 +404,16 @@ impl Daemon {
Ok(result)
}

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let chunks = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.chunks(50_000); // Max Amount of batched requests
let mut results = vec![];
for chunk in &chunks {
let reqs = chunk.collect();
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
}
} else {
bail!("non-array replies: {:?}", replies);
}
}

Ok(results)
let req = json!({"method": method, "params": params, "id": id});
let reply = self.call_jsonrpc(method, &req)?;
parse_jsonrpc_reply(reply, method, id)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn retry_request(&self, method: &str, params: &Value) -> Result<Value> {
loop {
match self.handle_request_batch(method, params_list) {
match self.handle_request(method, &params) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
self.signal.wait(Duration::from_secs(3), false)?;
Expand All @@ -436,13 +427,45 @@ impl Daemon {
}

fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
self.retry_request(method, &params)
}

fn retry_reconnect(&self) -> Daemon {
// XXX add a max reconnection attempts limit?
loop {
match self.reconnect() {
Ok(daemon) => break daemon,
Err(e) => {
warn!("failed connecting to RPC daemon: {}", e.display_chain());
}
}
}
}

// Send requests in parallel over multiple connections as individual JSON-RPC requests (with no JSON-RPC batching)
fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap();

thread_pool.install(|| {
params_list
.par_iter()
.map(|params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as
// necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(method, params)
})
})
.collect()
})
}

// bitcoind JSONRPC API:
Expand Down Expand Up @@ -510,12 +533,7 @@ impl Daemon {
.collect();

let values = self.requests("getrawtransaction", &params_list)?;
let mut txs = vec![];
for value in values {
txs.push(tx_from_value(value)?);
}
assert_eq!(txhashes.len(), txs.len());
Ok(txs)
values.into_iter().map(tx_from_value).collect()
}

pub fn gettransaction_raw(
Expand Down
2 changes: 2 additions & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl TestRunner {
network_type,
db_path: electrsdb.path().to_path_buf(),
daemon_dir: daemon_subdir.clone(),
daemon_parallelism: 3,
blocks_dir: daemon_subdir.join("blocks"),
daemon_rpc_addr: params.rpc_socket.into(),
cookie: None,
Expand Down Expand Up @@ -127,6 +128,7 @@ impl TestRunner {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal.clone(),
Expand Down

0 comments on commit 06526c7

Please sign in to comment.