diff --git a/relay/src/callback/mod.rs b/relay/src/callback/mod.rs index 85b3c97..38628e0 100644 --- a/relay/src/callback/mod.rs +++ b/relay/src/callback/mod.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use anagram_bonsol_channel_utils::deployment_address; use async_trait::async_trait; use dashmap::DashMap; use itertools::Itertools; use solana_sdk::{ - message::{v0, VersionedMessage}, - transaction::VersionedTransaction, + account::Account, message::{v0, VersionedMessage}, transaction::VersionedTransaction }; use solana_transaction_status::TransactionStatus; use tokio::task::JoinHandle; @@ -22,16 +22,15 @@ use { use { anyhow::Result, + metrics::gauge, solana_rpc_client::nonblocking::rpc_client::RpcClient, solana_sdk::{ instruction::{AccountMeta, Instruction}, pubkey::Pubkey, signature::Keypair, - signer::Signer, - transaction::Transaction, + signer::Signer }, tracing::{error, info}, - metrics::gauge, }; #[async_trait] pub trait TransactionSender { @@ -60,6 +59,7 @@ pub trait TransactionSender { async fn get_current_block(&self) -> Result; fn get_signature_status(&self, sig: &Signature) -> Option; fn clear_signature_status(&self, sig: &Signature); + async fn get_deployment_account(&self, image_id: &str) -> Result; } use crate::{observe::MetricEvents, types::ProgramExec}; @@ -297,4 +297,13 @@ impl TransactionSender for RpcTransactionSender { .await .map_err(|e| anyhow::anyhow!("{:?}", e)) } + + async fn get_deployment_account(&self, image_id: &str) -> Result { + let (deployment_account, _) = deployment_address(image_id); + self + .rpc_client + .get_account(&deployment_account) + .await + .map_err(|e| anyhow::anyhow!("Failed to get account: {:?}", e)) + } } diff --git a/relay/src/config.rs b/relay/src/config.rs index d762d44..114091a 100644 --- a/relay/src/config.rs +++ b/relay/src/config.rs @@ -30,6 +30,19 @@ pub enum SignerConfig { KeypairFile { path: String }, //--- below not implemented yet maybe hsm, signer server or some weird sig agg shiz } +#[derive(Debug, Deserialize, Clone)] +pub enum MissingImageStrategy { + DownloadAndClaim, + DownloadAndMiss, + Fail, +} + +impl Default for MissingImageStrategy { + fn default() -> Self { + MissingImageStrategy::DownloadAndClaim + } +} + #[derive(Debug, Deserialize, Clone)] pub struct ProverNodeConfig { pub env: Option, @@ -59,6 +72,7 @@ pub struct ProverNodeConfig { pub stark_compression_tools_path: String, #[serde(default = "default_metrics_config")] pub metrics_config: MetricsConfig, + pub missing_image_strategy: MissingImageStrategy, } #[derive(Debug, Deserialize, Clone)] @@ -142,6 +156,7 @@ impl Default for ProverNodeConfig { signer_config: default_signer_config(), stark_compression_tools_path: default_stark_compression_tools_path(), metrics_config: default_metrics_config(), + missing_image_strategy: Default::default(), } } } diff --git a/relay/src/prover/mod.rs b/relay/src/prover/mod.rs index bcd107f..6a3819b 100644 --- a/relay/src/prover/mod.rs +++ b/relay/src/prover/mod.rs @@ -1,7 +1,7 @@ mod image; mod utils; use self::image::Image; -use crate::observe::*; +use crate::{observe::*, MissingImageStrategy}; use crate::{ callback::{RpcTransactionSender, TransactionSender}, config::ProverNodeConfig, @@ -30,6 +30,7 @@ use { }, }; +use anagram_bonsol_schema::root_as_deploy_v1; use risc0_groth16::{ProofJson, Seal}; use risc0_zkvm::{sha::Digest, InnerReceipt, MaybePruned, ReceiptClaim}; use tempfile::tempdir; @@ -56,6 +57,8 @@ pub enum Risc0RunnerError { InvalidData, #[error("Img too large")] ImgTooLarge, + #[error("Img load error")] + ImgLoadError, #[error("Image download error")] ImageDownloadError(#[from] anyhow::Error), #[error("Invalid input type")] @@ -248,6 +251,7 @@ impl Risc0Runner { &config, &inflight_proofs, input_client.clone(), + img_client.clone(), &txn_sender, &loaded_images, &input_staging_area, @@ -447,6 +451,7 @@ async fn handle_execution_request<'a>( config: &ProverNodeConfig, in_flight_proofs: InflightProofRef<'a>, input_client: Arc, + img_client: Arc, transaction_sender: &RpcTransactionSender, loaded_images: LoadedImageMapRef<'a>, input_staging_area: InputStagingAreaRef<'a>, @@ -467,16 +472,35 @@ async fn handle_execution_request<'a>( .map(|d| d.to_string()) .ok_or(Risc0RunnerError::InvalidData)?; let expiry = exec.max_block_height(); - let image_compute_estimate = loaded_images.get(&image_id).map(|img| img.size); - let computable_by = if let Some(ice) = image_compute_estimate { + let img = loaded_images.get(&image_id); + let img = if img.is_none() { + match config.missing_image_strategy { + MissingImageStrategy::DownloadAndClaim => { + info!("Image not loaded, attempting to load and rejecting claim"); + load_image(config, transaction_sender, &img_client, &image_id, loaded_images).await?; + loaded_images.get(&image_id) + } + MissingImageStrategy::DownloadAndMiss => { + info!("Image not loaded, loading and rejecting claim"); + load_image(config, transaction_sender, &img_client, &image_id, loaded_images).await?; + None + } + MissingImageStrategy::Fail => { + info!("Image not loaded, rejecting claim"); + None + } + } + } else { + img + } + .ok_or(Risc0RunnerError::ImgLoadError)?; + // naive compute cost estimate which is YES WE CAN DO THIS in the default amount of time - emit_histogram!(MetricEvents::ImageComputeEstimate, ice as f64, image_id => image_id.clone()); + emit_histogram!(MetricEvents::ImageComputeEstimate, img.size as f64, image_id => image_id.clone()); //ensure compute can happen before expiry //execution_block + (image_compute_estimate % config.max_compute_per_block) + 1 some bogus calc - expiry / 2 - } else { - u64::MAX - }; + let computable_by = expiry / 2; + if computable_by < expiry { //the way this is done can cause race conditions where so many request come in a short time that we accept // them before we change the value of g so we optimistically change to inflight and we will decrement if we dont win the claim @@ -638,6 +662,23 @@ async fn dowload_public_input( }) } +async fn load_image<'a>( + config: &ProverNodeConfig, + transaction_sender: &RpcTransactionSender, + http_client: &reqwest::Client, + image_id: &str, + loaded_images: LoadedImageMapRef<'a> +) -> Result<()> { + let account = transaction_sender + .get_deployment_account(image_id) + .await + .map_err(|e| Risc0RunnerError::ImageDownloadError(e))?; + let deploy_data = root_as_deploy_v1(&account.data) + .map_err(|_| anyhow::anyhow!("Failed to parse account data"))?; + handle_image_deployment(config, http_client, deploy_data, loaded_images).await?; + Ok(()) +} + async fn handle_image_deployment<'a>( config: &ProverNodeConfig, http_client: &reqwest::Client,