Skip to content

Commit

Permalink
feat(snos_job): Added http_client
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Jul 22, 2024
1 parent 2eab368 commit 9ee833e
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 36 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ futures = "0.3.30"
indexmap = "2.1.0"
mongodb = { version = "2.8.1" }
omniqueue = { version = "0.2.0" }
reqwest = { version = "0.11.24" }
reqwest = { version = "0.12", features = ["json"] }
rstest = "0.18.2"
serde = { version = "1.0.197" }
serde_json = "1.0.114"
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ num-bigint = { workspace = true }
num-traits = { workspace = true }
omniqueue = { workspace = true, optional = true }
prover-client-interface = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
settlement-client-interface = { workspace = true }
Expand Down
13 changes: 11 additions & 2 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Config {
prover_client: Box<dyn ProverClient>,
/// Settlement client
settlement_client: Box<dyn SettlementClient>,
/// An HTTP client used for RPC requests
http_client: Box<reqwest::Client>,
/// The database client
database: Box<dyn Database>,
queue: Box<dyn QueueProvider>,
Expand All @@ -53,13 +55,14 @@ pub async fn init_config() -> Config {
// init the queue
let queue = Box::new(SqsQueue {});

let http_client = Box::new(reqwest::Client::new());
let da_client = build_da_client().await;

let settings_provider = DefaultSettingsProvider {};
let settlement_client = build_settlement_client(&settings_provider).await;
let prover_client = build_prover_service(&settings_provider);

Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue)
Config::new(Arc::new(provider), da_client, prover_client, settlement_client, http_client, database, queue)
}

