From 8bbfa17e6056670216dca3a29dcfc8906629575d Mon Sep 17 00:00:00 2001 From: Renan Santos Date: Fri, 7 Jun 2024 13:38:44 -0300 Subject: [PATCH] fix!: close epochs based on block numbers - Fixes the ClaimMismatch bug on v.1x. - BREAKING CHANGE: some environment variables have changed, check the CHANGELOG. --- CHANGELOG.md | 8 + build/compose-devnet.yaml | 3 +- build/compose-sepolia.yaml | 1 - cmd/gen-devnet/deployer.go | 21 +- cmd/gen-devnet/types.go | 1 - docs/config.md | 14 +- internal/node/config/config.go | 55 +- internal/node/config/generate/Config.toml | 13 +- internal/node/config/generated.go | 22 +- internal/node/services.go | 7 +- offchain/dispatcher/src/config.rs | 10 +- offchain/dispatcher/src/drivers/context.rs | 751 ++++++++++++------ offchain/dispatcher/src/drivers/machine.rs | 554 ++++++------- offchain/dispatcher/src/drivers/mock.rs | 77 +- offchain/dispatcher/src/machine/mod.rs | 1 - .../dispatcher/src/machine/rollups_broker.rs | 59 +- offchain/dispatcher/src/setup.rs | 16 +- offchain/types/src/blockchain_config.rs | 15 +- setup_env.sh | 3 +- test/config.go | 8 +- test/echo_test.go | 2 - 21 files changed, 916 insertions(+), 725 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 529a2e427..891342646 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- Changed the dispatcher to close epochs based on block numbers instead of block timestamps. +- **BREAKING**: changed the dispatcher to use the `CARTESI_CONTRACTS_INPUT_BOX_DEPLOYMENT_BLOCK_NUMBER` environment variable instead of `CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER`. +- **BREAKING**: replaced the environment variable `CARTESI_EPOCH_DURATION` with `CARTESI_EPOCH_LENGTH` to match the new epoch algorithm, and set its default value to 7200 (1 day worth of blocks, considering one block is mined every 12 seconds). +- **BREAKING**: replaced the internal environment variable `RD_EPOCH_DURATION` with `RD_EPOCH_LENGTH` to match the new epoch algorithm, and also set its default value to 7200. - Bumped Rust Version to 1.78.0 +### Removed + +- Removed `CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER` environment variable. + ## [1.4.0] 2024-04-09 ### Added diff --git a/build/compose-devnet.yaml b/build/compose-devnet.yaml index 24576ac30..1adbf9300 100644 --- a/build/compose-devnet.yaml +++ b/build/compose-devnet.yaml @@ -20,12 +20,11 @@ services: CARTESI_BLOCKCHAIN_IS_LEGACY: "false" CARTESI_BLOCKCHAIN_FINALITY_OFFSET: "1" CARTESI_CONTRACTS_APPLICATION_ADDRESS: "0x7C54E3f7A8070a54223469965A871fB8f6f88c22" - CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER: "20" CARTESI_CONTRACTS_HISTORY_ADDRESS: "0x325272217ae6815b494bF38cED004c5Eb8a7CdA7" CARTESI_CONTRACTS_AUTHORITY_ADDRESS: "0x58c93F83fb3304730C95aad2E360cdb88b782010" CARTESI_CONTRACTS_INPUT_BOX_ADDRESS: "0x59b22D57D4f067708AB0c00552767405926dc768" CARTESI_CONTRACTS_INPUT_BOX_DEPLOYMENT_BLOCK_NUMBER: "20" - CARTESI_EPOCH_DURATION: "120" + CARTESI_EPOCH_LENGTH: "120" CARTESI_FEATURE_DISABLE_MACHINE_HASH_CHECK: "true" CARTESI_AUTH_KIND: "mnemonic" CARTESI_AUTH_MNEMONIC: "test test test test test test test test test test test junk" diff --git a/build/compose-sepolia.yaml b/build/compose-sepolia.yaml index e340b8b94..8229efdcd 100644 --- a/build/compose-sepolia.yaml +++ b/build/compose-sepolia.yaml @@ -15,7 +15,6 @@ services: CARTESI_BLOCKCHAIN_IS_LEGACY: "false" CARTESI_BLOCKCHAIN_FINALITY_OFFSET: "1" CARTESI_CONTRACTS_APPLICATION_ADDRESS: "0x9f12D4365806FC000D6555ACB85c5371b464E506" - CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER: "4152308" CARTESI_CONTRACTS_HISTORY_ADDRESS: "0x76f4dCaC0920826541EE718214EEE4be07346cEE" CARTESI_CONTRACTS_AUTHORITY_ADDRESS: "0x5827Ec9365D3a9b27bF1dB982d258Ad234D37242" CARTESI_CONTRACTS_INPUT_BOX_ADDRESS: "0x59b22D57D4f067708AB0c00552767405926dc768" diff --git a/cmd/gen-devnet/deployer.go b/cmd/gen-devnet/deployer.go index 9fcc4a578..25ef0fc22 100644 --- a/cmd/gen-devnet/deployer.go +++ b/cmd/gen-devnet/deployer.go @@ -5,7 +5,6 @@ package main import ( "context" - "encoding/json" "fmt" "os/exec" "strings" @@ -44,7 +43,7 @@ func createApplication(ctx context.Context, hash string) (DeploymentInfo, error) var depInfo DeploymentInfo // Create the Authority/History pair - contractAddresses, _, err := createContracts(ctx, + contractAddresses, err := createContracts(ctx, common.Address.Hex(addresses.GetTestBook().AuthorityHistoryPairFactory), "newAuthorityHistoryPair(address,bytes32)(address,address)", CONTRACT_OWNER_ADDRESS, @@ -57,7 +56,7 @@ func createApplication(ctx context.Context, hash string) (DeploymentInfo, error) depInfo.HistoryAddress = contractAddresses[1] // Create the Application, passing the address of the newly created Authority - contractAddresses, blockNumber, err := createContracts(ctx, + contractAddresses, err = createContracts(ctx, common.Address.Hex(addresses.GetTestBook().CartesiDAppFactory), "newApplication(address,address,bytes32,bytes32)(address)", depInfo.AuthorityAddress, @@ -69,7 +68,6 @@ func createApplication(ctx context.Context, hash string) (DeploymentInfo, error) } depInfo.ApplicationAddress = contractAddresses[0] - depInfo.BlockNumber = blockNumber return depInfo, nil } @@ -80,7 +78,7 @@ func createApplication(ctx context.Context, hash string) (DeploymentInfo, error) // // Warning: a second call to a contract with the same arguments will fail. func createContracts(ctx context.Context, - args ...string) ([]string, string, error) { + args ...string) ([]string, error) { commonArgs := []string{"--rpc-url", RPC_URL} commonArgs = append(commonArgs, args...) @@ -94,7 +92,7 @@ func createContracts(ctx context.Context, castCall.Stdout = &outStrBuilder err := castCall.Run() if err != nil { - return contractAddresses, "", fmt.Errorf("command failed %v: %v", castCall.Args, err) + return contractAddresses, fmt.Errorf("command failed %v: %v", castCall.Args, err) } contractAddresses = strings.Fields(outStrBuilder.String()) @@ -110,20 +108,13 @@ func createContracts(ctx context.Context, castSend.Stdout = &outStrBuilder err = castSend.Run() if err != nil { - return contractAddresses, "", fmt.Errorf("command failed %v: %v", castSend.Args, err) + return contractAddresses, fmt.Errorf("command failed %v: %v", castSend.Args, err) } if VerboseLog { fmt.Printf("deployer: command: %s\n", castSend.Args) fmt.Printf("deployer: output: %s\n", outStrBuilder.String()) } - // Extract blockNumber from JSON output - jsonMap := make(map[string](any)) - err = json.Unmarshal([]byte([]byte(outStrBuilder.String())), &jsonMap) - if err != nil { - return contractAddresses, "", fmt.Errorf("failed to extract block number, %s", err.Error()) - } - blockNumber := jsonMap["blockNumber"].(string) - return contractAddresses, blockNumber, nil + return contractAddresses, nil } diff --git a/cmd/gen-devnet/types.go b/cmd/gen-devnet/types.go index c55181bb3..954a05882 100644 --- a/cmd/gen-devnet/types.go +++ b/cmd/gen-devnet/types.go @@ -7,5 +7,4 @@ type DeploymentInfo struct { AuthorityAddress string `json:"CARTESI_CONTRACTS_AUTHORITY_ADDRESS"` HistoryAddress string `json:"CARTESI_CONTRACTS_HISTORY_ADDRESS"` ApplicationAddress string `json:"CARTESI_CONTRACTS_APPLICATION_ADDRESS"` - BlockNumber string `json:"CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER"` } diff --git a/docs/config.md b/docs/config.md index ecb988b07..a48b45f4e 100644 --- a/docs/config.md +++ b/docs/config.md @@ -116,12 +116,6 @@ Address of the DApp's contract. * **Type:** `string` -## `CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER` - -Block in which the DApp's contract was deployed. - -* **Type:** `int64` - ## `CARTESI_CONTRACTS_AUTHORITY_ADDRESS` Address of the Authority contract. @@ -236,14 +230,14 @@ for more information. * **Type:** `string` * **Default:** `""` -## `CARTESI_EPOCH_DURATION` +## `CARTESI_EPOCH_LENGTH` -Duration of a rollups epoch in seconds. +Length of a rollups epoch in blocks. At the end of each epoch, the node will send claims to the blockchain. -* **Type:** `Duration` -* **Default:** `"86400"` +* **Type:** `uint64` +* **Default:** `"7200"` ## `CARTESI_SNAPSHOT_DIR` diff --git a/internal/node/config/config.go b/internal/node/config/config.go index ad48ef99c..6f9d6391d 100644 --- a/internal/node/config/config.go +++ b/internal/node/config/config.go @@ -13,32 +13,31 @@ import ( // NodeConfig contains all the Node variables. // See the corresponding environment variable for the variable documentation. type NodeConfig struct { - LogLevel LogLevel - LogPretty bool - RollupsEpochDuration Duration - BlockchainID uint64 - BlockchainHttpEndpoint Redacted[string] - BlockchainWsEndpoint Redacted[string] - BlockchainIsLegacy bool - BlockchainFinalityOffset int - BlockchainBlockTimeout int - ContractsApplicationAddress string - ContractsApplicationDeploymentBlockNumber int64 - ContractsHistoryAddress string - ContractsAuthorityAddress string - ContractsInputBoxAddress string - ContractsInputBoxDeploymentBlockNumber int64 - SnapshotDir string - PostgresEndpoint Redacted[string] - HttpAddress string - HttpPort int - FeatureHostMode bool - FeatureDisableClaimer bool - FeatureDisableMachineHashCheck bool - ExperimentalServerManagerBypassLog bool - ExperimentalSunodoValidatorEnabled bool - ExperimentalSunodoValidatorRedisEndpoint string - Auth Auth + LogLevel LogLevel + LogPretty bool + RollupsEpochLength uint64 + BlockchainID uint64 + BlockchainHttpEndpoint Redacted[string] + BlockchainWsEndpoint Redacted[string] + BlockchainIsLegacy bool + BlockchainFinalityOffset int + BlockchainBlockTimeout int + ContractsApplicationAddress string + ContractsHistoryAddress string + ContractsAuthorityAddress string + ContractsInputBoxAddress string + ContractsInputBoxDeploymentBlockNumber int64 + SnapshotDir string + PostgresEndpoint Redacted[string] + HttpAddress string + HttpPort int + FeatureHostMode bool + FeatureDisableClaimer bool + FeatureDisableMachineHashCheck bool + ExperimentalServerManagerBypassLog bool + ExperimentalSunodoValidatorEnabled bool + ExperimentalSunodoValidatorRedisEndpoint string + Auth Auth } // Auth is used to sign transactions. @@ -75,7 +74,7 @@ func FromEnv() NodeConfig { var config NodeConfig config.LogLevel = getLogLevel() config.LogPretty = getLogPretty() - config.RollupsEpochDuration = getEpochDuration() + config.RollupsEpochLength = getEpochLength() config.BlockchainID = getBlockchainId() config.BlockchainHttpEndpoint = Redacted[string]{getBlockchainHttpEndpoint()} config.BlockchainWsEndpoint = Redacted[string]{getBlockchainWsEndpoint()} @@ -83,8 +82,6 @@ func FromEnv() NodeConfig { config.BlockchainFinalityOffset = getBlockchainFinalityOffset() config.BlockchainBlockTimeout = getBlockchainBlockTimeout() config.ContractsApplicationAddress = getContractsApplicationAddress() - config.ContractsApplicationDeploymentBlockNumber = - getContractsApplicationDeploymentBlockNumber() config.ContractsHistoryAddress = getContractsHistoryAddress() config.ContractsAuthorityAddress = getContractsAuthorityAddress() config.ContractsInputBoxAddress = getContractsInputBoxAddress() diff --git a/internal/node/config/generate/Config.toml b/internal/node/config/generate/Config.toml index 175b2c754..bda385b6c 100644 --- a/internal/node/config/generate/Config.toml +++ b/internal/node/config/generate/Config.toml @@ -45,11 +45,11 @@ the snapshot matches the hash in the Application contract.""" # Rollups # -[rollups.CARTESI_EPOCH_DURATION] -default = "86400" # 1 day in seconds -go-type = "Duration" +[rollups.CARTESI_EPOCH_LENGTH] +default = "7200" # 1 day (average) in blocks (considering one block is mined every 12 seconds) +go-type = "uint64" description = """ -Duration of a rollups epoch in seconds. +Length of a rollups epoch in blocks. At the end of each epoch, the node will send claims to the blockchain.""" @@ -101,11 +101,6 @@ go-type = "string" description = """ Address of the DApp's contract.""" -[contracts.CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER] -go-type = "int64" -description = """ -Block in which the DApp's contract was deployed.""" - [contracts.CARTESI_CONTRACTS_HISTORY_ADDRESS] go-type = "string" description = """ diff --git a/internal/node/config/generated.go b/internal/node/config/generated.go index 3c9ba6138..bb60cf9d5 100644 --- a/internal/node/config/generated.go +++ b/internal/node/config/generated.go @@ -281,18 +281,6 @@ func getContractsApplicationAddress() string { return val } -func getContractsApplicationDeploymentBlockNumber() int64 { - s, ok := os.LookupEnv("CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER") - if !ok { - panic("missing env var CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER") - } - val, err := toInt64(s) - if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER: %v", err)) - } - return val -} - func getContractsAuthorityAddress() string { s, ok := os.LookupEnv("CARTESI_CONTRACTS_AUTHORITY_ADDRESS") if !ok { @@ -473,14 +461,14 @@ func getPostgresEndpoint() string { return val } -func getEpochDuration() Duration { - s, ok := os.LookupEnv("CARTESI_EPOCH_DURATION") +func getEpochLength() uint64 { + s, ok := os.LookupEnv("CARTESI_EPOCH_LENGTH") if !ok { - s = "86400" + s = "7200" } - val, err := toDuration(s) + val, err := toUint64(s) if err != nil { - panic(fmt.Sprintf("failed to parse CARTESI_EPOCH_DURATION: %v", err)) + panic(fmt.Sprintf("failed to parse CARTESI_EPOCH_LENGTH: %v", err)) } return val } diff --git a/internal/node/services.go b/internal/node/services.go index 23de90986..892c3db16 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -155,13 +155,12 @@ func newDispatcher(c config.NodeConfig, workDir string) services.CommandService c.BlockchainFinalityOffset)) s.Env = append(s.Env, fmt.Sprintf("REDIS_ENDPOINT=%v", getRedisEndpoint(c))) s.Env = append(s.Env, fmt.Sprintf("DAPP_ADDRESS=%v", c.ContractsApplicationAddress)) - s.Env = append(s.Env, fmt.Sprintf("DAPP_DEPLOYMENT_BLOCK_NUMBER=%v", - c.ContractsApplicationDeploymentBlockNumber)) + s.Env = append(s.Env, fmt.Sprintf("INPUT_BOX_DEPLOYMENT_BLOCK_NUMBER=%v", + c.ContractsInputBoxDeploymentBlockNumber)) s.Env = append(s.Env, fmt.Sprintf("HISTORY_ADDRESS=%v", c.ContractsHistoryAddress)) s.Env = append(s.Env, fmt.Sprintf("AUTHORITY_ADDRESS=%v", c.ContractsAuthorityAddress)) s.Env = append(s.Env, fmt.Sprintf("INPUT_BOX_ADDRESS=%v", c.ContractsInputBoxAddress)) - s.Env = append(s.Env, fmt.Sprintf("RD_EPOCH_DURATION=%v", - int(c.RollupsEpochDuration.Seconds()))) + s.Env = append(s.Env, fmt.Sprintf("RD_EPOCH_LENGTH=%v", c.RollupsEpochLength)) s.Env = append(s.Env, fmt.Sprintf("CHAIN_ID=%v", c.BlockchainID)) s.Env = append(s.Env, fmt.Sprintf("DISPATCHER_HTTP_SERVER_PORT=%v", getPort(c, portOffsetDispatcher))) diff --git a/offchain/dispatcher/src/config.rs b/offchain/dispatcher/src/config.rs index e7ea10a09..eb1f16124 100644 --- a/offchain/dispatcher/src/config.rs +++ b/offchain/dispatcher/src/config.rs @@ -30,9 +30,9 @@ pub struct DispatcherEnvCLIConfig { #[command(flatten)] pub blockchain_config: BlockchainCLIConfig, - /// Duration of rollups epoch in seconds, for which dispatcher will make claims. - #[arg(long, env, default_value = "604800")] - pub rd_epoch_duration: u64, + /// Duration of rollups epoch in blocks, for which dispatcher will make claims. + #[arg(long, env, default_value = "7200")] + pub rd_epoch_length: u64, /// Chain ID #[arg(long, env)] @@ -46,7 +46,7 @@ pub struct DispatcherConfig { pub log_config: LogConfig, pub blockchain_config: BlockchainConfig, - pub epoch_duration: u64, + pub epoch_length: u64, pub chain_id: u64, } @@ -86,7 +86,7 @@ impl Config { broker_config, log_config, blockchain_config, - epoch_duration: dispatcher_config.rd_epoch_duration, + epoch_length: dispatcher_config.rd_epoch_length, chain_id: dispatcher_config.chain_id, }; diff --git a/offchain/dispatcher/src/drivers/context.rs b/offchain/dispatcher/src/drivers/context.rs index 5c788d5ac..a8063bbcf 100644 --- a/offchain/dispatcher/src/drivers/context.rs +++ b/offchain/dispatcher/src/drivers/context.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use crate::{ - machine::{rollups_broker::BrokerFacadeError, BrokerSend, RollupStatus}, + machine::{rollups_broker::BrokerFacadeError, BrokerSend}, metrics::DispatcherMetrics, }; @@ -11,48 +11,50 @@ use types::foldables::Input; #[derive(Debug)] pub struct Context { - inputs_sent_count: u64, - last_event_is_finish_epoch: bool, - last_timestamp: u64, + inputs_sent: u64, + last_input_epoch: Option, + last_finished_epoch: Option, // constants - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, + // metrics dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, } impl Context { pub fn new( - genesis_timestamp: u64, + genesis_block: u64, epoch_length: u64, dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, - status: RollupStatus, ) -> Self { + assert!(epoch_length > 0); Self { - inputs_sent_count: status.inputs_sent_count, - last_event_is_finish_epoch: status.last_event_is_finish_epoch, - last_timestamp: genesis_timestamp, - genesis_timestamp, + inputs_sent: 0, + last_input_epoch: None, + last_finished_epoch: None, + genesis_block, epoch_length, dapp_metadata, metrics, } } - pub fn inputs_sent_count(&self) -> u64 { - self.inputs_sent_count + pub fn inputs_sent(&self) -> u64 { + self.inputs_sent } pub async fn finish_epoch_if_needed( &mut self, - event_timestamp: u64, + block_number: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - if self.should_finish_epoch(event_timestamp) { - self.finish_epoch(event_timestamp, broker).await?; + let epoch = self.calculate_epoch(block_number); + if self.should_finish_epoch(epoch) { + self.finish_epoch(broker).await?; } Ok(()) } @@ -62,98 +64,147 @@ impl Context { input: &Input, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - broker.enqueue_input(self.inputs_sent_count, input).await?; + let input_block_number = input.block_added.number.as_u64(); + self.finish_epoch_if_needed(input_block_number, broker) + .await?; + + broker.enqueue_input(self.inputs_sent, input).await?; + self.metrics .advance_inputs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.inputs_sent_count += 1; - self.last_event_is_finish_epoch = false; + + self.inputs_sent += 1; + + let input_epoch = self.calculate_epoch(input_block_number); + self.last_finished_epoch.map(|last_finished_epoch| { + // Asserting that the calculated epoch comes after the last finished epoch. + // (If last_finished_epoch == None then we don't need the assertion.) + assert!(input_epoch > last_finished_epoch) + }); + self.last_input_epoch = Some(input_epoch); + Ok(()) } } impl Context { - fn calculate_epoch(&self, timestamp: u64) -> u64 { - assert!(timestamp >= self.genesis_timestamp); - (timestamp - self.genesis_timestamp) / self.epoch_length - } - - // This logic works because we call this function with `event_timestamp` being equal to the - // timestamp of each individual input, rather than just the latest from the blockchain. - fn should_finish_epoch(&self, event_timestamp: u64) -> bool { - if self.inputs_sent_count == 0 || self.last_event_is_finish_epoch { - false - } else { - let current_epoch = self.calculate_epoch(self.last_timestamp); - let event_epoch = self.calculate_epoch(event_timestamp); - event_epoch > current_epoch + fn calculate_epoch(&self, block_number: u64) -> u64 { + assert!(block_number >= self.genesis_block); + (block_number - self.genesis_block) / self.epoch_length + } + + fn should_finish_epoch(&self, epoch: u64) -> bool { + // Being (last_input_epoch >= last_finished_epoch) a structural invariant. + // Being the current epoch the epoch of the last input. + // + // If last_finished_epoch is None and last_input_epoch is None, + // then there are no inputs by definition and the current epoch is empty. + // + // If last_finished_epoch is Some(x) and last_input_epoch is Some(x), + // then an epoch was finished and the last enqueued input belongs to that epoch; + // meaning that any subsequent epochs (which includes the current one) are empty. + // + // If last_finished_epoch is Some(x) and last_input_epoch is Some(y), or + // If last_finished_epoch is None and last_input_epoch is Some(_), then + // the current epoch is not empty by definition and by the structural invariant. + // + // The state in which last_finished_epoch is Some(_) and last_input_epoch is None is + // impossible (by the structural invariant). + if self.last_finished_epoch == self.last_input_epoch { + return false; // if the current epoch is empty } + + if epoch == self.last_input_epoch.unwrap() { + return false; // if the current epoch is still not over + } + + epoch > self.last_finished_epoch.unwrap_or(0) } async fn finish_epoch( &mut self, - event_timestamp: u64, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - assert!(event_timestamp >= self.genesis_timestamp); - broker.finish_epoch(self.inputs_sent_count).await?; + // Asserting that there are inputs in the current epoch. + assert!( + match (self.last_input_epoch, self.last_finished_epoch) { + (Some(input_epoch), Some(finished_epoch)) => input_epoch > finished_epoch, + (Some(_), None) => true, // Consider input_epoch greater than None + (None, _) => false, // None is never greater than any value + }, + "Assertion failed: last_input_epoch should be greater than last_finished_epoch" + ); + + broker.finish_epoch(self.inputs_sent).await?; self.metrics .finish_epochs_sent .get_or_create(&self.dapp_metadata) .inc(); - self.last_timestamp = event_timestamp; - self.last_event_is_finish_epoch = true; + + self.last_finished_epoch = self.last_input_epoch; Ok(()) } } #[cfg(test)] -mod private_tests { - use crate::{drivers::mock, metrics::DispatcherMetrics}; +mod tests { + use std::collections::VecDeque; - use super::{Context, DAppMetadata}; + use crate::drivers::mock::Event; + use rollups_events::DAppMetadata; + use serial_test::serial; - // -------------------------------------------------------------------------------------------- - // calculate_epoch_for - // -------------------------------------------------------------------------------------------- + use crate::{drivers::mock, metrics::DispatcherMetrics}; - fn new_context_for_calculate_epoch_test( - genesis_timestamp: u64, - epoch_length: u64, - ) -> Context { - Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 0, - genesis_timestamp, - epoch_length, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), + use super::Context; + + impl Default for Context { + fn default() -> Self { + Context::new( + /* genesis_block */ 0, + /* epoch_length */ 10, + /* dapp_metadata */ DAppMetadata::default(), + /* metrics */ DispatcherMetrics::default(), + ) } } + // -------------------------------------------------------------------------------------------- + // calculate_epoch + // -------------------------------------------------------------------------------------------- + #[test] fn calculate_epoch_with_zero_genesis() { - let epoch_length = 3; - let context = new_context_for_calculate_epoch_test(0, epoch_length); - let n = 10; + let mut context = Context::default(); + context.genesis_block = 0; + context.epoch_length = 10; + + let number_of_epochs = 10; let mut tested = 0; - for epoch in 0..n { - let x = epoch * epoch_length; - let y = (epoch + 1) * epoch_length; - for i in x..y { - assert_eq!(context.calculate_epoch(i), epoch); + for current_epoch in 0..number_of_epochs { + let block_lower_bound = current_epoch * context.epoch_length; + let block_upper_bound = (current_epoch + 1) * context.epoch_length; + for i in block_lower_bound..block_upper_bound { + assert_eq!(context.calculate_epoch(i), current_epoch); tested += 1; } } - assert_eq!(tested, n * epoch_length); - assert_eq!(context.calculate_epoch(9), 3); + + assert_eq!(tested, number_of_epochs * context.epoch_length); + assert_eq!( + context.calculate_epoch(context.epoch_length * number_of_epochs), + context.epoch_length + ); } #[test] fn calculate_epoch_with_offset_genesis() { - let context = new_context_for_calculate_epoch_test(2, 2); + let mut context = Context::default(); + context.genesis_block = 2; + context.epoch_length = 2; + assert_eq!(context.calculate_epoch(2), 0); assert_eq!(context.calculate_epoch(3), 0); assert_eq!(context.calculate_epoch(4), 1); @@ -163,68 +214,119 @@ mod private_tests { #[test] #[should_panic] - fn calculate_epoch_invalid() { - new_context_for_calculate_epoch_test(4, 3).calculate_epoch(2); + fn calculate_epoch_should_panic_because_block_came_before_genesis() { + let mut context = Context::default(); + context.genesis_block = 4; + context.epoch_length = 4; + context.calculate_epoch(2); } // -------------------------------------------------------------------------------------------- - // should_finish_epoch + // should_finish_epoch -- first epoch // -------------------------------------------------------------------------------------------- #[test] - fn should_not_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_the_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_not_finish_epoch_because_of_zero_inputs() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(4)); + fn should_finish_the_first_epoch_after_several_blocks() { + let mut context = Context::default(); + context.inputs_sent = 110; + context.last_input_epoch = Some(9); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(100); + assert_eq!(context.should_finish_epoch(epoch), true); } #[test] - fn should_finish_epoch_because_of_time() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(context.should_finish_epoch(5)); + fn should_not_finish_an_empty_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(10); + assert_eq!(context.should_finish_epoch(epoch), false); } #[test] - fn should_finish_epoch_because_last_event_is_finish_epoch() { - let context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: true, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert!(!context.should_finish_epoch(5)); + fn should_not_finish_a_very_late_empty_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 0; + context.last_input_epoch = None; + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(2340); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_first_epoch() { + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let epoch = context.calculate_epoch(9); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + // -------------------------------------------------------------------------------------------- + // should_finish_epoch -- other epochs + // -------------------------------------------------------------------------------------------- + + #[test] + fn should_finish_epoch() { + let mut context = Context::default(); + context.inputs_sent = 42; + context.last_input_epoch = Some(4); + context.last_finished_epoch = Some(3); + let epoch = context.calculate_epoch(54); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_finish_epoch_by_a_lot() { + let mut context = Context::default(); + context.inputs_sent = 142; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(2); + let epoch = context.calculate_epoch(190); + assert_eq!(context.should_finish_epoch(epoch), true); + } + + #[test] + fn should_not_finish_an_empty_epoch() { + let mut context = Context::default(); + context.inputs_sent = 120; + context.last_input_epoch = Some(9); + context.last_finished_epoch = Some(9); + let epoch = context.calculate_epoch(105); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_very_late_empty_epoch() { + let mut context = Context::default(); + context.inputs_sent = 120; + context.last_input_epoch = Some(15); + context.last_finished_epoch = Some(15); + let epoch = context.calculate_epoch(1000); + assert_eq!(context.should_finish_epoch(epoch), false); + } + + #[test] + fn should_not_finish_a_timely_epoch() { + let mut context = Context::default(); + context.inputs_sent = 230; + context.last_input_epoch = Some(11); + context.last_finished_epoch = Some(10); + let epoch = context.calculate_epoch(110); + assert_eq!(context.should_finish_epoch(epoch), false); } // -------------------------------------------------------------------------------------------- @@ -233,72 +335,33 @@ mod private_tests { #[tokio::test] async fn finish_epoch_ok() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 3, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let broker = mock::Broker::new(vec![], vec![]); - let timestamp = 6; - let result = context.finish_epoch(timestamp, &broker).await; - assert!(result.is_ok()); - assert_eq!(context.last_timestamp, timestamp); - assert!(context.last_event_is_finish_epoch); - } + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; - #[tokio::test] - #[should_panic] - async fn finish_epoch_invalid() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 6, - genesis_timestamp: 5, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; let broker = mock::Broker::new(vec![], vec![]); - let _ = context.finish_epoch(0, &broker).await; + let result = context.finish_epoch(&broker).await; + assert!(result.is_ok()); + assert_eq!(context.inputs_sent, 1); + assert_eq!(context.last_input_epoch, Some(0)); + assert_eq!(context.last_finished_epoch, Some(0)); } #[tokio::test] async fn finish_epoch_broker_error() { - let last_timestamp = 3; - let last_event_is_finish_epoch = false; - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch, - last_timestamp, - genesis_timestamp: 0, - epoch_length: 5, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 1; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch(6, &broker).await; + let result = context.finish_epoch(&broker).await; assert!(result.is_err()); - assert_eq!(context.last_timestamp, last_timestamp); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch - ); + assert_eq!(context.inputs_sent, 1); + assert_eq!(context.last_input_epoch, Some(0)); + assert_eq!(context.last_finished_epoch, None); } -} - -#[cfg(test)] -mod public_tests { - use crate::{ - drivers::mock::{self, SendInteraction}, - machine::RollupStatus, - metrics::DispatcherMetrics, - }; - - use super::{Context, DAppMetadata}; // -------------------------------------------------------------------------------------------- // new @@ -306,26 +369,29 @@ mod public_tests { #[tokio::test] async fn new_ok() { - let genesis_timestamp = 42; + let genesis_block = 42; let epoch_length = 24; - let inputs_sent_count = 150; - let last_event_is_finish_epoch = true; - let rollup_status = RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch, - }; + let context = Context::new( - genesis_timestamp, + genesis_block, epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, ); - assert_eq!(context.genesis_timestamp, genesis_timestamp); - assert_eq!(context.inputs_sent_count, inputs_sent_count); - assert_eq!( - context.last_event_is_finish_epoch, - last_event_is_finish_epoch + + assert_eq!(context.genesis_block, genesis_block); + assert_eq!(context.epoch_length, epoch_length); + assert_eq!(context.dapp_metadata, DAppMetadata::default()); + } + + #[test] + #[should_panic] + fn new_should_panic_because_epoch_length_is_zero() { + Context::new( + 0, + 0, + DAppMetadata::default(), + DispatcherMetrics::default(), ); } @@ -335,17 +401,10 @@ mod public_tests { #[test] fn inputs_sent_count() { - let inputs_sent_count = 42; - let context = Context { - inputs_sent_count, - last_event_is_finish_epoch: false, // ignored - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - assert_eq!(context.inputs_sent_count(), inputs_sent_count); + let number_of_inputs_sent = 42; + let mut context = Context::default(); + context.inputs_sent = number_of_inputs_sent; + assert_eq!(context.inputs_sent(), number_of_inputs_sent); } // -------------------------------------------------------------------------------------------- @@ -354,52 +413,40 @@ mod public_tests { #[tokio::test] async fn finish_epoch_if_needed_true() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(12, &broker).await; assert!(result.is_ok()); - broker - .assert_send_interactions(vec![SendInteraction::FinishedEpoch(1)]); + broker.assert_state(vec![ + Event::FinishEpoch(0), // + ]); } #[tokio::test] async fn finish_epoch_if_needed_false() { - let mut context = Context { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 2, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; + let broker = mock::Broker::new(vec![], vec![]); - let result = context.finish_epoch_if_needed(3, &broker).await; + let result = context.finish_epoch_if_needed(9, &broker).await; assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); + broker.assert_state(vec![]); } #[tokio::test] async fn finish_epoch_if_needed_broker_error() { - let mut context = Context { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - last_timestamp: 2, - genesis_timestamp: 0, - epoch_length: 4, - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); + context.inputs_sent = 9; + context.last_input_epoch = Some(0); + context.last_finished_epoch = None; let broker = mock::Broker::with_finish_epoch_error(); - let result = context.finish_epoch_if_needed(4, &broker).await; + let result = context.finish_epoch_if_needed(28, &broker).await; assert!(result.is_err()); } @@ -409,40 +456,234 @@ mod public_tests { #[tokio::test] async fn enqueue_input_ok() { - let inputs_sent_count = 42; - let mut context = Context { - inputs_sent_count, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; - let input = mock::new_input(2); + let number_of_inputs_sent = 42; + let last_input_epoch = Some(1); + let last_finished_epoch = None; + + let mut context = Context::default(); + context.inputs_sent = number_of_inputs_sent; + context.last_input_epoch = last_input_epoch; + context.last_finished_epoch = last_finished_epoch; + + let input = mock::new_input(22); let broker = mock::Broker::new(vec![], vec![]); let result = context.enqueue_input(&input, &broker).await; assert!(result.is_ok()); - assert_eq!(context.inputs_sent_count, inputs_sent_count + 1); - assert!(!context.last_event_is_finish_epoch); - broker.assert_send_interactions(vec![SendInteraction::EnqueuedInput( - inputs_sent_count, - )]); + + assert_eq!(context.inputs_sent, number_of_inputs_sent + 1); + assert_eq!(context.last_input_epoch, Some(2)); + assert_eq!(context.last_finished_epoch, Some(1)); + + broker.assert_state(vec![ + Event::FinishEpoch(0), + Event::Input(number_of_inputs_sent), + ]); } #[tokio::test] async fn enqueue_input_broker_error() { - let mut context = Context { - inputs_sent_count: 42, - last_event_is_finish_epoch: true, - last_timestamp: 0, // ignored - genesis_timestamp: 0, // ignored - epoch_length: 0, // ignored - dapp_metadata: DAppMetadata::default(), - metrics: DispatcherMetrics::default(), - }; + let mut context = Context::default(); let broker = mock::Broker::with_enqueue_input_error(); - let result = context.enqueue_input(&mock::new_input(2), &broker).await; + let result = context.enqueue_input(&mock::new_input(82), &broker).await; assert!(result.is_err()); } + + // -------------------------------------------------------------------------------------------- + // deterministic behavior + // -------------------------------------------------------------------------------------------- + + #[derive(Clone)] + struct Case { + input_blocks: Vec, + epoch_length: u64, + last_block: u64, + expected: Vec, + } + + #[tokio::test] + #[serial] + async fn deterministic_behavior() { + let cases: Vec = vec![ + Case { + input_blocks: vec![], + epoch_length: 2, + last_block: 100, + expected: vec![], + }, + Case { + input_blocks: vec![0, 1, 4, 5], + epoch_length: 2, + last_block: 10, + expected: vec![ + Event::Input(0), + Event::Input(1), + Event::FinishEpoch(0), + Event::Input(2), + Event::Input(3), + Event::FinishEpoch(1), + ], + }, + Case { + input_blocks: vec![0, 0, 0, 7, 7], + epoch_length: 2, + last_block: 10, + expected: vec![ + Event::Input(0), + Event::Input(1), + Event::Input(2), + Event::FinishEpoch(0), + Event::Input(3), + Event::Input(4), + Event::FinishEpoch(1), + ], + }, + Case { + input_blocks: vec![0, 2], + epoch_length: 2, + last_block: 4, + expected: vec![ + Event::Input(0), + Event::FinishEpoch(0), + Event::Input(1), + Event::FinishEpoch(1), + ], + }, + Case { + input_blocks: vec![1, 2, 4], + epoch_length: 2, + last_block: 6, + expected: vec![ + Event::Input(0), + Event::FinishEpoch(0), + Event::Input(1), + Event::FinishEpoch(1), + Event::Input(2), + Event::FinishEpoch(2), + ], + }, + Case { + input_blocks: vec![0, 1, 1, 2, 3, 4, 5, 5, 5, 6, 7], + epoch_length: 2, + last_block: 7, + expected: vec![ + Event::Input(0), + Event::Input(1), + Event::Input(2), + Event::FinishEpoch(0), + Event::Input(3), + Event::Input(4), + Event::FinishEpoch(1), + Event::Input(5), + Event::Input(6), + Event::Input(7), + Event::Input(8), + Event::FinishEpoch(2), + Event::Input(9), + Event::Input(10), + ], + }, + Case { + input_blocks: vec![0, 5, 9], + epoch_length: 2, + last_block: 10, + expected: vec![ + Event::Input(0), + Event::FinishEpoch(0), + Event::Input(1), + Event::FinishEpoch(1), + Event::Input(2), + Event::FinishEpoch(2), + ], + }, + ]; + for (i, case) in cases.iter().enumerate() { + println!("Testing case {}.", i); + test_deterministic_case(case.clone()).await; + } + } + + // -------------------------------------------------------------------------------------------- + // auxiliary + // -------------------------------------------------------------------------------------------- + + async fn test_deterministic_case(case: Case) { + let broker1 = create_state_as_inputs_are_being_received( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + let broker2 = create_state_by_receiving_all_inputs_at_once( + case.epoch_length, + case.input_blocks.clone(), + case.last_block, + ) + .await; + broker1.assert_state(case.expected.clone()); + broker2.assert_state(case.expected.clone()); + } + + async fn create_state_as_inputs_are_being_received( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("================================================"); + println!("one_block_at_a_time:"); + + let mut input_blocks: VecDeque<_> = input_blocks.into(); + let mut current_input_block = input_blocks.pop_front(); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in 0..=last_block { + while let Some(input_block) = current_input_block { + if block == input_block { + println!("\tenqueue_input(input_block: {})", block); + let input = mock::new_input(block); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + + current_input_block = input_blocks.pop_front(); + } else { + break; + } + } + + println!("\tfinish_epoch_if_needed(block: {})\n", block); + let result = context.finish_epoch_if_needed(block, &broker).await; + assert!(result.is_ok()); + } + + broker + } + + async fn create_state_by_receiving_all_inputs_at_once( + epoch_length: u64, + input_blocks: Vec, + last_block: u64, + ) -> mock::Broker { + println!("all_inputs_at_once:"); + + let mut context = Context::default(); + context.epoch_length = epoch_length; + let broker = mock::Broker::new(vec![], vec![]); + + for block in input_blocks { + println!("\tenqueue_input(input_block: {})\n", block); + let input = mock::new_input(block); + let result = context.enqueue_input(&input, &broker).await; + assert!(result.is_ok()); + } + + println!("\tfinish_epoch_if_needed(last_block: {})", last_block); + let result = context.finish_epoch_if_needed(last_block, &broker).await; + assert!(result.is_ok()); + + println!("================================================"); + + broker + } } diff --git a/offchain/dispatcher/src/drivers/machine.rs b/offchain/dispatcher/src/drivers/machine.rs index 3f22f97f6..f899aae51 100644 --- a/offchain/dispatcher/src/drivers/machine.rs +++ b/offchain/dispatcher/src/drivers/machine.rs @@ -6,9 +6,9 @@ use super::Context; use crate::machine::{rollups_broker::BrokerFacadeError, BrokerSend}; use eth_state_fold_types::{ethereum_types::Address, Block}; -use types::foldables::{DAppInputBox, Input, InputBox}; +use types::foldables::{DAppInputBox, InputBox}; -use tracing::{debug, instrument, trace}; +use tracing::{debug, instrument}; pub struct MachineDriver { dapp_address: Address, @@ -27,21 +27,17 @@ impl MachineDriver { input_box: &InputBox, broker: &impl BrokerSend, ) -> Result<(), BrokerFacadeError> { - let dapp_input_box = - match input_box.dapp_input_boxes.get(&self.dapp_address) { - None => { - debug!("No inputs for dapp {}", self.dapp_address); - return Ok(()); - } - - Some(d) => d, - }; - - self.process_inputs(context, dapp_input_box, broker).await?; + match input_box.dapp_input_boxes.get(&self.dapp_address) { + None => { + debug!("No inputs for dapp {}", self.dapp_address); + } + Some(dapp_input_box) => { + self.process_inputs(context, dapp_input_box, broker).await? + } + }; - context - .finish_epoch_if_needed(block.timestamp.as_u64(), broker) - .await?; + let block = block.number.as_u64(); + context.finish_epoch_if_needed(block, broker).await?; Ok(()) } @@ -57,163 +53,56 @@ impl MachineDriver { ) -> Result<(), BrokerFacadeError> { tracing::trace!( "Last input sent to machine manager `{}`, current input `{}`", - context.inputs_sent_count(), + context.inputs_sent(), dapp_input_box.inputs.len() ); - let input_slice = dapp_input_box - .inputs - .skip(context.inputs_sent_count() as usize); + let input_slice = + dapp_input_box.inputs.skip(context.inputs_sent() as usize); for input in input_slice { - self.process_input(context, &input, broker).await?; + context.enqueue_input(&input, broker).await?; } Ok(()) } - - #[instrument(level = "trace", skip_all)] - async fn process_input( - &self, - context: &mut Context, - input: &Input, - broker: &impl BrokerSend, - ) -> Result<(), BrokerFacadeError> { - let input_timestamp = input.block_added.timestamp.as_u64(); - trace!(?context, ?input_timestamp); - - context - .finish_epoch_if_needed(input_timestamp, broker) - .await?; - - context.enqueue_input(input, broker).await?; - - Ok(()) - } } #[cfg(test)] mod tests { - use eth_state_fold_types::{ethereum_types::H160, Block}; - use rollups_events::DAppMetadata; use std::sync::Arc; + use eth_state_fold_types::ethereum_types::H160; + use rollups_events::DAppMetadata; + use types::foldables::InputBox; + use crate::{ drivers::{ - mock::{self, SendInteraction}, + machine::MachineDriver, + mock::{self, Broker}, Context, }, machine::RollupStatus, metrics::DispatcherMetrics, }; - use super::MachineDriver; - - // -------------------------------------------------------------------------------------------- - // process_input - // -------------------------------------------------------------------------------------------- - - async fn test_process_input( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, + fn new_context(genesis_block: u64, epoch_length: u64) -> Context { + let context = Context::new( + genesis_block, + epoch_length, DAppMetadata::default(), DispatcherMetrics::default(), - rollup_status, ); - let machine_driver = MachineDriver::new(H160::random()); - for block_timestamp in input_timestamps { - let input = mock::new_input(block_timestamp); - let result = machine_driver - .process_input(&mut context, &input, &broker) - .await; - assert!(result.is_ok()); - } - - broker.assert_send_interactions(expected); - } - - #[tokio::test] - async fn process_input_right_before_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_at_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 1, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_last_event_is_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: true, - }; - let input_timestamps = vec![5]; - let send_interactions = vec![SendInteraction::EnqueuedInput(0)]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; + context } - #[tokio::test] - async fn process_input_after_finish_epoch() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![6, 7]; - let send_interactions = vec![ - SendInteraction::FinishedEpoch(3), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; - } - - #[tokio::test] - async fn process_input_crossing_two_epochs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![3, 4, 5, 6, 7, 9, 10, 11]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), - SendInteraction::EnqueuedInput(4), - SendInteraction::EnqueuedInput(5), - SendInteraction::FinishedEpoch(6), - SendInteraction::EnqueuedInput(6), - SendInteraction::EnqueuedInput(7), - ]; - test_process_input(rollup_status, input_timestamps, send_interactions) - .await; + fn new_broker(context: &Context) -> Broker { + mock::Broker::new( + vec![RollupStatus { + inputs_sent_count: context.inputs_sent(), + }], + Vec::new(), + ) } // -------------------------------------------------------------------------------------------- @@ -221,23 +110,16 @@ mod tests { // -------------------------------------------------------------------------------------------- async fn test_process_inputs( - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, + mut context: Context, + broker: Broker, + input_blocks: Vec, + expected: Vec, ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); let machine_driver = MachineDriver::new(H160::random()); let dapp_input_box = types::foldables::DAppInputBox { - inputs: input_timestamps + inputs: input_blocks .iter() - .map(|timestamp| Arc::new(mock::new_input(*timestamp))) + .map(|block| Arc::new(mock::new_input(*block))) .collect::>() .into(), }; @@ -246,48 +128,57 @@ mod tests { .await; assert!(result.is_ok()); - broker.assert_send_interactions(expected); + broker.assert_state(expected); } #[tokio::test] - async fn test_process_inputs_without_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::EnqueuedInput(2), - SendInteraction::EnqueuedInput(3), + async fn process_inputs_without_skipping_inputs() { + let context = new_context(0, 10); + let broker = new_broker(&context); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Input(3), ]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + test_process_inputs(context, broker, input_blocks, expected).await; } #[tokio::test] - async fn process_inputs_with_some_skipping() { - let rollup_status = RollupStatus { - inputs_sent_count: 3, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![SendInteraction::EnqueuedInput(3)]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + async fn process_inputs_with_some_skipped_inputs() { + let mut context = new_context(0, 10); + let mut throwaway_broker = new_broker(&context); + for i in 0..=1 { + assert!(context + .enqueue_input(&mock::new_input(i), &mut throwaway_broker) + .await + .is_ok()); + } + assert_eq!(2, context.inputs_sent()); + + let broker = new_broker(&context); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![mock::Event::Input(2), mock::Event::Input(3)]; + test_process_inputs(context, broker, input_blocks, expected).await; } #[tokio::test] - async fn process_inputs_skipping_all() { - let rollup_status = RollupStatus { - inputs_sent_count: 4, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2, 3, 4]; - let send_interactions = vec![]; - test_process_inputs(rollup_status, input_timestamps, send_interactions) - .await; + async fn process_inputs_skipping_all_inputs() { + let mut context = new_context(0, 10); + let mut throwaway_broker = new_broker(&context); + for i in 0..=3 { + assert!(context + .enqueue_input(&mock::new_input(i), &mut throwaway_broker) + .await + .is_ok()); + } + assert_eq!(4, context.inputs_sent()); + + let broker = new_broker(&context); + let input_blocks = vec![0, 1, 2, 3]; + let expected = vec![]; + test_process_inputs(context, broker, input_blocks, expected).await; } // -------------------------------------------------------------------------------------------- @@ -295,123 +186,236 @@ mod tests { // -------------------------------------------------------------------------------------------- async fn test_react( - block: Block, - rollup_status: RollupStatus, - input_timestamps: Vec, - expected: Vec, - ) { - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - + block: u64, + mut context: Context, + broker: Option, + input_box: Option, + input_blocks: Vec, + expected: Vec, + ) -> (Context, Broker, InputBox) { + let rollup_status = RollupStatus { + inputs_sent_count: context.inputs_sent(), + }; + let broker = broker + .unwrap_or(mock::Broker::new(vec![rollup_status], Vec::new())); let dapp_address = H160::random(); let machine_driver = MachineDriver::new(dapp_address); - let input_box = mock::new_input_box(); + let input_box = input_box.unwrap_or(mock::new_input_box()); let input_box = - mock::update_input_box(input_box, dapp_address, input_timestamps); + mock::update_input_box(input_box, dapp_address, input_blocks); let result = machine_driver - .react(&mut context, &block, &input_box, &broker) + .react(&mut context, &mock::new_block(block), &input_box, &broker) .await; assert!(result.is_ok()); - broker.assert_send_interactions(expected); + broker.assert_state(expected); + + (context, broker, input_box) } #[tokio::test] async fn react_without_finish_epoch() { - let block = mock::new_block(3); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + let block = 3; + let context = new_context(0, 10); + let input_blocks = vec![1, 2]; + let expected = vec![mock::Event::Input(0), mock::Event::Input(1)]; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_with_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![1, 2]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::EnqueuedInput(1), - SendInteraction::FinishedEpoch(2), + let block = 10; + let context = new_context(0, 10); + let input_blocks = vec![1, 2]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Input(1), + mock::Event::FinishEpoch(0), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_with_internal_finish_epoch() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![4, 5]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), + let block = 14; + let context = new_context(0, 10); + let input_blocks = vec![9, 10]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::FinishEpoch(0), + mock::Event::Input(1), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) - .await; + test_react(block, context, None, None, input_blocks, expected).await; } #[tokio::test] async fn react_without_inputs() { - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let broker = mock::Broker::new(vec![rollup_status], Vec::new()); - let mut context = Context::new( - 0, - 5, - DAppMetadata::default(), - DispatcherMetrics::default(), - rollup_status, - ); - let block = mock::new_block(5); - let input_box = mock::new_input_box(); - let machine_driver = MachineDriver::new(H160::random()); - let result = machine_driver - .react(&mut context, &block, &input_box, &broker) - .await; - assert!(result.is_ok()); - broker.assert_send_interactions(vec![]); + let block = 10; + let context = new_context(0, 10); + let input_blocks = vec![]; + let expected = vec![]; + test_react(block, context, None, None, input_blocks, expected).await; } + // NOTE: this test shows we DON'T close the epoch after the first input! #[tokio::test] async fn react_with_inputs_after_first_epoch_length() { - let block = mock::new_block(5); - let rollup_status = RollupStatus { - inputs_sent_count: 0, - last_event_is_finish_epoch: false, - }; - let input_timestamps = vec![7, 8]; - let send_interactions = vec![ - SendInteraction::EnqueuedInput(0), - SendInteraction::FinishedEpoch(1), - SendInteraction::EnqueuedInput(1), + let block = 20; + let context = new_context(0, 10); + let input_blocks = vec![14, 16, 18, 20]; + let expected = vec![ + mock::Event::Input(0), + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::FinishEpoch(0), + mock::Event::Input(3), + ]; + test_react(block, context, None, None, input_blocks, expected).await; + } + + #[tokio::test] + async fn react_is_deterministic() { + let final_expected = vec![ + mock::Event::Input(0), + mock::Event::FinishEpoch(0), + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Input(3), + mock::Event::Input(4), + mock::Event::Input(5), + mock::Event::Input(6), + mock::Event::Input(7), + mock::Event::Input(8), + mock::Event::Input(9), + mock::Event::FinishEpoch(1), + mock::Event::Input(10), + mock::Event::Input(11), + mock::Event::Input(12), + mock::Event::Input(13), + mock::Event::Input(14), + mock::Event::Input(15), + mock::Event::FinishEpoch(2), + mock::Event::Input(16), + mock::Event::Input(17), + mock::Event::Input(18), ]; - test_react(block, rollup_status, input_timestamps, send_interactions) + + { + // original + let block1 = 3100; + let block2 = 6944; + + let context = new_context(0, 1000); + + let input_blocks1 = vec![ + 56, // + // + 1078, // + 1091, // + 1159, // + 1204, // + 1227, // + 1280, // + 1298, // + 1442, // + 1637, // + // + 2827, // + 2881, // + 2883, // + 2887, // + 2891, // + 2934, // + ]; + let mut input_blocks2 = input_blocks1.clone(); + input_blocks2.append(&mut vec![ + 6160, // + 6864, // + 6944, // + ]); + + let expected1 = vec![ + mock::Event::Input(0), + mock::Event::FinishEpoch(0), + mock::Event::Input(1), + mock::Event::Input(2), + mock::Event::Input(3), + mock::Event::Input(4), + mock::Event::Input(5), + mock::Event::Input(6), + mock::Event::Input(7), + mock::Event::Input(8), + mock::Event::Input(9), + mock::Event::FinishEpoch(1), + mock::Event::Input(10), + mock::Event::Input(11), + mock::Event::Input(12), + mock::Event::Input(13), + mock::Event::Input(14), + mock::Event::Input(15), + mock::Event::FinishEpoch(2), + ]; + + let (context, broker, input_box) = test_react( + block1, + context, + None, + None, + input_blocks1, + expected1, + ) + .await; + + test_react( + block2, + context, + Some(broker), + Some(input_box), + input_blocks2, + final_expected.clone(), + ) .await; + } + + { + // reconstruction + let block = 6944; + let context = new_context(0, 1000); + let input_blocks = vec![ + 56, // + // + 1078, // + 1091, // + 1159, // + 1204, // + 1227, // + 1280, // + 1298, // + 1442, // + 1637, // + // + 2827, // + 2881, // + 2883, // + 2887, // + 2891, // + 2934, // + // + 6160, // + 6864, // + 6944, // + ]; + test_react( + block, + context, + None, + None, + input_blocks, + final_expected, + ) + .await; + } } } diff --git a/offchain/dispatcher/src/drivers/mock.rs b/offchain/dispatcher/src/drivers/mock.rs index d56acd616..951eb743e 100644 --- a/offchain/dispatcher/src/drivers/mock.rs +++ b/offchain/dispatcher/src/drivers/mock.rs @@ -12,7 +12,10 @@ use snafu::whatever; use std::{ collections::VecDeque, ops::{Deref, DerefMut}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, Mutex, + }, }; use types::foldables::{DAppInputBox, Input, InputBox}; @@ -24,21 +27,21 @@ use crate::machine::{ // auxiliary functions // ------------------------------------------------------------------------------------------------ -pub fn new_block(timestamp: u32) -> Block { +pub fn new_block(number: u64) -> Block { Block { hash: H256::random(), - number: 0.into(), + number: number.into(), parent_hash: H256::random(), - timestamp: timestamp.into(), + timestamp: 0.into(), logs_bloom: Bloom::default(), } } -pub fn new_input(timestamp: u32) -> Input { +pub fn new_input(block: u64) -> Input { Input { sender: Arc::new(H160::random()), payload: vec![], - block_added: Arc::new(new_block(timestamp)), + block_added: Arc::new(new_block(block)), dapp: Arc::new(H160::random()), tx_hash: Arc::new(H256::default()), } @@ -55,11 +58,11 @@ pub fn new_input_box() -> InputBox { pub fn update_input_box( input_box: InputBox, dapp_address: Address, - timestamps: Vec, + blocks: Vec, ) -> InputBox { - let inputs = timestamps + let inputs = blocks .iter() - .map(|timestamp| Arc::new(new_input(*timestamp))) + .map(|block| Arc::new(new_input(*block))) .collect::>(); let inputs = Vector::from(inputs); let dapp_input_boxes = input_box @@ -77,16 +80,18 @@ pub fn update_input_box( // ------------------------------------------------------------------------------------------------ #[derive(Debug, Clone, Copy, PartialEq)] -pub enum SendInteraction { - EnqueuedInput(u64), - FinishedEpoch(u64), +pub enum Event { + Input(u64), // input index + FinishEpoch(u64), // index of the "external" epoch } #[derive(Debug)] pub struct Broker { pub rollup_statuses: Mutex>, pub next_claims: Mutex>, - pub send_interactions: Mutex>, + pub events: Mutex>, + + finish_epochs: AtomicU16, status_error: bool, enqueue_input_error: bool, finish_epoch_error: bool, @@ -97,7 +102,8 @@ impl Broker { Self { rollup_statuses: Mutex::new(VecDeque::new()), next_claims: Mutex::new(VecDeque::new()), - send_interactions: Mutex::new(Vec::new()), + events: Mutex::new(Vec::new()), + finish_epochs: AtomicU16::new(0), status_error: false, enqueue_input_error: false, finish_epoch_error: false, @@ -126,28 +132,29 @@ impl Broker { broker } - fn send_interactions_len(&self) -> usize { - let mutex_guard = self.send_interactions.lock().unwrap(); + fn events_len(&self) -> usize { + let mutex_guard = self.events.lock().unwrap(); mutex_guard.deref().len() } - fn get_send_interaction(&self, i: usize) -> SendInteraction { - let mutex_guard = self.send_interactions.lock().unwrap(); + fn get_event(&self, i: usize) -> Event { + let mutex_guard = self.events.lock().unwrap(); mutex_guard.deref().get(i).unwrap().clone() } - pub fn assert_send_interactions(&self, expected: Vec) { + pub fn assert_state(&self, expected: Vec) { assert_eq!( - self.send_interactions_len(), + self.events_len(), expected.len(), - "{:?}", - self.send_interactions + "\n{:?}\n{:?}", + self.events.lock().unwrap().deref(), + expected ); - println!("Send interactions:"); + println!("Events:"); for (i, expected) in expected.iter().enumerate() { - let send_interaction = self.get_send_interaction(i); - println!("{:?} - {:?}", send_interaction, expected); - assert_eq!(send_interaction, *expected); + let event = self.get_event(i); + println!("index: {:?} => {:?} - {:?}", i, event, expected); + assert_eq!(event, *expected); } } } @@ -174,25 +181,23 @@ impl BrokerSend for Broker { if self.enqueue_input_error { whatever!("enqueue_input error") } else { - let mut mutex_guard = self.send_interactions.lock().unwrap(); - mutex_guard - .deref_mut() - .push(SendInteraction::EnqueuedInput(input_index)); + let mut mutex_guard = self.events.lock().unwrap(); + mutex_guard.deref_mut().push(Event::Input(input_index)); Ok(()) } } - async fn finish_epoch( - &self, - inputs_sent_count: u64, - ) -> Result<(), BrokerFacadeError> { + async fn finish_epoch(&self, _: u64) -> Result<(), BrokerFacadeError> { if self.finish_epoch_error { whatever!("finish_epoch error") } else { - let mut mutex_guard = self.send_interactions.lock().unwrap(); + let mut mutex_guard = self.events.lock().unwrap(); + let current_epoch = self.finish_epochs.load(Ordering::SeqCst); mutex_guard .deref_mut() - .push(SendInteraction::FinishedEpoch(inputs_sent_count)); + .push(Event::FinishEpoch(current_epoch.into())); + self.finish_epochs + .store(current_epoch + 1, Ordering::SeqCst); Ok(()) } } diff --git a/offchain/dispatcher/src/machine/mod.rs b/offchain/dispatcher/src/machine/mod.rs index 735fdff23..99a4f2623 100644 --- a/offchain/dispatcher/src/machine/mod.rs +++ b/offchain/dispatcher/src/machine/mod.rs @@ -12,7 +12,6 @@ use self::rollups_broker::BrokerFacadeError; #[derive(Debug, Clone, Copy, Default)] pub struct RollupStatus { pub inputs_sent_count: u64, - pub last_event_is_finish_epoch: bool, } #[async_trait] diff --git a/offchain/dispatcher/src/machine/rollups_broker.rs b/offchain/dispatcher/src/machine/rollups_broker.rs index e430e4fe7..a2f729766 100644 --- a/offchain/dispatcher/src/machine/rollups_broker.rs +++ b/offchain/dispatcher/src/machine/rollups_broker.rs @@ -69,7 +69,24 @@ impl BrokerFacade { broker: &mut sync::MutexGuard<'_, Broker>, ) -> Result { let event = self.peek(broker).await?; - Ok(event.into()) + + let old_epoch_index = event + .clone() + .map(|event| event.payload.epoch_index) + .unwrap_or(0); + + // The epoch gets incremented inside this ".into()"! + // Check From> for BrokerStreamStatus + let status: BrokerStreamStatus = event.into(); + + // Asserting that the epoch_index is continuous. + let new_epoch_index = status.epoch_number; + assert!( + new_epoch_index == old_epoch_index + || new_epoch_index == old_epoch_index + 1 + ); + + Ok(status) } #[tracing::instrument(level = "trace", skip_all)] @@ -188,18 +205,8 @@ impl BrokerSend for BrokerFacade { impl From for RollupStatus { fn from(payload: RollupsInput) -> Self { - let inputs_sent_count = payload.inputs_sent_count; - - match payload.data { - RollupsData::AdvanceStateInput { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: false, - }, - - RollupsData::FinishEpoch { .. } => RollupStatus { - inputs_sent_count, - last_event_is_finish_epoch: true, - }, + RollupStatus { + inputs_sent_count: payload.inputs_sent_count, } } } @@ -219,7 +226,7 @@ impl From> for BrokerStreamStatus { RollupsData::FinishEpoch { .. } => Self { id, - epoch_number: epoch_index + 1, + epoch_number: epoch_index + 1, // Epoch number being incremented! status: payload.into(), }, } @@ -330,7 +337,6 @@ mod broker_facade_tests { let (_fixture, broker) = setup(&docker).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 0); - assert!(!status.last_event_is_finish_epoch); } #[tokio::test] @@ -340,7 +346,6 @@ mod broker_facade_tests { produce_advance_state_inputs(&fixture, 1).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 1); - assert!(!status.last_event_is_finish_epoch); } #[tokio::test] @@ -350,28 +355,6 @@ mod broker_facade_tests { produce_advance_state_inputs(&fixture, 10).await; let status = broker.status().await.expect("'status' function failed"); assert_eq!(status.inputs_sent_count, 10); - assert!(!status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_is_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 0); - assert!(status.last_event_is_finish_epoch); - } - - #[tokio::test] - async fn status_inputs_with_finish_epoch() { - let docker = Cli::default(); - let (fixture, broker) = setup(&docker).await; - produce_advance_state_inputs(&fixture, 5).await; - produce_finish_epoch_input(&fixture).await; - let status = broker.status().await.expect("'status' function failed"); - assert_eq!(status.inputs_sent_count, 5); - assert!(status.last_event_is_finish_epoch); } // -------------------------------------------------------------------------------------------- diff --git a/offchain/dispatcher/src/setup.rs b/offchain/dispatcher/src/setup.rs index 97f632168..c95d02bc0 100644 --- a/offchain/dispatcher/src/setup.rs +++ b/offchain/dispatcher/src/setup.rs @@ -80,15 +80,14 @@ pub async fn create_context( dapp_metadata: DAppMetadata, metrics: DispatcherMetrics, ) -> Result { - let dapp_deployment_block_number = - U64::from(config.blockchain_config.dapp_deployment_block_number); - let genesis_timestamp: u64 = block_server - .query_block(dapp_deployment_block_number) + let input_box_deployment_block_number = + U64::from(config.blockchain_config.input_box_deployment_block_number); + let genesis_block = block_server + .query_block(input_box_deployment_block_number) .await .context(StateServerSnafu)? - .timestamp + .number .as_u64(); - let epoch_length = config.epoch_duration; let status = broker.status().await.context(BrokerSnafu)?; @@ -97,11 +96,10 @@ pub async fn create_context( ensure!(status.inputs_sent_count == 0, DirtyBrokerSnafu); let context = Context::new( - genesis_timestamp, - epoch_length, + genesis_block, + config.epoch_length, dapp_metadata, metrics, - status, ); Ok(context) diff --git a/offchain/types/src/blockchain_config.rs b/offchain/types/src/blockchain_config.rs index c38c02cd1..e6250a0a3 100644 --- a/offchain/types/src/blockchain_config.rs +++ b/offchain/types/src/blockchain_config.rs @@ -35,9 +35,9 @@ pub struct BlockchainCLIConfig { #[arg(long, env)] pub dapp_address: Option, - /// DApp deployment block number + /// Input box deployment block number #[arg(long, env)] - pub dapp_deployment_block_number: Option, + pub input_box_deployment_block_number: Option, /// History contract address #[arg(long, env)] @@ -63,7 +63,7 @@ pub struct BlockchainCLIConfig { #[derive(Clone, Debug)] pub struct BlockchainConfig { pub dapp_address: Address, - pub dapp_deployment_block_number: u64, + pub input_box_deployment_block_number: u64, pub history_address: Address, pub authority_address: Address, pub input_box_address: Address, @@ -96,7 +96,8 @@ impl TryFrom for BlockchainConfig { // try to get the values from the environment values let mut dapp_address = cli.dapp_address.map(deserialize::
).transpose()?; - let mut dapp_deployment_block_number = cli.dapp_deployment_block_number; + let input_box_deployment_block_number = + cli.input_box_deployment_block_number; let mut history_address = cli .history_address .map(deserialize::
) @@ -115,8 +116,6 @@ impl TryFrom for BlockchainConfig { cli.dapp_deployment_file.map(read::).transpose()? { dapp_address = dapp_address.or(file.address); - dapp_deployment_block_number = - dapp_deployment_block_number.or(file.block_number); } if let Some(file) = cli .rollups_deployment_file @@ -142,8 +141,8 @@ impl TryFrom for BlockchainConfig { Ok(BlockchainConfig { dapp_address: check_missing!(dapp_address), - dapp_deployment_block_number: check_missing!( - dapp_deployment_block_number + input_box_deployment_block_number: check_missing!( + input_box_deployment_block_number ), history_address: check_missing!(history_address), authority_address: check_missing!(authority_address), diff --git a/setup_env.sh b/setup_env.sh index 7b65dab06..e4b1cc720 100644 --- a/setup_env.sh +++ b/setup_env.sh @@ -3,7 +3,7 @@ export CARTESI_LOG_PRETTY="true" export CARTESI_FEATURE_HOST_MODE="false" export CARTESI_FEATURE_DISABLE_CLAIMER="false" export CARTESI_FEATURE_DISABLE_MACHINE_HASH_CHECK="false" -export CARTESI_EPOCH_DURATION="120" +export CARTESI_EPOCH_LENGTH="10" export CARTESI_BLOCKCHAIN_ID="31337" export CARTESI_BLOCKCHAIN_HTTP_ENDPOINT="http://localhost:8545" export CARTESI_BLOCKCHAIN_WS_ENDPOINT="ws://localhost:8545" @@ -11,7 +11,6 @@ export CARTESI_BLOCKCHAIN_IS_LEGACY="false" export CARTESI_BLOCKCHAIN_FINALITY_OFFSET="1" export CARTESI_BLOCKCHAIN_BLOCK_TIMEOUT="60" export CARTESI_CONTRACTS_APPLICATION_ADDRESS="0x7C54E3f7A8070a54223469965A871fB8f6f88c22" -export CARTESI_CONTRACTS_APPLICATION_DEPLOYMENT_BLOCK_NUMBER="20" export CARTESI_CONTRACTS_HISTORY_ADDRESS="0x325272217ae6815b494bF38cED004c5Eb8a7CdA7" export CARTESI_CONTRACTS_AUTHORITY_ADDRESS="0x58c93F83fb3304730C95aad2E360cdb88b782010" export CARTESI_CONTRACTS_INPUT_BOX_ADDRESS="0x59b22D57D4f067708AB0c00552767405926dc768" diff --git a/test/config.go b/test/config.go index b491fd8cf..9c5f742ab 100644 --- a/test/config.go +++ b/test/config.go @@ -5,9 +5,7 @@ package endtoendtests import ( - "fmt" "log/slog" - "time" "github.com/cartesi/rollups-node/internal/node/config" "github.com/cartesi/rollups-node/pkg/addresses" @@ -22,7 +20,7 @@ const ( LocalHttpPort = 10000 LocalBlockTimeout = 120 LocalFinalityOffset = 1 - LocalEpochDurationInSeconds = 240 + LocalEpochLength = 5 ) func NewLocalNodeConfig(localPostgresEnpoint string, localBlockchainHttpEndpoint string, @@ -41,8 +39,7 @@ func NewLocalNodeConfig(localPostgresEnpoint string, localBlockchainHttpEndpoint config.Redacted[string]{Value: localPostgresEnpoint} //Epoch - nodeConfig.RollupsEpochDuration, _ = - time.ParseDuration(fmt.Sprintf("%ds", LocalEpochDurationInSeconds)) + nodeConfig.RollupsEpochLength = LocalEpochLength //Blochain nodeConfig.BlockchainID = LocalBlockchainID @@ -58,7 +55,6 @@ func NewLocalNodeConfig(localPostgresEnpoint string, localBlockchainHttpEndpoint nodeConfig.ContractsHistoryAddress = book.HistoryAddress.Hex() nodeConfig.ContractsAuthorityAddress = book.AuthorityAddress.Hex() nodeConfig.ContractsApplicationAddress = book.CartesiDApp.Hex() - nodeConfig.ContractsApplicationDeploymentBlockNumber = LocalApplicationDeploymentBlockNumber nodeConfig.ContractsInputBoxAddress = book.InputBox.Hex() nodeConfig.ContractsInputBoxDeploymentBlockNumber = LocalInputBoxDeploymentBlockNumber diff --git a/test/echo_test.go b/test/echo_test.go index ab645d67c..dfaea7cd9 100644 --- a/test/echo_test.go +++ b/test/echo_test.go @@ -153,8 +153,6 @@ func (s *EchoInputTestSuite) TestSendInput() { // Check input was correctly added to the blockchain s.Require().Equal(0, inputIndex) - s.Require().Nil(ethutil.AdvanceDevnetTime(s.ctx, s.blockchainHttpEndpoint, devNetAdvanceTimeInSeconds)) - // Get Input with vouchers and proofs graphQlClient := graphql.NewClient(graphqlEndpoint, nil) var input *readerclient.Input