Skip to content

Commit

Permalink
feat: publish by prefixes/folders
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Apr 29, 2024
1 parent 588ab46 commit 7630280
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 129 deletions.
63 changes: 9 additions & 54 deletions docs/publisher_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Publisher must have read access to all files contained in the Bundle. The publis

### CLI usage

The publisher must provide a name for the bundle, filenames, file type, version, and path for read-access of the files.
The publisher must provide path for read-access of the files.

Publishing separate files stored in the local file system
```
Expand All @@ -28,6 +28,13 @@ $ file-exchange publisher \
local-files --main-dir ./example-file/
```

Publishing a set of files/objects by the folders/prefixes
```
$ file-exchange publisher \
--prefixes example-file \
local-files --main-dir ./
```

Publishing files/objects stored in a remote s3 bucket into a bundle, provide s3 and bundle configurations.
```
$ file-exchange publisher \
Expand All @@ -45,57 +52,5 @@ $ file-exchange publisher \

For more information
```
$ file-exchange --help
Publisher takes the files, generate bundle manifest,
and publish to IPFS
Usage: file-exchange publisher [OPTIONS] <COMMAND>
Commands:
local-files
object-storage
help Print this message or the help
of the given subcommand(s)
Options:
--yaml-store <YAML_STORE_DIR>
Path to the directory to store the generated
yaml file for bundle [env: YAML_STORE_DIR=]
[default: ./example-file/bundle.yaml]
--chunk-size <CHUNK_SIZE>
Chunk size in bytes to split files (Default:
1048576 bytes = 1MiB) [env: CHUNK_SIZE=]
[default: 1048576]
--filenames <FILE_NAMES>
Name for the files to publish [env:
FILE_NAMES=]
--bundle-name <BUNDLE_NAME>
Name for the bundle (later this can be
interactive) [env: BUNDLE_NAME=]
--file-type <FILE_TYPE>
Type of the file (e.g., sql_snapshot,
flatfiles) [env: FILE_TYPE=]
--bundle-version <FILE_VERSION>
Bundle versioning [env: FILE_VERSION=]
--identifier <IDENTIFIER>
Identifier of the file given its type
(chain-id for firehose flatfiles, subgraph
deployment hash for subgraph snapshots)
[env: IDENTIFIER=]
--start-block <START_BLOCK>
Start block for flatfiles [env:
START_BLOCK=]
--end-block <END_BLOCK>
End block for sql snapshot or flatfiles
[env: END_BLOCK=]
--description <DESCRIPTION>
Describe bundle content [env: DESCRIPTION=]
[default: ]
--chain-id <NETWORK>
Network represented in CCIP ID (Ethereum
mainnet: 1, goerli: 5, arbitrum-one: 42161,
sepolia: 58008 [env: NETWORK=] [default: 1]
-h, --help
Print help
$ file-exchange publisher --help
```
4 changes: 0 additions & 4 deletions file-exchange/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@ harness = false
name = "validate_local"
harness = false

[[bench]]
name = "new_file_manifest"
harness = false

[[bench]]
name = "hash_chunk"
harness = false
27 changes: 0 additions & 27 deletions file-exchange/benches/new_file_manifest.rs

This file was deleted.

7 changes: 7 additions & 0 deletions file-exchange/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ pub struct PublisherArgs {
help = "Name for the files to publish"
)]
pub filenames: Vec<String>,
#[arg(
long,
value_name = "PREFIXES",
env = "PREFIXES",
help = "Publish all files in the folders/prefixes (publication takes union of folders and filenames)"
)]
pub prefixes: Vec<String>,
#[clap(flatten)]
pub bundle: Option<BundleArgs>,
}
Expand Down
50 changes: 41 additions & 9 deletions file-exchange/src/manifest/file_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,20 @@ mod tests {
}))
.unwrap();
// produce the same file manifest
let object_meta = store
.find_object(file_name1, None)
.await
.expect("find object");
let file_manifest1 = store
.file_manifest(file_name1, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta, Some(CHUNK_SIZE as usize))
.await
.unwrap();
let object_meta2 = store
.find_object(file_name2, None)
.await
.expect("find object");
let file_manifest2 = store
.file_manifest(file_name2, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta2, Some(CHUNK_SIZE as usize))
.await
.unwrap();

Expand Down Expand Up @@ -109,12 +117,20 @@ mod tests {
main_dir: readdir1.to_string(),
}))
.unwrap();
let object_meta = store
.find_object(file_name1, None)
.await
.expect("find object");
let file_manifest1 = store
.file_manifest(file_name1, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta, Some(CHUNK_SIZE as usize))
.await
.unwrap();
let object_meta2 = store
.find_object(file_name2, None)
.await
.expect("find object");
let file_manifest2 = store
.file_manifest(file_name2, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta2, Some(CHUNK_SIZE as usize))
.await
.unwrap();

Expand All @@ -139,12 +155,16 @@ mod tests {
main_dir: readdir.to_string(),
}))
.unwrap();
let object_meta = store
.find_object(file_name, None)
.await
.expect("find object");
let file_manifest1 = store
.file_manifest(file_name, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta, Some(CHUNK_SIZE as usize))
.await
.unwrap();
let file_manifest2 = store
.file_manifest(file_name, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta, Some(CHUNK_SIZE as usize))
.await
.unwrap();

Expand All @@ -167,8 +187,12 @@ mod tests {
main_dir: readdir1.to_string(),
}))
.unwrap();
let object_meta = store
.find_object(file_name1, None)
.await
.expect("find object");
let bytes_vec = store
.multipart_read(file_name1, None, Some(CHUNK_SIZE as usize))
.multipart_read(&object_meta, Some(CHUNK_SIZE as usize))
.await
.unwrap();
let chunks1: Vec<Vec<u8>> = bytes_vec.into_iter().map(|bytes| bytes.to_vec()).collect();
Expand All @@ -182,12 +206,20 @@ mod tests {
let file_name2 = path2.file_name().unwrap().to_str().unwrap();

// produce different file manifest
let object_meta = store
.find_object(file_name1, None)
.await
.expect("find object");
let file_manifest1 = store
.file_manifest(file_name1, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta, Some(CHUNK_SIZE as usize))
.await
.unwrap();
let object_meta2 = store
.find_object(file_name2, None)
.await
.expect("find object");
let file_manifest2 = store
.file_manifest(file_name2, None, Some(CHUNK_SIZE as usize))
.file_manifest(&object_meta2, Some(CHUNK_SIZE as usize))
.await
.unwrap();

Expand Down
53 changes: 45 additions & 8 deletions file-exchange/src/manifest/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Store {
Ok(result)
}

pub async fn multipart_read(
pub async fn multipart_read_by_name_and_prefix(
&self,
file_name: &str,
file_path: Option<&Path>,
Expand Down Expand Up @@ -195,6 +195,34 @@ impl Store {
Ok(result)
}

pub async fn multipart_read(
&self,
object_meta: &ObjectMeta,
chunk_size: Option<usize>,
) -> Result<Vec<Bytes>, Error> {
let step = chunk_size.unwrap_or({
let s = object_meta.size / self.read_concurrency;
if s > 0 {
s
} else {
object_meta.size
}
});
let ranges = (0..(object_meta.size / step + 1))
.map(|i| std::ops::Range::<usize> {
start: i * step,
end: ((i + 1) * step).min(object_meta.size),
})
.collect::<Vec<std::ops::Range<usize>>>();
let result = self
.store
.get_ranges(&object_meta.location, ranges.as_slice())
.await
.unwrap();

Ok(result)
}

/// Async write with concurrent uploads at a location path
pub async fn multipart_write(
&self,
Expand Down Expand Up @@ -243,19 +271,19 @@ impl Store {
.map_err(Error::ObjectStoreError)
}

/// Create file manifests from a prefix
pub async fn file_manifest(
&self,
file_name: &str,
prefix: Option<&Path>,
object_meta: &ObjectMeta,
chunk_size: Option<usize>,
) -> Result<FileManifest, Error> {
let parts = self.multipart_read(file_name, prefix, chunk_size).await?;
let parts = self.multipart_read(object_meta, chunk_size).await?;
let total_bytes = parts.iter().map(|b| b.len() as u64).sum();
let byte_size_used = parts
.first()
.ok_or(Error::ChunkInvalid(format!(
"No chunk produced from object store {} with prefix {:#?}, with chunk size config of {:#?}",
file_name, prefix, chunk_size
"No chunk produced from object store for object meta {:#?}, with chunk size config of {:#?}",
object_meta, chunk_size
)))?
.len();
let chunk_hashes = parts.iter().map(|c| hash_chunk(c)).collect();
Expand Down Expand Up @@ -401,9 +429,14 @@ mod tests {
.await;
assert!(write_res.is_ok());

let object_meta = object_store
.find_object(new_file_name, None)
.await
.expect("find object");

// Read with fixed concurrency
let read_res: Vec<Bytes> = object_store
.multipart_read(new_file_name, None, Some(CHUNK_SIZE.try_into().unwrap()))
.multipart_read(&object_meta, Some(CHUNK_SIZE.try_into().unwrap()))
.await
.unwrap();
let flattened_vec: Vec<u8> = read_res.into_iter().flat_map(|b| b.to_vec()).collect();
Expand All @@ -422,8 +455,12 @@ mod tests {
assert!(write_res.is_ok());

// Read with adjusted concurrency
let object_meta = object_store
.find_object(new_file_name, None)
.await
.expect("find object");
let read_res: Vec<Bytes> = object_store
.multipart_read(new_file_name, None, None)
.multipart_read(&object_meta, None)
.await
.unwrap();
let flattened_vec: Vec<u8> = read_res.into_iter().flat_map(|b| b.to_vec()).collect();
Expand Down
Loading

0 comments on commit 7630280

Please sign in to comment.