diff --git a/Cargo.lock b/Cargo.lock index 068544e..a820d3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff" dependencies = [ "cfg-if", "once_cell", @@ -92,9 +92,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2faccea4cc4ab4a667ce676a30e8ec13922a692c99bb8f5b11f1502c72e04220" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" [[package]] name = "anstyle-parse" @@ -234,9 +234,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b30c39ebe61f75d1b3785362b1586b41991873c9ab3e317a9181c246fb71d82" +checksum = "7af266887e24cd5f6d2ea7433cacd25dcd4773b7f70e488701968a7cdf51df57" dependencies = [ "aws-credential-types", "aws-runtime", @@ -264,9 +264,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33cc49dcdd31c8b6e79850a179af4c367669150c7ac0135f176c61bec81a70f7" +checksum = "2d56f287a9e65e4914bfedb5b22c056b65e4c232fca512d5509a9df36386759f" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -276,9 +276,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb031bff99877c26c28895766f7bb8484a05e24547e370768d6cc9db514662aa" +checksum = "2d6a29eca8ea8982028a4df81883e7001e250a21d323b86418884b5345950a4b" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -300,9 +300,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.14.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "951f7730f51a2155c711c85c79f337fbc02a577fa99d2a0a8059acfce5392113" +checksum = "c977e92277652aefb9a76a0fca652b26757d6845dce0d7bf4426da80f13d85b0" dependencies = [ "aws-credential-types", "aws-runtime", @@ -329,9 +329,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f486420a66caad72635bc2ce0ff6581646e0d32df02aa39dc983bfe794955a5b" +checksum = "e2d7f527c7b28af1a641f7d89f9e6a4863e8ec00f39d2b731b056fc5ec5ce829" dependencies = [ "aws-credential-types", "aws-runtime", @@ -351,9 +351,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ddccf01d82fce9b4a15c8ae8608211ee7db8ed13a70b514bbfe41df3d24841" +checksum = "0d0be3224cd574ee8ab5fd7c32087876f25c134c27ac603fcb38669ed8d346b0" dependencies = [ "aws-credential-types", "aws-runtime", @@ -373,9 +373,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a591f8c7e6a621a501b2b5d2e88e1697fcb6274264523a6ad4d5959889a41ce" +checksum = "5b3167c60d82a13bbaef569da06041644ff41e85c6377e5dad53fa2526ccfe9d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -396,9 +396,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c371c6b0ac54d4605eb6f016624fb5c7c2925d315fdf600ac1bf21b19d5f1742" +checksum = "54b1cbe0eee57a213039088dbdeca7be9352f24e0d72332d961e8a1cb388f82d" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -425,9 +425,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72ee2d09cce0ef3ae526679b522835d63e75fb427aca5413cd371e490d52dcc6" +checksum = "426a5bc369ca7c8d3686439e46edc727f397a47ab3696b13f3ae8c81b3b36132" dependencies = [ "futures-util", "pin-project-lite", @@ -436,9 +436,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be2acd1b9c6ae5859999250ed5a62423aedc5cf69045b844432de15fa2f31f2b" +checksum = "6ee554133eca2611b66d23548e48f9b44713befdb025ab76bc00185b878397a1" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -468,9 +468,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dab56aea3cd9e1101a0a999447fb346afb680ab1406cebc44b32346e25b4117d" +checksum = "85d6a0619f7b67183067fa3b558f94f90753da2df8c04aeb7336d673f804b0b8" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -489,18 +489,18 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd3898ca6518f9215f62678870064398f00031912390efd03f1f6ef56d83aa8e" +checksum = "a1c1b5186b6f5c579bf0de1bcca9dd3d946d6d51361ea1d18131f6a0b64e13ae" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-query" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda4b1dfc9810e35fba8a620e900522cd1bd4f9578c446e82f49d1ce41d2e9f9" +checksum = "1c0a2ce65882e788d2cf83ff28b9b16918de0460c47bf66c5da4f6c17b4c9694" dependencies = [ "aws-smithy-types", "urlencoding", @@ -508,9 +508,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fafdab38f40ad7816e7da5dec279400dd505160780083759f01441af1bbb10ea" +checksum = "b4cb6b3afa5fc9825a75675975dcc3e21764b5476bc91dbc63df4ea3d30a576e" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -533,9 +533,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c18276dd28852f34b3bf501f4f3719781f4999a51c7bff1a5c6dc8c4529adc29" +checksum = "23165433e80c04e8c09cee66d171292ae7234bae05fa9d5636e33095eae416b2" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -549,9 +549,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb3e134004170d3303718baa2a4eb4ca64ee0a1c0a7041dca31b38be0fb414f3" +checksum = "c94a5bec34850b92c9a054dad57b95c1d47f25125f55973e19f6ad788f0381ff" dependencies = [ "base64-simd", "bytes", @@ -572,18 +572,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8604a11b25e9ecaf32f9aa56b9fe253c5e2f606a3477f0071e96d3155a5ed218" +checksum = "d16f94c9673412b7a72e3c3efec8de89081c320bf59ea12eed34c417a62ad600" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.1.4" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "789bbe008e65636fe1b6dbbb374c40c8960d1232b96af5ff4aec349f9c4accf4" +checksum = "0ff7e122ee50ca962e9de91f5850cc37e2184b1219611eef6d44aa85929b54f6" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -845,9 +845,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.33" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", @@ -871,9 +871,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.18" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e578d6ec4194633722ccf9544794b71b1385c3c027efe0c55db226fc880865c" +checksum = "80c21025abd42669a92efc996ef13cfb2c5c627858421ea58d5c3b331a6c134f" dependencies = [ "clap_builder", "clap_derive", @@ -881,9 +881,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.18" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4df4df40ec50c46000231c914968278b1eb05098cf8f1b3a518a95030e71d1c7" +checksum = "458bf1f341769dfcf849846f65dffdf9146daa56bcd2a47cb4e1de9915567c99" dependencies = [ "anstream", "anstyle", @@ -893,9 +893,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.4.7" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" +checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" dependencies = [ "heck", "proc-macro2", @@ -905,9 +905,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "clipboard-win" @@ -1067,18 +1067,18 @@ checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" [[package]] name = "crc32c" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2" dependencies = [ "rustc_version", ] [[package]] name = "crc32fast" -version = "1.3.2" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa" dependencies = [ "cfg-if", ] @@ -1180,9 +1180,9 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.1" +version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" dependencies = [ "cfg-if", "cpufeatures", @@ -1483,9 +1483,9 @@ dependencies = [ [[package]] name = "ed25519-dalek" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" dependencies = [ "curve25519-dalek", "ed25519", @@ -1510,9 +1510,9 @@ dependencies = [ [[package]] name = "either" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "elliptic-curve" @@ -1702,9 +1702,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" [[package]] name = "flume" @@ -1960,7 +1960,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.11", - "indexmap 2.2.1", + "indexmap 2.2.3", "slab", "tokio", "tokio-util", @@ -1979,7 +1979,7 @@ dependencies = [ "futures-sink", "futures-util", "http 1.0.0", - "indexmap 2.2.1", + "indexmap 2.2.3", "slab", "tokio", "tokio-util", @@ -2019,9 +2019,9 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "bd5256b483761cd23699d0da46cc6fd2ee3be420bbe6d020ae4a091e70b7e9fd" [[package]] name = "hex" @@ -2228,12 +2228,11 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", - "futures-channel", "futures-util", "http 1.0.0", "http-body 1.0.0", @@ -2241,14 +2240,13 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tracing", ] [[package]] name = "iana-time-zone" -version = "0.1.59" +version = "0.1.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -2318,9 +2316,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.1" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433de089bd45971eecf4668ee0ee8f4cec17db4f8bd8f7bc3197a6ce37aa7d9b" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -2328,9 +2326,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.7" +version = "0.17.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb28741c9db9a713d93deb3bb9515c20788cef5815265bee4980e87bde7e0f25" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" dependencies = [ "console", "instant", @@ -2388,7 +2386,7 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "iroh" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "anyhow", "bao-tree", @@ -2437,7 +2435,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "toml 0.8.8", + "toml 0.8.10", "tracing", "tracing-subscriber", "url", @@ -2447,7 +2445,7 @@ dependencies = [ [[package]] name = "iroh-base" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "anyhow", "bao-tree", @@ -2476,7 +2474,7 @@ dependencies = [ [[package]] name = "iroh-bytes" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "anyhow", "bao-tree", @@ -2514,7 +2512,7 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "anyhow", "bytes", @@ -2523,7 +2521,7 @@ dependencies = [ "ed25519-dalek", "futures", "genawaiter", - "indexmap 2.2.1", + "indexmap 2.2.3", "iroh-base", "iroh-blake3", "iroh-metrics", @@ -2568,7 +2566,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "anyhow", "erased_set", @@ -2588,7 +2586,7 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "aead", "anyhow", @@ -2663,7 +2661,7 @@ dependencies = [ [[package]] name = "iroh-sync" version = "0.12.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#4615915f9cb17ca95c39d51e720c74996ce63427" +source = "git+https://github.com/n0-computer/iroh?branch=main#983edcc0910d55035205759107fc3b318243480e" dependencies = [ "anyhow", "bytes", @@ -2714,9 +2712,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -2732,9 +2730,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -2863,6 +2861,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2871,9 +2879,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -3060,21 +3068,26 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" -version = "0.1.45" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" dependencies = [ - "autocfg", "num-traits", ] [[package]] name = "num-iter" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9" dependencies = [ "autocfg", "num-integer", @@ -3083,9 +3096,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" dependencies = [ "autocfg", "libm", @@ -3344,9 +3357,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f200d8d83c44a45b21764d1916299752ca035d15ecd46faca3e9a2a2bf6ad06" +checksum = "219c0dcc30b6a27553f9cc242972b67f75b60eb0db71f0b5462f38b058c41546" dependencies = [ "memchr", "thiserror", @@ -3355,9 +3368,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcd6ab1236bbdb3a49027e920e693192ebfe8913f6d60e294de57463a493cfde" +checksum = "22e1288dbd7786462961e69bfd4df7848c1e37e8b74303dbdab82c3a9cdd2809" dependencies = [ "pest", "pest_generator", @@ -3365,9 +3378,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a31940305ffc96863a735bef7c7994a00b325a7138fdbc5bda0f1a0476d3275" +checksum = "1381c29a877c6d34b8c176e734f35d7f7f5b3adaefe940cb4d1bb7af94678e2e" dependencies = [ "pest", "pest_meta", @@ -3378,9 +3391,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.6" +version = "2.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7ff62f5259e53b78d1af898941cdcdccfae7385cf7d793a6e55de5d05bb4b7d" +checksum = "d0934d6907f148c22a3acbda520c7eed243ad7487a30f51f6ce52b58b7077a8a" dependencies = [ "once_cell", "pest", @@ -3610,7 +3623,7 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" dependencies = [ - "toml_edit", + "toml_edit 0.21.1", ] [[package]] @@ -3983,9 +3996,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.7", "bytes", @@ -4001,6 +4014,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "once_cell", "percent-encoding", "pin-project-lite", @@ -4009,6 +4023,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-rustls", @@ -4164,9 +4179,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno", @@ -4403,7 +4418,7 @@ version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ - "indexmap 2.2.1", + "indexmap 2.2.3", "itoa", "ryu", "serde", @@ -4446,7 +4461,7 @@ version = "0.9.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e" dependencies = [ - "indexmap 2.2.1", + "indexmap 2.2.3", "itoa", "ryu", "serde", @@ -4673,9 +4688,9 @@ checksum = "9e08d8363704e6c71fc928674353e6b7c23dcea9d82d7012c8faf2a3a025f8d0" [[package]] name = "strsim" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" [[package]] name = "struct_iterable" @@ -4846,31 +4861,30 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", "rustix", "windows-sys 0.52.0", ] [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" dependencies = [ "proc-macro2", "quote", @@ -4889,12 +4903,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", "itoa", + "num-conv", "powerfmt", "serde", "time-core", @@ -4909,10 +4924,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -4933,9 +4949,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -5062,14 +5078,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.22.5", ] [[package]] @@ -5083,15 +5099,26 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.21.0" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap 2.2.3", + "toml_datetime", + "winnow 0.5.40", +] + +[[package]] +name = "toml_edit" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" +checksum = "99e68c159e8f5ba8a28c4eb7b0c0c190d77bb479047ca713270048145a9ad28a" dependencies = [ - "indexmap 2.2.1", + "indexmap 2.2.3", "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.1", ] [[package]] @@ -5231,6 +5258,7 @@ dependencies = [ "percent-encoding", "quic-rpc", "quinn", + "reqwest", "serde", "serde_yaml", "thiserror", @@ -5325,6 +5353,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -5348,9 +5385,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.10.1" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" @@ -5467,9 +5504,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -5477,9 +5514,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -5492,9 +5529,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if", "js-sys", @@ -5504,9 +5541,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5514,9 +5551,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2", "quote", @@ -5527,9 +5564,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "watchable" @@ -5545,9 +5582,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" dependencies = [ "js-sys", "wasm-bindgen", @@ -5555,9 +5592,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.3" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "widestring" @@ -5807,9 +5844,18 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.35" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + +[[package]] +name = "winnow" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1931d78a9c73861da0134f453bb1f790ce49b2e30eba8410b4b79bac72b46a2d" +checksum = "d90f4e0f530c4c69f62b80d839e9ef3855edc9cba471a160c4d692deed62b401" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 76d04d4..349d54b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,3 +38,4 @@ tracing-subscriber = "0.3" futures = "0.3" lru = "0.12.1" tokio-task-pool = "0.1.5" +reqwest = { version = "0.11.24", default-features = false, features = ["rustls", "multipart"] } diff --git a/bin/main.rs b/bin/main.rs index a4ebb7f..4c69758 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -15,18 +15,23 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::runtime::Builder; +use tokio::signal; use tokio::sync::RwLock; use tokio_util::io::{ReaderStream, StreamReader}; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; use tower_http::trace::{self, TraceLayer}; -use tracing::Level; +use tracing::{info, Level}; use tracing_subscriber::EnvFilter; -use trident_storage::config::{ - load_config, save_config, Config, MirroringConfig, SinkConfig, TableConfig, -}; +use trident_storage::config::{load_config, save_config, Config, SinkConfig, TableConfig}; use trident_storage::error::Error; use trident_storage::iroh_node::IrohNode; +fn return_true() -> bool { + true +} + /// Simple program to greet a person #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -38,16 +43,22 @@ struct Args { #[derive(Deserialize)] struct TablesCreateRequest { - storage: Option, - mirroring: Option, + storage: String, + #[serde(default)] + sinks: Vec, + #[serde(default = "return_true")] + keep_blob: bool, } #[derive(Deserialize)] struct TablesImportRequest { ticket: String, download_policy: DownloadPolicy, - storage: Option, - mirroring: Option, + storage: String, + #[serde(default)] + sinks: Vec, + #[serde(default = "return_true")] + keep_blob: bool, } #[derive(Deserialize)] @@ -83,10 +94,16 @@ struct TablesLsResponse { pub tables: HashMap, } -async fn create_app(args: Args) -> Result { +async fn create_app( + args: Args, + cancellation_token: CancellationToken, + task_tracker: TaskTracker, +) -> Result { let config = Arc::new(RwLock::new(load_config(&args.config).await?)); let state = AppState { - iroh_node: Arc::new(RwLock::new(IrohNode::new(config.clone()).await?)), + iroh_node: Arc::new(RwLock::new( + IrohNode::new(config.clone(), cancellation_token, task_tracker).await?, + )), config: config.clone(), config_path: args.config.clone(), }; @@ -96,8 +113,10 @@ async fn create_app(args: Args) -> Result { async fn app() -> Result<(), Error> { let args = Args::parse(); + let cancellation_token = CancellationToken::new(); + let task_tracker = TaskTracker::new(); - let state = create_app(args).await?; + let state = create_app(args, cancellation_token.clone(), task_tracker.clone()).await?; let config = state.config.clone(); // build our application with a route @@ -127,10 +146,41 @@ async fn app() -> Result<(), Error> { .await .unwrap(); - axum::serve(listener, app).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal(cancellation_token, task_tracker)) + .await + .unwrap(); Ok(()) } +async fn shutdown_signal(cancelation_token: CancellationToken, task_tracker: TaskTracker) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + info!("received_signal"); + cancelation_token.cancel(); + task_tracker.wait().await; + info!("stopped_tasks"); +} + fn main() -> Result<(), Error> { // initialize tracing tracing_subscriber::fmt() @@ -187,8 +237,9 @@ async fn tables_create( .await .tables_create( &table, - tables_create_request.storage.as_deref(), - tables_create_request.mirroring, + &tables_create_request.storage, + tables_create_request.sinks, + tables_create_request.keep_blob, ) .await { @@ -217,8 +268,9 @@ async fn tables_import( &table, &tables_import_request.ticket, tables_import_request.download_policy, - tables_import_request.storage.as_deref(), - tables_import_request.mirroring, + &tables_import_request.storage, + tables_import_request.sinks, + tables_import_request.keep_blob, ) .await { @@ -313,9 +365,13 @@ async fn table_get( Path((table, key)): Path<(String, String)>, ) -> Response { match state.iroh_node.read().await.table_get(&table, &key).await { - Ok(reader) => Response::builder() + Ok(Some(reader)) => Response::builder() .body(Body::from_stream(ReaderStream::new(reader))) .unwrap(), + Ok(None) => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::default()) + .unwrap(), Err(e) => e.into_response(), } } diff --git a/src/config.rs b/src/config.rs index 7d39ddf..af1dff2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,23 +7,20 @@ use std::path::PathBuf; fn return_false() -> bool { false } - -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub enum StorageEngineConfig { - FS(String), - Iroh, +fn return_true() -> bool { + true } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct FSStorageEngineConfig { +pub struct StorageEngineConfig { pub replicas: u8, - pub fs_shards: Vec, + pub fs_shards: Vec, #[serde(default = "return_false")] pub is_import_missing_enabled: bool, } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct FSShardConfig { +pub struct ShardConfig { pub name: String, pub path: PathBuf, pub weight: usize, @@ -44,17 +41,16 @@ pub struct S3Config { } #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum SinkConfig { - S3(S3Config), +pub struct IpfsConfig { + pub api_base_url: String, + pub in_place: bool, } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MirroringConfig { - #[serde(default = "Vec::new")] - pub sinks: Vec, - #[serde(default = "return_false")] - pub delete_after_mirroring: bool, +#[serde(rename_all = "snake_case")] +pub enum SinkConfig { + S3(S3Config), + Ipfs(IpfsConfig), } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -62,9 +58,11 @@ pub struct TableConfig { pub id: String, #[serde(default = "DownloadPolicy::default")] pub download_policy: DownloadPolicy, - #[serde(skip_serializing_if = "Option::is_none")] - pub mirroring: Option, - pub storage_engine: StorageEngineConfig, + #[serde(default = "Vec::new")] + pub sinks: Vec, + pub storage_name: String, + #[serde(default = "return_true")] + pub keep_blob: bool, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -85,7 +83,7 @@ pub struct IrohConfig { #[serde(default = "HashMap::new")] pub sinks: HashMap, #[serde(default = "HashMap::new")] - pub fs_storages: HashMap, + pub fs_storages: HashMap, pub gc_interval_secs: Option, } diff --git a/src/hash_ring.rs b/src/hash_ring.rs index 0726a85..0a8a7ac 100644 --- a/src/hash_ring.rs +++ b/src/hash_ring.rs @@ -1,10 +1,10 @@ -use crate::config::FSShardConfig; +use crate::config::ShardConfig; use bisection::bisect_right; use std::collections::HashMap; use std::collections::{BinaryHeap, HashSet}; use std::hash::{Hash, Hasher}; -impl Hash for FSShardConfig { +impl Hash for ShardConfig { fn hash(&self, state: &mut H) { self.path.hash(state) } @@ -13,12 +13,12 @@ impl Hash for FSShardConfig { #[derive(Clone)] pub struct HashRing { v_nodes: usize, - ring: HashMap, + ring: HashMap, sorted_keys: Vec, } impl HashRing { - pub fn with_hasher<'a>(nodes: impl Iterator) -> HashRing { + pub fn with_hasher<'a>(nodes: impl Iterator) -> HashRing { let mut new_hash_ring = HashRing { v_nodes: 160, ring: HashMap::new(), @@ -29,7 +29,7 @@ impl HashRing { } /// Adds a node to the hash ring - pub fn add_nodes<'a>(&mut self, nodes: impl Iterator) { + pub fn add_nodes<'a>(&mut self, nodes: impl Iterator) { for node in nodes { for i in 0..self.v_nodes * node.weight { let node_name = format!("{}-{}", &node.name, i); @@ -41,7 +41,7 @@ impl HashRing { self.sorted_keys = BinaryHeap::from(self.sorted_keys.clone()).into_sorted_vec(); } - pub fn range(&self, key: &str, size: usize) -> Vec<&FSShardConfig> { + pub fn range(&self, key: &str, size: usize) -> Vec<&ShardConfig> { let mut result = Vec::with_capacity(size); let mut visited = HashSet::new(); let position = if let Some(position) = self.get_pos(key) { diff --git a/src/iroh_node.rs b/src/iroh_node.rs index d2aa120..99b47e2 100644 --- a/src/iroh_node.rs +++ b/src/iroh_node.rs @@ -1,12 +1,7 @@ -use crate::config::{ - Config, FSStorageEngineConfig, MirroringConfig, SinkConfig, StorageEngineConfig, TableConfig, -}; +use crate::config::{Config, SinkConfig, StorageEngineConfig, TableConfig}; use crate::error::{Error, Result}; -use crate::sinks::{S3Sink, Sink}; -use crate::storages::fs_storage::FSStorageEngine; -use crate::storages::iroh_storage::IrohStorageEngine; -use crate::storages::mirroring::Mirroring; -use crate::storages::{Storage, StorageEngine}; +use crate::sinks::{IpfsSink, S3Sink, Sink}; +use crate::storage::Storage; use crate::utils::bytes_to_key; use crate::IrohClient; use async_stream::stream; @@ -29,6 +24,8 @@ use std::time::Duration; use tokio::io::AsyncRead; use tokio::sync::RwLock; use tokio_stream::Stream; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; use tracing::info; pub struct IrohNode { @@ -36,12 +33,18 @@ pub struct IrohNode { table_storages: HashMap, author_id: AuthorId, config: Arc>, - fs_storage_configs: HashMap, + fs_storage_configs: HashMap, sinks: HashMap>, + cancellation_token: CancellationToken, + task_tracker: TaskTracker, } impl IrohNode { - pub async fn new(config: Arc>) -> Result { + pub async fn new( + config: Arc>, + cancellation_token: CancellationToken, + task_tracker: TaskTracker, + ) -> Result { let mut config_lock = config.write().await; tokio::fs::create_dir_all(&config_lock.iroh.path) .await @@ -125,11 +128,15 @@ impl IrohNode { let sink = S3Sink::new(&name, &s3_config).await; sinks.insert(name, Arc::new(sink) as Arc) } + SinkConfig::Ipfs(ipfs_config) => { + let sink = IpfsSink::new(&name, &ipfs_config).await; + sinks.insert(name, Arc::new(sink) as Arc) + } }; } let mut table_storages = HashMap::new(); - let fs_storage_configs = config_lock.iroh.fs_storages.clone(); + let storage_configs = config_lock.iroh.fs_storages.clone(); for (table_name, table_config) in &mut config_lock.iroh.tables { let iroh_doc = sync_client .docs @@ -142,38 +149,24 @@ impl IrohNode { .await .map_err(Error::doc)?; iroh_doc.start_sync(vec![]).await.map_err(Error::doc)?; - let storage_engine = match &table_config.storage_engine { - StorageEngineConfig::Iroh => { - StorageEngine::Iroh(IrohStorageEngine::new(author_id, iroh_doc.clone())) - } - StorageEngineConfig::FS(storage_name) => StorageEngine::FS( - FSStorageEngine::new( - author_id, - iroh_doc.clone(), - fs_storage_configs[storage_name].clone(), - ) - .await?, - ), - }; - let mirroring = match &table_config.mirroring { - Some(mirroring_config) => Some( - Mirroring::new( - iroh_doc.clone(), - mirroring_config - .sinks - .clone() - .into_iter() - .map(|sink_name| sinks[&sink_name].clone()) - .collect(), - sync_client.clone(), - mirroring_config.delete_after_mirroring, - ) - .await?, - ), - None => None, - }; - let storage = Storage::new(storage_engine, mirroring); - table_storages.insert(table_name.clone(), storage); + let materialised_sinks = table_config + .sinks + .iter() + .map(|sink_name| sinks[sink_name].clone()) + .collect(); + let storage_engine = Storage::new( + table_name, + author_id, + iroh_doc.clone(), + sync_client.clone(), + storage_configs[&table_config.storage_name].clone(), + materialised_sinks, + table_config.keep_blob, + cancellation_token.clone(), + task_tracker.clone(), + ) + .await?; + table_storages.insert(table_name.clone(), storage_engine); } let fs_storage_configs = config_lock.iroh.fs_storages.clone(); @@ -187,6 +180,8 @@ impl IrohNode { config, fs_storage_configs, sinks, + cancellation_token: cancellation_token.clone(), + task_tracker: task_tracker.clone(), }; Ok(iroh_node) @@ -218,59 +213,39 @@ impl IrohNode { pub async fn tables_create( &mut self, table_name: &str, - storage_name: Option<&str>, - mirroring_config: Option, + storage_name: &str, + sinks: Vec, + keep_blob: bool, ) -> Result { match self.table_storages.entry(table_name.to_string()) { Entry::Occupied(_) => Err(Error::existing_table(table_name)), Entry::Vacant(entry) => { let iroh_doc = self.sync_client.docs.create().await.map_err(Error::table)?; - let (storage_engine, storage_engine_config) = match storage_name { - None => ( - StorageEngine::Iroh(IrohStorageEngine::new( - self.author_id, - iroh_doc.clone(), - )), - StorageEngineConfig::Iroh, - ), - Some(storage_name) => ( - StorageEngine::FS( - FSStorageEngine::new( - self.author_id, - iroh_doc.clone(), - self.fs_storage_configs[storage_name].clone(), - ) - .await?, - ), - StorageEngineConfig::FS(storage_name.to_string()), - ), - }; - let mirroring = match &mirroring_config { - Some(mirroring_config) => Some( - Mirroring::new( - iroh_doc.clone(), - mirroring_config - .sinks - .clone() - .into_iter() - .map(|sink_name| self.sinks[&sink_name].clone()) - .collect(), - self.sync_client.clone(), - mirroring_config.delete_after_mirroring, - ) - .await?, - ), - None => None, - }; - let storage = Storage::new(storage_engine, mirroring); - entry.insert(storage); + let materialised_sinks = sinks + .iter() + .map(|sink_name| self.sinks[sink_name].clone()) + .collect(); + let storage_engine = Storage::new( + table_name, + self.author_id, + iroh_doc.clone(), + self.sync_client.clone(), + self.fs_storage_configs[storage_name].clone(), + materialised_sinks, + keep_blob, + self.cancellation_token.clone(), + self.task_tracker.clone(), + ) + .await?; + entry.insert(storage_engine); self.config.write().await.iroh.tables.insert( table_name.to_string(), TableConfig { id: iroh_doc.id().to_string(), download_policy: DownloadPolicy::default(), - mirroring: mirroring_config, - storage_engine: storage_engine_config, + sinks, + storage_name: storage_name.to_string(), + keep_blob, }, ); @@ -284,8 +259,9 @@ impl IrohNode { table_name: &str, table_ticket: &str, download_policy: DownloadPolicy, - storage_name: Option<&str>, - mirroring_config: Option, + storage_name: &str, + sinks: Vec, + keep_blob: bool, ) -> Result { match self.table_storages.entry(table_name.to_string()) { Entry::Occupied(_) => Err(Error::existing_table(table_name)), @@ -300,53 +276,31 @@ impl IrohNode { .set_download_policy(download_policy.clone()) .await .map_err(Error::doc)?; - let (storage_engine, storage_engine_config) = match storage_name { - None => ( - StorageEngine::Iroh(IrohStorageEngine::new( - self.author_id, - iroh_doc.clone(), - )), - StorageEngineConfig::Iroh, - ), - Some(storage_name) => ( - StorageEngine::FS( - FSStorageEngine::new( - self.author_id, - iroh_doc.clone(), - self.fs_storage_configs[storage_name].clone(), - ) - .await?, - ), - StorageEngineConfig::FS(storage_name.to_string()), - ), - }; - let mirroring = match &mirroring_config { - Some(mirroring_config) => Some( - Mirroring::new( - iroh_doc.clone(), - mirroring_config - .sinks - .clone() - .into_iter() - .map(|sink_name| self.sinks[&sink_name].clone()) - .collect(), - self.sync_client.clone(), - mirroring_config.delete_after_mirroring, - ) - .await?, - ), - None => None, - }; - let storage = Storage::new(storage_engine, mirroring); - - entry.insert(storage); + let materialised_sinks = sinks + .iter() + .map(|sink_name| self.sinks[sink_name].clone()) + .collect(); + let storage_engine = Storage::new( + table_name, + self.author_id, + iroh_doc.clone(), + self.sync_client.clone(), + self.fs_storage_configs[storage_name].clone(), + materialised_sinks, + keep_blob, + self.cancellation_token.clone(), + self.task_tracker.clone(), + ) + .await?; + entry.insert(storage_engine); self.config.write().await.iroh.tables.insert( table_name.to_string(), TableConfig { id: iroh_doc.id().to_string(), - download_policy: download_policy, - mirroring: mirroring_config, - storage_engine: storage_engine_config, + download_policy, + sinks, + storage_name: storage_name.to_string(), + keep_blob, }, ); @@ -420,7 +374,7 @@ impl IrohNode { &self, table_name: &str, key: &str, - ) -> Result> { + ) -> Result>> { match self.table_storages.get(table_name) { Some(table_storage) => table_storage.get(key).await, None => Err(Error::missing_table(table_name)), @@ -436,7 +390,7 @@ impl IrohNode { pub async fn table_exists(&self, table_name: &str, key: &str) -> Result { match self.table_storages.get(table_name) { - Some(table_storage) => table_storage.exists(key).await, + Some(table_storage) => Ok(table_storage.exists(key).await?.is_some()), None => Err(Error::missing_table(table_name)), } } diff --git a/src/lib.rs b/src/lib.rs index cb506fe..9857a18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ pub mod file_shard; mod hash_ring; pub mod iroh_node; mod sinks; -pub mod storages; +mod storage; mod utils; pub type IrohDoc = iroh::client::Doc>; diff --git a/src/sinks/ipfs_sink.rs b/src/sinks/ipfs_sink.rs new file mode 100644 index 0000000..5f75990 --- /dev/null +++ b/src/sinks/ipfs_sink.rs @@ -0,0 +1,68 @@ +use crate::config::IpfsConfig; +use crate::error::Error; +use crate::sinks::Sink; +use crate::utils::{bytes_to_key, FRAGMENT}; +use axum::async_trait; +use percent_encoding::utf8_percent_encode; +use reqwest::header::HeaderMap; +use reqwest::Client; +use std::path::Path; + +pub struct IpfsSink { + name: String, + config: IpfsConfig, + client: Client, +} + +impl IpfsSink { + pub async fn new(name: &str, config: &IpfsConfig) -> Self { + let mut config = config.clone(); + config.api_base_url = config.api_base_url.trim_end_matches('/').to_string(); + IpfsSink { + name: name.to_string(), + config, + client: Client::new(), + } + } +} + +#[async_trait] +impl Sink for IpfsSink { + fn name(&self) -> &str { + &self.name + } + + async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error> { + // ToDo: Remove allocating and return stream + // https://github.com/awslabs/aws-sdk-rust/discussions/361 + let encoded_key = + utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT) + .collect::() + .to_lowercase(); + + let mut headers = HeaderMap::new(); + + headers.insert("Abspath", path.to_string_lossy().parse().unwrap()); + + let file_part = reqwest::multipart::Part::bytes(tokio::fs::read(path).await.unwrap()) + .file_name(encoded_key) + .headers(headers) + .mime_str("application/octet-stream") + .unwrap(); + let form = reqwest::multipart::Form::new().part("file", file_part); + let res = self + .client + .post(format!( + "{}/api/v0/add?hash=blake3&chunker=size-1048576&nocopy=true&pin=true", + self.config.api_base_url, + )) + .multipart(form) + .send() + .await + .map_err(Error::sink)?; + if !res.status().is_success() { + return Err(Error::sink(res.text().await.unwrap())); + } + Ok(()) + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs new file mode 100644 index 0000000..8c7ec7e --- /dev/null +++ b/src/sinks/mod.rs @@ -0,0 +1,15 @@ +use crate::error::Error; +use axum::async_trait; +use std::path::Path; + +mod ipfs_sink; +mod s3_sink; + +pub use ipfs_sink::IpfsSink; +pub use s3_sink::S3Sink; + +#[async_trait] +pub trait Sink: Send + Sync { + fn name(&self) -> &str; + async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error>; +} diff --git a/src/sinks.rs b/src/sinks/s3_sink.rs similarity index 87% rename from src/sinks.rs rename to src/sinks/s3_sink.rs index 7528159..05f4e18 100644 --- a/src/sinks.rs +++ b/src/sinks/s3_sink.rs @@ -1,18 +1,13 @@ use crate::config::S3Config; use crate::error::Error; +use crate::sinks::Sink; use crate::utils::{bytes_to_key, FRAGMENT}; use aws_credential_types::Credentials; use aws_sdk_s3::config::{BehaviorVersion, Region}; use aws_sdk_s3::primitives::ByteStream; use axum::async_trait; use percent_encoding::utf8_percent_encode; -use tokio_util::bytes; - -#[async_trait] -pub trait Sink: Send + Sync { - fn name(&self) -> &str; - async fn send(&self, key: &[u8], value: bytes::Bytes) -> Result<(), Error>; -} +use std::path::Path; pub struct S3Sink { name: String, @@ -57,19 +52,21 @@ impl Sink for S3Sink { &self.name } - async fn send(&self, key: &[u8], value: bytes::Bytes) -> Result<(), Error> { + async fn send(&self, key: &[u8], path: &Path) -> Result<(), Error> { // ToDo: Remove allocating and return stream // https://github.com/awslabs/aws-sdk-rust/discussions/361 let encoded_key = utf8_percent_encode(std::str::from_utf8(bytes_to_key(key)).unwrap(), FRAGMENT) .collect::() .to_lowercase(); - + let body = ByteStream::from_path(Path::new(path)) + .await + .map_err(Error::sink)?; self.client .put_object() .bucket(&self.config.bucket_name) .key([self.config.prefix.as_str(), encoded_key.as_str()].join("/")) - .body(ByteStream::from(value)) + .body(body) .send() .await .map_err(Error::sink)?; diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 0000000..5db73c8 --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,373 @@ +use crate::config::StorageEngineConfig; +use crate::error::{Error, Result}; +use crate::file_shard::FileShard; +use crate::hash_ring::HashRing; +use crate::sinks::Sink; +use crate::utils::{bytes_to_key, key_to_bytes}; +use crate::{IrohClient, IrohDoc}; +use async_stream::stream; +use futures::{Stream, StreamExt}; +use iroh::bytes::Hash; +use iroh::client::{Entry, LiveEvent}; +use iroh::rpc_protocol::ShareMode; +use iroh::sync::store::Query; +use iroh::sync::{AuthorId, ContentStatus}; +use iroh::ticket::DocTicket; +use lru::LruCache; +use std::collections::{HashMap, HashSet}; +use std::num::NonZeroUsize; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::io::AsyncRead; +use tokio_task_pool::Pool; +use tokio_util::bytes; +use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; +use tracing::{error, info, info_span, warn, Instrument}; + +#[derive(Clone)] +pub struct Storage { + author_id: AuthorId, + iroh_doc: IrohDoc, + sync_client: IrohClient, + hash_ring: HashRing, + fs_shards: HashMap, + replicas: u8, + sinks: Vec>, + keep_blob: bool, +} + +impl Storage { + pub async fn new( + table_name: &str, + author_id: AuthorId, + iroh_doc: IrohDoc, + sync_client: IrohClient, + storage_config: StorageEngineConfig, + sinks: Vec>, + keep_blob: bool, + cancellation_token: CancellationToken, + task_tracker: TaskTracker, + ) -> Result { + let mut fs_shards = HashMap::new(); + for fs_shard in &storage_config.fs_shards { + fs_shards.insert( + fs_shard.name.clone(), + FileShard::new(&fs_shard.path.join(table_name)).await?, + ); + } + let storage = Storage { + author_id, + iroh_doc: iroh_doc.clone(), + sync_client, + hash_ring: HashRing::with_hasher(storage_config.fs_shards.iter()), + fs_shards, + replicas: storage_config.replicas, + sinks, + keep_blob, + }; + let storage_clone = storage.clone(); + + task_tracker.spawn({ + let cancellation_token = cancellation_token.clone(); + async move { + let mut stream = storage_clone.iroh_doc().subscribe().await.unwrap(); + let mut wait_list = LruCache::new(NonZeroUsize::new(4096).expect("not possible")); + info!("started"); + loop { + tokio::select! { + _ = cancellation_token.cancelled() => return Ok::<(), Error>(()), + event = stream.next() => { + if let Some(event) = event { + let event = event.unwrap(); + match &event { + LiveEvent::InsertRemote { + entry, + content_status, + .. + } => { + info!(event = ?event); + match content_status { + ContentStatus::Complete => { + storage_clone.process_remote_entry(entry).await?; + storage_clone.process_sinks(entry).await; + } + ContentStatus::Missing => { + if entry.content_len() > 0 { + wait_list.put(entry.content_hash(), entry.clone()); + } else { + storage_clone.process_remote_entry(entry).await?; + } + } + ContentStatus::Incomplete => { + wait_list.put(entry.content_hash(), entry.clone()); + } + }; + } + LiveEvent::InsertLocal { entry } => { + storage_clone.process_sinks(entry).await; + } + LiveEvent::ContentReady { hash } => { + info!(event = ?event); + let Some(entry) = &wait_list.pop(hash) else { + warn!(action = "skipped_absent_hash", hash = ?hash); + continue; + }; + storage_clone.process_remote_entry(entry).await?; + storage_clone.process_sinks(entry).await; + storage_clone.retain_blob_if_needed(entry).await; + } + _ => {} + }; + } else { + return Ok::<(), Error>(()) + } + } + } + } + } + .instrument(info_span!(parent: None, "fs_sync", table_id = iroh_doc.id().to_string())) + }); + + let fs_storage_clone = storage.clone(); + + if storage_config.is_import_missing_enabled { + task_tracker.spawn(async move { + let all_keys: Arc> = Arc::new( + fs_storage_clone + .iroh_doc() + .get_many(Query::all()) + .await + .map_err(Error::doc)? + .map(|x| bytes::Bytes::copy_from_slice(x.unwrap().key())) + .collect() + .await, + ); + let pool = Arc::new(Pool::bounded(16)); + + for fs_shard in fs_storage_clone.fs_shards.values() { + let fs_storage_clone = fs_storage_clone.clone(); + let fs_shard = fs_shard.clone(); + let all_keys = all_keys.clone(); + let pool = pool.clone(); + let cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + let base_path = fs_shard.path().to_path_buf(); + let mut read_dir_stream = tokio::fs::read_dir(&base_path) + .await + .map_err(Error::io_error)?; + + loop { + tokio::select! { + _ = cancellation_token.cancelled() => return Ok::<(), Error>(()), + entry = read_dir_stream.next_entry() => { + let entry = entry.map_err(Error::io_error)?; + if let Some(entry) = entry { + let key = key_to_bytes(&entry.file_name().to_string_lossy()); + if all_keys.contains(&key) || key.starts_with(&[b'~']) { + continue; + } + pool.spawn({ + let iroh_doc = fs_storage_clone.iroh_doc().clone(); + async move { + let import_progress = iroh_doc + .import_file( + fs_storage_clone.author_id, + key, + &entry.path(), + true, + ) + .await + .map_err(Error::doc) + .unwrap(); + if let Err(error) = import_progress.finish().await.map_err(Error::hash) { + error!(error = ?error, path = ?entry.path(), key = ?entry.file_name(), "import_progress_error"); + } + info!(action = "imported", key = ?entry.file_name()) + } + .instrument(info_span!(parent: None, "restore")) + }) + .await + .unwrap(); + } else { + return Ok::<(), Error>(()) + } + } + } + } + }); + } + Ok::<(), Error>(()) + }); + } + Ok(storage) + } + + fn get_path(&self, key: &str) -> Result { + if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { + let file_shard = &self.fs_shards[&file_shard_config.name]; + return Ok(file_shard.get_path_for(key)); + } + Err(Error::storage("missing file shards")) + } + + async fn process_sinks(&self, entry: &Entry) { + let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); + let file_shard_path = self.get_path(key).unwrap(); + for sink in &self.sinks { + if let Err(error) = sink.send(entry.key(), &file_shard_path).await { + warn!(error = ?error); + continue; + } + info!(action = "send", sink = sink.name(), key = ?key); + } + } + + async fn process_remote_entry(&self, entry: &Entry) -> Result> { + let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); + if entry.content_len() == 0 { + self.delete_from_fs(key).await?; + Ok(None) + } else { + let file_shard_path = self.get_path(key).unwrap(); + self.iroh_doc() + .export_file(entry.clone(), file_shard_path.clone()) + .await + .map_err(Error::storage)? + .finish() + .await + .map_err(Error::storage)?; + Ok(Some(file_shard_path)) + } + } + + async fn retain_blob_if_needed(&self, entry: &Entry) { + if !self.keep_blob { + if let Err(error) = self + .sync_client + .blobs + .delete_blob(entry.content_hash()) + .await + { + warn!(error = ?error); + } + } + } + + pub async fn delete_from_fs(&self, key: &str) -> Result<()> { + info!("delete_from_fs {:?} {:?}", self.iroh_doc().id(), key); + if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { + self.fs_shards[&file_shard_config.name] + .delete(key) + .await + .map_err(Error::io_error)?; + return Ok(()); + } + Err(Error::storage("no file shards")) + } + + pub async fn delete(&self, key: &str) -> Result { + info!("delete {:?} {:?}", self.iroh_doc().id(), key); + let removed_items = self + .iroh_doc + .del(self.author_id, key_to_bytes(key)) + .await + .map_err(Error::missing_key)?; + Ok(removed_items) + } + + pub async fn insert(&self, key: &str, value: S) -> Result { + if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { + let file_shard = &self.fs_shards[&file_shard_config.name]; + let data_path = file_shard + .insert(key, value) + .await + .map_err(Error::io_error)?; + let import_progress = self + .iroh_doc + .import_file(self.author_id, key_to_bytes(key), &data_path, true) + .await + .map_err(Error::doc)?; + return Ok(import_progress.finish().await.map_err(Error::hash)?.hash); + } + Err(Error::FileShard { + description: "missing file shards".to_string(), + }) + } + + pub async fn exists(&self, key: &str) -> Result> { + for file_shard_config in self.hash_ring.range(key, 1) { + let file_shard = &self.fs_shards[&file_shard_config.name]; + if file_shard + .exists(key) + .await + .map_err(Error::io_error)? + .is_some() + { + return Ok(Some(file_shard.get_path_for(key))); + } + } + Ok(None) + } + + pub fn iroh_doc(&self) -> &IrohDoc { + &self.iroh_doc + } + + pub async fn get(&self, key: &str) -> Result>> { + match self + .iroh_doc + .get_one(Query::key_exact(key_to_bytes(key))) + .await + .map_err(Error::doc)? + { + Some(entry) => { + return Ok(Some(Box::new( + entry + .content_reader(self.iroh_doc()) + .await + .map_err(Error::doc)?, + ))) + } + None => { + for file_shard_config in self.hash_ring.range(key, 1) { + let file_shard = &self.fs_shards[&file_shard_config.name]; + return match file_shard.open_store(key).await { + Ok(Some(file)) => Ok(Some(Box::new(file))), + Ok(None) => Ok(None), + Err(e) => Err(Error::io_error(e)), + }; + } + } + } + Err(Error::io_error("missing shard")) + } + + pub fn get_all(&self) -> impl Stream> { + let iroh_doc = self.iroh_doc.clone(); + stream! { + for await entry in iroh_doc.get_many(Query::all()).await.map_err(Error::table)? { + yield entry.map_err(Error::entry) + } + } + } + pub async fn share(&self, mode: ShareMode) -> Result { + self.iroh_doc().share(mode).await.map_err(Error::storage) + } + + pub async fn get_hash(&self, key: &str) -> Result> { + Ok(self + .iroh_doc() + .get_one(Query::key_exact(key_to_bytes(key))) + .await + .map_err(Error::missing_key)? + .map(|entry| (entry.content_hash(), entry.content_len()))) + } + + pub async fn insert_hash(&self, key: &str, hash: Hash, size: u64) -> Result<()> { + self.iroh_doc + .set_hash(self.author_id, key_to_bytes(key), hash, size) + .await + .map_err(Error::hash) + } +} diff --git a/src/storages/fs_storage.rs b/src/storages/fs_storage.rs deleted file mode 100644 index 6160cb1..0000000 --- a/src/storages/fs_storage.rs +++ /dev/null @@ -1,259 +0,0 @@ -use crate::config::FSStorageEngineConfig; -use crate::error::{Error, Result}; -use crate::file_shard::FileShard; -use crate::hash_ring::HashRing; -use crate::utils::{bytes_to_key, key_to_bytes}; -use crate::IrohDoc; -use futures::StreamExt; -use iroh::bytes::Hash; -use iroh::client::{Entry, LiveEvent}; -use iroh::sync::store::Query; -use iroh::sync::{AuthorId, ContentStatus}; -use lru::LruCache; -use std::collections::{HashMap, HashSet}; -use std::num::NonZeroUsize; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::io::AsyncRead; -use tokio_task_pool::Pool; -use tokio_util::bytes; -use tracing::{error, info, info_span, warn, Instrument}; - -#[derive(Clone)] -pub struct FSStorageEngine { - author_id: AuthorId, - iroh_doc: IrohDoc, - hash_ring: HashRing, - fs_shards: HashMap, - replicas: u8, -} - -impl FSStorageEngine { - pub async fn new( - author_id: AuthorId, - iroh_doc: IrohDoc, - fs_storage_config: FSStorageEngineConfig, - ) -> Result { - let mut fs_shards = HashMap::new(); - for fs_shard in &fs_storage_config.fs_shards { - fs_shards.insert(fs_shard.name.clone(), FileShard::new(&fs_shard.path).await?); - } - let fs_storage = FSStorageEngine { - author_id, - iroh_doc: iroh_doc.clone(), - hash_ring: HashRing::with_hasher(fs_storage_config.fs_shards.iter()), - fs_shards, - replicas: fs_storage_config.replicas, - }; - let fs_storage_clone = fs_storage.clone(); - tokio::spawn({ - async move { - let mut stream = fs_storage_clone.iroh_doc().subscribe().await.unwrap(); - let mut wait_list = LruCache::new(NonZeroUsize::new(1024).expect("not possible")); - info!("started"); - while let Some(event) = stream.next().await { - let event = event.unwrap(); - match &event { - LiveEvent::InsertRemote { - entry, - content_status, - .. - } => { - info!(event = ?event); - match content_status { - ContentStatus::Complete => { - fs_storage_clone.process_remote_entry(entry).await?; - } - ContentStatus::Missing => { - if entry.content_len() > 0 { - wait_list.put(entry.content_hash(), entry.clone()); - } else { - fs_storage_clone.process_remote_entry(entry).await?; - } - } - ContentStatus::Incomplete => { - wait_list.put(entry.content_hash(), entry.clone()); - } - }; - } - LiveEvent::ContentReady { hash } => { - info!(event = ?event); - let Some(hash) = &wait_list.pop(hash) else { - warn!(action = "skipped_absent_hash", hash = ?hash); - continue; - }; - fs_storage_clone.process_remote_entry(hash).await?; - } - _ => {} - }; - } - Ok::<(), Error>(()) - } - .instrument(info_span!(parent: None, "fs_sync", table_id = iroh_doc.id().to_string())) - }); - - let fs_storage_clone = fs_storage.clone(); - - if fs_storage_config.is_import_missing_enabled { - tokio::spawn(async move { - let all_keys: Arc> = Arc::new( - fs_storage_clone - .iroh_doc() - .get_many(Query::all()) - .await - .map_err(Error::doc)? - .map(|x| bytes::Bytes::copy_from_slice(x.unwrap().key())) - .collect() - .await, - ); - let pool = Arc::new(Pool::bounded(16)); - - for fs_shard in fs_storage_clone.fs_shards.values() { - let fs_storage_clone = fs_storage_clone.clone(); - let fs_shard = fs_shard.clone(); - let all_keys = all_keys.clone(); - let pool = pool.clone(); - tokio::spawn(async move { - let base_path = fs_shard.path().to_path_buf(); - let mut read_dir_stream = tokio::fs::read_dir(&base_path) - .await - .map_err(Error::io_error)?; - while let Some(entry) = read_dir_stream - .next_entry() - .await - .map_err(Error::io_error)? - { - let key = key_to_bytes(&entry.file_name().to_string_lossy()); - if all_keys.contains(&key) || key.starts_with(&[b'~']) { - continue; - } - pool.spawn({ - let iroh_doc = fs_storage_clone.iroh_doc().clone(); - async move { - let import_progress = iroh_doc - .import_file( - fs_storage_clone.author_id, - key, - &entry.path(), - true, - ) - .await - .map_err(Error::doc) - .unwrap(); - match import_progress.finish().await.map_err(Error::hash) { - Err(error) => error!(error = ?error, path = ?entry.path(), key = ?entry.file_name(), "import_progress_error"), - _ => {} - }; - info!(action = "imported", key = ?entry.file_name()) - } - .instrument(info_span!(parent: None, "restore")) - }) - .await - .unwrap(); - } - Ok::<(), Error>(()) - }); - } - Ok::<(), Error>(()) - }); - } - Ok(fs_storage) - } - - fn get_path(&self, key: &str) -> Result { - if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - let file_shard = &self.fs_shards[&file_shard_config.name]; - return Ok(file_shard.get_path_for(key)); - } - Err(Error::storage("missing file shards")) - } - - async fn process_remote_entry(&self, entry: &Entry) -> Result<()> { - let key = std::str::from_utf8(bytes_to_key(entry.key())).unwrap(); - if entry.content_len() == 0 { - self.delete_from_fs(key).await?; - } else { - let file_shard_path = self.get_path(key).unwrap(); - self.iroh_doc() - .export_file(entry.clone(), file_shard_path) - .await - .map_err(Error::storage)? - .finish() - .await - .map_err(Error::storage)?; - } - Ok(()) - } - - pub async fn delete_from_fs(&self, key: &str) -> Result<()> { - info!("delete_from_fs {:?} {:?}", self.iroh_doc().id(), key); - if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - self.fs_shards[&file_shard_config.name] - .delete(key) - .await - .map_err(Error::io_error)?; - return Ok(()); - } - Err(Error::storage("no file shards")) - } - - pub async fn delete(&self, key: &str) -> Result { - info!("delete {:?} {:?}", self.iroh_doc().id(), key); - let removed_items = self - .iroh_doc - .del(self.author_id, key_to_bytes(key)) - .await - .map_err(Error::missing_key)?; - Ok(removed_items) - } - - pub async fn insert(&self, key: &str, value: S) -> Result { - if let Some(file_shard_config) = self.hash_ring.range(key, 1).into_iter().next() { - let file_shard = &self.fs_shards[&file_shard_config.name]; - let data_path = file_shard - .insert(key, value) - .await - .map_err(Error::io_error)?; - let import_progress = self - .iroh_doc - .import_file(self.author_id, key_to_bytes(key), &data_path, true) - .await - .map_err(Error::doc)?; - return Ok(import_progress.finish().await.map_err(Error::hash)?.hash); - } - Err(Error::FileShard { - description: "missing file shards".to_string(), - }) - } - - pub async fn exists(&self, key: &str) -> Result> { - for file_shard_config in self.hash_ring.range(key, 1) { - let file_shard = &self.fs_shards[&file_shard_config.name]; - if file_shard - .exists(key) - .await - .map_err(Error::io_error)? - .is_some() - { - return Ok(Some(file_shard.get_path_for(key))); - } - } - Ok(None) - } - - pub fn iroh_doc(&self) -> &IrohDoc { - &self.iroh_doc - } - - pub async fn get(&self, key: &str) -> Result> { - for file_shard_config in self.hash_ring.range(key, 1) { - let file_shard = &self.fs_shards[&file_shard_config.name]; - match file_shard.open_store(key).await { - Ok(Some(file)) => return Ok(Box::new(file)), - Ok(None) => return Err(Error::io_error("missing file")), - Err(e) => return Err(Error::io_error(e)), - } - } - Err(Error::io_error("missing shard")) - } -} diff --git a/src/storages/iroh_storage.rs b/src/storages/iroh_storage.rs deleted file mode 100644 index 1e2ab65..0000000 --- a/src/storages/iroh_storage.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::error::{Error, Result}; -use crate::utils::key_to_bytes; -use crate::IrohDoc; -use iroh::bytes::Hash; -use iroh::sync::store::{Query, SortBy, SortDirection}; -use iroh::sync::AuthorId; -use tokio::io::{AsyncRead, AsyncReadExt}; - -#[derive(Clone)] -pub struct IrohStorageEngine { - author_id: AuthorId, - iroh_doc: IrohDoc, -} - -impl IrohStorageEngine { - pub fn new(author_id: AuthorId, iroh_doc: IrohDoc) -> Self { - IrohStorageEngine { - author_id, - iroh_doc, - } - } - - pub async fn delete(&self, key: &str) -> Result { - self.iroh_doc - .del(self.author_id, key_to_bytes(key)) - .await - .map_err(Error::missing_key) - } - - pub async fn insert(&self, key: &str, mut value: S) -> Result { - let mut buffer = vec![]; - value - .read_to_end(&mut buffer) - .await - .map_err(Error::io_error)?; - self.iroh_doc - .set_bytes(self.author_id, key_to_bytes(key), buffer) - .await - .map_err(Error::hash) - } - - pub async fn insert_hash(&self, key: &str, hash: Hash, size: u64) -> Result<()> { - self.iroh_doc - .set_hash(self.author_id, key_to_bytes(key), hash, size) - .await - .map_err(Error::hash) - } - - pub async fn exists(&self, key: &str) -> Result { - Ok(self - .iroh_doc - .get_one( - Query::key_exact(key_to_bytes(key)).sort_by(SortBy::KeyAuthor, SortDirection::Asc), - ) - .await - .map_err(Error::doc)? - .is_some()) - } - - pub const fn iroh_doc(&self) -> &IrohDoc { - &self.iroh_doc - } - - pub async fn get(&self, key: &str) -> Result> { - Ok(Box::new( - self.iroh_doc() - .get_one( - Query::key_exact(key_to_bytes(key)) - .sort_by(SortBy::KeyAuthor, SortDirection::Asc), - ) - .await - .map_err(Error::doc)? - .ok_or_else(|| Error::missing_key(key))? - .content_reader(self.iroh_doc()) - .await - .map_err(Error::doc)?, - )) - } -} diff --git a/src/storages/mirroring.rs b/src/storages/mirroring.rs deleted file mode 100644 index 4f9ea4b..0000000 --- a/src/storages/mirroring.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::error::{Error, Result}; -use crate::sinks::Sink; -use crate::{IrohClient, IrohDoc}; -use futures::StreamExt; -use iroh::client::LiveEvent; -use lru::LruCache; -use std::num::NonZeroUsize; -use std::sync::Arc; -use std::time::Duration; -use tokio::task::JoinHandle; -use tracing::{info, info_span, warn, Instrument}; - -#[derive(Clone)] -pub struct Mirroring { - thread: Arc>>, -} - -impl Mirroring { - pub async fn new( - iroh_doc: IrohDoc, - sinks: Vec>, - sync_client: IrohClient, - delete_after_mirroring: bool, - ) -> Result { - let table_id = iroh_doc.id().to_string(); - let mut stream = iroh_doc.subscribe().await.map_err(Error::doc)?; - let thread = tokio::spawn( - async move { - let mut wait_list = LruCache::new(NonZeroUsize::new(1024).expect("not_possible")); - info!("started"); - while let Some(event) = stream.next().await { - let event = match event { - Ok(event) => event, - Err(error) => { - warn!(error = ?error); - continue - } - }; - match &event { - LiveEvent::InsertLocal { entry } => { - info!(event = ?event); - match sync_client.blobs.read(entry.content_hash()).await { - Ok(mut reader) => { - let bytes = match reader.read_to_bytes().await { - Ok(bytes) => bytes, - Err(error) => { - warn!(error = ?error); - continue; - } - }; - for sink in &sinks { - if let Err(error) = - sink.send(entry.key(), bytes.clone()).await - { - warn!(error = ?error); - continue; - } - info!(action = "sent", sink = sink.name(), key = ?std::str::from_utf8(entry.key())); - } - } - Err(error) => warn!(error = ?error), - } - } - LiveEvent::InsertRemote { entry, .. } => { - info!(event = ?event); - wait_list.put(entry.content_hash(), entry.key().to_vec()); - } - LiveEvent::ContentReady { hash } => { - info!(event = ?event); - let Some(key) = wait_list.get(hash) else { - warn!(error = "missing_key_in_wait_list"); - continue; - }; - match sync_client.blobs.read(*hash).await { - Ok(mut reader) => { - let bytes = match reader.read_to_bytes().await { - Ok(bytes) => bytes, - Err(error) => { - warn!(error = ?error); - continue; - } - }; - for sink in &sinks { - if let Err(error) = sink.send(key, bytes.clone()).await { - warn!(error = ?error); - continue; - } - info!(action = "sent", sink = sink.name(), key = ?std::str::from_utf8(key)); - } - if delete_after_mirroring { - let sync_client = sync_client.clone(); - let hash = hash.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(120)).await; - if let Err(error) = - sync_client.blobs.delete_blob(hash).await - { - warn!(error = ?error); - } - }); - } - } - Err(error) => warn!(error = ?error), - } - } - _ => {} - } - } - warn!("stopped_mirroring"); - Ok(()) - } - .instrument(info_span!(parent: None, "mirroring", table_id = table_id)), - ); - Ok(Self { - thread: Arc::new(thread), - }) - } -} diff --git a/src/storages/mod.rs b/src/storages/mod.rs deleted file mode 100644 index 400baf6..0000000 --- a/src/storages/mod.rs +++ /dev/null @@ -1,103 +0,0 @@ -use crate::error::{Error, Result}; -use async_stream::stream; -use futures::Stream; -use iroh::bytes::Hash; -use iroh::client::Entry; -use iroh::rpc_protocol::ShareMode; -use iroh::sync::store::{Query, SortBy, SortDirection}; -use iroh::ticket::DocTicket; -use tokio::io::AsyncRead; - -pub mod fs_storage; -pub(crate) mod iroh_storage; -pub mod mirroring; - -use crate::storages::iroh_storage::IrohStorageEngine; -use crate::storages::mirroring::Mirroring; -use crate::utils::key_to_bytes; -use crate::IrohDoc; -use fs_storage::FSStorageEngine; - -#[derive(Clone)] -pub enum StorageEngine { - FS(FSStorageEngine), - Iroh(IrohStorageEngine), -} - -#[derive(Clone)] -pub struct Storage { - engine: StorageEngine, - mirroring: Option, -} - -impl Storage { - pub fn new(engine: StorageEngine, mirroring: Option) -> Self { - Storage { engine, mirroring } - } - - pub async fn get(&self, key: &str) -> Result> { - match &self.engine { - StorageEngine::FS(storage) => storage.get(key).await, - StorageEngine::Iroh(storage) => storage.get(key).await, - } - } - - pub async fn insert(&self, key: &str, value: S) -> Result { - match &self.engine { - StorageEngine::FS(storage) => storage.insert(key, value).await, - StorageEngine::Iroh(storage) => storage.insert(key, value).await, - } - } - - pub async fn delete(&self, key: &str) -> Result { - match &self.engine { - StorageEngine::FS(storage) => storage.delete(key).await, - StorageEngine::Iroh(storage) => storage.delete(key).await, - } - } - - pub async fn insert_hash(&self, key: &str, hash: Hash, size: u64) -> Result<()> { - match &self.engine { - StorageEngine::FS(_) => Err(Error::storage("unsupported_operation")), - StorageEngine::Iroh(storage) => storage.insert_hash(key, hash, size).await, - } - } - - pub async fn exists(&self, key: &str) -> Result { - match &self.engine { - StorageEngine::FS(storage) => Ok(storage.exists(key).await?.is_some()), - StorageEngine::Iroh(storage) => storage.exists(key).await, - } - } - - pub fn iroh_doc(&self) -> &IrohDoc { - match &self.engine { - StorageEngine::FS(storage) => storage.iroh_doc(), - StorageEngine::Iroh(storage) => storage.iroh_doc(), - } - } - - pub async fn get_hash(&self, key: &str) -> Result> { - Ok(self - .iroh_doc() - .get_one( - Query::key_exact(key_to_bytes(key)).sort_by(SortBy::KeyAuthor, SortDirection::Asc), - ) - .await - .map_err(Error::missing_key)? - .map(|entry| (entry.content_hash(), entry.content_len()))) - } - - pub async fn share(&self, mode: ShareMode) -> Result { - self.iroh_doc().share(mode).await.map_err(Error::storage) - } - - pub fn get_all(&self) -> impl Stream> { - let iroh_doc = self.iroh_doc().clone(); - stream! { - for await entry in iroh_doc.get_many(Query::all()).await.map_err(Error::table)? { - yield entry.map_err(Error::entry) - } - } - } -}