diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d69c604..0461988 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -100,4 +100,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: clippy - args: --all-features -- -D warnings + args: --all-features -- -D warnings -A clippy::absurd_extreme_comparisons diff --git a/src/bin/zkbtc.rs b/src/bin/zkbtc.rs index 88bed6b..976afe6 100644 --- a/src/bin/zkbtc.rs +++ b/src/bin/zkbtc.rs @@ -229,7 +229,7 @@ async fn main() -> Result<()> { info!( "deploying circuit {} with {num_public_inputs} public inputs", - hex::encode(&vk_hash) + hex::encode(vk_hash) ); // sanity check for stateful zkapps @@ -421,7 +421,7 @@ async fn main() -> Result<()> { { let path = output_dir.join("publickey-package.json"); let file = - std::fs::File::create(&path).expect("couldn't create file given output dir"); + std::fs::File::create(path).expect("couldn't create file given output dir"); serde_json::to_writer_pretty(file, &pubkey_package).unwrap(); } @@ -445,7 +445,7 @@ async fn main() -> Result<()> { }; let path = output_dir.join("committee-cfg.json"); let file = - std::fs::File::create(&path).expect("couldn't create file given output dir"); + std::fs::File::create(path).expect("couldn't create file given output dir"); serde_json::to_writer_pretty(file, &committee_cfg).unwrap(); } } diff --git a/src/bob_request.rs b/src/bob_request.rs index 99d7120..bde8323 100644 --- a/src/bob_request.rs +++ b/src/bob_request.rs @@ -146,7 +146,7 @@ impl BobRequest { // extract new_state let new_state = public_inputs .0 - .get(0) + .first() .cloned() .context("the full public input does not contain a new state")?; @@ -196,13 +196,13 @@ impl BobRequest { let amount_in = string_to_amount( proof_inputs .get("amount_in") - .and_then(|x| x.get(0)) + .and_then(|x| x.first()) .context("amount_in in proof inputs must be of length 1")?, )?; let amount_out = string_to_amount( proof_inputs .get("amount_out") - .and_then(|x| x.get(0)) + .and_then(|x| x.first()) .context("amount_out in proof inputs must be of length 1")?, )?; let new_value = smart_contract.locked_value + amount_in - amount_out; @@ -284,7 +284,7 @@ impl BobRequest { let new_state = new_state.unwrap(); ensure!( public_inputs.0.len() - == 1 * 2 /* prev/new_state */ + 1 /* truncated txid */ + 1 /* amount_out */ + 1, /* amount_in */ + == 2 /* prev/new_state */ + 1 /* truncated txid */ + 1 /* amount_out */ + 1, /* amount_in */ "the number of public inputs is not correct" ); diff --git a/src/committee/node.rs b/src/committee/node.rs index 3360815..030ab40 100644 --- a/src/committee/node.rs +++ b/src/committee/node.rs @@ -212,7 +212,7 @@ async fn round_2_signing( } async fn is_alive(params: Params<'static>, _context: Arc) -> RpcResult { - Ok(params.parse::<[u64; 1]>()?[0].clone()) + Ok(params.parse::<[u64; 1]>()?[0]) } // diff --git a/src/committee/orchestrator.rs b/src/committee/orchestrator.rs index 41181ec..ed012ac 100644 --- a/src/committee/orchestrator.rs +++ b/src/committee/orchestrator.rs @@ -12,8 +12,7 @@ use bitcoin::{ key::{TapTweak, UntweakedPublicKey}, secp256k1, taproot, TapSighashType, Witness, }; -use frost_secp256k1_tr::Ciphersuite; -use frost_secp256k1_tr::Group; +use frost_secp256k1_tr::{Ciphersuite, Group, Identifier}; use futures::future::join_all; use itertools::Itertools; use jsonrpsee::{server::Server, RpcModule}; @@ -40,14 +39,12 @@ use super::node::{Round2Request, Round2Response}; // Orchestration logic // -type RpcOrchestratorContext = Arc<(Orchestrator, Arc>)>; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitteeConfig { pub threshold: usize, // TODO: We could use a Vec instead of a HashMap for the members, since it would be more efficient. // We do not currently need hashmap functionality, but we might later, so left unchanged. - pub members: HashMap, + pub members: HashMap, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] @@ -61,8 +58,8 @@ pub enum MemberStatus { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StatusResponse { - pub online_members: Vec, - pub offline_members: Vec, + pub online_members: Vec, + pub offline_members: Vec, } // This will be the second part in the RpcModule context wrapped in a RwLock. @@ -70,8 +67,8 @@ pub struct StatusResponse { // to wait for a read lock every time an rpc handler needs to access the config. // This way, handlers will only wait for read locks when they need the status of the members. pub struct MemberStatusState { - pub key_to_addr: HashMap, - pub status: HashMap, + pub key_to_addr: HashMap, + pub status: HashMap, } impl MemberStatusState { @@ -81,16 +78,16 @@ impl MemberStatusState { let mut futures = Vec::with_capacity(config.members.len()); for (key, member) in config.members.iter() { - let _ = key_to_addr.insert(key.clone(), member.address.clone()); + let _ = key_to_addr.insert(*key, member.address.clone()); futures.push(( - key.clone(), + *key, Self::check_alive(member.address.clone()), member.address.clone(), )); } for (key, future, address) in futures.into_iter() { - let new_status = if future.await == true { + let new_status = if future.await { info!("{address} is online"); MemberStatus::Online } else { @@ -110,11 +107,11 @@ impl MemberStatusState { } } - pub fn get_member_status(&self, key: &frost_secp256k1_tr::Identifier) -> MemberStatus { + pub fn get_member_status(&self, key: &Identifier) -> MemberStatus { *self.status.get(key).unwrap() } - pub fn mark_as_disconnected(&mut self, key: &frost_secp256k1_tr::Identifier) { + pub fn mark_as_disconnected(&mut self, key: &Identifier) { let m_status = self.status.get_mut(key).unwrap(); *m_status = MemberStatus::Disconnected(( Self::get_current_time_secs() + Self::get_next_fibonacci_backoff_delay(0), @@ -122,26 +119,19 @@ impl MemberStatusState { )) } - pub fn mark_as_offline(&mut self, key: &frost_secp256k1_tr::Identifier) { + pub fn mark_as_offline(&mut self, key: &Identifier) { let m_status = self.status.get_mut(key).unwrap(); *m_status = MemberStatus::Offline; } - pub fn get_status( - &self, - ) -> ( - Vec, - Vec, - ) { + pub fn get_status(&self) -> (Vec, Vec) { let mut online = Vec::new(); let mut offline = Vec::new(); for (member, status) in self.status.iter() { match *status { - MemberStatus::Online => online.push(member.clone()), - MemberStatus::Offline | MemberStatus::Disconnected(_) => { - offline.push(member.clone()) - } + MemberStatus::Online => online.push(*member), + MemberStatus::Offline | MemberStatus::Disconnected(_) => offline.push(*member), }; } @@ -156,12 +146,12 @@ impl MemberStatusState { } fn fib(n: u64) -> u64 { - if n <= 0 { - return 0; + if n == 0 { + 0 } else if n == 1 { - return 1; + 1 } else { - return Self::fib(n - 1) + Self::fib(n - 2); + Self::fib(n - 1) + Self::fib(n - 2) } } @@ -206,22 +196,20 @@ impl MemberStatusState { // Get array of members which are not *permanently* offline let members_to_check = { + let current_time = Self::get_current_time_secs(); let r_lock = state.read().unwrap(); r_lock - .key_to_addr + .status .iter() - .map(|(key, addr)| { - let status = r_lock.status.get(key).unwrap().clone(); - (key.clone(), addr.clone(), status) + .filter(|(_, s)| { + matches!(s, MemberStatus::Online) + | matches!(s, MemberStatus::Disconnected((r, _)) if *r <= current_time) }) - .filter(|(_, _, status)| match *status { - MemberStatus::Online => true, - MemberStatus::Disconnected((retry_time, _)) => { - retry_time <= Self::get_current_time_secs() - } - _ => false, + .map(|(key, status)| { + let addr = r_lock.key_to_addr.get(key).unwrap(); + (*key, addr.clone(), *status) }) - .collect::>() + .collect_vec() }; // check alive for each one of them @@ -248,7 +236,8 @@ impl MemberStatusState { let member_status = state_w.status.get_mut(key).unwrap(); let last_retries = match old_status { MemberStatus::Disconnected((_, last_retry_number)) => *last_retry_number, - _ => 0, + MemberStatus::Online => 0, + MemberStatus::Offline => continue, }; if last_retries > KEEPALIVE_MAX_RETRIES { @@ -278,25 +267,24 @@ pub struct Member { pub struct Orchestrator { pub pubkey_package: frost_secp256k1_tr::keys::PublicKeyPackage, pub committee_cfg: CommitteeConfig, + pub member_status: Arc>, } impl Orchestrator { pub fn new( pubkey_package: frost_secp256k1_tr::keys::PublicKeyPackage, committee_cfg: CommitteeConfig, + member_status: Arc>, ) -> Self { Self { pubkey_package, committee_cfg, + member_status, } } /// Handles bob request from A to Z. - pub async fn handle_request( - &self, - bob_request: &BobRequest, - member_status: Arc>, - ) -> Result { + pub async fn handle_request(&self, bob_request: &BobRequest) -> Result { // Validate transaction before forwarding it, and get smart contract let smart_contract = bob_request.validate_request().await?; @@ -310,7 +298,7 @@ impl Orchestrator { let mut commitments_map = BTreeMap::new(); let mut available_members = { - let ms_r = member_status.read().unwrap(); + let ms_r = self.member_status.read().unwrap(); self.committee_cfg .members .iter() @@ -347,17 +335,25 @@ impl Orchestrator { Ok(x) => x, Err(rpc_error) => { warn!("Round 1 error with {}, marking as disconnected and retrying round 1: {rpc_error}", member.address); - let mut ms_w = member_status.write().unwrap(); + let mut ms_w = self.member_status.write().unwrap(); ms_w.mark_as_disconnected(member_id); continue 'retry; } }; - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let resp: Round1Response = response.result()?; + if let Ok(response) = + serde_json::from_str::(&resp) + { + let resp: Round1Response = response.result()?; - // store the commitment - commitments_map.insert(*member_id, resp.commitments); + // store the commitment + commitments_map.insert(*member_id, resp.commitments); + } else { + warn!("Round 1 error with {}, marking as offline and retrying from round 1: deserialize error", member.address); + let mut ms_w = self.member_status.write().unwrap(); + ms_w.mark_as_disconnected(member_id); + continue 'retry; + } } // @@ -402,17 +398,25 @@ impl Orchestrator { Ok(x) => x, Err(rpc_error) => { warn!("Round 2 error with {}, marking as offline and retrying from round 1: {rpc_error}", member.address); - let mut ms_w = member_status.write().unwrap(); + let mut ms_w = self.member_status.write().unwrap(); ms_w.mark_as_offline(member_id); continue 'retry; } }; - let response: bitcoincore_rpc::jsonrpc::Response = serde_json::from_str(&resp)?; - let round2_response: Round2Response = response.result()?; + if let Ok(response) = + serde_json::from_str::(&resp) + { + let round2_response: Round2Response = response.result()?; - // store the commitment - signature_shares.insert(*member_id, round2_response.signature_share); + // store the commitment + signature_shares.insert(*member_id, round2_response.signature_share); + } else { + warn!("Round 2 error with {}, marking as offline and retrying from round 1: deserialize error", member.address); + let mut ms_w = self.member_status.write().unwrap(); + ms_w.mark_as_offline(member_id); + continue 'retry; + } } // @@ -529,34 +533,30 @@ impl Orchestrator { /// Bob's request to unlock funds from a smart contract. async fn unlock_funds( params: Params<'static>, - context: RpcOrchestratorContext, + context: Arc, ) -> RpcResult { // get bob request let bob_request: [BobRequest; 1] = params.parse()?; let bob_request = &bob_request[0]; info!("received request: {:?}", bob_request); - let bob_response = context - .0 - .handle_request(bob_request, context.1.clone()) - .await - .map_err(|e| { - ErrorObjectOwned::owned( - jsonrpsee_types::error::UNKNOWN_ERROR_CODE, - "error while unlocking funds", - Some(format!("the request didn't validate: {e}")), - ) - })?; + let bob_response = context.handle_request(bob_request).await.map_err(|e| { + ErrorObjectOwned::owned( + jsonrpsee_types::error::UNKNOWN_ERROR_CODE, + "error while unlocking funds", + Some(format!("the request didn't validate: {e}")), + ) + })?; RpcResult::Ok(bob_response) } async fn get_nodes_status( _params: Params<'static>, - context: RpcOrchestratorContext, + context: Arc, ) -> RpcResult { let (online, offline) = { - let mss_r = context.1.read().unwrap(); + let mss_r = context.member_status.read().unwrap(); mss_r.get_status() }; @@ -578,13 +578,11 @@ pub async fn run_server( let mss_thread_copy = member_status_state.clone(); tokio::spawn(async move { MemberStatusState::keepalive_thread(mss_thread_copy).await }); - let ctx = ( - Orchestrator { - pubkey_package, - committee_cfg, - }, - member_status_state.clone(), - ); + let ctx = Orchestrator { + pubkey_package, + committee_cfg, + member_status: member_status_state, + }; let server = Server::builder() .build(address.parse::()?) diff --git a/src/constants.rs b/src/constants.rs index d66468f..e00f626 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -36,7 +36,7 @@ pub const CIRCOM_ETH_PRIME_BYTELEN: usize = 32; pub const STATELESS_ZKAPP_PUBLIC_INPUT_LEN: usize = 1 /* truncated txid */; /// The expected number of public inputs for a stateful zkapp. -pub const STATEFUL_ZKAPP_PUBLIC_INPUT_LEN: usize = 1 * 2 /* new state + prev state */ + 1 /* truncated txid */ + 1 /* amount_out */ + 1 /* amount_in */; +pub const STATEFUL_ZKAPP_PUBLIC_INPUT_LEN: usize = 2 /* new state + prev state */ + 1 /* truncated txid */ + 1 /* amount_out */ + 1 /* amount_in */; /// The number of seconds to sleep between orchestrator-node keepalive requests pub const KEEPALIVE_WAIT_SECONDS: u64 = 5;