Skip to content

Commit

Permalink
feat: Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Feb 18, 2024
1 parent d4b30ba commit f61a850
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 117 deletions.
5 changes: 2 additions & 3 deletions examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ iroh:
max_rpc_connections: 32
max_rpc_streams: 1024
sinks: {}
fs_storages:
storages:
default:
replicas: 1
fs_shards:
shards:
- name: shard1
path: /tmp/trident/data/shard1
weight: 1
Expand Down
7 changes: 3 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ fn return_true() -> bool {

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct StorageEngineConfig {
pub replicas: u8,
pub fs_shards: Vec<ShardConfig>,
pub shards: Vec<ShardConfig>,
#[serde(default = "return_false")]
pub is_import_missing_enabled: bool,
}
Expand Down Expand Up @@ -83,7 +82,7 @@ pub struct IrohConfig {
#[serde(default = "HashMap::new")]
pub sinks: HashMap<String, SinkConfig>,
#[serde(default = "HashMap::new")]
pub fs_storages: HashMap<String, StorageEngineConfig>,
pub storages: HashMap<String, StorageEngineConfig>,
pub gc_interval_secs: Option<u64>,
}

Expand All @@ -103,7 +102,7 @@ pub async fn load_config(config_path: &str) -> Result<Config, Error> {
pub async fn save_config(config_path: &str, config: &Config) -> Result<(), Error> {
tokio::fs::write(
config_path,
serde_yaml::to_string(config).unwrap().as_bytes(),
serde_yaml::to_string(config).expect("unreachable").as_bytes(),
)
.await
.map_err(Error::io_error)
Expand Down
16 changes: 8 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ pub enum Error {
TableTicket { description: String },
#[error("blobs: {description}")]
Blobs { description: String },
#[error("hash_error: {description}")]
Hash { description: String },
#[error("entry_error: {description}")]
Entry { description: String },
#[error("io_error: {description}")]
Expand All @@ -39,6 +37,8 @@ pub enum Error {
ExistingTable { description: String },
#[error("storage: {description}")]
Storage { description: String },
#[error("key: {description}")]
IncorrectKey { description: String },
}

impl Error {
Expand Down Expand Up @@ -78,12 +78,6 @@ impl Error {
}
}

pub fn hash(error: impl Display) -> Self {
Error::Hash {
description: error.to_string(),
}
}

pub fn entry(error: impl Display) -> Self {
Error::Entry {
description: error.to_string(),
Expand Down Expand Up @@ -136,6 +130,12 @@ impl Error {
description: error.to_string(),
}
}

pub fn incorrect_key(error: impl Display) -> Self {
Error::IncorrectKey {
description: error.to_string(),
}
}
}

impl IntoResponse for Error {
Expand Down
26 changes: 15 additions & 11 deletions src/iroh_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::config::{Config, SinkConfig, StorageEngineConfig, TableConfig};
use crate::error::{Error, Result};
use crate::sinks::{IpfsSink, S3Sink, Sink};
use crate::storage::Storage;
use crate::utils::bytes_to_key;
use crate::IrohClient;
use async_stream::stream;
use futures::StreamExt;
Expand Down Expand Up @@ -113,7 +112,7 @@ impl IrohNode {
}

let author_id = match &config_lock.iroh.author {
Some(author) => AuthorId::from_str(author).unwrap(),
Some(author) => AuthorId::from_str(author).map_err(Error::author)?,
None => {
let author_id = sync_client.authors.create().await.map_err(Error::author)?;
config_lock.iroh.author = Some(author_id.to_string());
Expand All @@ -136,14 +135,14 @@ impl IrohNode {
}

let mut table_storages = HashMap::new();
let storage_configs = config_lock.iroh.fs_storages.clone();
let storage_configs = config_lock.iroh.storages.clone();
for (table_name, table_config) in &mut config_lock.iroh.tables {
let iroh_doc = sync_client
.docs
.open(NamespaceId::from_str(&table_config.id).unwrap())
.open(NamespaceId::from_str(&table_config.id).map_err(Error::storage)?)
.await
.map_err(Error::table)?
.unwrap();
.ok_or_else(|| Error::table(format!("{} does not exist", table_config.id)))?;
iroh_doc
.set_download_policy(table_config.download_policy.clone())
.await
Expand All @@ -169,7 +168,7 @@ impl IrohNode {
table_storages.insert(table_name.clone(), storage_engine);
}

let fs_storage_configs = config_lock.iroh.fs_storages.clone();
let fs_storage_configs = config_lock.iroh.storages.clone();

drop(config_lock);

Expand Down Expand Up @@ -396,10 +395,15 @@ impl IrohNode {
}

pub fn table_keys(&self, table_name: &str) -> Option<impl Stream<Item = Result<String>>> {
self.table_storages.get(table_name).cloned().map_or_else(|| None, |table_storage| Some(stream! {
for await el in table_storage.get_all() {
yield Ok(format!("{}\n", std::str::from_utf8(bytes_to_key(el.unwrap().key())).unwrap()))
}
}))
self.table_storages.get(table_name).cloned().map_or_else(
|| None,
|table_storage| {
Some(stream! {
for await el in table_storage.get_all() {
yield Ok(format!("{}\n", std::str::from_utf8(el.unwrap().key()).unwrap()))
}
})
},
)
}
}
16 changes: 8 additions & 8 deletions src/sinks/ipfs_sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::IpfsConfig;
use crate::error::Error;
use crate::sinks::Sink;
use crate::utils::{bytes_to_key, FRAGMENT};
use crate::utils::FRAGMENT;
use axum::async_trait;
use percent_encoding::utf8_percent_encode;
use reqwest::header::HeaderMap;
Expand Down Expand Up @@ -32,19 +32,18 @@ impl Sink for IpfsSink {
&self.name
}

async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error> {
async fn send(&self, key: &str, path: &Path) -> Result<(), Error> {
// ToDo: Remove allocating and return stream
// https://github.com/awslabs/aws-sdk-rust/discussions/361
let encoded_key =
utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT)
.collect::<String>()
.to_lowercase();
let encoded_key = utf8_percent_encode(key, FRAGMENT)
.collect::<String>()
.to_lowercase();

let mut headers = HeaderMap::new();

headers.insert("Abspath", path.to_string_lossy().parse().unwrap());

let file_part = reqwest::multipart::Part::bytes(tokio::fs::read(path).await.unwrap())
let file_part = reqwest::multipart::Part::bytes(tokio::fs::read(path).await.map_err(Error::io_error)?)
.file_name(encoded_key)
.headers(headers)
.mime_str("application/octet-stream")
Expand All @@ -61,7 +60,8 @@ impl Sink for IpfsSink {
.await
.map_err(Error::sink)?;
if !res.status().is_success() {
return Err(Error::sink(res.text().await.unwrap()));
let res_text = res.text().await.map_err(Error::sink)?;
return Err(Error::sink(res_text));
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ pub use s3_sink::S3Sink;
#[async_trait]
pub trait Sink: Send + Sync {
fn name(&self) -> &str;
async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error>;
async fn send(&self, key: &str, path: &Path) -> Result<(), Error>;
}
11 changes: 5 additions & 6 deletions src/sinks/s3_sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::S3Config;
use crate::error::Error;
use crate::sinks::Sink;
use crate::utils::{bytes_to_key, FRAGMENT};
use crate::utils::FRAGMENT;
use aws_credential_types::Credentials;
use aws_sdk_s3::config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
Expand Down Expand Up @@ -52,13 +52,12 @@ impl Sink for S3Sink {
&self.name
}

async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error> {
async fn send(&self, key: &str, path: &Path) -> Result<(), Error> {
// ToDo: Remove allocating and return stream
// https://github.com/awslabs/aws-sdk-rust/discussions/361
let encoded_key =
utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT)
.collect::<String>()
.to_lowercase();
let encoded_key = utf8_percent_encode(key, FRAGMENT)
.collect::<String>()
.to_lowercase();
let body = ByteStream::from_path(Path::new(path))
.await
.map_err(Error::sink)?;
Expand Down
Loading

0 comments on commit f61a850

Please sign in to comment.