diff --git a/src/backend/fs.rs b/src/backend/fs.rs index 0757ece..db33f84 100644 --- a/src/backend/fs.rs +++ b/src/backend/fs.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::{anyhow, Error, Result}; -use axum::body::Bytes; +use axum::{body::Bytes, extract::BodyStream}; use bytes::BytesMut; use carbonado::{constants::Format, file::Header, structs::Encoded}; use chrono::{NaiveDateTime, TimeZone, Utc}; @@ -49,13 +49,24 @@ pub async fn write_file<'a>( &x_only_pk.serialize(), ))); + //let is_mime_type = Arc::new(Mutex::new("mim_type_is?")); + trace!("Iterate through file body stream"); let thread_file_hasher = file_hasher.clone(); - let segment_hashes: Vec = file_stream - .try_par_then(None, move |segment: Bytes| { + // let is_mime_type = is_mime_type.clone(); + + let segment_hashes: Vec<(BaoHash, String)> = file_stream + .try_par_then(None, move |(segment, (_, _, mime_type))| { + // If you need to pass additional information, create a new struct or tuple + //let additional_data = (body_stream.clone(), remainder.clone(), is_mime_type.clone()); + let segment = segment.clone(); + let mime_type = mime_type.clone(); + trace!("Process segment"); let thread_file_hasher = thread_file_hasher.clone(); + //let is_mime_type = is_mime_type.clone(); + async move { thread_file_hasher.lock().await.update(&segment); trace!("Encoding segment"); @@ -64,12 +75,30 @@ pub async fn write_file<'a>( let segment_hash = write_segment(&ss, &pk_bytes, &encoded_segment)?; trace!("Segment hash: {segment_hash}"); - Ok::(segment_hash) + // Extract the mime_type from the result here + // let mime_type = match result { + // Ok(Some((_, (_, _, mime_type)))) => mime_type, + // _ => "application/octet-stream".to_string(), + // }; + + Ok::<(BaoHash, String), Error>((segment_hash, mime_type)) } }) .try_collect() .await?; + // Extract mime_type values from segment_hashes + // let mime_types: Vec = segment_hashes + // .iter() + // .map(|(_, mime_type)| mime_type.clone()) + // .collect(); + + // Convert the reference to a slice of tuples into a reference to a slice of BaoHash + let seg_hashes: &[BaoHash] = &segment_hashes + .iter() + .map(|(bao_hash, _)| bao_hash) + .collect::>()[..]; + let file_hash: Blake3Hash = Blake3Hash(file_hasher.lock().await.finalize()); trace!("Check if catalog already exists"); @@ -82,7 +111,7 @@ pub async fn write_file<'a>( } trace!("Append each hash to its catalog"); - write_catalog(&write_pk_str, &file_hash, name, mime_type, &segment_hashes).await?; + write_catalog(&write_pk_str, &file_hash, name, "mime_type", seg_hashes).await?; debug!("Finished write_file"); Ok(file_hash) diff --git a/src/frontend/http.rs b/src/frontend/http.rs index 2a84c2a..70e5a76 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -24,16 +24,18 @@ use crate::{ prelude::*, }; -async fn write_file_handler(pk: &str, body: BodyStream, name: Option) -> Result { +pub async fn write_file_handler( + pk: &str, + body: BodyStream, + name: Option, +) -> Result { let pk = &Secp256k1PubKey::try_from(pk)?; - let is_mime_type = "default_mime_type".to_string(); - let file_stream: FileStream = stream::try_unfold( ( body, BytesMut::with_capacity(SEGMENT_SIZE * 2), - is_mime_type.clone(), + "default_mime_type".to_string(), ), |(mut body_stream, mut remainder, mut is_mime_type)| async move { while remainder.len() < SEGMENT_SIZE { @@ -43,17 +45,13 @@ async fn write_file_handler(pk: &str, body: BodyStream, name: Option) -> let mut buffer = Vec::new(); buffer.extend_from_slice(&bytes); - // is_mime_type = "test_mime_type".to_string(); - let kind = infer::get(&buffer); // Use the extended buffer here - match kind { - Some(mime_type) => { - is_mime_type = mime_type.mime_type().to_string(); - } - None => debug!("no mime_type found"), + let mime_type = match kind { + Some(mime_type) => mime_type.mime_type().to_string(), + None => "application/octet-stream".to_string(), }; - debug!(">>>>>>>>>>>> PASS TO TUPLE MIME_TYPE {}", &is_mime_type); + debug!(">>>>>>>>>>>> PASS TO TUPLE MIME_TYPE {}", &mime_type); remainder.extend(bytes); @@ -62,7 +60,7 @@ async fn write_file_handler(pk: &str, body: BodyStream, name: Option) -> trace!("Stream 1MB segment"); return Ok(Some(( segment.freeze(), - (body_stream, remainder, is_mime_type), + (body_stream, remainder, mime_type.clone()), // Pass mime_type here ))); } } else { @@ -76,7 +74,7 @@ async fn write_file_handler(pk: &str, body: BodyStream, name: Option) -> ) .boxed(); - let Blake3Hash(hash) = write_file(pk, file_stream, name, &is_mime_type).await?; + let Blake3Hash(hash) = write_file(pk, file_stream, name).await?; Ok(hash.to_hex().to_string()) } diff --git a/src/structs.rs b/src/structs.rs index dff14fa..adc172e 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -3,6 +3,8 @@ use std::{ str::FromStr, }; +use axum::extract::BodyStream; +use bytes::BytesMut; use serde::{Deserialize, Serialize}; use anyhow::{Error, Result}; @@ -18,6 +20,31 @@ impl fmt::Display for Blake3Hash { } } +// ub struct BaoHashExt { +// pub hash: bao::Hash, +// pub other_data: (BodyStream, BytesMut, String), // Tuple containing body, BytesMut, and mime_type +// } + +// impl BaoHashExt { +// // Extract the bao::Hash +// pub fn get_hash(&self) -> &bao::Hash { +// &self.hash +// } + +// // Extract the String field from the tuple +// pub fn get_mime_type(&self) -> &str { +// &self.other_data.2 +// } +// } + +// impl fmt::Display for BaoHashExt { +// fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { +// let Self(hash) = self; + +// f.write_str(&hash.to_string()) +// } +// }p + #[derive(Clone, Debug)] pub struct BaoHash(pub bao::Hash); diff --git a/tests/file.rs b/tests/file.rs index 3dc8cca..a7d0782 100644 --- a/tests/file.rs +++ b/tests/file.rs @@ -1,12 +1,11 @@ use std::fs; -use std::pin::Pin; //use tokio::stream::StreamExt; -use anyhow::{Error, Result}; +use anyhow::Result; use axum::body::Bytes; use bytes::BytesMut; use carbonado_node::{ - backend::fs::{read_catalog, read_file, write_file, FileStream}, + backend::fs::{read_file, write_file, FileStream}, config::node_shared_secret, prelude::SEGMENT_SIZE, structs::{Hash, Lookup, Secp256k1PubKey}, @@ -47,7 +46,7 @@ async fn write_read() -> Result<()> { .map(|chunk| Ok(Bytes::from(chunk))) .boxed(); - let blake3_hash = write_file(&Secp256k1PubKey(pk), file_stream, None, "mime_type_test").await?; + let blake3_hash = write_file(&Secp256k1PubKey(pk), file_stream, None).await?; info!("Reading file, {}", blake3_hash); @@ -98,7 +97,7 @@ async fn read_write_delete_file() -> Result<()> { let (sk, pk) = generate_keypair(&mut thread_rng()); - let file_did_write = write_file(&Secp256k1PubKey(pk), file_stream, None, "mime_type_test") + let file_did_write = write_file(&Secp256k1PubKey(pk), file_stream, None) .await .is_ok();