From c8f64bce1164754391737a8037f81a825aa6b706 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 26 Jan 2024 17:26:15 -0600 Subject: [PATCH] feat: create and send dumb receipt with chunk request --- file-exchange/src/download_client/mod.rs | 96 +++++++++++++++--------- 1 file changed, 60 insertions(+), 36 deletions(-) diff --git a/file-exchange/src/download_client/mod.rs b/file-exchange/src/download_client/mod.rs index b85f829..6a3c4be 100644 --- a/file-exchange/src/download_client/mod.rs +++ b/file-exchange/src/download_client/mod.rs @@ -2,9 +2,10 @@ use alloy_primitives::{Address, U256}; use bytes::Bytes; use ethers::providers::{Http, Middleware, Provider}; + use rand::seq::SliceRandom; use reqwest::{ - header::{AUTHORIZATION, CONTENT_RANGE}, + header::{HeaderName, AUTHORIZATION, CONTENT_RANGE}, Client, }; use secp256k1::SecretKey; @@ -17,13 +18,16 @@ use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; use tokio::sync::Mutex; -use crate::config::DownloaderArgs; -use crate::discover::{Finder, IndexerEndpoint}; use crate::errors::Error; use crate::manifest::{ file_hasher::verify_chunk, ipfs::IpfsClient, manifest_fetcher::read_bundle, Bundle, FileManifestMeta, }; +use crate::{config::DownloaderArgs, discover, graphql}; +use crate::{ + discover::{Finder, IndexerEndpoint}, + download_client::signer::Access, +}; use crate::util::build_wallet; @@ -190,13 +194,22 @@ impl Downloader { let client = self.http_client.clone(); //TODO: can utilize operator address for on-chain checks let request = self.download_range_request(&meta, i, file.clone())?; + //TODO: create specialized receipts with indexer allocation and cost models + let receipt = self + .receipt_signer + .create_receipt( + graphql::allocation_id(&request.receiver), + &discover::Finder::fees(), + ) + .await; let block_list = self.indexer_blocklist.clone(); - let target_chunks = self.target_chunks.clone(); + let target_chunks: Arc>>> = + self.target_chunks.clone(); let url = request.query_endpoint.clone(); // Spawn a new asynchronous task for each range request let handle: tokio::task::JoinHandle>, Error>> = tokio::spawn(async move { - match download_chunk_and_write_to_file(&client, request).await { + match download_chunk_and_write_to_file(&client, request, receipt).await { Ok(r) => { // Update downloaded status target_chunks @@ -254,20 +267,21 @@ impl Downloader { ) -> Result { let mut rng = rand::thread_rng(); let query_endpoints = &self.indexer_urls.lock().unwrap(); - let url = if let Some((operator, url)) = query_endpoints.choose(&mut rng).cloned() { - tracing::debug!( - operator, - url, - chunk = i, - file_manifest = meta.meta_info.hash, - "Querying operator" - ); - url.clone() - } else { - let err_msg = "No operator serving the file, data unavailable".to_string(); - tracing::warn!(err_msg); - return Err(Error::DataUnavilable(err_msg.to_string())); - }; + let (operator, url) = + if let Some((operator, url)) = query_endpoints.choose(&mut rng).cloned() { + tracing::debug!( + operator, + url, + chunk = i, + file_manifest = meta.meta_info.hash, + "Querying operator" + ); + (operator, url) + } else { + let err_msg = "No operator serving the file, data unavailable".to_string(); + tracing::warn!(err_msg); + return Err(Error::DataUnavilable(err_msg.to_string())); + }; //TODO: do no add ipfs_hash here, construct query_endpoint after updating route 'bundles/id/:id' let query_endpoint = url + "/bundles/id/" + &self.ipfs_hash; let file_hash = meta.meta_info.hash.clone(); @@ -279,6 +293,7 @@ impl Downloader { let chunk_hash = meta.file_manifest.chunk_hashes[i as usize].clone(); Ok(DownloadRangeRequest { + receiver: operator, query_endpoint, file_hash, start, @@ -287,7 +302,6 @@ impl Downloader { file, max_retry: self.chunk_max_retry, auth_token: self.free_query_auth_token.clone(), - _receipt: None, }) } @@ -342,9 +356,9 @@ impl Downloader { #[derive(Debug, Clone)] pub struct DownloadRangeRequest { + receiver: String, query_endpoint: String, auth_token: Option, - _receipt: Option, file_hash: String, start: u64, end: u64, @@ -357,6 +371,7 @@ pub struct DownloadRangeRequest { async fn download_chunk_and_write_to_file( http_client: &Client, request: DownloadRangeRequest, + receipt: Option, ) -> Result>, Error> { let mut attempts = 0; @@ -370,6 +385,7 @@ async fn download_chunk_and_write_to_file( http_client, &request.query_endpoint, request.auth_token.clone(), + receipt.clone(), &request.file_hash, request.start, request.end, @@ -415,31 +431,39 @@ async fn request_chunk( http_client: &Client, query_endpoint: &str, auth_token: Option, + receipt: Option, file_hash: &str, start: u64, end: u64, ) -> Result { let range = format!("bytes={}-{}", start, end); - //TODO: implement payment flow - // if auth_token.is_none() { - // tracing::error!( - // "No auth token provided; require receipt implementation" - // ); - // Err(anyhow!("No auth token")) - // }; + if auth_token.is_none() && receipt.is_none() { + let e = "No auth token provided and no receipt constructed".to_string(); + tracing::error!(e); + return Err(Error::InvalidConfig(e)); + }; tracing::debug!(query_endpoint, range, "Make range request"); - let response = http_client + let request = http_client .get(query_endpoint) .header("file_hash", file_hash) - .header(CONTENT_RANGE, range) - .header( - AUTHORIZATION, - auth_token.expect("No payment nor auth token"), + .header(CONTENT_RANGE, range); + + let request = if let Some(auth) = auth_token { + request.header(AUTHORIZATION, auth) + } else { + request + }; + let request = if let Some(receipt) = receipt { + request.header( + HeaderName::from_str("Scalar-Receipt").unwrap(), + &receipt.serialize(), ) - .send() - .await - .map_err(Error::Request)?; + } else { + request + }; + + let response = request.send().await.map_err(Error::Request)?; // Check if the server supports range requests if response.status().is_success() && response.headers().contains_key(CONTENT_RANGE) {