Skip to content

Commit

Permalink
[development] Add new method for inseting hashes
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Sep 10, 2024
1 parent 9e5a302 commit 884f62e
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ percent-encoding = "2.3.1"
rand = "0.9.0-alpha.1"
redb = { version = "2.1.2", features = ["logging"] }
serde = "1.0"
serde_json = "1.0"
serde_yaml = "0.9"
thiserror = "1.0"
tokio = { version = "1", features = ["full"] }
Expand Down
35 changes: 32 additions & 3 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ struct TablesForeignInsertRequest {
to_key: String,
}

#[derive(Deserialize)]
struct TablesHashInsertRequest {
hash: String,
to_table: String,
to_key: String,
}

#[derive(Clone)]
struct AppState {
iroh_node: Arc<RwLock<IrohNode>>,
Expand Down Expand Up @@ -152,8 +159,9 @@ async fn app() -> Result<(), Error> {
let mut router = Router::new()
.route("/blobs/:hash", get(blobs_get))
.route("/tables/", get(tables_ls))
.route("/tables/foreign_insert/", post(table_foreign_insert))
.route("/tables/:table/", post(tables_create))
.route("/tables/foreign_insert/", post(tables_foreign_insert))
.route("/tables/hash_insert/", post(tables_hash_insert))
.route("/tables/:table/exists/", get(tables_exists))
.route("/tables/:table/peers/", get(tables_peers))
.route("/tables/:table/", delete(tables_drop))
Expand All @@ -179,7 +187,6 @@ async fn app() -> Result<(), Error> {
.layer(CorsLayer::permissive())
.with_state(state);

// run our app with hyper, listening globally on port 3000
let listener = tokio::net::TcpListener::bind(&config.read().await.http.endpoint)
.await
.map_err(Error::io_error)?;
Expand Down Expand Up @@ -479,7 +486,7 @@ async fn table_insert(
}
}

async fn table_foreign_insert(
async fn tables_foreign_insert(
State(state): State<AppState>,
Query(query): Query<TablesForeignInsertRequest>,
) -> Response {
Expand All @@ -502,6 +509,28 @@ async fn table_foreign_insert(
}
}

async fn tables_hash_insert(
State(state): State<AppState>,
Query(query): Query<TablesHashInsertRequest>,
) -> Response {
match state
.iroh_node
.read()
.await
.tables_hash_insert(
&query.hash,
&query.to_table,
&query.to_key,
)
.await
{
Ok(hash) => Response::builder()
.body(Body::from(hash.to_string()))
.expect("Can't build response"),
Err(e) => e.into_response(),
}
}

async fn table_root_get(
State(state): State<AppState>,
method: Method,
Expand Down
49 changes: 43 additions & 6 deletions src/iroh_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use iroh::net::relay::{RelayMap, RelayMode, RelayNode};
use iroh::node::{GcPolicy, Node};
use iroh_base::hash::Hash;
use iroh_base::node_addr::NodeAddr;
use serde_json::json;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::{Ipv4Addr, SocketAddrV4};
Expand Down Expand Up @@ -375,6 +376,25 @@ impl IrohNode {
Ok(from_hash)
}

pub async fn tables_hash_insert(
&self,
hash: &str,
to_table_name: &str,
to_key: &str,
) -> Result<Hash> {
let Some(to_table) = self.tables.get(to_table_name) else {
return Err(Error::missing_table(to_table_name));
};
let hash = Hash::from_str(hash).map_err(Error::blobs)?;
match self.blobs_get(hash.clone()).await? {
Some((_, size)) => {
to_table.insert_hash(to_key, hash, size).await?;
Ok(hash)
}
None => Err(Error::missing_key(hash))
}
}

pub async fn table_share(&self, table_name: &str, mode: ShareMode) -> Result<DocTicket> {
match self.tables.get(table_name) {
Some(table) => Ok(table.share(mode).await?),
Expand Down Expand Up @@ -404,7 +424,15 @@ impl IrohNode {
|table| {
Some(stream! {
for await el in table.get_all() {
yield Ok(format!("{}\n", std::str::from_utf8(el.expect("Can't extract document").key()).expect("Not utf8 symbol")))
let entry = el.expect("Can't extract document");
let key = std::str::from_utf8(entry.key()).expect("Not utf8 symbol");
let hash_str = entry.content_hash().to_string();
let data = json!({
"key": key,
"hash": hash_str,
"size": entry.content_len(),
});
yield Ok(format!("{}\n", data.to_string()))
}
})
},
Expand All @@ -415,12 +443,21 @@ impl IrohNode {
&self,
hash: Hash,
) -> Result<Option<(Box<dyn AsyncRead + Unpin + Send>, u64)>> {
let blob_reader = self.node.blobs().read(hash).await.map_err(Error::blobs)?;
if !blob_reader.is_complete() {
return Ok(None);
match self.node.blobs().read(hash).await {
Ok(blob_reader) => {
if !blob_reader.is_complete() {
return Ok(None);
}
let file_size = blob_reader.size();
Ok(Some((Box::new(blob_reader), file_size)))
}
Err(e) => {
if e.to_string() == "Blob not found" {
return Ok(None);
}
Err(Error::blobs(e))
}
}
let file_size = blob_reader.size();
Ok(Some((Box::new(blob_reader), file_size)))
}

pub async fn send_shutdown(&self) -> Result<()> {
Expand Down
13 changes: 6 additions & 7 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,18 +227,17 @@ impl Table {
entry = read_dir_stream.next_entry() => {
let entry = entry.map_err(Error::io_error)?;
if let Some(entry) = entry {
let key = key_to_bytes(&entry.file_name().to_string_lossy());
if all_keys.contains(&key) || key.starts_with(&[b'~']) {
let key_bytes = key_to_bytes(&entry.file_name().to_string_lossy());
if all_keys.contains(&key_bytes) || key_bytes.starts_with(&[b'~']) {
continue;
}
let import_threads_task_tracker0 = import_threads_task_tracker0.clone();
let base_path = base_path.clone();
let iroh_doc = iroh_doc.clone();
pool.spawn(async move {
if import_threads_task_tracker0.is_closed() {
return
}
let join_handle = import_threads_task_tracker0.spawn(Table::import_existing_file(iroh_doc.clone(), table.author_id.clone(), key, entry.path()).instrument(info_span!(parent: None, "import_missing", shard = ?base_path)));
let join_handle = import_threads_task_tracker0.spawn(Table::import_existing_file(iroh_doc.clone(), table.author_id.clone(), key_bytes, entry.path()).instrument(info_span!(parent: None, "import_missing")));
if let Err(error) = join_handle.await {
error!(
error = ?error,
Expand Down Expand Up @@ -276,19 +275,19 @@ impl Table {
{
Ok(import_progress) => import_progress,
Err(error) => {
error!(error = ?error, path = ?path, "import_existing_file_progress_error");
warn!(error = ?error, path = ?path, "import_existing_file_progress_error");
return false;
}
};
if let Err(error) = import_progress.finish().await.map_err(Error::storage) {
error!(
warn!(
error = ?error,
path = ?path,
"import_existing_file_progress_error"
);
return false;
}
info!(action = "imported_existing_file");
info!(action = "imported_existing_file", path = ?path);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion trident-py/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "izihawa-trident-client"
version = "1.3.12"
version = "1.4.1"
description = ""
authors = ["Pasha Podolsky <[email protected]>"]
readme = "README.md"
Expand Down
15 changes: 12 additions & 3 deletions trident-py/trident/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def table_delete(self, table: str, key: str):
await self.delete(url)

async def table_foreign_insert(self, from_table: str, from_key: str, to_table: str, to_key: str) -> bytes:
url = f"/tables/foreign_insert/"
url = "/tables/foreign_insert/"
response = await self.post(url, params={
'from_table': from_table,
'from_key': from_key,
Expand All @@ -121,6 +121,15 @@ async def table_foreign_insert(self, from_table: str, from_key: str, to_table: s
})
return await response.read()

async def table_hash_insert(self, hash: str, to_table: str, to_key: str) -> bytes:
url = "/tables/hash_insert/"
response = await self.post(url, params={
'hash': hash,
'to_table': to_table,
'to_key': to_key,
})
return await response.read()

async def table_get(self, table: str, key: str, timeout: float = None) -> dict | None:
url = f"/tables/{table}/{key}"
response = await self.get(url, timeout=timeout)
Expand All @@ -139,10 +148,10 @@ async def table_get_chunks(self, table: str, key: str, timeout: float = None) ->
async for data, _ in response.content.iter_chunks():
yield data

async def table_ls(self, table) -> typing.AsyncGenerator[str, None]:
async def table_ls(self, table) -> typing.AsyncGenerator[dict, None]:
response = await self.get(f"/tables/{table}/")
async for line in response.content:
yield line.decode()[:-1]
yield json.loads(line.decode())

async def table_exists(self, table: str, key: str) -> dict | None:
url = f"/tables/{table}/{key}"
Expand Down

0 comments on commit 884f62e

Please sign in to comment.