diff --git a/chain/service/src/chain_service.rs b/chain/service/src/chain_service.rs index 957e2c3646..3b67f25f8f 100644 --- a/chain/service/src/chain_service.rs +++ b/chain/service/src/chain_service.rs @@ -461,9 +461,9 @@ impl ReadableChainService for ChainReaderServiceInner { let head = self.main.current_header(); if self.main.check_dag_type(&head)? != DagHeaderType::Normal { bail!( - "The chain is still not a dag and its dag fork number is {} and the current is {:?}.", + "The chain is still not a dag and its dag fork number is {:?} and the current block's header number is {:?}.", + self.main.dag_fork_height()?, head.number(), - self.main.dag_fork_height()? ); } let (dag_genesis, state) = self.main.get_dag_state_by_block(&head)?; diff --git a/flexidag/tests/tests.rs b/flexidag/tests/tests.rs index 255f6c70e9..6bcf4c0a4e 100644 --- a/flexidag/tests/tests.rs +++ b/flexidag/tests/tests.rs @@ -348,42 +348,6 @@ fn test_dag_tips_store() { ); } -// #[test] -// fn test_dag_multiple_commits() { -// // initialzie the dag firstly -// let dag = BlockDAG::create_for_testing().unwrap(); - -// let genesis = BlockHeader::dag_genesis_random() -// .as_builder() -// .with_difficulty(0.into()) -// .build(); -// dag.init_with_genesis(genesis.clone()).unwrap(); - -// // normally add the dag blocks -// let mut headers = vec![]; -// let mut parents_hash = vec![genesis.id()]; -// let mut parent_hash = genesis.id(); -// for _ in 0..100 { -// let header_builder = BlockHeaderBuilder::random(); -// let header = header_builder -// .with_parent_hash(parent_hash) -// .with_parents_hash(Some(parents_hash.clone())) -// .build(); -// parents_hash = vec![header.id()]; -// parent_hash = header.id(); -// headers.push(header.clone()); -// dag.commit(header.to_owned()).unwrap(); -// let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); -// } - -// for _ in 0..10 { -// for header in &headers { -// let _ = dag.commit(header.clone()); -// let _ = dag.ghostdata_by_hash(header.id()).unwrap().unwrap(); -// } -// } -// } - #[test] fn test_dag_multiple_commits() -> anyhow::Result<()> { set_test_flexidag_fork_height(1); @@ -746,33 +710,11 @@ fn test_reachability_algorighm() -> anyhow::Result<()> { hashes.push(child8); print_reachability_data(reachability_store.read().deref(), &hashes); - // for _i in 7..=31 { - // let s = Hash::random(); - // inquirer::add_block( - // &mut reachability_store, - // s, - // child1, - // &mut vec![child1].into_iter(), - // )?; - // hashes.push(s); - // print_reachability_data(&reachability_store, &hashes); - // } - assert!( dag.check_ancestor_of(origin, vec![child5])?, "child 5 must be origin's child" ); - // let mut count = 6; - // loop { - // let child = Hash::random(); - // inquirer::add_block(&mut reachability_store, child, origin, &mut vec![origin].into_iter())?; - // hashes.push(child); - // print!("{count:?}"); - // print_reachability_data(&reachability_store, &hashes); - // count += 1; - // } - Ok(()) } diff --git a/network-p2p/src/protocol/generic_proto/tests.rs b/network-p2p/src/protocol/generic_proto/tests.rs index 98d8da7460..9b431940a2 100644 --- a/network-p2p/src/protocol/generic_proto/tests.rs +++ b/network-p2p/src/protocol/generic_proto/tests.rs @@ -20,6 +20,7 @@ use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; +use anyhow::{bail, format_err, Ok}; use futures::prelude::*; use libp2p::core::{connection::ConnectionId, transport::MemoryTransport, upgrade}; use libp2p::swarm::behaviour::FromSwarm; @@ -179,7 +180,7 @@ impl NetworkBehaviour for CustomProtoWithAddr { } #[test] -fn reconnect_after_disconnect() { +fn reconnect_after_disconnect() -> anyhow::Result<()> { // We connect two nodes together, then force a disconnect (through the API of the `Service`), // check that the disconnect worked, and finally check whether they successfully reconnect. @@ -223,7 +224,7 @@ fn reconnect_after_disconnect() { } } ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain, - ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), + ServiceState::FirstConnec | ServiceState::ConnectedAgain => bail!("unexpected"), }, future::Either::Left(SwarmEvent::Behaviour( GenericProtoOut::CustomProtocolClosed { .. }, @@ -231,7 +232,7 @@ fn reconnect_after_disconnect() { ServiceState::FirstConnec => service1_state = ServiceState::Disconnected, ServiceState::ConnectedAgain | ServiceState::NotConnected - | ServiceState::Disconnected => panic!(), + | ServiceState::Disconnected => bail!("unexpected"), }, future::Either::Right(SwarmEvent::Behaviour( GenericProtoOut::CustomProtocolOpen { .. }, @@ -246,7 +247,7 @@ fn reconnect_after_disconnect() { } } ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain, - ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(), + ServiceState::FirstConnec | ServiceState::ConnectedAgain => bail!("unexpected"), }, future::Either::Right(SwarmEvent::Behaviour( GenericProtoOut::CustomProtocolClosed { .. }, @@ -254,7 +255,7 @@ fn reconnect_after_disconnect() { ServiceState::FirstConnec => service2_state = ServiceState::Disconnected, ServiceState::ConnectedAgain | ServiceState::NotConnected - | ServiceState::Disconnected => panic!(), + | ServiceState::Disconnected => bail!("unexpected"), }, _ => {} } @@ -268,7 +269,7 @@ fn reconnect_after_disconnect() { // Now that the two services have disconnected and reconnected, wait for 3 seconds and // check whether they're still connected. - let mut delay = futures_timer::Delay::new(Duration::from_secs(3)); + let mut delay = futures_timer::Delay::new(Duration::from_secs(20)); loop { // Grab next event from services. @@ -285,9 +286,16 @@ fn reconnect_after_disconnect() { match event { SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolOpen { .. }) - | SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolClosed { .. }) => panic!(), + | SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolClosed { .. }) => { + bail!("unexpected event: {:?}", event) + } _ => {} } } - }); + + anyhow::Result::Ok(()) + }) + .map_err(|e| format_err!("{:?}", e))?; + + Ok(()) } diff --git a/network/tests/network_service_test.rs b/network/tests/network_service_test.rs index 3c2fc405fa..45a553dd13 100644 --- a/network/tests/network_service_test.rs +++ b/network/tests/network_service_test.rs @@ -1,7 +1,7 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 -use anyhow::anyhow; +use anyhow::{anyhow, bail, format_err, Ok}; use futures::stream::StreamExt; use futures_timer::Delay; use network_api::messages::{ @@ -140,7 +140,7 @@ async fn test_connected_nodes() { } #[stest::test] -async fn test_event_notify_receive() { +async fn test_event_notify_receive() -> anyhow::Result<()> { let (network1, network2) = test_helper::build_network_pair().await.unwrap(); // transaction let msg_send = PeerMessage::new_transactions( @@ -149,7 +149,10 @@ async fn test_event_notify_receive() { ); let mut receiver = network2.message_handler.channel(); network1.service_ref.send_peer_message(msg_send.clone()); - let msg_receive = receiver.next().await.unwrap(); + let msg_receive = receiver + .next() + .await + .ok_or_else(|| format_err!("in network1, receive message timeout or return none"))?; assert_eq!(msg_send.notification, msg_receive.notification); //block @@ -162,12 +165,17 @@ async fn test_event_notify_receive() { ); let mut receiver = network2.message_handler.channel(); network1.service_ref.send_peer_message(msg_send.clone()); - let msg_receive = receiver.next().await.unwrap(); + let msg_receive = receiver + .next() + .await + .ok_or_else(|| format_err!("in network2, receive message timeout or return none"))?; assert_eq!(msg_send.notification, msg_receive.notification); + + Ok(()) } #[stest::test] -async fn test_event_notify_receive_repeat_block() { +async fn test_event_notify_receive_repeat_block() -> anyhow::Result<()> { let (network1, network2) = test_helper::build_network_pair().await.unwrap(); let block = Block::new(BlockHeader::random(), BlockBody::new_empty()); @@ -189,12 +197,16 @@ async fn test_event_notify_receive_repeat_block() { assert_eq!(msg_send1.notification, msg_receive1.notification); //repeat message is filter, so expect timeout error. - let msg_receive2 = async_std::future::timeout(Duration::from_secs(2), receiver.next()).await; - assert!(msg_receive2.is_err()); + let result = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await; + if result.is_err() { + Ok(()) + } else { + bail!("expect timeout error, but receive message.") + } } #[stest::test] -async fn test_event_notify_receive_repeat_transaction() { +async fn test_event_notify_receive_repeat_transaction() -> anyhow::Result<()> { let (network1, network2) = test_helper::build_network_pair().await.unwrap(); let txn1 = SignedUserTransaction::mock(); @@ -236,8 +248,12 @@ async fn test_event_notify_receive_repeat_transaction() { ); //msg3 is empty after filter, so expect timeout error. - let msg_receive3 = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await; - assert!(msg_receive3.is_err()); + let result = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await; + if result.is_err() { + Ok(()) + } else { + bail!("expect timeout error, but receive message.") + } } fn mock_block_info(total_difficulty: U256) -> BlockInfo { @@ -250,7 +266,7 @@ fn mock_block_info(total_difficulty: U256) -> BlockInfo { } #[stest::test] -async fn test_event_broadcast() { +async fn test_event_broadcast() -> anyhow::Result<()> { let mut nodes = test_helper::build_network_cluster(3).await.unwrap(); let node3 = nodes.pop().unwrap(); let node2 = nodes.pop().unwrap(); @@ -268,22 +284,34 @@ async fn test_event_broadcast() { ))); node1.service_ref.broadcast(notification.clone()); - let msg_receive2 = receiver2.next().await.unwrap(); + let msg_receive2 = receiver2 + .next() + .await + .ok_or_else(|| format_err!("in receive2, receive message timeout or return none"))?; assert_eq!(notification, msg_receive2.notification); - let msg_receive3 = receiver3.next().await.unwrap(); + let msg_receive3 = receiver3 + .next() + .await + .ok_or_else(|| format_err!("in receive3, receive message timeout or return none"))?; assert_eq!(notification, msg_receive3.notification); //repeat broadcast node2.service_ref.broadcast(notification.clone()); let msg_receive1 = async_std::future::timeout(Duration::from_secs(1), receiver1.next()).await; - assert!(msg_receive1.is_err()); + if msg_receive1.is_ok() { + bail!("expect timeout error, but receive message.") + } let msg_receive3 = async_std::future::timeout(Duration::from_secs(1), receiver3.next()).await; - assert!(msg_receive3.is_err()); + if msg_receive3.is_ok() { + bail!("expect timeout error, but receive message.") + } print!("{:?}", node1.config.metrics.registry().unwrap().gather()); + + Ok(()) } #[stest::test] diff --git a/storage/src/block/mod.rs b/storage/src/block/mod.rs index 43a8764b53..84d9deb00b 100644 --- a/storage/src/block/mod.rs +++ b/storage/src/block/mod.rs @@ -408,6 +408,10 @@ impl BlockStorage { self.dag_sync_block_storage.remove(block_id) } + pub fn delete_all_dag_sync_blocks(&self) -> Result<()> { + self.dag_sync_block_storage.remove_all() + } + pub fn get_dag_sync_block(&self, block_id: HashValue) -> Result> { self.dag_sync_block_storage.get(block_id) } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index bead417f0c..08c361f07f 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -292,6 +292,7 @@ pub trait BlockStore { fn save_dag_sync_block(&self, block: DagSyncBlock) -> Result<()>; fn delete_dag_sync_block(&self, block_id: HashValue) -> Result<()>; + fn delete_all_dag_sync_blocks(&self) -> Result<()>; fn get_dag_sync_block(&self, block_id: HashValue) -> Result>; } @@ -548,6 +549,10 @@ impl BlockStore for Storage { self.block_storage.delete_dag_sync_block(block_id) } + fn delete_all_dag_sync_blocks(&self) -> Result<()> { + self.block_storage.delete_all_dag_sync_blocks() + } + fn get_dag_sync_block(&self, block_id: HashValue) -> Result> { self.block_storage.get_dag_sync_block(block_id) } diff --git a/storage/src/storage.rs b/storage/src/storage.rs index ef38706854..fbd238e186 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -12,7 +12,7 @@ use byteorder::{BigEndian, ReadBytesExt}; use rocksdb::{DBPinnableSlice, WriteBatch as DBWriteBatch}; use starcoin_config::NodeConfig; use starcoin_crypto::HashValue; -use starcoin_logger::prelude::info; +use starcoin_logger::prelude::{debug, info}; use starcoin_vm_types::state_store::table::TableHandle; use std::{convert::TryInto, fmt::Debug, marker::PhantomData, sync::Arc}; @@ -517,6 +517,8 @@ where fn get_raw(&self, key: K) -> Result>>; fn iter(&self) -> Result>; + + fn remove_all(&self) -> Result<()>; } impl KeyCodec for u64 { @@ -660,4 +662,24 @@ where .ok_or_else(|| format_err!("Only support scan on db storage instance"))?; db.iter::(self.get_store().prefix_name) } + + fn remove_all(&self) -> Result<()> { + if let Some(db) = self.get_store().storage().db() { + let mut iter = db.iter::(self.get_store().prefix_name)?; + iter.seek_to_first(); + for result_item in iter { + match result_item { + Ok(item) => { + let (key, _) = item; + self.remove(key)?; + } + Err(e) => { + debug!("finish to remove all keys in db with an error: {:?}", e); + } + } + } + } + + Ok(()) + } } diff --git a/storage/src/tests/test_block.rs b/storage/src/tests/test_block.rs index 0024af03de..d481a57509 100644 --- a/storage/src/tests/test_block.rs +++ b/storage/src/tests/test_block.rs @@ -3,15 +3,16 @@ extern crate chrono; +use anyhow::Ok; use bcs_ext::BCSCodec; use chrono::prelude::*; use starcoin_crypto::HashValue; -use crate::block::{FailedBlock, OldFailedBlock}; +use crate::block::{DagSyncBlock, FailedBlock, OldFailedBlock}; use crate::cache_storage::CacheStorage; use crate::db_storage::DBStorage; use crate::storage::StorageInstance; -use crate::Storage; +use crate::{BlockStore, Storage}; use starcoin_config::RocksdbConfig; use starcoin_types::account_address::AccountAddress; use starcoin_types::block::{Block, BlockBody, BlockHeader, BlockHeaderExtra}; @@ -232,3 +233,84 @@ fn test_save_failed_block() { assert_eq!(result.0, block); assert_eq!(result.3, "1".to_string()); } + +fn new_dag_sync_block(children: Vec) -> anyhow::Result { + let dt = Local::now(); + let block_header = BlockHeader::new( + HashValue::random(), + dt.timestamp_nanos() as u64, + 3, + AccountAddress::random(), + HashValue::zero(), + HashValue::random(), + HashValue::zero(), + 0, + U256::zero(), + HashValue::random(), + ChainId::test(), + 0, + BlockHeaderExtra::new([0u8; 4]), + None, + ); + + let block_body = BlockBody::new(vec![SignedUserTransaction::mock()], None); + + let block = Block::new(block_header, block_body); + + anyhow::Ok(DagSyncBlock { block, children }) +} + +fn delete_disc_storage() -> anyhow::Result<()> { + let tmpdir = starcoin_config::temp_dir(); + let storage = Storage::new(StorageInstance::new_db_instance( + DBStorage::new(tmpdir.path(), RocksdbConfig::default(), None).unwrap(), + ))?; + + let mut last_block_id = vec![]; + for _i in 0..10 { + let block = new_dag_sync_block(last_block_id)?; + storage.save_dag_sync_block(block.clone())?; + last_block_id = vec![block.block.id()]; + } + + storage.delete_all_dag_sync_blocks() +} + +fn delete_cache_storage() -> anyhow::Result<()> { + let storage = Storage::new(StorageInstance::new_cache_instance())?; + + let mut last_block_id = vec![]; + for _i in 0..10 { + let block = new_dag_sync_block(last_block_id)?; + storage.save_dag_sync_block(block.clone())?; + last_block_id = vec![block.block.id()]; + } + + storage.delete_all_dag_sync_blocks() +} + +fn delete_disc_and_cache_storage() -> anyhow::Result<()> { + let tmpdir = starcoin_config::temp_dir(); + let storage = Storage::new(StorageInstance::new_cache_and_db_instance( + CacheStorage::new(None), + DBStorage::new(tmpdir.path(), RocksdbConfig::default(), None).unwrap(), + ))?; + + let mut last_block_id = vec![]; + for _i in 0..10 { + let block = new_dag_sync_block(last_block_id)?; + storage.save_dag_sync_block(block.clone())?; + last_block_id = vec![block.block.id()]; + } + + storage.delete_all_dag_sync_blocks() +} + +#[test] +fn test_delete_sync_blocks() -> anyhow::Result<()> { + delete_disc_storage()?; + delete_cache_storage()?; + delete_disc_and_cache_storage()?; + + Ok(()) +} diff --git a/sync/src/block_connector/test_write_dag_block_chain.rs b/sync/src/block_connector/test_write_dag_block_chain.rs index dc7cd4d9ba..4dd56cfc55 100644 --- a/sync/src/block_connector/test_write_dag_block_chain.rs +++ b/sync/src/block_connector/test_write_dag_block_chain.rs @@ -36,19 +36,6 @@ pub fn gen_dag_blocks( } else { bail!("times must > 0") } - - // match result { - // super::write_block_chain::ConnectOk::Duplicate(block) - // | super::write_block_chain::ConnectOk::ExeConnectMain(block) - // | super::write_block_chain::ConnectOk::ExeConnectBranch(block) - // | super::write_block_chain::ConnectOk::Connect(block) => Some(block.header().id()), - // super::write_block_chain::ConnectOk::DagConnected - // | super::write_block_chain::ConnectOk::MainDuplicate - // | super::write_block_chain::ConnectOk::DagPending - // | super::write_block_chain::ConnectOk::DagConnectMissingBlock => { - // unreachable!("should not reach here, result: {:?}", result); - // } - // } } pub fn new_dag_block( @@ -61,17 +48,6 @@ pub fn new_dag_block( None => AccountInfo::random(), }; let miner_address = *miner.address(); - // let parent_id = writeable_block_chain_service.get_main().current_header().id(); - // let mut block_chain = BlockChain::new( - // net.time_service().clone(), - // parent_id, - // writeable_block_chain_service.get_main().get_storage(), - // None, - // writeable_block_chain_service.get_dag().clone(), - // ) - // .unwrap(); - // let current_header = block_chain.current_header - let dag_fork_height = writeable_block_chain_service .get_main() .dag_fork_height()? diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index fed76209b8..d55c8ae5ca 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -265,7 +265,6 @@ where let executed_block = self.main.execute(verified_block)?; let enacted_blocks = vec![executed_block.block().clone()]; self.do_new_head(executed_block, 1, enacted_blocks, 0, vec![])?; - // bail!("failed to apply for tesing the connection later!"); Ok(()) } @@ -296,7 +295,6 @@ where self.update_startup_info(self.main.head_block().header())?; ctx.broadcast(NewHeadBlock { executed_block: Arc::new(self.main.head_block()), - // tips: self.main.status().tips_hash.clone(), }); Ok(()) } else { @@ -520,7 +518,6 @@ where if let Err(e) = self.bus.broadcast(NewHeadBlock { executed_block: Arc::new(block), - // tips: self.main.status().tips_hash.clone(), }) { error!("Broadcast NewHeadBlock error: {:?}", e); } diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index 5cbea1fde1..fe47639692 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -378,10 +378,6 @@ where } absent_blocks.push(parent) } - // if ancestors.contains(&parent) { - // continue; - // } - // ancestors.push(parent); } Ok(()) } @@ -389,7 +385,6 @@ where fn find_absent_parent_dag_blocks_for_blocks( &self, block_headers: Vec, - // ancestors: &mut Vec, absent_blocks: &mut Vec, ) -> Result<()> { for block_header in block_headers { @@ -402,16 +397,10 @@ where &self, mut block_headers: Vec, ) -> Result> { - // let mut ancestors = vec![]; - // let mut absent_block_headers = vec![]; let mut absent_blocks_map = HashMap::new(); loop { let mut absent_blocks = vec![]; - self.find_absent_parent_dag_blocks_for_blocks( - block_headers, - // &mut ancestors, - &mut absent_blocks, - )?; + self.find_absent_parent_dag_blocks_for_blocks(block_headers, &mut absent_blocks)?; if absent_blocks.is_empty() { return Ok(absent_blocks_map.into_values().collect()); } @@ -424,7 +413,6 @@ where remote_absent_blocks.into_iter().for_each(|(_id, block)| { absent_blocks_map.insert(block.id(), block); }); - // absent_block_headers.append(&mut remote_absent_block_headers.into_iter().map(|(_, header)| header.expect("block header should not be none!")).collect()); } } @@ -513,12 +501,10 @@ where process_dag_ancestors .insert(block.header().id(), block.header().clone()); - // execute the children if their parents are ready - // if all children are executed, delete it. self.execute_if_parent_ready(executed_block.block.id())?; - // todo: how to delete the block? - // self.local_store.delete_dag_sync_block(executed_block.block.id())?; + self.local_store + .delete_dag_sync_block(executed_block.block.id())?; self.notify_connected_block( executed_block.block, @@ -542,26 +528,6 @@ where } } - // dag_ancestors = std::mem::take(&mut process_dag_ancestors); - // // process_dag_ancestors = vec![]; - - // dag_ancestors = Self::remove_repeated( - // &self.fetch_dag_block_absent_children(dag_ancestors).await?, - // ); - // source_path.extend(&dag_ancestors); - - // if !dag_ancestors.is_empty() { - // for (id, op_header) in self.fetch_block_headers(dag_ancestors.clone()).await? { - // if let Some(header) = op_header { - // self.ensure_dag_parent_blocks_exist(header, source_path)?; - // } else { - // bail!("when finding the ancestor's children's parents, fetching block header failed, block id: {:?}", id); - // } - // } - // } - - // info!("next dag children blocks: {:?}", dag_ancestors); - Ok(()) }; async_std::task::block_on(fut) @@ -591,9 +557,6 @@ where })?; result.push((block.id(), block)); } - // result.extend(self.fetcher.fetch_blocks(chunk.to_vec()).await?.into_iter().map(|(block, _)| { - // (block.id(), block) - // })); } Ok(result) } @@ -653,6 +616,7 @@ where self.check_enough_by_info(executed_block.block_info)?, )?; self.execute_if_parent_ready(*child)?; + self.local_store.delete_dag_sync_block(*child)?; } } parent_block @@ -682,85 +646,12 @@ where })?; parent_block.children.push(block_header.id()); self.local_store.save_dag_sync_block(parent_block)?; - // return Ok(false); result = Ok(false); } } - // Ok(true) result } - // fn remove_repeated(repeated: &[HashValue]) -> Vec { - // let mut uniqued = vec![]; - // let mut remove_repeated = HashSet::new(); - // for d in repeated { - // if remove_repeated.insert(*d) { - // uniqued.push(*d); - // } - // } - // uniqued - // } - - // async fn fetch_dag_block_absent_children( - // &self, - // mut dag_ancestors: Vec, - // ) -> Result> { - // let mut absent_children = Vec::new(); - // while !dag_ancestors.is_empty() { - // let children = self - // .fetch_dag_block_children(std::mem::take(&mut dag_ancestors)) - // .await?; - // for child in children { - // if self.chain.has_dag_block(child)? { - // if !dag_ancestors.contains(&child) { - // dag_ancestors.push(child); - // } - // } else if !absent_children.contains(&child) { - // absent_children.push(child); - // } - // } - // } - // Ok(absent_children) - // } - - // async fn fetch_dag_block_children( - // &self, - // dag_ancestors: Vec, - // ) -> Result> { - // let mut result = vec![]; - // for chunk in dag_ancestors.chunks(usize::try_from(MAX_BLOCK_REQUEST_SIZE)?) { - // result.extend(self.fetch_dag_block_children_inner(chunk.to_vec()).await?); - // } - // Ok(result) - // } - - // async fn fetch_dag_block_children_inner( - // &self, - // dag_ancestors: Vec, - // ) -> Result> { - // let mut count: i32 = 20; - // while count > 0 { - // info!("fetch block chidlren retry count = {}", count); - // match self - // .fetcher - // .fetch_dag_block_children(dag_ancestors.clone()) - // .await - // { - // Ok(result) => { - // return Ok(result); - // } - // Err(e) => { - // count = count.saturating_sub(1); - // if count == 0 { - // bail!("failed to fetch dag block children due to: {:?}", e); - // } - // async_std::task::sleep(Duration::from_secs(1)).await; - // } - // } - // } - // bail!("failed to fetch dag block children"); - // } - pub fn check_enough_by_info(&self, block_info: BlockInfo) -> Result { if block_info.block_accumulator_info.num_leaves == self.target.block_info.block_accumulator_info.num_leaves @@ -871,6 +762,7 @@ where } fn finish(self) -> Result { + self.local_store.delete_all_dag_sync_blocks()?; Ok(self.chain) } } diff --git a/sync/src/tasks/test_tools.rs b/sync/src/tasks/test_tools.rs index b30ddee711..b9e2b07aa0 100644 --- a/sync/src/tasks/test_tools.rs +++ b/sync/src/tasks/test_tools.rs @@ -13,6 +13,7 @@ use starcoin_account_api::AccountInfo; use starcoin_chain_api::ChainReader; use starcoin_chain_service::ChainReaderService; use starcoin_config::{BuiltinNetworkID, ChainNetwork, NodeConfig, RocksdbConfig}; +use starcoin_dag::blockdag::DEFAULT_GHOSTDAG_K; use starcoin_dag::consensusdb::prelude::FlexiDagStorageConfig; use starcoin_genesis::Genesis; use starcoin_logger::prelude::*; @@ -20,8 +21,6 @@ use starcoin_service_registry::{RegistryAsyncService, RegistryService, ServiceRe use starcoin_storage::db_storage::DBStorage; use starcoin_storage::storage::StorageInstance; use starcoin_storage::Storage; -// use starcoin_txpool_mock_service::MockTxPoolService; -use starcoin_dag::blockdag::DEFAULT_GHOSTDAG_K; #[cfg(test)] use starcoin_txpool_mock_service::MockTxPoolService; use std::fs; @@ -42,9 +41,6 @@ impl SyncTestSystem { pub async fn initialize_sync_system() -> Result { let config = Arc::new(NodeConfig::random_for_test()); - // let (storage, chain_info, _, _) = StarcoinGenesis::init_storage_for_test(config.net()) - // .expect("init storage by genesis fail."); - let temp_path = PathBuf::from(starcoin_config::temp_dir().as_ref()); let storage_path = temp_path.join(Path::new("local/storage")); let dag_path = temp_path.join(Path::new("local/dag")); @@ -212,12 +208,3 @@ pub async fn full_sync_new_node() -> Result<()> { Ok(()) } - -// #[cfg(test)] -// pub async fn generate_red_dag_block() -> Result { -// let net = ChainNetwork::new_builtin(BuiltinNetworkID::Test); -// let mut node = SyncNodeMocker::new(net, 300, 0)?; -// node.produce_block(10)?; -// let block = node.produce_block(1)?; -// Ok(block) -// } diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index d6a24ed9cc..748b8a32a3 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -291,15 +291,14 @@ pub async fn test_full_sync_fork_from_genesis() -> Result<()> { #[stest::test(timeout = 120)] pub async fn test_full_sync_continue() -> Result<()> { - // let net1 = ChainNetwork::new_builtin(BuiltinNetworkID::Test); let test_system = SyncTestSystem::initialize_sync_system().await?; - let mut node1 = test_system.target_node; // SyncNodeMocker::new(net1, 10, 50)?; + let mut node1 = test_system.target_node; let dag = node1.chain().dag(); node1.produce_block(10)?; let arc_node1 = Arc::new(node1); let net2 = ChainNetwork::new_builtin(BuiltinNetworkID::Test); //fork from genesis - let mut node2 = test_system.local_node; // SyncNodeMocker::new(net2.clone(), 1, 50)?; + let mut node2 = test_system.local_node; node2.produce_block(7)?; // first set target to 5. @@ -1090,17 +1089,8 @@ async fn test_sync_block_in_async_connection() -> Result<()> { let test_system = SyncTestSystem::initialize_sync_system().await?; let mut target_node = Arc::new(test_system.target_node); - // let (storage, chain_info, _, _) = - // Genesis::init_storage_for_test(&net).expect("init storage by genesis fail."); - let local_node = Arc::new(test_system.local_node); - // let dag_storage = starcoin_dag::consensusdb::prelude::FlexiDagStorage::create_from_path( - // Path::new("."), - // FlexiDagStorageConfig::new(), - // )?; - // let dag = starcoin_dag::blockdag::BlockDAG::new(8, dag_storage); - target_node = sync_block_in_async_connection( target_node, local_node.clone(), @@ -1118,113 +1108,3 @@ async fn test_sync_block_in_async_connection() -> Result<()> { Ok(()) } - -// #[cfg(test)] -// async fn sync_dag_chain( -// mut target_node: Arc, -// local_node: Arc, -// registry: &ServiceRef, -// ) -> Result<()> { -// Arc::get_mut(&mut target_node) -// .unwrap() -// .produce_block_and_create_dag(21)?; -// Ok(()) - -// let flexidag_service = registry.service_ref::().await?; -// let local_dag_accumulator_info = flexidag_service.send(GetDagAccumulatorInfo).await??.ok_or(anyhow!("dag accumulator is none"))?; - -// let result = sync_dag_full_task( -// local_dag_accumulator_info, -// target_accumulator_info, -// target_node.clone(), -// accumulator_store, -// accumulator_snapshot, -// local_store, -// local_net.time_service(), -// None, -// connector_service, -// network, -// false, -// dag, -// block_chain_service, -// flexidag_service, -// local_net.id().clone(), -// )?; - -// Ok(result) -// } - -// #[cfg(test)] -// async fn sync_dag_block_from_single_chain( -// mut target_node: Arc, -// local_node: Arc, -// registry: &ServiceRef, -// block_count: u64, -// ) -> Result> { -// use starcoin_consensus::BlockDAG; - -// Arc::get_mut(&mut target_node) -// .unwrap() -// .produce_block(block_count)?; -// loop { -// let target = target_node.sync_target(); - -// let storage = local_node.chain().get_storage(); -// let startup_info = storage -// .get_startup_info()? -// .ok_or_else(|| format_err!("Startup info should exist."))?; -// let current_block_id = startup_info.main; - -// let local_net = local_node.chain_mocker.net(); -// let (local_ancestor_sender, _local_ancestor_receiver) = unbounded(); - -// let block_chain_service = async_std::task::block_on( -// registry.service_ref::>(), -// )?; - -// let (sync_task, _task_handle, task_event_counter) = if local_node.chain().head_block().block.header().number() -// > BlockDAG::dag_fork_height_with_net(local_net.id().clone()) { - -// } else { -// full_sync_task( -// current_block_id, -// target.clone(), -// false, -// local_net.time_service(), -// storage.clone(), -// block_chain_service, -// target_node.clone(), -// local_ancestor_sender, -// DummyNetworkService::default(), -// 15, -// ChainNetworkID::TEST, -// None, -// None, -// )? -// }; - -// let branch = sync_task.await?; -// info!("checking branch in sync service is the same as target's branch"); -// assert_eq!(branch.current_header().id(), target.target_id.id()); - -// let block_connector_service = registry -// .service_ref::>() -// .await? -// .clone(); -// let result = block_connector_service -// .send(CheckBlockConnectorHashValue { -// head_hash: target.target_id.id(), -// number: target.target_id.number(), -// }) -// .await?; -// if result.is_ok() { -// break; -// } -// let reports = task_event_counter.get_reports(); -// reports -// .iter() -// .for_each(|report| debug!("reports: {}", report)); -// } - -// Ok(target_node) -// } diff --git a/sync/src/tasks/tests_dag.rs b/sync/src/tasks/tests_dag.rs index ab13fc69f2..fe70ffc5b4 100644 --- a/sync/src/tasks/tests_dag.rs +++ b/sync/src/tasks/tests_dag.rs @@ -17,12 +17,11 @@ use starcoin_txpool_mock_service::MockTxPoolService; use test_helper::DummyNetworkService; #[stest::test(timeout = 120)] -pub async fn test_full_sync_new_node_dag() { +pub async fn test_full_sync_new_node_dag() -> Result<()> { starcoin_types::block::set_test_flexidag_fork_height(10); - full_sync_new_node() - .await - .expect("dag full sync should success"); + full_sync_new_node().await?; starcoin_types::block::reset_test_custom_fork_height(); + Ok(()) } async fn sync_block_process( @@ -182,8 +181,6 @@ async fn test_sync_red_blocks_dag() -> Result<()> { .expect("failed to produce block"); sync_block_process(target_node, local_node, &test_system.registry).await?; - // // genertate the red blocks - // Arc::get_mut(&mut target_node).unwrap().produce_block_by_header(dag_genesis_header, 5).expect("failed to produce block"); starcoin_types::block::reset_test_custom_fork_height(); Ok(()) diff --git a/sync/tests/common_test_sync_libs.rs b/sync/tests/common_test_sync_libs.rs index 1559fb952b..34e0d08a48 100644 --- a/sync/tests/common_test_sync_libs.rs +++ b/sync/tests/common_test_sync_libs.rs @@ -76,10 +76,6 @@ pub fn generate_block(handle: &NodeHandle, count: usize) -> Result<()> { #[allow(unused)] pub fn generate_dag_fork_number(handle: &NodeHandle) -> Result<()> { - // for _i in 0..G_TEST_DAG_FORK_HEIGHT - 3 { - // let (_block, _is_dag) = handle.generate_block()?; - // } - block_on(async move { let current_header = handle .registry() @@ -87,14 +83,7 @@ pub fn generate_dag_fork_number(handle: &NodeHandle) -> Result<()> { .await? .main_head_header() .await?; - // let block_info = handle.storage().get_block_info(current_header.id())?.expect("failed to get the block info"); - - // let accumulator = MerkleAccumulator::new_with_info(block_info.block_accumulator_info, handle.storage().get_accumulator_store(AccumulatorStoreType::Block)); - // let dag_genesis = accumulator.get_leaf(G_TEST_DAG_FORK_HEIGHT)?.expect("failed to get the dag genesis"); - // let dag_genesis_header = handle.storage().get_block(dag_genesis)?.expect("failed to get the dag genesis header"); let mut dag = handle.registry().get_shared::().await?; - // dag.init_with_genesis(dag_genesis_header.header().clone()).expect("failed to initialize dag"); - // Ok(()) dag.save_dag_state(*G_TEST_DAG_FORK_STATE_KEY, DagState { tips: vec![] }) }) } diff --git a/sync/tests/test_rpc_client.rs b/sync/tests/test_rpc_client.rs index d74d9f6aa2..a9f4db3b1a 100644 --- a/sync/tests/test_rpc_client.rs +++ b/sync/tests/test_rpc_client.rs @@ -13,7 +13,6 @@ fn test_verified_client_for_dag() { let (local_handle, target_handle, target_peer_id) = common_test_sync_libs::init_two_node() .expect("failed to initalize the local and target node"); - // common_test_sync_libs::execute_dag_poll_block(target_handle.registry().clone(), 20).expect("failed to execute the dag poll block"); common_test_sync_libs::generate_dag_fork_number(&target_handle) .expect("failed to execute the dag fork number");