impl Config {
Expand All @@ -69,10 +72,11 @@ impl Config {
da_client: Box<dyn DaClient>,
prover_client: Box<dyn ProverClient>,
settlement_client: Box<dyn SettlementClient>,
http_client: Box<reqwest::Client>,
database: Box<dyn Database>,
queue: Box<dyn QueueProvider>,
) -> Self {
Self { starknet_client, da_client, prover_client, settlement_client, database, queue }
Self { starknet_client, da_client, prover_client, settlement_client, http_client, database, queue }
}

/// Returns the starknet client
Expand All @@ -95,6 +99,11 @@ impl Config {
self.settlement_client.as_ref()
}

/// Returns the http client
pub fn http_client(&self) -> &reqwest::Client {
self.http_client.as_ref()
}

/// Returns the database client
pub fn database(&self) -> &dyn Database {
self.database.as_ref()
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ mod tests {
let server = MockServer::start();

let config =
init_config(Some(format!("http://localhost:{}", server.port())), None, None, None, None, None).await;
init_config(Some(format!("http://localhost:{}", server.port())), None, None, None, None, None, None).await;

get_nonce_attached(&server, nonce_file_path);

Expand Down
11 changes: 5 additions & 6 deletions crates/orchestrator/src/jobs/snos_job/dummy_state.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
//! A Dummy state that does nothing.
//! It just implements the State and StateReader trait provided by Blockifier.
//! It just implements the State and StateReader traits provided by Blockifier.
//!
//! This module needs to be deleted as soon as we can import the structure from
//! Madara code.
//! This module needs to be deleted as soon as we can import the structure
//! [BlockifierStateAdapter] from Madara code.
//! Currently, we have version conflicts between snos <=> deoxys <=> cairo-vm.
//! This is an issue that needs to be tackled on its own.
use std::collections::HashSet;

use blockifier::execution::contract_class::ContractClass;
use blockifier::execution::contract_class::{ContractClass, ContractClassV0};
use blockifier::state::cached_state::CommitmentStateDiff;
use blockifier::state::errors::StateError;
use blockifier::state::state_api::{State, StateReader, StateResult};
use indexmap::IndexMap;
use starknet_api::core::{ClassHash, CompiledClassHash, ContractAddress, Nonce};
Expand All @@ -33,7 +32,7 @@ impl StateReader for DummyState {
}

fn get_compiled_contract_class(&mut self, _class_hash: ClassHash) -> StateResult<ContractClass> {
Err(StateError::OutOfRangeContractAddress)
Ok(ContractClass::V0(ContractClassV0::default()))
}

fn get_compiled_class_hash(&mut self, _class_hash: ClassHash) -> StateResult<CompiledClassHash> {
Expand Down
130 changes: 117 additions & 13 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use cairo_vm::Felt252;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use num::FromPrimitive;
use serde::{Deserialize, Serialize};
use serde_json::json;
use snos::execution::helper::ExecutionHelperWrapper;
use snos::io::input::StarknetOsInput;
use snos::run_os;
use starknet_api::block::{BlockHash, BlockNumber, BlockTimestamp};
use starknet_api::hash::StarkFelt;
use starknet_core::types::FieldElement;
use utils::env_utils::get_env_var_or_panic;
use uuid::Uuid;

use utils::time::get_current_timestamp_in_secs;
Expand All @@ -30,6 +32,52 @@ use crate::jobs::Job;

use super::constants::JOB_METADATA_SNOS_BLOCK;

// TODO: Those two structure should live somewhere else.

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct RpcResponse<T> {
pub jsonrpc: String,
pub id: u64,
pub result: T,
}

#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeHistory {
/// An array of block base fees per gas.
/// This includes the next block after the newest of the returned range,
/// because this value can be derived from the newest block. Zeroes are
/// returned for pre-EIP-1559 blocks.
///
/// # Note
///
/// Empty list is skipped only for compatibility with Erigon and Geth.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub base_fee_per_gas: Vec<String>,
/// An array of block gas used ratios. These are calculated as the ratio
/// of `gasUsed` and `gasLimit`.
///
/// # Note
///
/// The `Option` is only for compatibility with Erigon and Geth.
pub gas_used_ratio: Vec<f64>,
/// An array of block base fees per blob gas. This includes the next block after the newest
/// of the returned range, because this value can be derived from the newest block. Zeroes
/// are returned for pre-EIP-4844 blocks.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub base_fee_per_blob_gas: Vec<String>,
/// An array of block blob gas used ratios. These are calculated as the ratio of gasUsed and
/// gasLimit.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub blob_gas_used_ratio: Vec<f64>,
/// Lowest number block of the returned range.
pub oldest_block: String,
/// An (optional) array of effective priority fee per gas data points from a single
/// block. All zeroes are returned if the block is empty.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reward: Option<Vec<Vec<u128>>>,
}

pub struct SnosJob;

#[async_trait]
Expand All @@ -56,10 +104,12 @@ impl Job for SnosJob {
let block_number = self.get_block_number_from_metadata(job)?;

// 1. Fetch SNOS input data from Madara
let snos_input: StarknetOsInput = self.get_snos_input_from_madara(config, &block_number)?;
let snos_input: StarknetOsInput = self.get_snos_input_from_madara(config, &block_number).await?;

// 2. Build the required inputs for snos::run_os
// TODO: import BlockifierStateAdapter from Madara RPC and use it here
// TODO: import BlockifierStateAdapter from Madara RPC and use it here.
// Currently not possible because of dependencies versions conflicts between
// SNOS, cairo-vm and madara.
let mut state = DummyState {};

let block_number_and_hash = BlockNumberHashPair {
Expand All @@ -75,21 +125,15 @@ impl Job for SnosJob {
// TODO: Assert that we really want current_timestamp?
block_timestamp: BlockTimestamp(get_current_timestamp_in_secs()),
sequencer_address: snos_input.general_config.sequencer_address,
// TODO: retrieve prices from Madara?
gas_prices: GasPrices {
eth_l1_gas_price: NonZeroU128::new(0).unwrap(),
eth_l1_data_gas_price: NonZeroU128::new(0).unwrap(),
strk_l1_gas_price: NonZeroU128::new(0).unwrap(),
strk_l1_data_gas_price: NonZeroU128::new(0).unwrap(),
},
gas_prices: self.get_gas_prices_from_l1(config).await?,
use_kzg_da: snos_input.general_config.use_kzg_da,
};

let chain_info = ChainInfo {
chain_id: snos_input.general_config.starknet_os_config.chain_id.clone(),
fee_token_addresses: FeeTokenAddresses {
eth_fee_token_address: snos_input.general_config.starknet_os_config.fee_token_address,
// TODO: assert that the STRK fee token address is deprecated_fee_token_address
// TODO: assert that the STRK fee token address is [deprecated_fee_token_address]
strk_fee_token_address: snos_input.general_config.starknet_os_config.deprecated_fee_token_address,
},
};
Expand Down Expand Up @@ -165,15 +209,75 @@ impl SnosJob {
}

/// Retrieves the [StarknetOsInput] for the provided block number from Madara.
fn get_snos_input_from_madara(&self, _config: &Config, block_number: &BlockNumber) -> Result<StarknetOsInput> {
let _rpc_request = json!(
async fn get_snos_input_from_madara(&self, config: &Config, block_number: &BlockNumber) -> Result<StarknetOsInput> {
let rpc_request = json!(
{
"id": 1,
"jsonrpc": "2.0",
"method": "madara_getSnosInput",
"params": [{"block_number": block_number}]
}
);
unimplemented!("Handler for madara_getSnosInput has not been implemented")
let client = config.http_client();
let response: RpcResponse<StarknetOsInput> =
client.post(get_env_var_or_panic("MADARA_RPC_URL")).json(&rpc_request).send().await?.json().await?;
Ok(response.result)
}

/// Retrieves the ETH & STRK gas prices and returns them in a [GasPrices].
/// TODO: We only retrieve the ETH gas price for now. For STRK, we need to implement
/// a logic to fetch the price of ETH <=> STRK from an Oracle.
async fn get_gas_prices_from_l1(&self, config: &Config) -> Result<GasPrices> {
let rpc_request = json!(
{
"id": 83,
"jsonrpc": "2.0",
"method": "eth_feeHistory",
"params": [300, "latest", []],
}
);

let client = config.http_client();
let fee_history: RpcResponse<FeeHistory> =
client.post(get_env_var_or_panic("ETHEREUM_RPC_URL")).json(&rpc_request).send().await?.json().await?;

let (eth_l1_gas_price, eth_l1_data_gas_price) = self.compute_eth_gas_prices_from_history(fee_history.result)?;

let gas_prices = GasPrices {
eth_l1_gas_price,
eth_l1_data_gas_price,
// TODO: Logic for fetching from an Oracle
strk_l1_gas_price: NonZeroU128::new(0).unwrap(),
strk_l1_data_gas_price: NonZeroU128::new(0).unwrap(),
};
Ok(gas_prices)
}

/// Compute the l1_gas_price and l1_data_gas_price from the [FeeHistory].
/// Source: https://github.com/keep-starknet-strange/madara/blob/7b405924b5859fdfa24ec33f152e56a97a047e31/crates/client/l1-gas-price/src/worker.rs#L31
fn compute_eth_gas_prices_from_history(&self, fee_history: FeeHistory) -> Result<(NonZeroU128, NonZeroU128)> {
// The RPC responds with 301 elements for some reason - probably because of the parameter "300" in the
// request.
// It's also just safer to manually take the last 300. We choose 300 to get average gas caprice for
// last one hour (300 * 12 sec block time).
let (_, blob_fee_history_one_hour) =
fee_history.base_fee_per_blob_gas.split_at(fee_history.base_fee_per_blob_gas.len().max(300) - 300);

let avg_blob_base_fee = blob_fee_history_one_hour
.iter()
.map(|hex_str| u128::from_str_radix(&hex_str[2..], 16).unwrap())
.sum::<u128>()
/ blob_fee_history_one_hour.len() as u128;

let eth_gas_price = u128::from_str_radix(
fee_history
.base_fee_per_gas
.last()
.ok_or(eyre!("Failed to get last element of `base_fee_per_gas`"))?
.trim_start_matches("0x"),
16,
)?;
// TODO: unsafe unwraps - handle errors
Ok((NonZeroU128::new(eth_gas_price).unwrap(), NonZeroU128::new(avg_blob_base_fee).unwrap()))
}
}
3 changes: 3 additions & 0 deletions crates/orchestrator/src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub async fn init_config(
da_client: Option<MockDaClient>,
prover_client: Option<MockProverClient>,
settlement_client: Option<MockSettlementClient>,
http_client: Option<reqwest::Client>,
) -> Config {
let _ = tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).with_target(false).try_init();

Expand All @@ -36,6 +37,7 @@ pub async fn init_config(
let da_client = da_client.unwrap_or_default();
let prover_client = prover_client.unwrap_or_default();
let settlement_client = settlement_client.unwrap_or_default();
let http_client = http_client.unwrap_or_default();

// init starknet client
let provider = JsonRpcClient::new(HttpTransport::new(Url::parse(rpc_url.as_str()).expect("Failed to parse URL")));
Expand All @@ -45,6 +47,7 @@ pub async fn init_config(
Box::new(da_client),
Box::new(prover_client),
Box::new(settlement_client),
Box::new(http_client),
Box::new(database),
Box::new(queue),
)
Expand Down
7 changes: 4 additions & 3 deletions crates/orchestrator/src/tests/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::jobs::Job;
#[rstest]
#[tokio::test]
async fn test_create_job() {
let config = init_config(None, None, None, None, None, None).await;
let config = init_config(None, None, None, None, None, None, None).await;
let job = DaJob.create_job(&config, String::from("0"), HashMap::new()).await;
assert!(job.is_ok());

Expand All @@ -36,7 +36,7 @@ async fn test_verify_job(#[from(default_job_item)] mut job_item: JobItem) {
let mut da_client = MockDaClient::new();
da_client.expect_verify_inclusion().times(1).returning(|_| Ok(DaVerificationStatus::Verified));

let config = init_config(None, None, None, Some(da_client), None, None).await;
let config = init_config(None, None, None, Some(da_client), None, None, None).await;
assert!(DaJob.verify_job(&config, &mut job_item).await.is_ok());
}

Expand All @@ -53,7 +53,8 @@ async fn test_process_job() {
da_client.expect_max_blob_per_txn().times(1).returning(move || ETHEREUM_MAX_BLOB_PER_TXN);

let config =
init_config(Some(format!("http://localhost:{}", server.port())), None, None, Some(da_client), None, None).await;
init_config(Some(format!("http://localhost:{}", server.port())), None, None, Some(da_client), None, None, None)
.await;
let state_update = MaybePendingStateUpdate::Update(StateUpdate {
block_hash: FieldElement::default(),
new_root: FieldElement::default(),
Expand Down
Loading

0 comments on commit 9ee833e

Please sign in to comment.