Skip to content

Commit

Permalink
refactor: update todo comments, small reading perf
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Mar 28, 2024
1 parent 5e67fd4 commit e2a76ca
Show file tree
Hide file tree
Showing 12 changed files with 18 additions and 64 deletions.
17 changes: 0 additions & 17 deletions file-exchange/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ pub struct DownloaderArgs {
}

/// Publisher takes the files, generate bundle manifest, and publish to IPFS
//TODO: a single command to publish a range of files
#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
#[group(required = false, multiple = true)]
pub struct PublisherArgs {
Expand Down Expand Up @@ -335,8 +334,6 @@ pub struct PublisherArgs {
value_name = "FILE_TYPE",
value_enum,
env = "FILE_TYPE",
//TODO: use enum
// value_parser = clap::value_parser!(FileType::from_str),
help = "Type of the file (e.g., sql_snapshot, flatfiles)"
)]
pub file_type: String,
Expand All @@ -345,8 +342,6 @@ pub struct PublisherArgs {
long,
value_name = "FILE_VERSION",
env = "FILE_VERSION",
//TODO: use enum
// value_parser = clap::value_parser!(FileType::from_str),
help = "Bundle versioning"
)]
pub bundle_version: String,
Expand Down Expand Up @@ -528,18 +523,6 @@ pub enum FileType {
Flatfiles,
}

// impl FromStr for FileType {
// type Err = &'static str;

// fn from_str(s: &str) -> Result<Self, Self::Err> {
// match s {
// "sql_snapshot" => Ok(FileType::SqlSnapshot),
// "flatfiles" => Ok(FileType::Flatfiles),
// _ => Err("Invalid file type"),
// }
// }
// }

