From 884a25459ff09830fc4bf221320e8972a4eccaf5 Mon Sep 17 00:00:00 2001 From: Avi Dessauer Date: Mon, 29 Jul 2024 09:48:36 -0700 Subject: [PATCH] Fix submit batch json --- Cargo.lock | 2 + kairos-server/Cargo.toml | 2 + kairos-server/src/state.rs | 13 +++--- kairos-server/src/state/submit_batch.rs | 59 ++++++++++++++++++++++++- 4 files changed, 69 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3238c5f..eaef1a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2768,6 +2768,7 @@ dependencies = [ "axum", "axum-extra", "axum-test", + "backoff", "casper-client", "casper-event-toolkit", "casper-types 3.0.0", @@ -2784,6 +2785,7 @@ dependencies = [ "proptest", "rand 0.8.5", "reqwest 0.12.4", + "risc0-zkvm", "serde", "serde_json", "sha2 0.10.8", diff --git a/kairos-server/Cargo.toml b/kairos-server/Cargo.toml index 895adb8b..23587920 100644 --- a/kairos-server/Cargo.toml +++ b/kairos-server/Cargo.toml @@ -47,6 +47,8 @@ reqwest = { version = "0.12", features = ["json"] } casper-event-toolkit = { git = "https://github.com/koxu1996/casper-event-toolkit.git", version = "0.1.3" } thiserror = "1.0" chrono = "0.4.38" +risc0-zkvm = { version="=1.0.3", default-features=false } +backoff = { version = "0.4", features = ["tokio", "futures"]} [dev-dependencies] proptest = "1" diff --git a/kairos-server/src/state.rs b/kairos-server/src/state.rs index 5b32cf56..13ef5eb4 100644 --- a/kairos-server/src/state.rs +++ b/kairos-server/src/state.rs @@ -5,6 +5,8 @@ mod trie; use std::collections::HashSet; use std::{sync::Arc, thread}; +use kairos_circuit_logic::ProofOutputs; +use risc0_zkvm::Receipt; use tokio::{ sync::{mpsc, RwLock}, task, @@ -94,17 +96,18 @@ impl BatchStateManager { if res.status().is_success() { tracing::info!("Proving server returned success"); - let proof_serialized = res.bytes().await.unwrap_or_else(|e| { - tracing::error!("Could not read response from proving server: {}", e); - panic!("Could not read response from proving server: {}", e); - }); + let (_proof_outputs, receipt): (ProofOutputs, Receipt) = + res.json().await.unwrap_or_else(|e| { + tracing::error!("Could not parse response from proving server: {}", e); + panic!("Could not parse response from proving server: {}", e); + }); if let Some(secret_key) = secret_key.as_ref() { submit_proof_to_contract( secret_key, contract_hash, casper_rpc.clone(), - proof_serialized.to_vec(), + serde_json::to_vec(&receipt).expect("Could not serialize receipt"), ) .await } else { diff --git a/kairos-server/src/state/submit_batch.rs b/kairos-server/src/state/submit_batch.rs index 6563ceba..1b76b2d8 100644 --- a/kairos-server/src/state/submit_batch.rs +++ b/kairos-server/src/state/submit_batch.rs @@ -1,5 +1,14 @@ -use casper_client::types::{DeployBuilder, ExecutableDeployItem, TimeDiff, Timestamp}; -use casper_client_types::{bytesrepr::Bytes, runtime_args, ContractHash, RuntimeArgs, SecretKey}; +use std::time::Instant; + +use anyhow::anyhow; +use backoff::{future::retry, ExponentialBackoff}; +use casper_client::{ + types::{DeployBuilder, ExecutableDeployItem, TimeDiff, Timestamp}, + Error, JsonRpcId, +}; +use casper_client_types::{ + bytesrepr::Bytes, runtime_args, ContractHash, ExecutionResult, RuntimeArgs, SecretKey, +}; use rand::random; use reqwest::Url; @@ -27,6 +36,8 @@ pub async fn submit_proof_to_contract( .build() .expect("could not build deploy"); + let deploy_hash = *deploy.id(); + let r = casper_client::put_deploy( casper_client::JsonRpcId::Number(random()), casper_rpc.as_str(), @@ -36,5 +47,49 @@ pub async fn submit_proof_to_contract( .await .expect("could not put deploy"); + let start = Instant::now(); + let timed_out = start.elapsed().as_secs() > 60; + + retry(ExponentialBackoff::default(), || async { + let response = casper_client::get_deploy( + JsonRpcId::Number(1), + casper_rpc.as_str(), + casper_client::Verbosity::Low, + deploy_hash, + false, + ) + .await + .map_err(|err| { + let elapsed = start.elapsed().as_secs(); + tracing::info!("Running for {elapsed}s, Error: {err:?}"); + err + }) + .map_err(|err| match &err { + e if timed_out => backoff::Error::permanent(anyhow!("Timeout on error: {e:?}")), + Error::ResponseIsHttpError { .. } | Error::FailedToGetResponse { .. } => { + backoff::Error::transient(anyhow!(err)) + } + _ => backoff::Error::permanent(anyhow!(err)), + }) + .expect("could not get deploy"); + + match response.result.execution_results.first() { + Some(result) => match &result.result { + ExecutionResult::Failure { error_message, .. } => { + Err(backoff::Error::permanent(anyhow!(error_message.clone()))) + } + ExecutionResult::Success { .. } => Ok(()), + }, + None if timed_out => Err(backoff::Error::permanent(anyhow!( + "Timeout on error: No execution results" + ))), + None => Err(backoff::Error::transient(anyhow!( + "No execution results there yet" + ))), + } + }) + .await + .expect("could not get deploy"); + tracing::info!("Deploy successful: {:?}", r); }