Skip to content

Commit

Permalink
Initial dedicated intergrating testing app
Browse files Browse the repository at this point in the history
Added tests over passes of fixed length, including turning off receiver and transmitter. Added Terminate API for shutting down listener

Iterating on fixing multi-pass scenario

More fixing attempts
  • Loading branch information
plauche committed May 26, 2023
1 parent 3427eac commit a877707
Show file tree
Hide file tree
Showing 13 changed files with 937 additions and 10 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"local-storage",
"messages",
"myceli",
"testing/testing-scenarios",
"transports"
]

Expand Down Expand Up @@ -57,4 +58,5 @@ tracing-subscriber = { version = "0.3.14", default-features = false, features =
ipfs-unixfs = { path = "ipfs-unixfs" }
local-storage = { path = "local-storage" }
messages = { path = "messages" }
myceli = { path = "myceli" }
transports = { path = "transports" }
41 changes: 39 additions & 2 deletions local-storage/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ impl StorageProvider for SqliteStorageProvider {
WITH RECURSIVE cids(x) AS (
VALUES(?1)
UNION
SELECT block_cid FROM links JOIN cids ON root_cid=x
SELECT block_cid FROM links JOIN cids ON root_cid=x WHERE block_id IS NOT null
)
SELECT x FROM cids;
SELECT cid FROM blocks WHERE cid in cids
",
)?
.query_map([cid], |row| {
Expand Down Expand Up @@ -540,4 +540,41 @@ pub mod tests {
0
);
}

#[test]
pub fn test_verify_get_all_cids() {
let harness = TestHarness::new();

let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00"));
let cid_str = cid.to_string();
let block_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11"));
let child_cid_str = block_cid.to_string();

let other_child_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11"));

let block = StoredBlock {
cid: cid_str.to_string(),
data: vec![],
links: vec![block_cid.to_string(), other_child_cid.to_string()],
};

let child_block = StoredBlock {
cid: block_cid.to_string(),
data: b"101293910101".to_vec(),
links: vec![],
};

harness.provider.import_block(&block).unwrap();

let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap();
assert_eq!(dag_cids, vec![cid_str.to_string()]);

harness.provider.import_block(&child_block).unwrap();

let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap();
assert_eq!(
dag_cids,
vec![cid_str.to_string(), child_cid_str.to_string()]
);
}
}
96 changes: 96 additions & 0 deletions local-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl Storage {
Ok(blocks)
});
let blocks = blocks?;
info!("FileBuilder found {} blocks in {path:?}", blocks.len());
let mut root_cid: Option<String> = None;

