Skip to content

Commit

Permalink
fix: store multi-rw range, hash with publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Dec 20, 2023
1 parent 62703c5 commit 2a8732e
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion subfile-exchange/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ tracing-subscriber = { version = "0.3", features = [
] }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", features = ["async_futures"] }

[dev-dependencies.cargo-husky]
version = "1"
Expand Down
34 changes: 34 additions & 0 deletions subfile-exchange/src/publisher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use serde_yaml::to_string;

use crate::config::PublisherArgs;
use crate::errors::Error;
use crate::subfile::local_file_system::Store;
use crate::subfile::{
ipfs::{AddResponse, IpfsClient},
BlockRange, ChunkFile, FileMetaInfo, SubfileManifest,
Expand Down Expand Up @@ -124,12 +125,45 @@ impl SubfilePublisher {

Ok(yaml)
}

pub async fn object_store_write_chunk_file(&self, file_name: &str) -> Result<String, Error> {
let store = Store::new(&self.config.read_dir)?;
let chunk_file = store
.chunk_file(file_name, Some(self.config.chunk_size as usize))
.await?;

tracing::trace!(
file = tracing::field::debug(&chunk_file),
"Created chunk file"
);

let yaml = to_string(&chunk_file).map_err(Error::YamlError)?;
Ok(yaml)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_write_chunk_file() {
let client = IpfsClient::localhost();
let args = PublisherArgs {
read_dir: String::from("../example-file"),
chunk_size: 1048576,
..Default::default()
};
let publisher = SubfilePublisher::new(client, args);
let name = "example-create-17686085.dbin";

// Hash and publish a single file
let chunk_file_yaml = publisher.write_chunk_file(name).unwrap();
let chunk_file_yaml2 = publisher.object_store_write_chunk_file(name).await.unwrap();

assert_eq!(chunk_file_yaml, chunk_file_yaml2);
}

#[tokio::test]
#[ignore] // Run when there is a localhost IPFS node
async fn test_publish() {
Expand Down
60 changes: 50 additions & 10 deletions subfile-exchange/src/subfile/local_file_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use std::sync::Arc;

use crate::subfile::Error;

use super::file_hasher::hash_chunk;
use super::ChunkFile;

pub struct Store {
local_file_system: Arc<LocalFileSystem>,
read_concurrency: usize,
Expand Down Expand Up @@ -74,27 +77,34 @@ impl Store {

pub async fn multipart_read(
&self,
file_name: &str,
location: &str,
chunk_size: Option<usize>,
) -> Result<Vec<Bytes>, Error> {
let object_meta = self
.find_object(file_name, None)
.find_object(location, None)
.await
.ok_or(Error::DataUnavilable(format!(
"Did not find file {}",
file_name
location
)))?;
let step = chunk_size.unwrap_or(object_meta.size / self.read_concurrency);
let ranges = (0..(object_meta.size / step))
let step = chunk_size.unwrap_or({
let s = object_meta.size / self.read_concurrency;
if s > 0 {
s
} else {
object_meta.size
}
});
let ranges = (0..(object_meta.size / step + 1))
.map(|i| std::ops::Range::<usize> {
start: i * step,
end: (i + 1) * step,
end: ((i + 1) * step).min(object_meta.size),
})
.collect::<Vec<std::ops::Range<usize>>>();

let result = self
.local_file_system
.get_ranges(&Path::from(file_name), ranges.as_slice())
.get_ranges(&Path::from(location), ranges.as_slice())
.await
.unwrap();

Expand All @@ -114,10 +124,17 @@ impl Store {
.await
.unwrap();
let size = bytes.len();
let step = chunk_size.unwrap_or({
let s = size / self.write_concurrency;
if s > 0 {
s
} else {
size
}
});

let step = chunk_size.unwrap_or(size / self.write_concurrency);
for i in 0..(size / step) {
let buf = &bytes[i * step..(i + 1) * step];
for i in 0..(size / step + 1) {
let buf = &bytes[i * step..((i + 1) * step).min(size)];
write.write_all(buf).await.unwrap();
}
write.flush().await.unwrap();
Expand All @@ -141,6 +158,29 @@ impl Store {
.await
.map_err(Error::ObjectStoreError)
}

pub async fn chunk_file(
&self,
location: &str,
chunk_size: Option<usize>,
) -> Result<ChunkFile, Error> {
let parts = self.multipart_read(location, chunk_size).await?;
let total_bytes = parts.iter().map(|b| b.len() as u64).sum();
let byte_size_used = parts
.first()
.ok_or(Error::ChunkInvalid(format!(
"No chunk produced from object store {}, with chunk size config of {:#?}",
location, chunk_size
)))?
.len();
let chunk_hashes = parts.iter().map(|c| hash_chunk(c)).collect();

Ok(ChunkFile {
total_bytes,
chunk_size: byte_size_used as u64,
chunk_hashes,
})
}
}

#[cfg(test)]
Expand Down

0 comments on commit 2a8732e

Please sign in to comment.