Skip to content

Commit

Permalink
Remove dag sync blocks (#4120)
Browse files Browse the repository at this point in the history
* add delete sync blocks

* fix fmt and clippy

* return error directly and prolong the duration for reconnection time in reconnect_after_disconnect
  • Loading branch information
jackzhhuang authored May 28, 2024
1 parent 3180971 commit 441a5e0
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 380 deletions.
4 changes: 2 additions & 2 deletions chain/service/src/chain_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ impl ReadableChainService for ChainReaderServiceInner {
let head = self.main.current_header();
if self.main.check_dag_type(&head)? != DagHeaderType::Normal {
bail!(
"The chain is still not a dag and its dag fork number is {} and the current is {:?}.",
"The chain is still not a dag and its dag fork number is {:?} and the current block's header number is {:?}.",
self.main.dag_fork_height()?,
head.number(),
self.main.dag_fork_height()?
);
}
let (dag_genesis, state) = self.main.get_dag_state_by_block(&head)?;
Expand Down
58 changes: 0 additions & 58 deletions flexidag/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,42 +348,6 @@ fn test_dag_tips_store() {
);
}

// #[test]
// fn test_dag_multiple_commits() {
// // initialzie the dag firstly
// let dag = BlockDAG::create_for_testing().unwrap();

// let genesis = BlockHeader::dag_genesis_random()
// .as_builder()
// .with_difficulty(0.into())
// .build();
// dag.init_with_genesis(genesis.clone()).unwrap();

// // normally add the dag blocks
// let mut headers = vec![];
// let mut parents_hash = vec![genesis.id()];
// let mut parent_hash = genesis.id();
// for _ in 0..100 {
// let header_builder = BlockHeaderBuilder::random();
// let header = header_builder
// .with_parent_hash(parent_hash)
// .with_parents_hash(Some(parents_hash.clone()))
// .build();
// parents_hash = vec![header.id()];
// parent_hash = header.id();
// headers.push(header.clone());
// dag.commit(header.to_owned()).unwrap();
// let ghostdata = dag.ghostdata_by_hash(header.id()).unwrap().unwrap();
// }

// for _ in 0..10 {
// for header in &headers {
// let _ = dag.commit(header.clone());
// let _ = dag.ghostdata_by_hash(header.id()).unwrap().unwrap();
// }
// }
// }

#[test]
fn test_dag_multiple_commits() -> anyhow::Result<()> {
set_test_flexidag_fork_height(1);
Expand Down Expand Up @@ -746,33 +710,11 @@ fn test_reachability_algorighm() -> anyhow::Result<()> {
hashes.push(child8);
print_reachability_data(reachability_store.read().deref(), &hashes);

// for _i in 7..=31 {
// let s = Hash::random();
// inquirer::add_block(
// &mut reachability_store,
// s,
// child1,
// &mut vec![child1].into_iter(),
// )?;
// hashes.push(s);
// print_reachability_data(&reachability_store, &hashes);
// }

assert!(
dag.check_ancestor_of(origin, vec![child5])?,
"child 5 must be origin's child"
);

// let mut count = 6;
// loop {
// let child = Hash::random();
// inquirer::add_block(&mut reachability_store, child, origin, &mut vec![origin].into_iter())?;
// hashes.push(child);
// print!("{count:?}");
// print_reachability_data(&reachability_store, &hashes);
// count += 1;
// }

Ok(())
}

Expand Down
24 changes: 16 additions & 8 deletions network-p2p/src/protocol/generic_proto/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::protocol::generic_proto::{GenericProto, GenericProtoOut};

use anyhow::{bail, format_err, Ok};
use futures::prelude::*;
use libp2p::core::{connection::ConnectionId, transport::MemoryTransport, upgrade};
use libp2p::swarm::behaviour::FromSwarm;
Expand Down Expand Up @@ -179,7 +180,7 @@ impl NetworkBehaviour for CustomProtoWithAddr {
}

#[test]
fn reconnect_after_disconnect() {
fn reconnect_after_disconnect() -> anyhow::Result<()> {
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
// check that the disconnect worked, and finally check whether they successfully reconnect.

Expand Down Expand Up @@ -223,15 +224,15 @@ fn reconnect_after_disconnect() {
}
}
ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain,
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
ServiceState::FirstConnec | ServiceState::ConnectedAgain => bail!("unexpected"),
},
future::Either::Left(SwarmEvent::Behaviour(
GenericProtoOut::CustomProtocolClosed { .. },
)) => match service1_state {
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
ServiceState::ConnectedAgain
| ServiceState::NotConnected
| ServiceState::Disconnected => panic!(),
| ServiceState::Disconnected => bail!("unexpected"),
},
future::Either::Right(SwarmEvent::Behaviour(
GenericProtoOut::CustomProtocolOpen { .. },
Expand All @@ -246,15 +247,15 @@ fn reconnect_after_disconnect() {
}
}
ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain,
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
ServiceState::FirstConnec | ServiceState::ConnectedAgain => bail!("unexpected"),
},
future::Either::Right(SwarmEvent::Behaviour(
GenericProtoOut::CustomProtocolClosed { .. },
)) => match service2_state {
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
ServiceState::ConnectedAgain
| ServiceState::NotConnected
| ServiceState::Disconnected => panic!(),
| ServiceState::Disconnected => bail!("unexpected"),
},
_ => {}
}
Expand All @@ -268,7 +269,7 @@ fn reconnect_after_disconnect() {

// Now that the two services have disconnected and reconnected, wait for 3 seconds and
// check whether they're still connected.
let mut delay = futures_timer::Delay::new(Duration::from_secs(3));
let mut delay = futures_timer::Delay::new(Duration::from_secs(20));

loop {
// Grab next event from services.
Expand All @@ -285,9 +286,16 @@ fn reconnect_after_disconnect() {

match event {
SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolOpen { .. })
| SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolClosed { .. }) => panic!(),
| SwarmEvent::Behaviour(GenericProtoOut::CustomProtocolClosed { .. }) => {
bail!("unexpected event: {:?}", event)
}
_ => {}
}
}
});

