diff --git a/bin/main.rs b/bin/main.rs index fd6dbf5..35094e9 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -300,13 +300,7 @@ async fn table_get( State(state): State, Path((table, key)): Path<(String, String)>, ) -> Response { - match state - .iroh_node - .read() - .await - .table_get(&table, &key) - .await - { + match state.iroh_node.read().await.table_get(&table, &key).await { Ok(reader) => Response::builder() .body(Body::from_stream(ReaderStream::new(reader))) .unwrap(), diff --git a/src/config.rs b/src/config.rs index 2c6ba27..6246132 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,10 +26,14 @@ pub struct FSShardConfig { pub weight: usize, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct S3Config { +pub struct S3ConfigCredentials { pub aws_access_key_id: String, pub aws_secret_access_key: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct S3Config { + pub credentials: Option, pub bucket_name: String, pub prefix: String, pub region_name: String, diff --git a/src/sinks.rs b/src/sinks.rs index f0826f2..7d44a50 100644 --- a/src/sinks.rs +++ b/src/sinks.rs @@ -21,16 +21,26 @@ pub struct S3Sink { impl S3Sink { pub async fn new(name: &str, config: &S3Config) -> Self { - let credentials = Credentials::from_keys( - &config.aws_access_key_id, - &config.aws_secret_access_key, - None, - ); - let sdk_config = aws_config::defaults(BehaviorVersion::latest()) - .credentials_provider(credentials) - .region(Region::new(config.region_name.clone())) - .load() - .await; + let sdk_config = match &config.credentials { + Some(credentials) => { + let credentials = Credentials::from_keys( + &credentials.aws_access_key_id, + &credentials.aws_secret_access_key, + None, + ); + aws_config::defaults(BehaviorVersion::latest()) + .credentials_provider(credentials) + .region(Region::new(config.region_name.clone())) + .load() + .await + } + None => { + aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new(config.region_name.clone())) + .load() + .await + } + }; let client = aws_sdk_s3::Client::new(&sdk_config); S3Sink { name: name.to_string(), diff --git a/src/storages/fs_storage.rs b/src/storages/fs_storage.rs index 288b94e..9e10804 100644 --- a/src/storages/fs_storage.rs +++ b/src/storages/fs_storage.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use std::io; use std::path::PathBuf; use tokio::io::AsyncRead; -use tracing::{info, info_span}; +use tracing::{info, info_span, Instrument}; #[derive(Clone)] pub struct FSStorageEngine { @@ -44,8 +44,6 @@ impl FSStorageEngine { tokio::spawn({ let fs_storage = fs_storage.clone(); async move { - let span = info_span!("fs_sync", table_id = ?iroh_doc.id()); - let _guard = span.enter(); let mut stream = fs_storage.iroh_doc().subscribe().await.unwrap(); let mut wait_list = HashMap::new(); info!("started"); @@ -85,6 +83,7 @@ impl FSStorageEngine { } Ok::<(), Error>(()) } + .instrument(info_span!("fs_sync", table_id = ?iroh_doc.id().to_string())) }); for (_, fs_shard) in &fs_storage.fs_shards { tokio::spawn({ @@ -118,6 +117,7 @@ impl FSStorageEngine { } Ok::<(), Error>(()) } + .instrument(info_span!("restore")) }); } Ok(fs_storage) diff --git a/src/storages/mirroring.rs b/src/storages/mirroring.rs index e19d40b..ccbb351 100644 --- a/src/storages/mirroring.rs +++ b/src/storages/mirroring.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::task::JoinHandle; -use tracing::{info, info_span, warn}; +use tracing::{info, info_span, warn, Instrument}; #[derive(Clone)] pub struct Mirroring { @@ -22,76 +22,79 @@ impl Mirroring { sync_client: IrohClient, delete_after_mirroring: bool, ) -> Self { - let thread = tokio::spawn(async move { - let span = info_span!("mirroring", table_id = ?iroh_doc.id()); - let _guard = span.enter(); - let mut stream = iroh_doc.subscribe().await.unwrap(); - let mut wait_list = HashMap::new(); - info!("started"); - while let Some(event) = stream.next().await { - let event = event.unwrap(); - match &event { - LiveEvent::InsertLocal { entry } => { - info!(event = ?event); - let mut reader = match entry.content_reader(&sync_client).await { - Ok(reader) => reader, - Err(error) => { - warn!(error = ?error); - continue; - } - }; - let mut buffer = vec![]; - if let Err(error) = reader.read_to_end(&mut buffer).await { - warn!(error = ?error); - continue; - } - for sink in &sinks { - if let Err(error) = sink.send(entry.key(), buffer.clone()).await { + let thread = tokio::spawn( + async move { + let mut stream = iroh_doc.subscribe().await.unwrap(); + let mut wait_list = HashMap::new(); + info!("started"); + while let Some(event) = stream.next().await { + let event = event.unwrap(); + match &event { + LiveEvent::InsertLocal { entry } => { + info!(event = ?event); + let mut reader = match entry.content_reader(&sync_client).await { + Ok(reader) => reader, + Err(error) => { + warn!(error = ?error); + continue; + } + }; + let mut buffer = vec![]; + if let Err(error) = reader.read_to_end(&mut buffer).await { warn!(error = ?error); continue; } - } - } - LiveEvent::InsertRemote { entry, .. } => { - info!(event = ?event); - wait_list.insert(entry.content_hash(), entry.key().to_vec()); - } - LiveEvent::ContentReady { hash } => { - info!(event = ?event); - let Some(key) = wait_list.remove(hash) else { - warn!(error = "missing_key_in_wait_list"); - continue; - }; - match sync_client.blobs.read(*hash).await { - Ok(mut reader) => { - let mut buffer = vec![]; - if let Err(error) = reader.read_to_end(&mut buffer).await { + for sink in &sinks { + if let Err(error) = sink.send(entry.key(), buffer.clone()).await { warn!(error = ?error); continue; } - for sink in &sinks { - info!(action = "send", sink = sink.name()); - if let Err(error) = sink.send(&key, buffer.clone()).await { + } + } + LiveEvent::InsertRemote { entry, .. } => { + info!(event = ?event); + wait_list.insert(entry.content_hash(), entry.key().to_vec()); + } + LiveEvent::ContentReady { hash } => { + info!(event = ?event); + let Some(key) = wait_list.remove(hash) else { + warn!(error = "missing_key_in_wait_list"); + continue; + }; + match sync_client.blobs.read(*hash).await { + Ok(mut reader) => { + let mut buffer = vec![]; + if let Err(error) = reader.read_to_end(&mut buffer).await { warn!(error = ?error); continue; } - } - if delete_after_mirroring { - if let Err(error) = sync_client.blobs.delete_blob(*hash).await { - warn!(error = ?error); - continue; + for sink in &sinks { + info!(action = "send", sink = sink.name()); + if let Err(error) = sink.send(&key, buffer.clone()).await { + warn!(error = ?error); + continue; + } + } + if delete_after_mirroring { + if let Err(error) = + sync_client.blobs.delete_blob(*hash).await + { + warn!(error = ?error); + continue; + } } } + Err(error) => warn!(error = ?error), } - Err(error) => warn!(error = ?error), } + _ => {} } - _ => {} } + warn!("stopped_mirroring"); + Ok(()) } - warn!("stopped_mirroring"); - Ok(()) - }); + .instrument(info_span!("mirroring", table_id = ?iroh_doc.id().to_string())), + ); Self { thread: Arc::new(thread), }