blocks.iter().for_each(|b| {
Expand Down Expand Up @@ -65,6 +66,7 @@ impl Storage {
if blocks.len() == 1 {
if let Some(first) = blocks.first() {
root_cid = Some(first.cid().to_string());
info!("set final root {root_cid:?}");
}
}
if let Some(root_cid) = root_cid {
Expand Down Expand Up @@ -139,6 +141,29 @@ impl Storage {
self.provider
.get_dag_blocks_by_window(cid, offset, window_size)
}

pub fn get_last_dag_cid(&self, cid: &str) -> Result<String> {
let dag_cids = self.get_all_dag_cids(cid)?;
match dag_cids.last() {
Some(cid) => Ok(cid.to_owned()),
None => bail!("No last cid found for dag {cid}"),
}
}

// Given a root CID, a number of CIDs, approximate the window we should be in
// pub fn find_dag_window(&self, root: &str, cid_count: u32, window_size: u32) -> Result<u32> {

// let all_cids = self.get_all_dag_cids(root)?;
// let chunks = all_cids.chunks(window_size as usize);
// let mut window_num = 0;
// for c in chunks {
// if c.contains(&child.to_string()) {
// return Ok(window_num);
// }
// window_num += 1;
// }
// bail!("Failed to find child cid {child} in dag {root}");
// }
}

#[cfg(test)]
Expand Down Expand Up @@ -169,6 +194,45 @@ pub mod tests {
}
}

fn generate_stored_blocks(num_blocks: u16) -> Result<Vec<StoredBlock>> {
const CHUNK_SIZE: u16 = 20;
let data_size = CHUNK_SIZE * num_blocks;
let mut data = Vec::<u8>::new();
data.resize(data_size.into(), 1);
thread_rng().fill_bytes(&mut data);

let rt = tokio::runtime::Runtime::new().unwrap();
let blocks = rt.block_on(async {
let file: File = FileBuilder::new()
.content_bytes(data)
.name("testfile")
.fixed_chunker(CHUNK_SIZE.into())
.build()
.await
.unwrap();
let blocks: Vec<_> = file.encode().await.unwrap().try_collect().await.unwrap();
blocks
});
let mut stored_blocks = vec![];

blocks.iter().for_each(|b| {
let links = b
.links()
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>();
let stored = StoredBlock {
cid: b.cid().to_string(),
data: b.data().to_vec(),
links,
};

stored_blocks.push(stored);
});

Ok(stored_blocks)
}

#[test]
pub fn test_import_path_to_storage() {
let harness = TestHarness::new();
Expand Down Expand Up @@ -289,6 +353,38 @@ pub mod tests {
assert_eq!(blocks.len(), cids.len());
}

#[test]
pub fn test_get_all_dag_cids() {
let harness = TestHarness::new();

let mut dag_blocks = generate_stored_blocks(50).unwrap();
let total_block_count = dag_blocks.len();

let root = dag_blocks.pop().unwrap();

harness.storage.import_block(&root).unwrap();

let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
assert_eq!(dag_cids.len(), 1);

for _ in (1..10) {
harness
.storage
.import_block(&dag_blocks.pop().unwrap())
.unwrap();
}

let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
assert_eq!(dag_cids.len(), 10);

while let Some(block) = dag_blocks.pop() {
harness.storage.import_block(&block).unwrap()
}

let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
assert_eq!(dag_cids.len(), total_block_count);
}

// TODO: duplicated data is not being handled correctly right now, need to fix this
// #[test]
// pub fn export_from_storage_various_file_sizes_duplicated_data() {
Expand Down
14 changes: 14 additions & 0 deletions messages/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,22 @@ pub enum ApplicationAPI {
},
// Resumes the transmission of all dags which may be paused
ResumeTransmitAllDags,
// Resumes the transmission of a dag from a prior session, given the last received CID
// for determining where to restart the transmission
ResumePriorDagTransmit {
cid: String,
num_received_cids: u32,
retries: u8,
},
/// Listens on address for data and writes out files received
Receive {
listen_addr: String,
},
/// Commands a node to request another node at target_addr to resume dag transfer
RequestResumeDagTransfer {
cid: String,
target_addr: String,
},
/// Request Available Blocks
RequestAvailableBlocks,
/// Advertise all available blocks by CID
Expand Down Expand Up @@ -103,6 +115,8 @@ pub enum ApplicationAPI {
Version {
version: String,
},
/// Asks IPFS instance to terminate
Terminate,
// TODO: Implement later
// Information about the next pass used for calculating
// data transfer parameters
Expand Down
10 changes: 10 additions & 0 deletions messages/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ pub enum DataProtocol {
target_addr: String,
retries: u8,
},
// Resumes the transmission of a dag which isn't currently tracked in sessions
// This accounts for resuming after restarting of transmitter
ResumePriorDagTransmit {
cid: String,
num_received_cids: u32,
target_addr: String,
retries: u8,
},
// Resumes the transmission of a dag which may have run out of retries or
// paused due to connectivity lost
ResumeTransmitDag {
Expand All @@ -64,4 +72,6 @@ pub enum DataProtocol {
cid: String,
blocks: Vec<String>,
},
// Used by listener to terminate shipper on program exit
Terminate,
}
7 changes: 6 additions & 1 deletion myceli/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{bail, Result};
use local_storage::storage::Storage;
use messages::{ApplicationAPI, DataProtocol, Message};
use std::path::PathBuf;
Expand Down Expand Up @@ -63,6 +63,11 @@ pub fn get_missing_dag_blocks_window_protocol(
}))
}

pub fn get_last_dag_cid(cid: &str, storage: Rc<Storage>) -> Result<String> {
let last_dag_cid = storage.get_last_dag_cid(cid)?;
Ok(last_dag_cid)
}

#[cfg(test)]
pub mod tests {
use super::*;
Expand Down
Loading

0 comments on commit a877707

Please sign in to comment.