diff --git a/Cargo.lock b/Cargo.lock index c44ae9d..f46d22f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4739,6 +4739,7 @@ dependencies = [ "range-collections", "redb 2.1.2", "serde", + "serde_json", "serde_yaml", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6a205e3..7a0de0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ percent-encoding = "2.3.1" rand = "0.9.0-alpha.1" redb = { version = "2.1.2", features = ["logging"] } serde = "1.0" +serde_json = "1.0" serde_yaml = "0.9" thiserror = "1.0" tokio = { version = "1", features = ["full"] } diff --git a/bin/main.rs b/bin/main.rs index 8172862..daa0f8e 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -95,6 +95,13 @@ struct TablesForeignInsertRequest { to_key: String, } +#[derive(Deserialize)] +struct TablesHashInsertRequest { + hash: String, + to_table: String, + to_key: String, +} + #[derive(Clone)] struct AppState { iroh_node: Arc>, @@ -152,8 +159,9 @@ async fn app() -> Result<(), Error> { let mut router = Router::new() .route("/blobs/:hash", get(blobs_get)) .route("/tables/", get(tables_ls)) - .route("/tables/foreign_insert/", post(table_foreign_insert)) .route("/tables/:table/", post(tables_create)) + .route("/tables/foreign_insert/", post(tables_foreign_insert)) + .route("/tables/hash_insert/", post(tables_hash_insert)) .route("/tables/:table/exists/", get(tables_exists)) .route("/tables/:table/peers/", get(tables_peers)) .route("/tables/:table/", delete(tables_drop)) @@ -179,7 +187,6 @@ async fn app() -> Result<(), Error> { .layer(CorsLayer::permissive()) .with_state(state); - // run our app with hyper, listening globally on port 3000 let listener = tokio::net::TcpListener::bind(&config.read().await.http.endpoint) .await .map_err(Error::io_error)?; @@ -479,7 +486,7 @@ async fn table_insert( } } -async fn table_foreign_insert( +async fn tables_foreign_insert( State(state): State, Query(query): Query, ) -> Response { @@ -502,6 +509,28 @@ async fn table_foreign_insert( } } +async fn tables_hash_insert( + State(state): State, + Query(query): Query, +) -> Response { + match state + .iroh_node + .read() + .await + .tables_hash_insert( + &query.hash, + &query.to_table, + &query.to_key, + ) + .await + { + Ok(hash) => Response::builder() + .body(Body::from(hash.to_string())) + .expect("Can't build response"), + Err(e) => e.into_response(), + } +} + async fn table_root_get( State(state): State, method: Method, diff --git a/src/iroh_node.rs b/src/iroh_node.rs index 85d3829..f5b1425 100644 --- a/src/iroh_node.rs +++ b/src/iroh_node.rs @@ -12,6 +12,7 @@ use iroh::net::relay::{RelayMap, RelayMode, RelayNode}; use iroh::node::{GcPolicy, Node}; use iroh_base::hash::Hash; use iroh_base::node_addr::NodeAddr; +use serde_json::json; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::{Ipv4Addr, SocketAddrV4}; @@ -375,6 +376,25 @@ impl IrohNode { Ok(from_hash) } + pub async fn tables_hash_insert( + &self, + hash: &str, + to_table_name: &str, + to_key: &str, + ) -> Result { + let Some(to_table) = self.tables.get(to_table_name) else { + return Err(Error::missing_table(to_table_name)); + }; + let hash = Hash::from_str(hash).map_err(Error::blobs)?; + match self.blobs_get(hash.clone()).await? { + Some((_, size)) => { + to_table.insert_hash(to_key, hash, size).await?; + Ok(hash) + } + None => Err(Error::missing_key(hash)) + } + } + pub async fn table_share(&self, table_name: &str, mode: ShareMode) -> Result { match self.tables.get(table_name) { Some(table) => Ok(table.share(mode).await?), @@ -404,7 +424,15 @@ impl IrohNode { |table| { Some(stream! { for await el in table.get_all() { - yield Ok(format!("{}\n", std::str::from_utf8(el.expect("Can't extract document").key()).expect("Not utf8 symbol"))) + let entry = el.expect("Can't extract document"); + let key = std::str::from_utf8(entry.key()).expect("Not utf8 symbol"); + let hash_str = entry.content_hash().to_string(); + let data = json!({ + "key": key, + "hash": hash_str, + "size": entry.content_len(), + }); + yield Ok(format!("{}\n", data.to_string())) } }) }, @@ -415,12 +443,21 @@ impl IrohNode { &self, hash: Hash, ) -> Result, u64)>> { - let blob_reader = self.node.blobs().read(hash).await.map_err(Error::blobs)?; - if !blob_reader.is_complete() { - return Ok(None); + match self.node.blobs().read(hash).await { + Ok(blob_reader) => { + if !blob_reader.is_complete() { + return Ok(None); + } + let file_size = blob_reader.size(); + Ok(Some((Box::new(blob_reader), file_size))) + } + Err(e) => { + if e.to_string() == "Blob not found" { + return Ok(None); + } + Err(Error::blobs(e)) + } } - let file_size = blob_reader.size(); - Ok(Some((Box::new(blob_reader), file_size))) } pub async fn send_shutdown(&self) -> Result<()> { diff --git a/src/table.rs b/src/table.rs index 84315b8..cfc1fb9 100644 --- a/src/table.rs +++ b/src/table.rs @@ -227,18 +227,17 @@ impl Table { entry = read_dir_stream.next_entry() => { let entry = entry.map_err(Error::io_error)?; if let Some(entry) = entry { - let key = key_to_bytes(&entry.file_name().to_string_lossy()); - if all_keys.contains(&key) || key.starts_with(&[b'~']) { + let key_bytes = key_to_bytes(&entry.file_name().to_string_lossy()); + if all_keys.contains(&key_bytes) || key_bytes.starts_with(&[b'~']) { continue; } let import_threads_task_tracker0 = import_threads_task_tracker0.clone(); - let base_path = base_path.clone(); let iroh_doc = iroh_doc.clone(); pool.spawn(async move { if import_threads_task_tracker0.is_closed() { return } - let join_handle = import_threads_task_tracker0.spawn(Table::import_existing_file(iroh_doc.clone(), table.author_id.clone(), key, entry.path()).instrument(info_span!(parent: None, "import_missing", shard = ?base_path))); + let join_handle = import_threads_task_tracker0.spawn(Table::import_existing_file(iroh_doc.clone(), table.author_id.clone(), key_bytes, entry.path()).instrument(info_span!(parent: None, "import_missing"))); if let Err(error) = join_handle.await { error!( error = ?error, @@ -276,19 +275,19 @@ impl Table { { Ok(import_progress) => import_progress, Err(error) => { - error!(error = ?error, path = ?path, "import_existing_file_progress_error"); + warn!(error = ?error, path = ?path, "import_existing_file_progress_error"); return false; } }; if let Err(error) = import_progress.finish().await.map_err(Error::storage) { - error!( + warn!( error = ?error, path = ?path, "import_existing_file_progress_error" ); return false; } - info!(action = "imported_existing_file"); + info!(action = "imported_existing_file", path = ?path); return true; } diff --git a/trident-py/pyproject.toml b/trident-py/pyproject.toml index b29a7ab..e5dfcc6 100644 --- a/trident-py/pyproject.toml +++ b/trident-py/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "izihawa-trident-client" -version = "1.3.12" +version = "1.4.1" description = "" authors = ["Pasha Podolsky "] readme = "README.md" diff --git a/trident-py/trident/client.py b/trident-py/trident/client.py index 49fc530..a452cf0 100644 --- a/trident-py/trident/client.py +++ b/trident-py/trident/client.py @@ -112,7 +112,7 @@ async def table_delete(self, table: str, key: str): await self.delete(url) async def table_foreign_insert(self, from_table: str, from_key: str, to_table: str, to_key: str) -> bytes: - url = f"/tables/foreign_insert/" + url = "/tables/foreign_insert/" response = await self.post(url, params={ 'from_table': from_table, 'from_key': from_key, @@ -121,6 +121,15 @@ async def table_foreign_insert(self, from_table: str, from_key: str, to_table: s }) return await response.read() + async def table_hash_insert(self, hash: str, to_table: str, to_key: str) -> bytes: + url = "/tables/hash_insert/" + response = await self.post(url, params={ + 'hash': hash, + 'to_table': to_table, + 'to_key': to_key, + }) + return await response.read() + async def table_get(self, table: str, key: str, timeout: float = None) -> dict | None: url = f"/tables/{table}/{key}" response = await self.get(url, timeout=timeout) @@ -139,10 +148,10 @@ async def table_get_chunks(self, table: str, key: str, timeout: float = None) -> async for data, _ in response.content.iter_chunks(): yield data - async def table_ls(self, table) -> typing.AsyncGenerator[str, None]: + async def table_ls(self, table) -> typing.AsyncGenerator[dict, None]: response = await self.get(f"/tables/{table}/") async for line in response.content: - yield line.decode()[:-1] + yield json.loads(line.decode()) async def table_exists(self, table: str, key: str) -> dict | None: url = f"/tables/{table}/{key}"