anyhow::Result::Ok(())
})
.map_err(|e| format_err!("{:?}", e))?;

Ok(())
}
58 changes: 43 additions & 15 deletions network/tests/network_service_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Starcoin Core Contributors
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use anyhow::{anyhow, bail, format_err, Ok};
use futures::stream::StreamExt;
use futures_timer::Delay;
use network_api::messages::{
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn test_connected_nodes() {
}

#[stest::test]
async fn test_event_notify_receive() {
async fn test_event_notify_receive() -> anyhow::Result<()> {
let (network1, network2) = test_helper::build_network_pair().await.unwrap();
// transaction
let msg_send = PeerMessage::new_transactions(
Expand All @@ -149,7 +149,10 @@ async fn test_event_notify_receive() {
);
let mut receiver = network2.message_handler.channel();
network1.service_ref.send_peer_message(msg_send.clone());
let msg_receive = receiver.next().await.unwrap();
let msg_receive = receiver
.next()
.await
.ok_or_else(|| format_err!("in network1, receive message timeout or return none"))?;
assert_eq!(msg_send.notification, msg_receive.notification);

//block
Expand All @@ -162,12 +165,17 @@ async fn test_event_notify_receive() {
);
let mut receiver = network2.message_handler.channel();
network1.service_ref.send_peer_message(msg_send.clone());
let msg_receive = receiver.next().await.unwrap();
let msg_receive = receiver
.next()
.await
.ok_or_else(|| format_err!("in network2, receive message timeout or return none"))?;
assert_eq!(msg_send.notification, msg_receive.notification);

Ok(())
}

#[stest::test]
async fn test_event_notify_receive_repeat_block() {
async fn test_event_notify_receive_repeat_block() -> anyhow::Result<()> {
let (network1, network2) = test_helper::build_network_pair().await.unwrap();

let block = Block::new(BlockHeader::random(), BlockBody::new_empty());
Expand All @@ -189,12 +197,16 @@ async fn test_event_notify_receive_repeat_block() {
assert_eq!(msg_send1.notification, msg_receive1.notification);

//repeat message is filter, so expect timeout error.
let msg_receive2 = async_std::future::timeout(Duration::from_secs(2), receiver.next()).await;
assert!(msg_receive2.is_err());
let result = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await;
if result.is_err() {
Ok(())
} else {
bail!("expect timeout error, but receive message.")
}
}

#[stest::test]
async fn test_event_notify_receive_repeat_transaction() {
async fn test_event_notify_receive_repeat_transaction() -> anyhow::Result<()> {
let (network1, network2) = test_helper::build_network_pair().await.unwrap();

let txn1 = SignedUserTransaction::mock();
Expand Down Expand Up @@ -236,8 +248,12 @@ async fn test_event_notify_receive_repeat_transaction() {
);

//msg3 is empty after filter, so expect timeout error.
let msg_receive3 = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await;
assert!(msg_receive3.is_err());
let result = async_std::future::timeout(Duration::from_secs(1), receiver.next()).await;
if result.is_err() {
Ok(())
} else {
bail!("expect timeout error, but receive message.")
}
}

fn mock_block_info(total_difficulty: U256) -> BlockInfo {
Expand All @@ -250,7 +266,7 @@ fn mock_block_info(total_difficulty: U256) -> BlockInfo {
}

#[stest::test]
async fn test_event_broadcast() {
async fn test_event_broadcast() -> anyhow::Result<()> {
let mut nodes = test_helper::build_network_cluster(3).await.unwrap();
let node3 = nodes.pop().unwrap();
let node2 = nodes.pop().unwrap();
Expand All @@ -268,22 +284,34 @@ async fn test_event_broadcast() {
)));
node1.service_ref.broadcast(notification.clone());

let msg_receive2 = receiver2.next().await.unwrap();
let msg_receive2 = receiver2
.next()
.await
.ok_or_else(|| format_err!("in receive2, receive message timeout or return none"))?;
assert_eq!(notification, msg_receive2.notification);

let msg_receive3 = receiver3.next().await.unwrap();
let msg_receive3 = receiver3
.next()
.await
.ok_or_else(|| format_err!("in receive3, receive message timeout or return none"))?;
assert_eq!(notification, msg_receive3.notification);

//repeat broadcast
node2.service_ref.broadcast(notification.clone());

let msg_receive1 = async_std::future::timeout(Duration::from_secs(1), receiver1.next()).await;
assert!(msg_receive1.is_err());
if msg_receive1.is_ok() {
bail!("expect timeout error, but receive message.")
}

let msg_receive3 = async_std::future::timeout(Duration::from_secs(1), receiver3.next()).await;
assert!(msg_receive3.is_err());
if msg_receive3.is_ok() {
bail!("expect timeout error, but receive message.")
}

print!("{:?}", node1.config.metrics.registry().unwrap().gather());

Ok(())
}

#[stest::test]
Expand Down
4 changes: 4 additions & 0 deletions storage/src/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ impl BlockStorage {
self.dag_sync_block_storage.remove(block_id)
}

pub fn delete_all_dag_sync_blocks(&self) -> Result<()> {
self.dag_sync_block_storage.remove_all()
}

pub fn get_dag_sync_block(&self, block_id: HashValue) -> Result<Option<DagSyncBlock>> {
self.dag_sync_block_storage.get(block_id)
}
Expand Down
5 changes: 5 additions & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ pub trait BlockStore {

fn save_dag_sync_block(&self, block: DagSyncBlock) -> Result<()>;
fn delete_dag_sync_block(&self, block_id: HashValue) -> Result<()>;
fn delete_all_dag_sync_blocks(&self) -> Result<()>;
fn get_dag_sync_block(&self, block_id: HashValue) -> Result<Option<DagSyncBlock>>;
}

Expand Down Expand Up @@ -548,6 +549,10 @@ impl BlockStore for Storage {
self.block_storage.delete_dag_sync_block(block_id)
}

fn delete_all_dag_sync_blocks(&self) -> Result<()> {
self.block_storage.delete_all_dag_sync_blocks()
}

fn get_dag_sync_block(&self, block_id: HashValue) -> Result<Option<DagSyncBlock>> {
self.block_storage.get_dag_sync_block(block_id)
}
Expand Down
24 changes: 23 additions & 1 deletion storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use byteorder::{BigEndian, ReadBytesExt};
use rocksdb::{DBPinnableSlice, WriteBatch as DBWriteBatch};
use starcoin_config::NodeConfig;
use starcoin_crypto::HashValue;
use starcoin_logger::prelude::info;
use starcoin_logger::prelude::{debug, info};
use starcoin_vm_types::state_store::table::TableHandle;
use std::{convert::TryInto, fmt::Debug, marker::PhantomData, sync::Arc};

Expand Down Expand Up @@ -517,6 +517,8 @@ where
fn get_raw(&self, key: K) -> Result<Option<Vec<u8>>>;

fn iter(&self) -> Result<SchemaIterator<K, V>>;

fn remove_all(&self) -> Result<()>;
}

impl KeyCodec for u64 {
Expand Down Expand Up @@ -660,4 +662,24 @@ where
.ok_or_else(|| format_err!("Only support scan on db storage instance"))?;
db.iter::<K, V>(self.get_store().prefix_name)
}

fn remove_all(&self) -> Result<()> {
if let Some(db) = self.get_store().storage().db() {
let mut iter = db.iter::<K, V>(self.get_store().prefix_name)?;
iter.seek_to_first();
for result_item in iter {
match result_item {
Ok(item) => {
let (key, _) = item;
self.remove(key)?;
}
Err(e) => {
debug!("finish to remove all keys in db with an error: {:?}", e);
}
}
}
}

Ok(())
}
}
Loading

0 comments on commit 441a5e0

Please sign in to comment.