Skip to content

Commit

Permalink
feat: create and send dumb receipt with chunk request
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jan 26, 2024
1 parent e780a34 commit c8f64bc
Showing 1 changed file with 60 additions and 36 deletions.
96 changes: 60 additions & 36 deletions file-exchange/src/download_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use alloy_primitives::{Address, U256};
use bytes::Bytes;

use ethers::providers::{Http, Middleware, Provider};

use rand::seq::SliceRandom;
use reqwest::{
header::{AUTHORIZATION, CONTENT_RANGE},
header::{HeaderName, AUTHORIZATION, CONTENT_RANGE},
Client,
};
use secp256k1::SecretKey;
Expand All @@ -17,13 +18,16 @@ use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
use tokio::sync::Mutex;

use crate::config::DownloaderArgs;
use crate::discover::{Finder, IndexerEndpoint};
use crate::errors::Error;
use crate::manifest::{
file_hasher::verify_chunk, ipfs::IpfsClient, manifest_fetcher::read_bundle, Bundle,
FileManifestMeta,
};
use crate::{config::DownloaderArgs, discover, graphql};
use crate::{
discover::{Finder, IndexerEndpoint},
download_client::signer::Access,
};

use crate::util::build_wallet;

Expand Down Expand Up @@ -190,13 +194,22 @@ impl Downloader {
let client = self.http_client.clone();
//TODO: can utilize operator address for on-chain checks
let request = self.download_range_request(&meta, i, file.clone())?;
//TODO: create specialized receipts with indexer allocation and cost models
let receipt = self
.receipt_signer
.create_receipt(
graphql::allocation_id(&request.receiver),
&discover::Finder::fees(),
)
.await;
let block_list = self.indexer_blocklist.clone();
let target_chunks = self.target_chunks.clone();
let target_chunks: Arc<StdMutex<HashMap<String, HashSet<u64>>>> =
self.target_chunks.clone();
let url = request.query_endpoint.clone();
// Spawn a new asynchronous task for each range request
let handle: tokio::task::JoinHandle<Result<Arc<Mutex<File>>, Error>> =
tokio::spawn(async move {
match download_chunk_and_write_to_file(&client, request).await {
match download_chunk_and_write_to_file(&client, request, receipt).await {
Ok(r) => {
// Update downloaded status
target_chunks
Expand Down Expand Up @@ -254,20 +267,21 @@ impl Downloader {
) -> Result<DownloadRangeRequest, Error> {
let mut rng = rand::thread_rng();
let query_endpoints = &self.indexer_urls.lock().unwrap();
let url = if let Some((operator, url)) = query_endpoints.choose(&mut rng).cloned() {
tracing::debug!(
operator,
url,
chunk = i,
file_manifest = meta.meta_info.hash,
"Querying operator"
);
url.clone()
} else {
let err_msg = "No operator serving the file, data unavailable".to_string();
tracing::warn!(err_msg);
return Err(Error::DataUnavilable(err_msg.to_string()));
};
let (operator, url) =
if let Some((operator, url)) = query_endpoints.choose(&mut rng).cloned() {
tracing::debug!(
operator,
url,
chunk = i,
file_manifest = meta.meta_info.hash,
"Querying operator"
);
(operator, url)
} else {
let err_msg = "No operator serving the file, data unavailable".to_string();
tracing::warn!(err_msg);
return Err(Error::DataUnavilable(err_msg.to_string()));
};
//TODO: do no add ipfs_hash here, construct query_endpoint after updating route 'bundles/id/:id'
let query_endpoint = url + "/bundles/id/" + &self.ipfs_hash;
let file_hash = meta.meta_info.hash.clone();
Expand All @@ -279,6 +293,7 @@ impl Downloader {
let chunk_hash = meta.file_manifest.chunk_hashes[i as usize].clone();

Ok(DownloadRangeRequest {
receiver: operator,
query_endpoint,
file_hash,
start,
Expand All @@ -287,7 +302,6 @@ impl Downloader {
file,
max_retry: self.chunk_max_retry,
auth_token: self.free_query_auth_token.clone(),
_receipt: None,
})
}

Expand Down Expand Up @@ -342,9 +356,9 @@ impl Downloader {

#[derive(Debug, Clone)]
pub struct DownloadRangeRequest {
receiver: String,
query_endpoint: String,
auth_token: Option<String>,
_receipt: Option<TapReceipt>,
file_hash: String,
start: u64,
end: u64,
Expand All @@ -357,6 +371,7 @@ pub struct DownloadRangeRequest {
async fn download_chunk_and_write_to_file(
http_client: &Client,
request: DownloadRangeRequest,
receipt: Option<TapReceipt>,
) -> Result<Arc<Mutex<File>>, Error> {
let mut attempts = 0;

Expand All @@ -370,6 +385,7 @@ async fn download_chunk_and_write_to_file(
http_client,
&request.query_endpoint,
request.auth_token.clone(),
receipt.clone(),
&request.file_hash,
request.start,
request.end,
Expand Down Expand Up @@ -415,31 +431,39 @@ async fn request_chunk(
http_client: &Client,
query_endpoint: &str,
auth_token: Option<String>,
receipt: Option<TapReceipt>,
file_hash: &str,
start: u64,
end: u64,
) -> Result<Bytes, Error> {
let range = format!("bytes={}-{}", start, end);
//TODO: implement payment flow
// if auth_token.is_none() {
// tracing::error!(
// "No auth token provided; require receipt implementation"
// );
// Err(anyhow!("No auth token"))
// };
if auth_token.is_none() && receipt.is_none() {
let e = "No auth token provided and no receipt constructed".to_string();
tracing::error!(e);
return Err(Error::InvalidConfig(e));
};

tracing::debug!(query_endpoint, range, "Make range request");
let response = http_client
let request = http_client
.get(query_endpoint)
.header("file_hash", file_hash)
.header(CONTENT_RANGE, range)
.header(
AUTHORIZATION,
auth_token.expect("No payment nor auth token"),
.header(CONTENT_RANGE, range);

let request = if let Some(auth) = auth_token {
request.header(AUTHORIZATION, auth)
} else {
request
};
let request = if let Some(receipt) = receipt {
request.header(
HeaderName::from_str("Scalar-Receipt").unwrap(),
&receipt.serialize(),
)
.send()
.await
.map_err(Error::Request)?;
} else {
request
};

let response = request.send().await.map_err(Error::Request)?;

// Check if the server supports range requests
if response.status().is_success() && response.headers().contains_key(CONTENT_RANGE) {
Expand Down

0 comments on commit c8f64bc

Please sign in to comment.