diff --git a/components/chainhook-cli/src/config/file.rs b/components/chainhook-cli/src/config/file.rs index 6ef2fa2fa..25e4abedb 100644 --- a/components/chainhook-cli/src/config/file.rs +++ b/components/chainhook-cli/src/config/file.rs @@ -5,6 +5,7 @@ pub struct ConfigFile { pub storage: StorageConfigFile, pub pox_config: Option, pub http_api: Option, + pub predicates: Option, pub event_source: Option>, pub limits: LimitsConfigFile, pub network: NetworkConfigFile, @@ -64,6 +65,11 @@ pub struct NetworkConfigFile { pub stacks_events_ingestion_port: Option, } +#[derive(Deserialize, Debug, Clone)] +pub struct PredicatesConfigFile { + pub payload_http_request_timeout_ms: Option, +} + #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "snake_case")] pub enum NetworkConfigMode { diff --git a/components/chainhook-cli/src/config/mod.rs b/components/chainhook-cli/src/config/mod.rs index fb2b6b2a3..cba31806e 100644 --- a/components/chainhook-cli/src/config/mod.rs +++ b/components/chainhook-cli/src/config/mod.rs @@ -3,7 +3,7 @@ pub mod generator; use chainhook_sdk::chainhooks::types::{ChainhookStore, PoxConfig}; pub use chainhook_sdk::indexer::IndexerConfig; -use chainhook_sdk::observer::EventObserverConfig; +use chainhook_sdk::observer::{EventObserverConfig, PredicatesConfig}; use chainhook_sdk::types::{ BitcoinBlockSignaling, BitcoinNetwork, StacksNetwork, StacksNodeConfig, }; @@ -30,6 +30,7 @@ pub struct Config { pub storage: StorageConfig, pub pox_config: PoxConfig, pub http_api: PredicatesApi, + pub predicates: PredicatesConfig, pub event_sources: Vec, pub limits: LimitsConfig, pub network: IndexerConfig, @@ -117,6 +118,9 @@ impl Config { EventObserverConfig { bitcoin_rpc_proxy_enabled: true, registered_chainhooks: ChainhookStore::new(), + predicates_config: PredicatesConfig { + payload_http_request_timeout_ms: self.predicates.payload_http_request_timeout_ms, + }, bitcoind_rpc_username: self.network.bitcoind_rpc_username.clone(), bitcoind_rpc_password: self.network.bitcoind_rpc_password.clone(), bitcoind_rpc_url: self.network.bitcoind_rpc_url.clone(), @@ -193,6 +197,14 @@ impl Config { }), }, }, + predicates: match config_file.predicates { + None => PredicatesConfig { + payload_http_request_timeout_ms: None, + }, + Some(predicates) => PredicatesConfig { + payload_http_request_timeout_ms: predicates.payload_http_request_timeout_ms, + }, + }, event_sources, limits: LimitsConfig { max_number_of_stacks_predicates: config_file @@ -357,6 +369,9 @@ impl Config { }, pox_config: PoxConfig::devnet_default(), http_api: PredicatesApi::Off, + predicates: PredicatesConfig { + payload_http_request_timeout_ms: None, + }, event_sources: vec![], limits: LimitsConfig { max_number_of_bitcoin_predicates: BITCOIN_MAX_PREDICATE_REGISTRATION, @@ -390,6 +405,9 @@ impl Config { }, pox_config: PoxConfig::testnet_default(), http_api: PredicatesApi::Off, + predicates: PredicatesConfig { + payload_http_request_timeout_ms: None, + }, event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig { file_url: DEFAULT_TESTNET_STACKS_TSV_ARCHIVE.into(), })], @@ -425,6 +443,9 @@ impl Config { }, pox_config: PoxConfig::mainnet_default(), http_api: PredicatesApi::Off, + predicates: PredicatesConfig { + payload_http_request_timeout_ms: None, + }, event_sources: vec![EventSourceConfig::StacksTsvUrl(UrlConfig { file_url: DEFAULT_MAINNET_STACKS_TSV_ARCHIVE.into(), })], diff --git a/components/chainhook-cli/src/scan/bitcoin.rs b/components/chainhook-cli/src/scan/bitcoin.rs index e57d2ddfc..3427ef60b 100644 --- a/components/chainhook-cli/src/scan/bitcoin.rs +++ b/components/chainhook-cli/src/scan/bitcoin.rs @@ -290,7 +290,7 @@ pub async fn execute_predicates_action<'a>( gather_proofs(&trigger, &mut proofs, config, ctx); } let predicate_uuid = &trigger.chainhook.uuid; - match handle_bitcoin_hook_action(trigger, &proofs) { + match handle_bitcoin_hook_action(trigger, &proofs, &config) { Err(e) => { warn!( ctx.expect_logger(), diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index 9ba580399..1e3b100e5 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -97,9 +97,11 @@ pub async fn get_canonical_fork_from_tsv( for result in reader_builder.deserialize() { line += 1; let record: Record = result.unwrap(); - if let RecordKind::StacksBlockReceived = &record.kind { if let Err(_e) = record_tx.send(Some((record, line))) { - break; - } }; + if let RecordKind::StacksBlockReceived = &record.kind { + if let Err(_e) = record_tx.send(Some((record, line))) { + break; + } + }; } let _ = record_tx.send(None); }) @@ -338,7 +340,12 @@ pub async fn scan_stacks_chainstate_via_rocksdb_using_predicate( apply: hits_per_blocks, rollback: vec![], }; - let res = match handle_stacks_hook_action(trigger, &proofs, ctx) { + let res = match handle_stacks_hook_action( + trigger, + &proofs, + &config.get_event_observer_config(), + ctx, + ) { Err(e) => { warn!( ctx.expect_logger(), @@ -487,7 +494,9 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( let mut tsv_line = String::new(); while tsv_current_line < tsv_line_number { tsv_line.clear(); - let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?; + let bytes_read = tsv_reader + .read_line(&mut tsv_line) + .map_err(|e| e.to_string())?; if bytes_read == 0 { return Err("Unexpected EOF when reading TSV".to_string()); } @@ -525,7 +534,8 @@ pub async fn scan_stacks_chainstate_via_csv_using_predicate( apply: hits_per_blocks, rollback: vec![], }; - match handle_stacks_hook_action(trigger, &proofs, ctx) { + match handle_stacks_hook_action(trigger, &proofs, &config.get_event_observer_config(), ctx) + { Err(e) => { error!(ctx.expect_logger(), "unable to handle action {}", e); } @@ -604,7 +614,9 @@ pub async fn consolidate_local_stacks_chainstate_using_csv( let mut tsv_line = String::new(); while tsv_current_line < tsv_line_number { tsv_line.clear(); - let bytes_read = tsv_reader.read_line(&mut tsv_line).map_err(|e| e.to_string())?; + let bytes_read = tsv_reader + .read_line(&mut tsv_line) + .map_err(|e| e.to_string())?; if bytes_read == 0 { return Err("Unexpected EOF when reading TSV".to_string()); } diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs index b9c821429..ae1196def 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs @@ -8,6 +8,7 @@ use crate::service::{ PredicateStatus, Service, }; use chainhook_sdk::chainhooks::types::PoxConfig; +use chainhook_sdk::observer::PredicatesConfig; use chainhook_sdk::{ chainhooks::stacks::StacksChainhookSpecificationNetworkMap, chainhooks::types::{ChainhookInstance, ChainhookSpecificationNetworkMap}, @@ -82,12 +83,12 @@ pub async fn filter_predicate_status_from_all_predicates( match matching_predicate { Some(predicate) => match predicate.get("status") { Some(status) => { - return serde_json::from_value(status.clone()).map_err(|e| { - format!("failed to parse status {}", e) - }); + return serde_json::from_value(status.clone()) + .map_err(|e| format!("failed to parse status {}", e)); } None => { - return Err("no status field on matching get predicates result".to_string()) + return Err("no status field on matching get predicates result" + .to_string()) } }, None => { @@ -98,7 +99,9 @@ pub async fn filter_predicate_status_from_all_predicates( } } None => { - return Err("failed to parse get predicate response's result field".to_string()) + return Err( + "failed to parse get predicate response's result field".to_string() + ) } }, None => { @@ -267,10 +270,7 @@ pub fn flush_redis(port: u16) { let client = redis::Client::open(format!("redis://localhost:{port}/")) .expect("unable to connect to redis"); let mut predicate_db_conn = client.get_connection().expect("unable to connect to redis"); - let db_keys: Vec = predicate_db_conn - .scan_match("*") - .unwrap() - .collect(); + let db_keys: Vec = predicate_db_conn.scan_match("*").unwrap().collect(); for k in db_keys { predicate_db_conn.del::<_, ()>(&k).unwrap(); } @@ -293,6 +293,7 @@ pub fn get_chainhook_config( }; Config { http_api: PredicatesApi::On(api_config), + predicates: PredicatesConfig::default(), pox_config: PoxConfig::devnet_default(), storage: StorageConfig { working_dir: working_dir.into(), @@ -343,12 +344,7 @@ pub async fn start_chainhook_service( ); let _ = hiro_system_kit::nestable_block_on(future); }) - .map_err(|e| { - format!( - "failed to start chainhook service thread, {}", - e - ) - })?; + .map_err(|e| format!("failed to start chainhook service thread, {}", e))?; // Loop to check if the server is ready let mut attempts = 0; diff --git a/components/chainhook-cli/src/service/tests/observer_tests.rs b/components/chainhook-cli/src/service/tests/observer_tests.rs index f3ebd2009..bfb512c03 100644 --- a/components/chainhook-cli/src/service/tests/observer_tests.rs +++ b/components/chainhook-cli/src/service/tests/observer_tests.rs @@ -2,7 +2,7 @@ use std::{sync::mpsc::channel, thread::sleep, time::Duration}; use chainhook_sdk::{ chainhooks::types::ChainhookStore, - observer::{start_event_observer, EventObserverConfig}, + observer::{start_event_observer, EventObserverConfig, PredicatesConfig}, types::{BitcoinNetwork, StacksNodeConfig}, utils::Context, }; @@ -190,6 +190,7 @@ async fn it_responds_200_for_unimplemented_endpoints( }); let config = EventObserverConfig { registered_chainhooks: ChainhookStore::new(), + predicates_config: PredicatesConfig::default(), bitcoin_rpc_proxy_enabled: false, bitcoind_rpc_username: String::new(), bitcoind_rpc_password: String::new(), diff --git a/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs b/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs index 00253c766..e239521c2 100644 --- a/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/bitcoin/mod.rs @@ -2,7 +2,7 @@ use super::types::{ append_error_context, validate_txid, ChainhookInstance, ExactMatchingRule, HookAction, MatchingRule, PoxConfig, TxinPredicate, }; -use crate::utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES}; +use crate::{observer::EventObserverConfig, utils::{Context, MAX_BLOCK_HEIGHTS_ENTRIES}}; use bitcoincore_rpc_json::bitcoin::{address::Payload, Address}; use chainhook_types::{ @@ -21,7 +21,7 @@ use serde::{de, Deserialize, Deserializer}; use serde_json::Value as JsonValue; use std::{ collections::{BTreeMap, HashMap, HashSet}, - str::FromStr, + str::FromStr, time::Duration, }; use reqwest::RequestBuilder; @@ -760,10 +760,15 @@ pub fn serialize_bitcoin_transactions_to_json( pub fn handle_bitcoin_hook_action<'a>( trigger: BitcoinTriggerChainhook<'a>, proofs: &HashMap<&'a TransactionIdentifier, String>, + config: &EventObserverConfig, ) -> Result { match &trigger.chainhook.action { HookAction::HttpPost(http) => { - let client = Client::builder() + let mut client_builder = Client::builder(); + if let Some(timeout) = config.predicates_config.payload_http_request_timeout_ms { + client_builder = client_builder.timeout(Duration::from_millis(timeout)); + } + let client = client_builder .build() .map_err(|e| format!("unable to build http client: {}", e))?; let host = http.url.to_string(); diff --git a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs index 755efdfe7..1b8e5bfa9 100644 --- a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs @@ -1,3 +1,4 @@ +use crate::observer::EventObserverConfig; use crate::utils::{AbstractStacksBlock, Context, MAX_BLOCK_HEIGHTS_ENTRIES}; use super::types::{ @@ -22,6 +23,7 @@ use schemars::JsonSchema; use serde_json::Value as JsonValue; use std::collections::{BTreeMap, HashMap}; use std::io::Cursor; +use std::time::Duration; use reqwest::RequestBuilder; @@ -1325,11 +1327,16 @@ pub fn serialize_stacks_payload_to_json<'a>( pub fn handle_stacks_hook_action<'a>( trigger: StacksTriggerChainhook<'a>, proofs: &HashMap<&'a TransactionIdentifier, String>, + config: &EventObserverConfig, ctx: &Context, ) -> Result { match &trigger.chainhook.action { HookAction::HttpPost(http) => { - let client = Client::builder() + let mut client_builder = Client::builder(); + if let Some(timeout) = config.predicates_config.payload_http_request_timeout_ms { + client_builder = client_builder.timeout(Duration::from_millis(timeout)); + } + let client = client_builder .build() .map_err(|e| format!("unable to build http client: {}", e))?; let host = http.url.to_string(); diff --git a/components/chainhook-sdk/src/chainhooks/tests/mod.rs b/components/chainhook-sdk/src/chainhooks/tests/mod.rs index 00fa5d6bf..9c5ee7625 100644 --- a/components/chainhook-sdk/src/chainhooks/tests/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/tests/mod.rs @@ -12,7 +12,11 @@ use super::{ }, types::{ExactMatchingRule, FileHook}, }; -use crate::{chainhooks::stacks::serialize_stacks_payload_to_json, utils::Context}; +use crate::{ + chainhooks::stacks::serialize_stacks_payload_to_json, + observer::EventObserverConfig, + utils::Context, +}; use crate::{ chainhooks::{ tests::fixtures::{get_expected_occurrence, get_test_event_payload_by_type}, @@ -735,7 +739,8 @@ fn test_stacks_hook_action_noop() { logger: None, tracer: false, }; - let occurrence = handle_stacks_hook_action(trigger, &proofs, &ctx).unwrap(); + let occurrence = + handle_stacks_hook_action(trigger, &proofs, &EventObserverConfig::default(), &ctx).unwrap(); if let StacksChainhookOccurrence::Data(data) = occurrence { assert_eq!(data.apply.len(), 1); assert_eq!( @@ -812,7 +817,8 @@ fn test_stacks_hook_action_file_append() { logger: None, tracer: false, }; - let occurrence = handle_stacks_hook_action(trigger, &proofs, &ctx).unwrap(); + let occurrence = + handle_stacks_hook_action(trigger, &proofs, &EventObserverConfig::default(), &ctx).unwrap(); if let StacksChainhookOccurrence::File(path, bytes) = occurrence { assert_eq!(path, "./".to_string()); let json: JsonValue = serde_json::from_slice(&bytes).unwrap(); diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index d762ec294..776e2e781 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -66,9 +66,29 @@ pub enum DataHandlerEvent { Terminate, } +#[derive(Clone, Debug, PartialEq)] +pub struct PredicatesConfig { + pub payload_http_request_timeout_ms: Option, +} + +impl PredicatesConfig { + pub fn new() -> Self { + PredicatesConfig { + payload_http_request_timeout_ms: None, + } + } +} + +impl Default for PredicatesConfig { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug, Clone)] pub struct EventObserverConfig { pub registered_chainhooks: ChainhookStore, + pub predicates_config: PredicatesConfig, pub bitcoin_rpc_proxy_enabled: bool, pub bitcoind_rpc_username: String, pub bitcoind_rpc_password: String, @@ -290,6 +310,9 @@ impl BitcoinEventObserverConfigBuilder { }; Ok(EventObserverConfig { registered_chainhooks: ChainhookStore::new(), + predicates_config: PredicatesConfig { + payload_http_request_timeout_ms: None, + }, bitcoin_rpc_proxy_enabled: false, bitcoind_rpc_username: self .bitcoind_rpc_username @@ -320,6 +343,9 @@ impl EventObserverConfig { pub fn default() -> Self { EventObserverConfig { registered_chainhooks: ChainhookStore::new(), + predicates_config: PredicatesConfig { + payload_http_request_timeout_ms: None, + }, bitcoin_rpc_proxy_enabled: false, bitcoind_rpc_username: "devnet".into(), bitcoind_rpc_password: "devnet".into(), @@ -363,7 +389,6 @@ impl EventObserverConfig { } pub fn get_bitcoin_config(&self) -> BitcoinConfig { - BitcoinConfig { username: self.bitcoind_rpc_username.clone(), password: self.bitcoind_rpc_password.clone(), @@ -403,6 +428,9 @@ impl EventObserverConfig { let config = EventObserverConfig { bitcoin_rpc_proxy_enabled: false, registered_chainhooks: ChainhookStore::new(), + predicates_config: PredicatesConfig { + payload_http_request_timeout_ms: None, + }, bitcoind_rpc_username: overrides .and_then(|c| c.bitcoind_rpc_username.clone()) .unwrap_or_else(|| "devnet".to_string()), @@ -1049,8 +1077,7 @@ pub fn get_bitcoin_proof( Ok(proof) => Ok(format!("0x{}", hex::encode(&proof))), Err(e) => Err(format!( "failed collecting proof for transaction {}: {}", - transaction_identifier.hash, - e + transaction_identifier.hash, e )), } } @@ -1361,7 +1388,8 @@ pub async fn start_observer_commands_handler( if let Some(highest_tip_block) = blocks_to_apply .iter() - .max_by_key(|b| b.block_identifier.index) { + .max_by_key(|b| b.block_identifier.index) + { prometheus_monitoring.btc_metrics_set_reorg( highest_tip_block.timestamp.into(), blocks_to_apply.len() as u64, @@ -1479,7 +1507,7 @@ pub async fn start_observer_commands_handler( } for chainhook_to_trigger in chainhooks_to_trigger.into_iter() { let predicate_uuid = &chainhook_to_trigger.chainhook.uuid; - match handle_bitcoin_hook_action(chainhook_to_trigger, &proofs) { + match handle_bitcoin_hook_action(chainhook_to_trigger, &proofs, &config) { Err(e) => { // todo: we may want to set predicates that reach this branch as interrupted, // but for now we will error to see if this problem occurs. @@ -1668,7 +1696,7 @@ pub async fn start_observer_commands_handler( let proofs = HashMap::new(); for chainhook_to_trigger in chainhooks_to_trigger.into_iter() { let predicate_uuid = &chainhook_to_trigger.chainhook.uuid; - match handle_stacks_hook_action(chainhook_to_trigger, &proofs, &ctx) { + match handle_stacks_hook_action(chainhook_to_trigger, &proofs, &config, &ctx) { Err(e) => { ctx.try_log(|logger| { // todo: we may want to set predicates that reach this branch as interrupted, diff --git a/components/chainhook-sdk/src/observer/tests/mod.rs b/components/chainhook-sdk/src/observer/tests/mod.rs index 32b19893c..41bcd25d9 100644 --- a/components/chainhook-sdk/src/observer/tests/mod.rs +++ b/components/chainhook-sdk/src/observer/tests/mod.rs @@ -35,11 +35,13 @@ use hiro_system_kit; use std::collections::BTreeMap; use std::sync::mpsc::{channel, Sender}; +use super::PredicatesConfig; use super::{ObserverEvent, DEFAULT_INGESTION_PORT}; fn generate_test_config() -> (EventObserverConfig, ChainhookStore) { let config: EventObserverConfig = EventObserverConfig { registered_chainhooks: ChainhookStore::new(), + predicates_config: PredicatesConfig::default(), bitcoin_rpc_proxy_enabled: false, bitcoind_rpc_username: "user".into(), bitcoind_rpc_password: "user".into(),