From d7fb4bca79c6363efe0aee71304ab96498696e9d Mon Sep 17 00:00:00 2001 From: cezary-stroczynski Date: Fri, 4 Oct 2024 15:08:15 +0200 Subject: [PATCH 1/2] Basic DDEX msg digestion flow --- contracts/contracts/DdexSequencer.sol | 13 ++- contracts/scripts/execute/deployLocal.ts | 47 +++++++++++ contracts/scripts/fixture/fixture.deploy.ts | 18 ++++ contracts/test/DdexSequencer.test.ts | 17 ++-- ow_data_provider_cli/src/constants.rs | 2 +- ow_data_provider_cli/src/ddex_sequencer.rs | 4 + ow_data_provider_cli/src/lib.rs | 1 + ow_validator_node/.env.template | 1 + ow_validator_node/Cargo.lock | 39 ++++++++- ow_validator_node/Cargo.toml | 2 + ow_validator_node/src/beacon_chain.rs | 33 +++++++- ow_validator_node/src/circuit_mock.rs | 67 +++++++++++++++ ow_validator_node/src/constants.rs | 5 +- ow_validator_node/src/ddex_sequencer.rs | 77 +++++++++++++++-- ow_validator_node/src/errors.rs | 12 +++ ow_validator_node/src/lib.rs | 92 +++++++++++++++++++-- 16 files changed, 397 insertions(+), 33 deletions(-) create mode 100644 contracts/scripts/execute/deployLocal.ts create mode 100644 ow_validator_node/src/circuit_mock.rs diff --git a/contracts/contracts/DdexSequencer.sol b/contracts/contracts/DdexSequencer.sol index a85f69a8..f0cf54f9 100644 --- a/contracts/contracts/DdexSequencer.sol +++ b/contracts/contracts/DdexSequencer.sol @@ -6,7 +6,7 @@ pragma solidity ^0.8.24; contract DdexSequencer is WhitelistConsumer { event NewBlobSubmitted(bytes commitment); - + event MessageDigested(DdexMessageData data); struct Blob { bytes32 nextBlob; @@ -14,6 +14,11 @@ contract DdexSequencer is WhitelistConsumer { address proposer; } + struct DdexMessageData { + string isrc; + string releaseId; + } + bytes1 public constant DATA_PROVIDERS_WHITELIST = 0x01; bytes1 public constant VALIDATORS_WHITELIST = 0x02; @@ -59,7 +64,8 @@ contract DdexSequencer is WhitelistConsumer { } function submitProofOfProcessing( - bool proof + bool proof, + DdexMessageData[] memory messagesData ) external isWhitelistedOn(VALIDATORS_WHITELIST) { require(blobQueueHead != bytes32(0), "Queue is empty"); bool isValid = proof; // TODO: implement actual logic of checking the proof for the blobQueueHead @@ -67,6 +73,9 @@ contract DdexSequencer is WhitelistConsumer { require(isValid, "Invalid proof"); _moveQueue(); + for (uint i = 0; i < messagesData.length; i++) { + emit MessageDigested(messagesData[i]); + } } function submitProofForFraudulentBlob( diff --git a/contracts/scripts/execute/deployLocal.ts b/contracts/scripts/execute/deployLocal.ts new file mode 100644 index 00000000..af337871 --- /dev/null +++ b/contracts/scripts/execute/deployLocal.ts @@ -0,0 +1,47 @@ +import { deployDdexSequencer } from "../actions/contract-deployment/DdexSequencer/DdexSequencer.deploy"; +import { deployOwnToken } from "../actions/contract-deployment/OwnToken/OwnToken.deploy"; +import { deployStakeVault } from "../actions/contract-deployment/StakeVault/StakeVault.deploy"; +import { deployWhitelist } from "../actions/contract-deployment/Whitelist/Whitelist.deploy"; +import { ethers } from "hardhat"; +import { getKurtosisEthersWallets } from "../fixture/fixture.deploy"; + +const SLASH_RATE = 1000; + +async function main() { + const [signer, validator, validator2, dataProvider, dataProvider2] = + getKurtosisEthersWallets(); + + const dataProvidersWhitelist = await deployWhitelist(signer, [ + dataProvider.address, + dataProvider2.address, + ]); + const validatorsWhitelist = await deployWhitelist(signer, [ + validator.address, + validator2.address, + ]); + const ownToken = await deployOwnToken(); + const stakeVault = await deployStakeVault({ + stakeTokenAddress: await ownToken.getAddress(), + _slashRate: SLASH_RATE, + }); + const ddexSequencer = await deployDdexSequencer({ + dataProvidersWhitelist: await dataProvidersWhitelist.getAddress(), + validatorsWhitelist: await validatorsWhitelist.getAddress(), + stakeVaultAddress: await stakeVault.getAddress(), + }); + + console.log({ + token: await ownToken.getAddress(), + deployer: await signer.getAddress(), + validator: validator.address, + validator2: validator2.address, + dataProvider: dataProvider.address, + dataProvider2: dataProvider2.address, + ddexSequencer: await ddexSequencer.getAddress(), + ownToken: await ownToken.getAddress(), + dataProvidersWhitelist: await dataProvidersWhitelist.getAddress(), + validatorsWhitelist: await validatorsWhitelist.getAddress(), + }); +} + +main(); diff --git a/contracts/scripts/fixture/fixture.deploy.ts b/contracts/scripts/fixture/fixture.deploy.ts index c873c8d1..8a2c87ff 100644 --- a/contracts/scripts/fixture/fixture.deploy.ts +++ b/contracts/scripts/fixture/fixture.deploy.ts @@ -59,3 +59,21 @@ export async function getEthersWalletWithFunds( await tx.wait(); return wallet; } + +// it's necessary to use ethers.Wallet instead of hardhatEthers.Wallet +// as only the first one currently supports type 3 EIP4844 transaction +// This function works only with Kurtosis testnet setup!!!! +export function getKurtosisEthersWallets(): HDNodeWallet[] { + const phrase = `${process.env.KURTOSIS_MNEMONIC}`; + const mnemonic = ethers.Mnemonic.fromPhrase(phrase); + const wallets: HDNodeWallet[] = []; + for (let i = 0; i < 20; i++) { + wallets.push( + ethers.HDNodeWallet.fromMnemonic(mnemonic, `m/44'/60'/0'/0/${i}`).connect( + hardhatEthers.provider + ) + ); + } + + return wallets; +} diff --git a/contracts/test/DdexSequencer.test.ts b/contracts/test/DdexSequencer.test.ts index 19f98c39..0b4048e9 100644 --- a/contracts/test/DdexSequencer.test.ts +++ b/contracts/test/DdexSequencer.test.ts @@ -137,7 +137,7 @@ describe("DdexSequencer", () => { expect(await ddexSequencer.blobQueueTail()).equal(blobhash); const blobDetailsBefore = await ddexSequencer.blobs(blobhash); - await ddexSequencer.connect(validator).submitProofOfProcessing(true); + await ddexSequencer.connect(validator).submitProofOfProcessing(true, []); const blobDetailsAfter = await ddexSequencer.blobs(blobhash); expect(await ddexSequencer.blobQueueHead()).equal(ZERO_BYTES32); @@ -176,7 +176,7 @@ describe("DdexSequencer", () => { expect(await ddexSequencer.blobQueueTail()).equal(blobhash2); const blob1DetailsBefore = await ddexSequencer.blobs(blobhash1); - await ddexSequencer.connect(validator).submitProofOfProcessing(true); + await ddexSequencer.connect(validator).submitProofOfProcessing(true, []); const blob1DetailsAfter = await ddexSequencer.blobs(blobhash1); expect(blob1DetailsBefore.nextBlob).equal(blobhash2); @@ -191,7 +191,7 @@ describe("DdexSequencer", () => { expect(await ddexSequencer.blobQueueHead()).equal(blobhash2); expect(await ddexSequencer.blobQueueTail()).equal(blobhash2); const blob2DetailsBefore = await ddexSequencer.blobs(blobhash2); - await ddexSequencer.connect(validator).submitProofOfProcessing(true); + await ddexSequencer.connect(validator).submitProofOfProcessing(true, []); const blob2DetailsAfter = await ddexSequencer.blobs(blobhash2); expect(blob2DetailsBefore.nextBlob).equal(ZERO_BYTES32); @@ -237,7 +237,7 @@ describe("DdexSequencer", () => { expect(await ddexSequencer.blobQueueTail()).equal(blobhash3); const blob1DetailsBefore = await ddexSequencer.blobs(blobhash1); - await ddexSequencer.connect(validator).submitProofOfProcessing(true); + await ddexSequencer.connect(validator).submitProofOfProcessing(true, []); const blob1DetailsAfter = await ddexSequencer.blobs(blobhash1); expect(blob1DetailsBefore.nextBlob).equal(blobhash2); @@ -252,7 +252,7 @@ describe("DdexSequencer", () => { expect(await ddexSequencer.blobQueueHead()).equal(blobhash2); expect(await ddexSequencer.blobQueueTail()).equal(blobhash3); const blob2DetailsBefore = await ddexSequencer.blobs(blobhash2); - await ddexSequencer.connect(validator).submitProofOfProcessing(true); + await ddexSequencer.connect(validator).submitProofOfProcessing(true, []); const blob2DetailsAfter = await ddexSequencer.blobs(blobhash2); expect(blob2DetailsBefore.nextBlob).equal(blobhash3); @@ -267,7 +267,7 @@ describe("DdexSequencer", () => { expect(await ddexSequencer.blobQueueHead()).equal(blobhash3); expect(await ddexSequencer.blobQueueTail()).equal(blobhash3); const blob3DetailsBefore = await ddexSequencer.blobs(blobhash3); - await ddexSequencer.connect(validator).submitProofOfProcessing(true); + await ddexSequencer.connect(validator).submitProofOfProcessing(true, []); const blob3DetailsAfter = await ddexSequencer.blobs(blobhash3); expect(blob3DetailsBefore.nextBlob).equal(ZERO_BYTES32); @@ -289,7 +289,8 @@ describe("DdexSequencer", () => { validators: [validator], } = fixture; expect(await ddexSequencer.blobQueueHead()).equal(ZERO_BYTES32); - await expect(ddexSequencer.connect(validator).submitProofOfProcessing(true)) - .to.rejected; + await expect( + ddexSequencer.connect(validator).submitProofOfProcessing(true, []) + ).to.rejected; }); }); diff --git a/ow_data_provider_cli/src/constants.rs b/ow_data_provider_cli/src/constants.rs index 6e59c372..47d57352 100644 --- a/ow_data_provider_cli/src/constants.rs +++ b/ow_data_provider_cli/src/constants.rs @@ -1,3 +1,3 @@ use alloy::primitives::{address, Address}; -pub const DDEX_SEQUENCER_ADDRESS: Address = address!("00c042C4D5D913277CE16611a2ce6e9003554aD5"); +pub const DDEX_SEQUENCER_ADDRESS: Address = address!("B965D10739e19a9158e7f713720B0145D996E370"); diff --git a/ow_data_provider_cli/src/ddex_sequencer.rs b/ow_data_provider_cli/src/ddex_sequencer.rs index abdaab2f..0e1880f2 100644 --- a/ow_data_provider_cli/src/ddex_sequencer.rs +++ b/ow_data_provider_cli/src/ddex_sequencer.rs @@ -74,6 +74,10 @@ impl DdexSequencerContext<'_> { .contract .submitNewBlob(Bytes::from(transaction_data.kzg_commitment.to_vec())) .sidecar(transaction_data.blob_sidecar) + // TODO make gas settings optional CLI/setting file parameters + .max_fee_per_blob_gas(10000000) + .max_fee_per_gas(100000000) + .max_priority_fee_per_gas(100000000) .send() .await .unwrap() diff --git a/ow_data_provider_cli/src/lib.rs b/ow_data_provider_cli/src/lib.rs index 480ad901..ad29f7a6 100644 --- a/ow_data_provider_cli/src/lib.rs +++ b/ow_data_provider_cli/src/lib.rs @@ -67,6 +67,7 @@ pub async fn run(config: Config) -> Result<(), Box> { let ddex_sequencer_context = DdexSequencerContext::build(&provider).await?; let blob_transaction_data = BlobTransactionData::build(&config).unwrap(); + println!("sending tx..."); ddex_sequencer_context .send_blob(blob_transaction_data) .await?; diff --git a/ow_validator_node/.env.template b/ow_validator_node/.env.template index 725c11ed..dd92c121 100644 --- a/ow_validator_node/.env.template +++ b/ow_validator_node/.env.template @@ -1,3 +1,4 @@ +PRIVATE_KEY=0000000000000000000000000000000000000000000000000000000000000000 RPC_URL=https://placeholder WS_URL=wss://placeholder BEACON_RPC_URL=placeholder diff --git a/ow_validator_node/Cargo.lock b/ow_validator_node/Cargo.lock index 15a453cc..44406d22 100644 --- a/ow_validator_node/Cargo.lock +++ b/ow_validator_node/Cargo.lock @@ -17,6 +17,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "ahash" version = "0.8.11" @@ -808,7 +814,7 @@ dependencies = [ "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", ] @@ -1789,6 +1795,15 @@ dependencies = [ "adler", ] +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + [[package]] name = "mio" version = "1.0.2" @@ -1936,6 +1951,17 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ow_blob_codec" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf657a0ea98eb5b5b5361de98c5367975b96a207c20c27adda44e99c995d8f42" +dependencies = [ + "c-kzg", + "miniz_oxide 0.8.0", + "quick-xml", +] + [[package]] name = "ow_validator_node" version = "0.1.0" @@ -1943,6 +1969,8 @@ dependencies = [ "alloy", "dotenvy", "futures-util", + "ow_blob_codec", + "quick-xml", "reqwest", "serde", "sha2", @@ -2167,6 +2195,15 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +dependencies = [ + "memchr", +] + [[package]] name = "quote" version = "1.0.37" diff --git a/ow_validator_node/Cargo.toml b/ow_validator_node/Cargo.toml index fd7a3683..743d6177 100644 --- a/ow_validator_node/Cargo.toml +++ b/ow_validator_node/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" alloy = { version = "0.3.0", features = ["full"] } dotenvy = "0.15.7" futures-util = "0.3.30" +ow_blob_codec = "0.1.1" +quick-xml = "0.36.2" reqwest = "0.12.7" serde = { version = "1.0.210", features = ["derive"] } sha2 = "0.10.8" diff --git a/ow_validator_node/src/beacon_chain.rs b/ow_validator_node/src/beacon_chain.rs index af409935..ffd4f640 100644 --- a/ow_validator_node/src/beacon_chain.rs +++ b/ow_validator_node/src/beacon_chain.rs @@ -1,5 +1,8 @@ use crate::{constants, errors::OwValidatorNodeError}; -use alloy::primitives::{Bytes, FixedBytes}; +use alloy::{ + eips::eip4844::BYTES_PER_BLOB, + primitives::{Bytes, FixedBytes}, +}; use reqwest; use serde::Deserialize; use std::error::Error; @@ -81,11 +84,32 @@ async fn find_commitment_in_sidecars( } } +fn blob_vec_from_string(prefixed_blob: String) -> Result<[u8; BYTES_PER_BLOB], Box> { + if prefixed_blob.len() != BYTES_PER_BLOB * 2 + 2 { + return Err(Box::new(OwValidatorNodeError::InvalidBlobLength( + prefixed_blob.len(), + ))); + } + let mut byte_array = [0u8; BYTES_PER_BLOB]; + + let blob = &prefixed_blob[2..]; + + for (i, byte) in byte_array.iter_mut().enumerate() { + let hex_byte = &blob[i * 2..i * 2 + 2]; + *byte = u8::from_str_radix(hex_byte, 16).map_err(|_| { + Box::new(OwValidatorNodeError::InvalidHexStringValue( + hex_byte.to_string(), + )) + })?; + } + Ok(byte_array) +} + pub async fn find_blob( beacon_rpc_url: &String, commitment: Bytes, parent_beacon_block_root: FixedBytes<32>, -) -> Result> { +) -> Result<[u8; BYTES_PER_BLOB], Box> { let mut slot = get_parent_beacon_block_slot(beacon_rpc_url, parent_beacon_block_root).await?; let mut blob_sidecar_data: Option = find_commitment_in_sidecars(beacon_rpc_url, slot, &commitment).await; @@ -102,5 +126,8 @@ pub async fn find_blob( )); })? .blob; - Ok(blob) + + let blob_array = blob_vec_from_string(blob)?; + + Ok(blob_array) } diff --git a/ow_validator_node/src/circuit_mock.rs b/ow_validator_node/src/circuit_mock.rs new file mode 100644 index 00000000..1c2cb887 --- /dev/null +++ b/ow_validator_node/src/circuit_mock.rs @@ -0,0 +1,67 @@ +use quick_xml::{ + events::{BytesEnd, BytesStart, Event}, + Reader, +}; + +use crate::{ddex_sequencer::DdexSequencer::DdexMessageData, errors::OwValidatorNodeError}; +use std::{error::Error, io::Cursor}; + +pub fn extract_message_data( + decoded_blob: &Vec>, +) -> Result, Box> { + let mut result = Vec::::new(); + + let mut reader: Reader>>; + let mut buffer = Vec::new(); + + let mut inside_isrc_tag = false; + let mut inside_release_id_tag = false; + + let mut isrc_tag_value: String = String::new(); + let mut release_id_tag_value: String = String::new(); + + for ddex_message in decoded_blob { + reader = Reader::from_reader(Cursor::new(&ddex_message)); + + loop { + match reader.read_event_into(&mut buffer) { + Ok(Event::Eof) => break, + Ok(Event::Start(e)) if e == BytesStart::new("ISRC") => { + inside_isrc_tag = true; + } + Ok(Event::Start(e)) if e == BytesStart::new("GRid") => { + inside_release_id_tag = true; + } + Ok(Event::Text(e)) if inside_isrc_tag => { + isrc_tag_value = String::from_utf8(e.to_vec())?; + } + Ok(Event::Text(e)) if inside_release_id_tag => { + release_id_tag_value = String::from_utf8(e.to_vec())?; + } + Ok(Event::End(e)) if e == BytesEnd::new("ISRC") => { + inside_isrc_tag = false; + } + Ok(Event::End(e)) if e == BytesEnd::new("GRid") => { + inside_release_id_tag = false; + } + Ok(_) => (), + Err(_) => { + return Err(Box::new(OwValidatorNodeError::InvalidHexStringValue( + "duer".to_string(), + ))) + } + } + buffer.clear(); + } + + result.push(DdexMessageData { + isrc: isrc_tag_value.clone(), + releaseId: release_id_tag_value.clone(), + }); + + isrc_tag_value = String::new(); + release_id_tag_value = String::new(); + } + + Ok(result) +} diff --git a/ow_validator_node/src/constants.rs b/ow_validator_node/src/constants.rs index 6199fb9d..ab8a0e5e 100644 --- a/ow_validator_node/src/constants.rs +++ b/ow_validator_node/src/constants.rs @@ -1,5 +1,6 @@ -use alloy::primitives::{address, Address}; +use alloy::primitives::{address, Address, FixedBytes}; -pub const DDEX_SEQUENCER_ADDRESS: Address = address!("00c042C4D5D913277CE16611a2ce6e9003554aD5"); +pub const DDEX_SEQUENCER_ADDRESS: Address = address!("B965D10739e19a9158e7f713720B0145D996E370"); pub const GET_BEACON_BLOCK_API_PATH: &str = "/eth/v2/beacon/blocks/"; pub const GET_SIDECARS_API_PATH: &str = "/eth/v1/beacon/blob_sidecars/"; +pub const EMPTY_QUEUE_HEAD: FixedBytes<32> = FixedBytes::repeat_byte(0); diff --git a/ow_validator_node/src/ddex_sequencer.rs b/ow_validator_node/src/ddex_sequencer.rs index 048bd9b2..ebbe842a 100644 --- a/ow_validator_node/src/ddex_sequencer.rs +++ b/ow_validator_node/src/ddex_sequencer.rs @@ -1,5 +1,10 @@ use crate::{constants, errors::OwValidatorNodeError, Config}; +use alloy::network::{Ethereum, EthereumWallet}; use alloy::primitives::{Bytes, FixedBytes}; +use alloy::providers::fillers::{ + ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller, +}; +use alloy::providers::{ProviderBuilder, WsConnect}; use alloy::{ eips::BlockNumberOrTag, providers::{Provider, RootProvider}, @@ -7,6 +12,7 @@ use alloy::{ sol, sol_types::SolEvent, }; +use futures_util::StreamExt; use sha2::{Digest, Sha256}; use std::error::Error; @@ -20,7 +26,18 @@ sol!( pub struct DdexSequencerContext<'a> { pub contract: DdexSequencer::DdexSequencerInstance< alloy::transports::http::Http, - &'a RootProvider>, + &'a FillProvider< + JoinFill< + JoinFill< + JoinFill, NonceFiller>, + ChainIdFiller, + >, + WalletFiller, + >, + RootProvider>, + alloy::transports::http::Http, + Ethereum, + >, >, } @@ -31,7 +48,18 @@ pub struct QueueHeadData { impl DdexSequencerContext<'_> { pub async fn build( - provider: &RootProvider>, + provider: &FillProvider< + JoinFill< + JoinFill< + JoinFill, NonceFiller>, + ChainIdFiller, + >, + WalletFiller, + >, + RootProvider>, + alloy::transports::http::Http, + Ethereum, + >, ) -> Result> { let contract = DdexSequencer::new(constants::DDEX_SEQUENCER_ADDRESS, provider); let result = DdexSequencerContext { contract }; @@ -67,18 +95,52 @@ impl DdexSequencerContext<'_> { Ok(parent_beacon_block_root) } - pub async fn get_queue_head_data( + pub async fn subscribe_to_queue( &self, config: &Config, ) -> Result> { - let blobhead = self.contract.blobQueueHead().call().await?._0; + println!("Subscribed to queue, waiting for new blobs..."); + let ws_url = WsConnect::new(&config.ws_url); + let ws_provider = ProviderBuilder::new().on_ws(ws_url).await?; + + let filter = Filter::new() + .address(constants::DDEX_SEQUENCER_ADDRESS) + .event(DdexSequencer::NewBlobSubmitted::SIGNATURE); + + let subscription = ws_provider.subscribe_logs(&filter).await?; + let mut stream = subscription.into_stream(); + + let mut queue_head_commitment = Bytes::new(); + let mut parent_beacon_block_root = FixedBytes::<32>::new([0u8; 32]); + + while let Some(log) = stream.next().await { + println!("New blob detected!"); + let DdexSequencer::NewBlobSubmitted { commitment } = log.log_decode()?.inner.data; + let block_number = log.block_number.ok_or_else(|| { + return Box::new(OwValidatorNodeError::BlockNotFoundInLog()); + })?; + parent_beacon_block_root = self.get_parent_beacon_block_root(block_number).await?; + queue_head_commitment = commitment; + *config.start_block.borrow_mut() = block_number; + break; + } + Ok(QueueHeadData { + parent_beacon_block_root, + commitment: queue_head_commitment, + }) + } + pub async fn get_queue_head_data( + &self, + config: &Config, + queue_head: FixedBytes<32>, + ) -> Result> { let filter = Filter::new() .address(constants::DDEX_SEQUENCER_ADDRESS) .event(DdexSequencer::NewBlobSubmitted::SIGNATURE) - .from_block(config.start_block); + .from_block(*config.start_block.borrow()); - let logs = self.contract.provider().get_logs(&filter).await?; + let logs = config.provider.get_logs(&filter).await?; let mut queue_head_commitment = Bytes::new(); let mut parent_beacon_block_root = FixedBytes::<32>::new([0u8; 32]); @@ -89,13 +151,14 @@ impl DdexSequencerContext<'_> { let DdexSequencer::NewBlobSubmitted { commitment } = log.log_decode()?.inner.data; let current_blobhash = Self::commitment_to_blobhash(&commitment)?; - if blobhead == current_blobhash { + if queue_head == current_blobhash { let block_number = log.block_number.ok_or_else(|| { return Box::new(OwValidatorNodeError::BlockNotFoundInLog()); })?; parent_beacon_block_root = self.get_parent_beacon_block_root(block_number).await?; queue_head_commitment = commitment; + *config.start_block.borrow_mut() = block_number; break; } } diff --git a/ow_validator_node/src/errors.rs b/ow_validator_node/src/errors.rs index 70c537c4..0da37eb0 100644 --- a/ow_validator_node/src/errors.rs +++ b/ow_validator_node/src/errors.rs @@ -7,6 +7,8 @@ pub enum OwValidatorNodeError { BlockNotFoundInLog(), QueueHeadNotFound(), FailedToFindBlobSidecar(String), + InvalidBlobLength(usize), + InvalidHexStringValue(String), } impl fmt::Display for OwValidatorNodeError { @@ -32,6 +34,16 @@ impl fmt::Display for OwValidatorNodeError { commitment ) } + Self::InvalidBlobLength(blob_len) => { + write!( + f, + "Invalid blob length: expected 262146 ( = 131072 * 2 + 2), got: {}", + blob_len + ) + } + Self::InvalidHexStringValue(hex_byte) => { + write!(f, "Invalid hex value in blob String: {}", hex_byte) + } } } } diff --git a/ow_validator_node/src/lib.rs b/ow_validator_node/src/lib.rs index 948227d9..a9fef9dd 100644 --- a/ow_validator_node/src/lib.rs +++ b/ow_validator_node/src/lib.rs @@ -1,16 +1,41 @@ mod beacon_chain; +mod circuit_mock; mod constants; + mod ddex_sequencer; mod errors; -use alloy::providers::ProviderBuilder; +use alloy::network::{Ethereum, EthereumWallet}; +use alloy::primitives::{Bytes, FixedBytes}; +use alloy::providers::fillers::{ + ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller, +}; +use alloy::providers::{ProviderBuilder, RootProvider}; +use alloy::signers::local::PrivateKeySigner; +use constants::EMPTY_QUEUE_HEAD; +use ddex_sequencer::{DdexSequencerContext, QueueHeadData}; +use std::cell::RefCell; use std::env; use std::error::Error; + pub struct Config { pub rpc_url: String, pub beacon_rpc_url: String, pub ws_url: String, - pub start_block: u64, + pub start_block: RefCell, + pub private_key: String, + pub provider: FillProvider< + JoinFill< + JoinFill< + JoinFill, NonceFiller>, + ChainIdFiller, + >, + WalletFiller, + >, + RootProvider>, + alloy::transports::http::Http, + Ethereum, + >, } impl Config { @@ -21,27 +46,55 @@ impl Config { } pub fn build() -> Result> { + let private_key = Config::get_env_var("PRIVATE_KEY")?; let rpc_url = Config::get_env_var("RPC_URL")?; let beacon_rpc_url = Config::get_env_var("BEACON_RPC_URL")?; let ws_url = Config::get_env_var("WS_URL")?; - let start_block = Config::get_env_var("START_BLOCK")?.parse::()?; + let start_block = RefCell::new(Config::get_env_var("START_BLOCK")?.parse::()?); + + let private_key_signer: PrivateKeySigner = + private_key.parse().expect("Failed to parse PRIVATE_KEY:"); + let wallet = EthereumWallet::from(private_key_signer); + + let provider = ProviderBuilder::new() + .with_recommended_fillers() + .wallet(wallet) + .on_http(rpc_url.parse().expect("RPC_URL parsing error:")); Ok(Config { rpc_url, beacon_rpc_url, ws_url, start_block, + private_key, + provider, }) } } -pub async fn run(config: Config) -> Result<(), Box> { - let provider = - ProviderBuilder::new().on_http(config.rpc_url.parse().expect("RPC_URL parsing error:")); - let ddex_sequencer_context = ddex_sequencer::DdexSequencerContext::build(&provider).await?; +async fn validate_blobs( + config: &Config, + ddex_sequencer_context: &DdexSequencerContext<'_>, +) -> Result<(), Box> { + let queue_head = ddex_sequencer_context + .contract + .blobQueueHead() + .call() + .await? + ._0; - let queue_head_data = ddex_sequencer_context.get_queue_head_data(&config).await?; + let mut queue_head_data: QueueHeadData = QueueHeadData { + commitment: Bytes::new(), + parent_beacon_block_root: FixedBytes::<32>::new([0u8; 32]), + }; + if queue_head == EMPTY_QUEUE_HEAD { + queue_head_data = ddex_sequencer_context.subscribe_to_queue(&config).await?; + } else { + queue_head_data = ddex_sequencer_context + .get_queue_head_data(&config, queue_head) + .await?; + } let blob = beacon_chain::find_blob( &config.beacon_rpc_url, queue_head_data.commitment, @@ -49,6 +102,27 @@ pub async fn run(config: Config) -> Result<(), Box> { ) .await?; - println!("Blob found for the queue head in the DdexSequencer contract: {blob}"); + let decoded = ow_blob_codec::decoder::blob_to_vecs(blob).unwrap(); + + let ddex_messages_data = circuit_mock::extract_message_data(&decoded)?; + println!("sending tx..."); + let receipt = ddex_sequencer_context + .contract + .submitProofOfProcessing(true, ddex_messages_data) + .send() + .await? + .get_receipt() + .await?; + + println!("Receipt tx hash: {}", receipt.transaction_hash); Ok(()) } + +pub async fn run(config: Config) -> Result<(), Box> { + let ddex_sequencer_context = + ddex_sequencer::DdexSequencerContext::build(&config.provider).await?; + + loop { + validate_blobs(&config, &ddex_sequencer_context).await?; + } +} From 9d3e382c6dc1a9baefff0a555e3c6c4cfb51f2e0 Mon Sep 17 00:00:00 2001 From: cezary-stroczynski Date: Fri, 4 Oct 2024 15:15:33 +0200 Subject: [PATCH 2/2] Update README.md file for ow_validator_node --- ow_validator_node/README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/ow_validator_node/README.md b/ow_validator_node/README.md index 8891c00d..7d9152ca 100644 --- a/ow_validator_node/README.md +++ b/ow_validator_node/README.md @@ -2,16 +2,20 @@ This is still very much pre-alfa early stage sketch-draft prototype!!! -This package will simply read the BLOB assigned to the message head in the DdexSequencer queue. +This package currently do: + +1. Read the BLOB associated with queue head of the `DdexSequencer` +2. Decode the BLOB and use `circuit_mock` to extract `isrc` and `GRid` +3. Send `isrc` and `GRid` as events to `DdexSequencer` ### TODO: -- Observe DDEX MESSAGE SEQUENCER contract and listen to the events +- ~~Observe DDEX MESSAGE SEQUENCER contract and listen to the events~~ - check if there is an access on IPFS to the added CID of graphic files inside DDEX message - BLOB processing: - create ZK proof for either successful BLOB processing or for incompatible BLOB - extract key data from DDEX messages packed into the BLOB - pin BLOB to IPFS - pin each DDEX message to IPFS -- prepare and send transaction to DDEX MESSAGE SEQUENCER +- ~~prepare and send transaction to DDEX MESSAGE SEQUENCER~~ - tests!