/// Sets up tracing, allows log level to be set from the environment variables
pub fn init_tracing(format: &str) -> Result<(), SetGlobalDefaultError> {
let filter = EnvFilter::from_default_env();
Expand Down
3 changes: 0 additions & 3 deletions file-exchange/src/discover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use crate::manifest::{
};
use crate::util::{UDecimal18, GRT};

// Pair indexer operator address and indexer service endpoint (operator, indexer_url)
// persumeably this should not be handled by clients themselves
//TODO: smarter type for tracking available endpoints
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ServiceEndpoint {
pub operator: String,
Expand Down Expand Up @@ -242,7 +240,6 @@ pub async fn unavailable_files(file_map: &FileAvailbilityMap) -> Vec<String> {
missing_file
}

//TODO: directly access the field instead
#[derive(Debug, Serialize, Deserialize)]
pub struct Operator {
#[serde(alias = "publicKey")]
Expand Down
4 changes: 1 addition & 3 deletions file-exchange/src/download_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ impl Downloader {
}

/// Read bundle manifiest and download the individual file manifests
//TODO: update once there is payment
pub async fn download_bundle(&self) -> Result<(), Error> {
self.init_target_chunks(&self.bundle);
tracing::trace!(
Expand Down Expand Up @@ -235,7 +234,6 @@ impl Downloader {
}

/// Download a file by reading its chunk manifest
//TODO: update once there is payment
pub async fn download_file_manifest(&self, meta: FileManifestMeta) -> Result<(), Error> {
tracing::debug!(
file_spec = tracing::field::debug(&meta),
Expand Down Expand Up @@ -403,7 +401,7 @@ impl Downloader {
tracing::warn!(err_msg);
return Err(Error::DataUnavailable(err_msg.to_string()));
};
//TODO: do no add ipfs_hash here, construct query_endpoint after updating route 'files/id/:id'
//TODO: do not add ipfs_hash here, construct query_endpoint after updating route 'files/id/:id'
let query_endpoint =
service.service_endpoint.clone() + "/files/id/" + &self.config.ipfs_hash;
let file_hash = meta.meta_info.hash.clone();
Expand Down
1 change: 0 additions & 1 deletion file-exchange/src/download_client/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl Access for TapReceipt {
}
}

//TODO: would be different with TAPv1/2: add record_receipt, update_allocations, get_receipts
impl ReceiptSigner {
pub async fn new(signer: SecretKey, chain_id: U256, verifier: Address) -> Self {
Self {
Expand Down
1 change: 0 additions & 1 deletion file-exchange/src/manifest/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::path::Path;

use crate::errors::Error;

// TODO: REFACTOR; read chunk can be further refactors, check for valid path, and use in serve_file_range
// Read a chunk from the file at the file_path from specified start and end bytes
pub fn read_chunk(file_path: &Path, (start, end): (u64, u64)) -> Result<Bytes, Error> {
let mut file = File::open(file_path).map_err(Error::FileIOError)?;
Expand Down
7 changes: 0 additions & 7 deletions file-exchange/src/manifest/ipfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ pub fn create_ipfs_client(uri: &str) -> IpfsClient {

tracing::info!(ipfs_address, "Connect to IPFS node");

//TODO: Test IPFS client

match IpfsClient::new(&ipfs_address) {
Ok(ipfs_client) => ipfs_client,
Err(e) => {
Expand All @@ -139,14 +137,9 @@ mod tests {
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

// fn test_client() -> IpfsClient {
// IpfsClient::new("https://ipfs.network.thegraph.com")
// }

#[tokio::test]
async fn fetch_random_subgraph_yaml() {
let ipfs_hash = "Qmc1mmagMJqopw2zb1iUTPRMhahMvEAKpQGS3KvuL9cpaX";
// https://ipfs.network.thegraph.com/api/v0/cat?arg=Qmc1mmagMJqopw2zb1iUTPRMhahMvEAKpQGS3KvuL9cpaX
let client = create_ipfs_client("https://ipfs.network.thegraph.com");

let retry_strategy = ExponentialBackoff::from_millis(10)
Expand Down
1 change: 0 additions & 1 deletion file-exchange/src/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub struct FileManifestMeta {
}

/* Bundle - packaging of file manifests mapped into local files */
//TODO: Add GraphQL derivation
#[derive(Clone, Debug, Serialize, Deserialize, SimpleObject)]
pub struct Bundle {
pub ipfs_hash: String,
Expand Down
44 changes: 17 additions & 27 deletions file-exchange/src/manifest/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ impl Store {
);

// Read all files in bundle to verify locally. This may cause a long initialization time
//TODO: allow for concurrent validation of files
for file_meta in &local.bundle.file_manifests {
self.read_and_validate_file(file_meta, &local.local_path)
.await?;
Expand Down Expand Up @@ -274,37 +273,28 @@ impl Store {
);

// loop through file manifest byte range
//multipart read/ vectorized read
for i in 0..(file_manifest.total_bytes / file_manifest.chunk_size + 1) {
// read range
let start = i * file_manifest.chunk_size;
let end: usize =
(u64::min(start + file_manifest.chunk_size, file_manifest.total_bytes) - 1)
.try_into()
.unwrap();
tracing::trace!(
i,
start_byte = tracing::field::debug(&start),
end_byte = tracing::field::debug(&end),
"Verify chunk index"
);
let chunk_hash = file_manifest.chunk_hashes[i as usize].clone();

// read chunk
// let chunk_data = read_chunk(&file_path, (start, end))?;
let start: usize = start.try_into().unwrap();
// let length: usize = end - start + 1;
let range = std::ops::Range {
start,
end: end + 1,
};
let file_name = meta_info.name.clone();
let chunk_ops: Vec<_> = (0..(file_manifest.total_bytes / file_manifest.chunk_size + 1))
.map(|i| {
let start = i * file_manifest.chunk_size;
let end = (u64::min(start + file_manifest.chunk_size, file_manifest.total_bytes)
- 1) as usize;
let chunk_hash = file_manifest.chunk_hashes[i as usize].clone();

let range = std::ops::Range {
start: start as usize,
end: end + 1,
};
(range, chunk_hash)
})
.collect();

let file_name = meta_info.name.clone();
for (range, chunk_hash) in chunk_ops {
let chunk_data = self.range_read(&file_name, &range).await?;
// verify chunk
if !verify_chunk(&chunk_data, &chunk_hash) {
tracing::error!(
file = tracing::field::debug(&file_name),
chunk_index = tracing::field::debug(&i),
chunk_hash = tracing::field::debug(&chunk_hash),
"Cannot locally verify the serving file"
);
Expand Down
1 change: 0 additions & 1 deletion file-exchange/src/transaction_manager/staking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ impl TransactionManager {
tokens: U256,
epoch: u64,
) -> Result<(H160, Option<TransactionReceipt>), Error> {
//TODO: Start with hardcoding, later add field indexer address to TX manager, tokens to fn params
let existing_ids: Vec<H160> = vec![];
let metadata: [u8; 32] = [0; 32];

Expand Down
1 change: 0 additions & 1 deletion file-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub struct ServerArgs {
default_value = "pretty"
)]
pub log_format: LogFormat,
//TODO: More complex price management
#[arg(
long,
value_name = "default-price-per-byte",
Expand Down
1 change: 0 additions & 1 deletion file-service/src/file_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ pub async fn file_service(
match req.get("content-range") {
Some(r) => {
let range = parse_range_header(r)?;
//TODO: validate receipt
serve_file_range(
context.state.store.clone(),
&file_manifest.meta_info.name,
Expand Down
1 change: 0 additions & 1 deletion file-service/src/file_server/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ impl From<BundleManifest> for GraphQlBundleManifest {
#[derive(Clone, Debug, SimpleObject)]
pub struct GraphQlBundle {
pub ipfs_hash: String,
//TODO: make local path available for admin only
pub manifest: GraphQlBundleManifest,
pub file_manifests: Vec<GraphQlFileManifestMeta>,
}
Expand Down

0 comments on commit e2a76ca

Please sign in to comment.