diff --git a/docs/publisher_guide.md b/docs/publisher_guide.md index e782e00..6680521 100644 --- a/docs/publisher_guide.md +++ b/docs/publisher_guide.md @@ -19,7 +19,7 @@ Publisher must have read access to all files contained in the Bundle. The publis ### CLI usage -The publisher must provide a name for the bundle, filenames, file type, version, and path for read-access of the files. +The publisher must provide path for read-access of the files. Publishing separate files stored in the local file system ``` @@ -28,6 +28,13 @@ $ file-exchange publisher \ local-files --main-dir ./example-file/ ``` +Publishing a set of files/objects by the folders/prefixes +``` +$ file-exchange publisher \ + --prefixes example-file \ + local-files --main-dir ./ +``` + Publishing files/objects stored in a remote s3 bucket into a bundle, provide s3 and bundle configurations. ``` $ file-exchange publisher \ @@ -45,57 +52,5 @@ $ file-exchange publisher \ For more information ``` -$ file-exchange --help - -Publisher takes the files, generate bundle manifest, -and publish to IPFS - -Usage: file-exchange publisher [OPTIONS] - -Commands: - local-files - object-storage - help Print this message or the help - of the given subcommand(s) - -Options: - --yaml-store - Path to the directory to store the generated - yaml file for bundle [env: YAML_STORE_DIR=] - [default: ./example-file/bundle.yaml] - --chunk-size - Chunk size in bytes to split files (Default: - 1048576 bytes = 1MiB) [env: CHUNK_SIZE=] - [default: 1048576] - --filenames - Name for the files to publish [env: - FILE_NAMES=] - --bundle-name - Name for the bundle (later this can be - interactive) [env: BUNDLE_NAME=] - --file-type - Type of the file (e.g., sql_snapshot, - flatfiles) [env: FILE_TYPE=] - --bundle-version - Bundle versioning [env: FILE_VERSION=] - --identifier - Identifier of the file given its type - (chain-id for firehose flatfiles, subgraph - deployment hash for subgraph snapshots) - [env: IDENTIFIER=] - --start-block - Start block for flatfiles [env: - START_BLOCK=] - --end-block - End block for sql snapshot or flatfiles - [env: END_BLOCK=] - --description - Describe bundle content [env: DESCRIPTION=] - [default: ] - --chain-id - Network represented in CCIP ID (Ethereum - mainnet: 1, goerli: 5, arbitrum-one: 42161, - sepolia: 58008 [env: NETWORK=] [default: 1] - -h, --help - Print help +$ file-exchange publisher --help ``` diff --git a/file-exchange/Cargo.toml b/file-exchange/Cargo.toml index fbe3f3a..f986d12 100644 --- a/file-exchange/Cargo.toml +++ b/file-exchange/Cargo.toml @@ -96,10 +96,6 @@ harness = false name = "validate_local" harness = false -[[bench]] -name = "new_file_manifest" -harness = false - [[bench]] name = "hash_chunk" harness = false diff --git a/file-exchange/benches/new_file_manifest.rs b/file-exchange/benches/new_file_manifest.rs deleted file mode 100644 index 7092b8f..0000000 --- a/file-exchange/benches/new_file_manifest.rs +++ /dev/null @@ -1,27 +0,0 @@ -use criterion::async_executor::FuturesExecutor; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; - -use file_exchange::{ - config::{LocalDirectory, StorageMethod}, - manifest::store::Store, - test_util::CHUNK_SIZE, -}; - -fn new_file_manifest_benchmark_object_store(c: &mut Criterion) { - let store = black_box( - Store::new(&StorageMethod::LocalFiles(LocalDirectory { - main_dir: "../example-file".to_string(), - })) - .unwrap(), - ); - let file_name = black_box("0017234600.dbin.zst"); - let file_size = black_box(Some(CHUNK_SIZE as usize)); - - c.bench_function("new_file_manifest_benchmark_object_store", |b| { - b.to_async(FuturesExecutor) - .iter(|| store.file_manifest(file_name, None, file_size)) - }); -} - -criterion_group!(benches, new_file_manifest_benchmark_object_store); -criterion_main!(benches); diff --git a/file-exchange/src/config.rs b/file-exchange/src/config.rs index 18a9e2c..aab6a7e 100644 --- a/file-exchange/src/config.rs +++ b/file-exchange/src/config.rs @@ -329,6 +329,13 @@ pub struct PublisherArgs { help = "Name for the files to publish" )] pub filenames: Vec, + #[arg( + long, + value_name = "PREFIXES", + env = "PREFIXES", + help = "Publish all files in the folders/prefixes (publication takes union of folders and filenames)" + )] + pub prefixes: Vec, #[clap(flatten)] pub bundle: Option, } diff --git a/file-exchange/src/manifest/file_hasher.rs b/file-exchange/src/manifest/file_hasher.rs index 3f91034..47fa9c6 100644 --- a/file-exchange/src/manifest/file_hasher.rs +++ b/file-exchange/src/manifest/file_hasher.rs @@ -75,12 +75,20 @@ mod tests { })) .unwrap(); // produce the same file manifest + let object_meta = store + .find_object(file_name1, None) + .await + .expect("find object"); let file_manifest1 = store - .file_manifest(file_name1, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta, Some(CHUNK_SIZE as usize)) .await .unwrap(); + let object_meta2 = store + .find_object(file_name2, None) + .await + .expect("find object"); let file_manifest2 = store - .file_manifest(file_name2, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta2, Some(CHUNK_SIZE as usize)) .await .unwrap(); @@ -109,12 +117,20 @@ mod tests { main_dir: readdir1.to_string(), })) .unwrap(); + let object_meta = store + .find_object(file_name1, None) + .await + .expect("find object"); let file_manifest1 = store - .file_manifest(file_name1, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta, Some(CHUNK_SIZE as usize)) .await .unwrap(); + let object_meta2 = store + .find_object(file_name2, None) + .await + .expect("find object"); let file_manifest2 = store - .file_manifest(file_name2, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta2, Some(CHUNK_SIZE as usize)) .await .unwrap(); @@ -139,12 +155,16 @@ mod tests { main_dir: readdir.to_string(), })) .unwrap(); + let object_meta = store + .find_object(file_name, None) + .await + .expect("find object"); let file_manifest1 = store - .file_manifest(file_name, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta, Some(CHUNK_SIZE as usize)) .await .unwrap(); let file_manifest2 = store - .file_manifest(file_name, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta, Some(CHUNK_SIZE as usize)) .await .unwrap(); @@ -167,8 +187,12 @@ mod tests { main_dir: readdir1.to_string(), })) .unwrap(); + let object_meta = store + .find_object(file_name1, None) + .await + .expect("find object"); let bytes_vec = store - .multipart_read(file_name1, None, Some(CHUNK_SIZE as usize)) + .multipart_read(&object_meta, Some(CHUNK_SIZE as usize)) .await .unwrap(); let chunks1: Vec> = bytes_vec.into_iter().map(|bytes| bytes.to_vec()).collect(); @@ -182,12 +206,20 @@ mod tests { let file_name2 = path2.file_name().unwrap().to_str().unwrap(); // produce different file manifest + let object_meta = store + .find_object(file_name1, None) + .await + .expect("find object"); let file_manifest1 = store - .file_manifest(file_name1, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta, Some(CHUNK_SIZE as usize)) .await .unwrap(); + let object_meta2 = store + .find_object(file_name2, None) + .await + .expect("find object"); let file_manifest2 = store - .file_manifest(file_name2, None, Some(CHUNK_SIZE as usize)) + .file_manifest(&object_meta2, Some(CHUNK_SIZE as usize)) .await .unwrap(); diff --git a/file-exchange/src/manifest/store.rs b/file-exchange/src/manifest/store.rs index a72355d..811bcda 100644 --- a/file-exchange/src/manifest/store.rs +++ b/file-exchange/src/manifest/store.rs @@ -154,7 +154,7 @@ impl Store { Ok(result) } - pub async fn multipart_read( + pub async fn multipart_read_by_name_and_prefix( &self, file_name: &str, file_path: Option<&Path>, @@ -195,6 +195,34 @@ impl Store { Ok(result) } + pub async fn multipart_read( + &self, + object_meta: &ObjectMeta, + chunk_size: Option, + ) -> Result, Error> { + let step = chunk_size.unwrap_or({ + let s = object_meta.size / self.read_concurrency; + if s > 0 { + s + } else { + object_meta.size + } + }); + let ranges = (0..(object_meta.size / step + 1)) + .map(|i| std::ops::Range:: { + start: i * step, + end: ((i + 1) * step).min(object_meta.size), + }) + .collect::>>(); + let result = self + .store + .get_ranges(&object_meta.location, ranges.as_slice()) + .await + .unwrap(); + + Ok(result) + } + /// Async write with concurrent uploads at a location path pub async fn multipart_write( &self, @@ -243,19 +271,19 @@ impl Store { .map_err(Error::ObjectStoreError) } + /// Create file manifests from a prefix pub async fn file_manifest( &self, - file_name: &str, - prefix: Option<&Path>, + object_meta: &ObjectMeta, chunk_size: Option, ) -> Result { - let parts = self.multipart_read(file_name, prefix, chunk_size).await?; + let parts = self.multipart_read(object_meta, chunk_size).await?; let total_bytes = parts.iter().map(|b| b.len() as u64).sum(); let byte_size_used = parts .first() .ok_or(Error::ChunkInvalid(format!( - "No chunk produced from object store {} with prefix {:#?}, with chunk size config of {:#?}", - file_name, prefix, chunk_size + "No chunk produced from object store for object meta {:#?}, with chunk size config of {:#?}", + object_meta, chunk_size )))? .len(); let chunk_hashes = parts.iter().map(|c| hash_chunk(c)).collect(); @@ -401,9 +429,14 @@ mod tests { .await; assert!(write_res.is_ok()); + let object_meta = object_store + .find_object(new_file_name, None) + .await + .expect("find object"); + // Read with fixed concurrency let read_res: Vec = object_store - .multipart_read(new_file_name, None, Some(CHUNK_SIZE.try_into().unwrap())) + .multipart_read(&object_meta, Some(CHUNK_SIZE.try_into().unwrap())) .await .unwrap(); let flattened_vec: Vec = read_res.into_iter().flat_map(|b| b.to_vec()).collect(); @@ -422,8 +455,12 @@ mod tests { assert!(write_res.is_ok()); // Read with adjusted concurrency + let object_meta = object_store + .find_object(new_file_name, None) + .await + .expect("find object"); let read_res: Vec = object_store - .multipart_read(new_file_name, None, None) + .multipart_read(&object_meta, None) .await .unwrap(); let flattened_vec: Vec = read_res.into_iter().flat_map(|b| b.to_vec()).collect(); diff --git a/file-exchange/src/publisher/mod.rs b/file-exchange/src/publisher/mod.rs index fca31c4..34abe1b 100644 --- a/file-exchange/src/publisher/mod.rs +++ b/file-exchange/src/publisher/mod.rs @@ -6,6 +6,7 @@ use crate::manifest::{ BlockRange, BundleManifest, FileMetaInfo, }; use object_store::path::Path; +use object_store::ObjectMeta; use serde_yaml::to_string; pub struct ManifestPublisher { @@ -28,10 +29,9 @@ impl ManifestPublisher { /// Takes file_path, create file_manifest, build merkle tree, publish, write to output pub async fn hash_and_publish_file( &self, - file_name: &str, - file_prefix: Option<&Path>, + object_meta: &ObjectMeta, ) -> Result { - let yaml_str = self.write_file_manifest(file_name, file_prefix).await?; + let yaml_str = self.write_file_manifest(object_meta).await?; let added: AddResponse = self .ipfs_client @@ -48,17 +48,43 @@ impl ManifestPublisher { pub async fn hash_and_publish_files(&self) -> Result, Error> { let mut root_hashes: Vec = Vec::new(); + let mut to_publish: Vec = Vec::new(); + // grab all files + for prefix in &self.config.prefixes { + let files = self + .store + .list(Some(&Path::from(prefix.to_string()))) + .await + .unwrap_or_default(); + to_publish = union_obj_metas(to_publish, files); + } + for filename in &self.config.filenames { + let object_meta = + self.store + .find_object(filename, None) + .await + .ok_or(Error::DataUnavailable(format!( + "Did not find object {:?}", + filename, + )))?; + if !to_publish.contains(&object_meta) { + to_publish.push(object_meta); + } + } - let file_names = &self.config.filenames; tracing::trace!( - file_names = tracing::field::debug(&file_names), - "hash_and_publish_files", + to_publish = tracing::field::debug(&to_publish), + "Publish these files/objects", ); - for file_name in file_names { - let ipfs_hash = self.hash_and_publish_file(file_name, None).await?.hash; + for object_meta in to_publish { + let ipfs_hash = self.hash_and_publish_file(&object_meta).await?.hash; root_hashes.push(FileMetaInfo { - name: file_name.to_string(), + name: object_meta + .location + .filename() + .unwrap_or_default() + .to_string(), hash: ipfs_hash, }); } @@ -126,24 +152,17 @@ impl ManifestPublisher { } } - // pub async fn object_store_write_file_manifest(&self, file_name: &str) -> Result { - pub async fn write_file_manifest( - &self, - file_name: &str, - file_prefix: Option<&Path>, - ) -> Result { + // publish by prefixes + pub async fn write_file_manifest(&self, object_meta: &ObjectMeta) -> Result { let file_manifest = self .store - .file_manifest( - file_name, - file_prefix, - Some(self.config.chunk_size as usize), - ) + .file_manifest(object_meta, Some(self.config.chunk_size as usize)) .await?; tracing::trace!( file = tracing::field::debug(&file_manifest), - "Created file manifest" + object_meta = tracing::field::debug(&object_meta), + "Created file manifest for the object" ); let yaml = to_string(&file_manifest).map_err(Error::YamlError)?; @@ -151,6 +170,25 @@ impl ManifestPublisher { } } +fn union_obj_metas(vec1: Vec, vec2: Vec) -> Vec { + let mut result = vec![]; + fn contains(result: &[ObjectMeta], meta: &ObjectMeta) -> bool { + result.iter().any(|item| item == meta) + } + + for item in vec1.into_iter() { + if !contains(&result, &item) { + result.push(item); + } + } + for item in vec2.into_iter() { + if !contains(&result, &item) { + result.push(item); + } + } + result +} + #[cfg(test)] mod tests { use super::*; @@ -170,7 +208,12 @@ mod tests { let name = "example-create-17686085.dbin"; // Hash and publish a single file - let file_manifest_yaml = publisher.write_file_manifest(name, None).await; + let object_meta = publisher + .store + .find_object(name, None) + .await + .expect("find object"); + let file_manifest_yaml = publisher.write_file_manifest(&object_meta).await; assert!(file_manifest_yaml.is_ok()); } @@ -185,12 +228,17 @@ mod tests { }), ..Default::default() }; - let builder = ManifestPublisher::new(client, args); + let publisher = ManifestPublisher::new(client, args); let name = "example-create-17686085.dbin"; + let object_meta = publisher + .store + .find_object(name, None) + .await + .expect("find object"); // Hash and publish a single file - let hash = builder - .hash_and_publish_file(name, None) + let hash = publisher + .hash_and_publish_file(&object_meta) .await .unwrap() .hash; @@ -202,9 +250,9 @@ mod tests { }]; if let Ok(manifest_yaml) = - builder.construct_bundle_manifest(&BundleArgs::default(), meta_info) + publisher.construct_bundle_manifest(&BundleArgs::default(), meta_info) { - if let Ok(ipfs_hash) = builder.publish_bundle_manifest(&manifest_yaml).await { + if let Ok(ipfs_hash) = publisher.publish_bundle_manifest(&manifest_yaml).await { tracing::info!("Published bundle manifest to IPFS with hash: {}", ipfs_hash); } } diff --git a/file-service/src/admin.rs b/file-service/src/admin.rs index cba37db..d4125e8 100644 --- a/file-service/src/admin.rs +++ b/file-service/src/admin.rs @@ -129,6 +129,7 @@ impl StatusMutation { &self, ctx: &Context<'_>, filenames: Vec, + prefixes: Vec, chunk_size: Option, bundle_name: Option, file_type: Option, @@ -158,6 +159,7 @@ impl StatusMutation { PublisherArgs { chunk_size: chunk_size.unwrap_or(1048576), filenames, + prefixes, bundle: Some(BundleArgs { bundle_name, file_type, @@ -516,6 +518,7 @@ impl StatusMutation { &self, ctx: &Context<'_>, filenames: Vec, + prefixes: Vec, chunk_size: Option, ) -> Result, ServerError> { if ctx.data_opt::() @@ -538,6 +541,7 @@ impl StatusMutation { PublisherArgs { chunk_size: chunk_size.unwrap_or(1048576), filenames: filenames.clone(), + prefixes: prefixes.clone(), storage_method: ctx .data_unchecked::() .state