Skip to content

Commit

Permalink
[fix] Logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Jan 19, 2024
1 parent b9a3cf8 commit b088ce4
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 77 deletions.
8 changes: 1 addition & 7 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,7 @@ async fn table_get(
State(state): State<AppState>,
Path((table, key)): Path<(String, String)>,
) -> Response {
match state
.iroh_node
.read()
.await
.table_get(&table, &key)
.await
{
match state.iroh_node.read().await.table_get(&table, &key).await {
Ok(reader) => Response::builder()
.body(Body::from_stream(ReaderStream::new(reader)))
.unwrap(),
Expand Down
8 changes: 6 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ pub struct FSShardConfig {
pub weight: usize,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct S3Config {
pub struct S3ConfigCredentials {
pub aws_access_key_id: String,
pub aws_secret_access_key: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct S3Config {
pub credentials: Option<S3ConfigCredentials>,
pub bucket_name: String,
pub prefix: String,
pub region_name: String,
Expand Down
30 changes: 20 additions & 10 deletions src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,26 @@ pub struct S3Sink {

impl S3Sink {
pub async fn new(name: &str, config: &S3Config) -> Self {
let credentials = Credentials::from_keys(
&config.aws_access_key_id,
&config.aws_secret_access_key,
None,
);
let sdk_config = aws_config::defaults(BehaviorVersion::latest())
.credentials_provider(credentials)
.region(Region::new(config.region_name.clone()))
.load()
.await;
let sdk_config = match &config.credentials {
Some(credentials) => {
let credentials = Credentials::from_keys(
&credentials.aws_access_key_id,
&credentials.aws_secret_access_key,
None,
);
aws_config::defaults(BehaviorVersion::latest())
.credentials_provider(credentials)
.region(Region::new(config.region_name.clone()))
.load()
.await
}
None => {
aws_config::defaults(BehaviorVersion::latest())
.region(Region::new(config.region_name.clone()))
.load()
.await
}
};
let client = aws_sdk_s3::Client::new(&sdk_config);
S3Sink {
name: name.to_string(),
Expand Down
6 changes: 3 additions & 3 deletions src/storages/fs_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::collections::HashMap;
use std::io;
use std::path::PathBuf;
use tokio::io::AsyncRead;
use tracing::{info, info_span};
use tracing::{info, info_span, Instrument};

#[derive(Clone)]
pub struct FSStorageEngine {
Expand Down Expand Up @@ -44,8 +44,6 @@ impl FSStorageEngine {
tokio::spawn({
let fs_storage = fs_storage.clone();
async move {
let span = info_span!("fs_sync", table_id = ?iroh_doc.id());
let _guard = span.enter();
let mut stream = fs_storage.iroh_doc().subscribe().await.unwrap();
let mut wait_list = HashMap::new();
info!("started");
Expand Down Expand Up @@ -85,6 +83,7 @@ impl FSStorageEngine {
}
Ok::<(), Error>(())
}
.instrument(info_span!("fs_sync", table_id = ?iroh_doc.id().to_string()))
});
for (_, fs_shard) in &fs_storage.fs_shards {
tokio::spawn({
Expand Down Expand Up @@ -118,6 +117,7 @@ impl FSStorageEngine {
}
Ok::<(), Error>(())
}
.instrument(info_span!("restore"))
});
}
Ok(fs_storage)
Expand Down
113 changes: 58 additions & 55 deletions src/storages/mirroring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
use tracing::{info, info_span, warn};
use tracing::{info, info_span, warn, Instrument};

#[derive(Clone)]
pub struct Mirroring {
Expand All @@ -22,76 +22,79 @@ impl Mirroring {
sync_client: IrohClient,
delete_after_mirroring: bool,
) -> Self {
let thread = tokio::spawn(async move {
let span = info_span!("mirroring", table_id = ?iroh_doc.id());
let _guard = span.enter();
let mut stream = iroh_doc.subscribe().await.unwrap();
let mut wait_list = HashMap::new();
info!("started");
while let Some(event) = stream.next().await {
let event = event.unwrap();
match &event {
LiveEvent::InsertLocal { entry } => {
info!(event = ?event);
let mut reader = match entry.content_reader(&sync_client).await {
Ok(reader) => reader,
Err(error) => {
warn!(error = ?error);
continue;
}
};
let mut buffer = vec![];
if let Err(error) = reader.read_to_end(&mut buffer).await {
warn!(error = ?error);
continue;
}
for sink in &sinks {
if let Err(error) = sink.send(entry.key(), buffer.clone()).await {
let thread = tokio::spawn(
async move {
let mut stream = iroh_doc.subscribe().await.unwrap();
let mut wait_list = HashMap::new();
info!("started");
while let Some(event) = stream.next().await {
let event = event.unwrap();
match &event {
LiveEvent::InsertLocal { entry } => {
info!(event = ?event);
let mut reader = match entry.content_reader(&sync_client).await {
Ok(reader) => reader,
Err(error) => {
warn!(error = ?error);
continue;
}
};
let mut buffer = vec![];
if let Err(error) = reader.read_to_end(&mut buffer).await {
warn!(error = ?error);
continue;
}
}
}
LiveEvent::InsertRemote { entry, .. } => {
info!(event = ?event);
wait_list.insert(entry.content_hash(), entry.key().to_vec());
}
LiveEvent::ContentReady { hash } => {
info!(event = ?event);
let Some(key) = wait_list.remove(hash) else {
warn!(error = "missing_key_in_wait_list");
continue;
};
match sync_client.blobs.read(*hash).await {
Ok(mut reader) => {
let mut buffer = vec![];
if let Err(error) = reader.read_to_end(&mut buffer).await {
for sink in &sinks {
if let Err(error) = sink.send(entry.key(), buffer.clone()).await {
warn!(error = ?error);
continue;
}
for sink in &sinks {
info!(action = "send", sink = sink.name());
if let Err(error) = sink.send(&key, buffer.clone()).await {
}
}
LiveEvent::InsertRemote { entry, .. } => {
info!(event = ?event);
wait_list.insert(entry.content_hash(), entry.key().to_vec());
}
LiveEvent::ContentReady { hash } => {
info!(event = ?event);
let Some(key) = wait_list.remove(hash) else {
warn!(error = "missing_key_in_wait_list");
continue;
};
match sync_client.blobs.read(*hash).await {
Ok(mut reader) => {
let mut buffer = vec![];
if let Err(error) = reader.read_to_end(&mut buffer).await {
warn!(error = ?error);
continue;
}
}
if delete_after_mirroring {
if let Err(error) = sync_client.blobs.delete_blob(*hash).await {
warn!(error = ?error);
continue;
for sink in &sinks {
info!(action = "send", sink = sink.name());
if let Err(error) = sink.send(&key, buffer.clone()).await {
warn!(error = ?error);
continue;
}
}
if delete_after_mirroring {
if let Err(error) =
sync_client.blobs.delete_blob(*hash).await
{
warn!(error = ?error);
continue;
}
}
}
Err(error) => warn!(error = ?error),
}
Err(error) => warn!(error = ?error),
}
_ => {}
}
_ => {}
}
warn!("stopped_mirroring");
Ok(())
}
warn!("stopped_mirroring");
Ok(())
});
.instrument(info_span!("mirroring", table_id = ?iroh_doc.id().to_string())),
);
Self {
thread: Arc::new(thread),
}
Expand Down

0 comments on commit b088ce4

Please sign in to comment.