Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(l2): restart processes with the proposed API #1234

Open
wants to merge 15 commits into
base: process-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/l2/proposer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Committer {
}
}

pub async fn main_logic(&self) -> Result<(), CommitterError> {
async fn main_logic(&self) -> Result<(), CommitterError> {
let last_committed_block =
EthClient::get_last_committed_block(&self.eth_client, self.on_chain_proposer_address)
.await?;
Expand Down
71 changes: 43 additions & 28 deletions crates/l2/proposer/l1_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,13 @@ use keccak_hash::keccak;
use secp256k1::SecretKey;
use std::{cmp::min, ops::Mul, time::Duration};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

pub async fn start_l1_watcher(store: Store) {
let eth_config = EthConfig::from_env().expect("EthConfig::from_env()");
let watcher_config = L1WatcherConfig::from_env().expect("L1WatcherConfig::from_env()");
let sleep_duration = Duration::from_millis(watcher_config.check_interval_ms);
let mut l1_watcher = L1Watcher::new_from_config(watcher_config, eth_config);
loop {
sleep(sleep_duration).await;

let logs = match l1_watcher.get_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting logs from L1: {}", error);
continue;
}
};
if logs.is_empty() {
continue;
}

let pending_deposits_logs = match l1_watcher.get_pending_deposit_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting L1 pending deposit logs: {}", error);
continue;
}
};
let _deposit_txs = l1_watcher
.process_logs(logs, &pending_deposits_logs, &store)
.await
.expect("l1_watcher.process_logs()");
}
l1_watcher.run(&store).await;
}

pub struct L1Watcher {
Expand All @@ -58,6 +32,7 @@ pub struct L1Watcher {
max_block_step: U256,
last_block_fetched: U256,
l2_proposer_pk: SecretKey,
check_interval: Duration,
}

impl L1Watcher {
Expand All @@ -69,6 +44,46 @@ impl L1Watcher {
max_block_step: watcher_config.max_block_step,
last_block_fetched: U256::zero(),
l2_proposer_pk: watcher_config.l2_proposer_private_key,
check_interval: Duration::from_millis(watcher_config.check_interval_ms),
}
}

pub async fn run(&mut self, store: &Store) {
loop {
if let Err(err) = self.main_logic(store.clone()).await {
error!("L1 Watcher Error: {}", err);
}

sleep(Duration::from_millis(200)).await;
fborello-lambda marked this conversation as resolved.
Show resolved Hide resolved
}
}

async fn main_logic(&mut self, store: Store) -> Result<(), L1WatcherError> {
loop {
sleep(self.check_interval).await;

let logs = match self.get_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting logs from L1: {}", error);
fborello-lambda marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
};
if logs.is_empty() {
continue;
}

let pending_deposits_logs = match self.get_pending_deposit_logs().await {
Ok(logs) => logs,
Err(error) => {
warn!("Error when getting L1 pending deposit logs: {}", error);
fborello-lambda marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
};
let _deposit_txs = self
.process_logs(logs, &pending_deposits_logs, &store)
.await
.expect("l1_watcher.process_logs()");
fborello-lambda marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
44 changes: 28 additions & 16 deletions crates/l2/proposer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::time::Duration;

use crate::utils::config::{proposer::ProposerConfig, read_env_file};
use errors::ProposerError;
use ethereum_types::{Address, H256};
use ethrex_dev::utils::engine_client::{config::EngineApiConfig, errors::EngineClientError};
use ethrex_storage::Store;
use tracing::{info, warn};
use tokio::time::sleep;
use tracing::{error, info, warn};

pub mod l1_committer;
pub mod l1_watcher;
Expand Down Expand Up @@ -33,20 +36,8 @@ pub async fn start_proposer(store: Store) {
let engine_config = EngineApiConfig::from_env().expect("EngineApiConfig::from_env");
let proposer = Proposer::new_from_config(&proposer_config, engine_config)
.expect("Proposer::new_from_config");
let head_block_hash = {
let current_block_number = store
.get_latest_block_number()
.expect("store.get_latest_block_number")
.expect("store.get_latest_block_number returned None");
store
.get_canonical_block_hash(current_block_number)
.expect("store.get_canonical_block_hash")
.expect("store.get_canonical_block_hash returned None")
};
proposer
.start(head_block_hash)
.await
.expect("Proposer::start");

proposer.run(store.clone()).await;
});
tokio::try_join!(l1_watcher, l1_committer, prover_server, proposer).expect("tokio::try_join");
}
Expand All @@ -63,7 +54,28 @@ impl Proposer {
})
}

pub async fn start(&self, head_block_hash: H256) -> Result<(), ProposerError> {
pub async fn run(&self, store: Store) {
loop {
let head_block_hash = {
let current_block_number = store
.get_latest_block_number()
.expect("store.get_latest_block_number")
fborello-lambda marked this conversation as resolved.
Show resolved Hide resolved
.expect("store.get_latest_block_number returned None");
store
.get_canonical_block_hash(current_block_number)
.expect("store.get_canonical_block_hash")
.expect("store.get_canonical_block_hash returned None")
};

if let Err(err) = self.main_logic(head_block_hash).await {
error!("Block Producer Error: {}", err);
}

sleep(Duration::from_millis(200)).await;
}
}

pub async fn main_logic(&self, head_block_hash: H256) -> Result<(), ProposerError> {
ethrex_dev::block_producer::start_block_producer(
self.engine_config.rpc_url.clone(),
std::fs::read(&self.engine_config.jwt_path).unwrap().into(),
Expand Down
Loading