From 94451bc8edc40a83a05526763be111cc1bcea5e3 Mon Sep 17 00:00:00 2001 From: hammadb Date: Thu, 14 Dec 2023 15:54:22 -0800 Subject: [PATCH 1/3] [ENH] add rust pulsar and topic management --- Cargo.lock | 1649 ++++++++++++++++- rust/worker/Cargo.toml | 6 + rust/worker/chroma_config.yaml | 21 + .../src/assignment/assignment_policy.rs | 58 +- rust/worker/src/assignment/mod.rs | 2 +- rust/worker/src/bin/worker.rs | 6 + rust/worker/src/config.rs | 22 +- rust/worker/src/errors.rs | 2 +- rust/worker/src/ingest/config.rs | 6 + rust/worker/src/ingest/ingest.rs | 307 +++ rust/worker/src/ingest/mod.rs | 5 + rust/worker/src/lib.rs | 38 + .../src/memberlist/memberlist_provider.rs | 45 +- rust/worker/src/memberlist/mod.rs | 3 + rust/worker/src/system/system.rs | 4 +- rust/worker/src/system/types.rs | 18 +- 16 files changed, 2108 insertions(+), 84 deletions(-) create mode 100644 rust/worker/chroma_config.yaml create mode 100644 rust/worker/src/bin/worker.rs create mode 100644 rust/worker/src/ingest/config.rs create mode 100644 rust/worker/src/ingest/ingest.rs create mode 100644 rust/worker/src/ingest/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 12fbf9c8dec..8fde2eda6aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,203 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" +dependencies = [ + "concurrent-queue", + "event-listener 4.0.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +dependencies = [ + "async-lock 3.2.0", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.1.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.1.1", + "async-executor", + "async-io 2.2.2", + "async-lock 3.2.0", + "blocking", + "futures-lite 2.1.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6afaa937395a620e33dc6a742c593c01aced20aa376ffb0f628121198578ccc7" +dependencies = [ + "async-lock 3.2.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.1.0", + "parking", + "polling 3.3.1", + "rustix 0.38.28", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c" +dependencies = [ + "event-listener 4.0.0", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-native-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9343dc5acf07e79ff82d0c37899f079db3534d99f189a1837c8e549c99405bec" +dependencies = [ + "futures-util", + "native-tls", + "thiserror", + "url", +] + +[[package]] +name = "async-process" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" +dependencies = [ + "async-io 1.13.0", + "async-lock 2.8.0", + "async-signal", + "blocking", + "cfg-if", + "event-listener 3.1.0", + "futures-lite 1.13.0", + "rustix 0.38.28", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-signal" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e47d90f65a225c4527103a8d747001fc56e375203592b25ad103e1ca13124c5" +dependencies = [ + "async-io 2.2.2", + "async-lock 2.8.0", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix 0.38.28", + "signal-hook-registry", + "slab", + "windows-sys 0.48.0", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-attributes", + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -88,6 +285,12 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "async-task" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1" + [[package]] name = "async-trait" version = "0.1.74" @@ -99,6 +302,19 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "asynchronous-codec" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4057f2c32adbb2fc158e22fb38433c8e9bbf76b75a4732c7c0cbaf695fb65568" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atomic" version = "0.6.0" @@ -108,6 +324,12 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -185,12 +407,36 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base16ct" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" + +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + +[[package]] +name = "bit-vec" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" + [[package]] name = "bitflags" version = "1.3.2" @@ -203,6 +449,31 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.1.1", + "async-lock 3.2.0", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite 2.1.0", + "piper", + "tracing", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -215,6 +486,12 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "374d28ec25809ee0e23827c2ab573d729e293f281dfe393500e7ad618baa61c6" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -227,6 +504,7 @@ version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ + "jobserver", "libc", ] @@ -244,11 +522,28 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.48.5", ] +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "const-oid" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" + [[package]] name = "core-foundation" version = "0.9.4" @@ -265,6 +560,39 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpufeatures" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" +dependencies = [ + "libc", +] + +[[package]] +name = "crc" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -298,6 +626,56 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "generic-array", + "rand_core", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "curve25519-dalek" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "platforms", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.40", +] + [[package]] name = "darling" version = "0.20.3" @@ -333,6 +711,33 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "data-url" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c297a1c74b71ae29df00c3e22dd9534821d60eb9af5a0192823fa2acea70c2a" + +[[package]] +name = "der" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + +[[package]] +name = "deranged" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +dependencies = [ + "powerfmt", + "serde", +] + [[package]] name = "derivative" version = "2.2.0" @@ -344,18 +749,98 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "const-oid", + "crypto-common", + "subtle", +] + [[package]] name = "dyn-clone" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d" +[[package]] +name = "ecdsa" +version = "0.16.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" +dependencies = [ + "der", + "digest", + "elliptic-curve", + "rfc6979", + "signature", + "spki", +] + +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8", + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" +dependencies = [ + "curve25519-dalek", + "ed25519", + "serde", + "sha2", + "subtle", + "zeroize", +] + [[package]] name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "elliptic-curve" +version = "0.13.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" +dependencies = [ + "base16ct", + "crypto-bigint", + "digest", + "ff", + "generic-array", + "group", + "hkdf", + "pem-rfc7468", + "pkcs8", + "rand_core", + "sec1", + "subtle", + "zeroize", +] + +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -372,12 +857,75 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770d968249b5d99410d61f5bf89057f3199a077a04d087092f58e7d10692baae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.0", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "ff" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449" +dependencies = [ + "rand_core", + "subtle", +] + +[[package]] +name = "fiat-crypto" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" + [[package]] name = "figment" version = "0.10.12" @@ -400,12 +948,37 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flate2" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -463,6 +1036,34 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeee267a1883f7ebef3700f262d2d54de95dfaf38189015a74fdc4e0c7ad8143" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.29" @@ -486,6 +1087,12 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.29" @@ -504,6 +1111,17 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", + "zeroize", +] + [[package]] name = "getrandom" version = "0.2.11" @@ -511,8 +1129,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -521,6 +1141,29 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "group" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" +dependencies = [ + "ff", + "rand_core", + "subtle", +] + [[package]] name = "h2" version = "0.3.22" @@ -568,6 +1211,30 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.5" @@ -698,6 +1365,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -706,6 +1383,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -716,6 +1394,7 @@ checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.3", + "serde", ] [[package]] @@ -733,6 +1412,32 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "ipnet" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.11.0" @@ -748,6 +1453,15 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "jobserver" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.66" @@ -786,7 +1500,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edc3606fd16aca7989db2f84bb25684d0270c6d6fa1dbcd0025af7b4130523a6" dependencies = [ - "base64", + "base64 0.21.5", "bytes", "chrono", "serde", @@ -813,7 +1527,7 @@ version = "0.87.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7266548b9269d9fa19022620d706697e64f312fb2ba31b93e6986453fcc82c92" dependencies = [ - "base64", + "base64 0.21.5", "bytes", "chrono", "either", @@ -900,12 +1614,42 @@ dependencies = [ "tracing", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin 0.5.2", +] + [[package]] name = "libc" version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -927,6 +1671,29 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] + +[[package]] +name = "lz4" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] [[package]] name = "matchit" @@ -955,6 +1722,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -979,13 +1752,41 @@ dependencies = [ name = "multimap" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] [[package]] -name = "murmur3" -version = "0.5.2" +name = "nom" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] [[package]] name = "num-bigint" @@ -998,6 +1799,23 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand", + "smallvec", + "zeroize", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -1008,6 +1826,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -1015,6 +1844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1027,6 +1857,26 @@ dependencies = [ "libc", ] +[[package]] +name = "oauth2" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c38841cdd844847e3e7c8d29cef9dcfed8877f8f56f9071f77843ecf3baf937f" +dependencies = [ + "base64 0.13.1", + "chrono", + "getrandom", + "http", + "rand", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "sha2", + "thiserror", + "url", +] + [[package]] name = "object" version = "0.32.1" @@ -1042,12 +1892,82 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openidconnect" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62d6050f6a84b81f23c569f5607ad883293e57491036e318fafe6fc4895fadb1" +dependencies = [ + "base64 0.13.1", + "chrono", + "dyn-clone", + "ed25519-dalek", + "hmac", + "http", + "itertools 0.10.5", + "log", + "oauth2", + "p256", + "p384", + "rand", + "rsa", + "serde", + "serde-value", + "serde_derive", + "serde_json", + "serde_path_to_error", + "serde_plain", + "serde_with", + "sha2", + "subtle", + "thiserror", + "url", +] + +[[package]] +name = "openssl" +version = "0.10.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b8419dc8cc6d866deb801274bba2e6f8f6108c1bb7fcc10ee5ab864931dbb45" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.40", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1057,6 +1977,36 @@ dependencies = [ "num-traits", ] +[[package]] +name = "p256" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" +dependencies = [ + "ecdsa", + "elliptic-curve", + "primeorder", + "sha2", +] + +[[package]] +name = "p384" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209" +dependencies = [ + "ecdsa", + "elliptic-curve", + "primeorder", + "sha2", +] + +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1109,10 +2059,19 @@ version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ - "base64", + "base64 0.21.5", "serde", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1161,12 +2120,102 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + +[[package]] +name = "pkg-config" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" + +[[package]] +name = "platforms" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" + +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf63fa624ab313c11656b4cda960bfc46c410187ad493c41f6ba2d8c1e991c9e" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.28", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "prettyplease" version = "0.2.15" @@ -1177,6 +2226,15 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "primeorder" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" +dependencies = [ + "elliptic-curve", +] + [[package]] name = "proc-macro2" version = "1.0.70" @@ -1199,6 +2257,16 @@ dependencies = [ "yansi", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive 0.11.9", +] + [[package]] name = "prost" version = "0.12.3" @@ -1206,7 +2274,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c289cda302b98a28d40c8b3b90498d6e526dd24ac2ecea73e4e491685b94a" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.12.3", +] + +[[package]] +name = "prost-build" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +dependencies = [ + "bytes", + "heck", + "itertools 0.10.5", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease 0.1.25", + "prost 0.11.9", + "prost-types 0.11.9", + "regex", + "syn 1.0.109", + "tempfile", + "which", ] [[package]] @@ -1217,20 +2307,33 @@ checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", "heck", - "itertools", + "itertools 0.11.0", "log", "multimap", "once_cell", "petgraph", - "prettyplease", - "prost", - "prost-types", + "prettyplease 0.2.15", + "prost 0.12.3", + "prost-types 0.12.3", "regex", "syn 2.0.40", "tempfile", "which", ] +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools 0.10.5", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "prost-derive" version = "0.12.3" @@ -1238,19 +2341,70 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efb6c9a1dd1def8e2124d17e83a20af56f1570d6c2d2bd9e266ccb768df3840e" dependencies = [ "anyhow", - "itertools", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.40", ] +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost 0.11.9", +] + [[package]] name = "prost-types" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "193898f59edcf43c26227dcd4c8427f00d99d61e95dcde58dabd49fa291d470e" dependencies = [ - "prost", + "prost 0.12.3", +] + +[[package]] +name = "pulsar" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d21c6a837986cf25d22ac5b951c267d95808f3c830ff009c2879fff259a0268" +dependencies = [ + "async-native-tls", + "async-std", + "async-trait", + "asynchronous-codec", + "bit-vec", + "bytes", + "chrono", + "crc", + "data-url", + "flate2", + "futures", + "futures-io", + "futures-timer", + "log", + "lz4", + "native-tls", + "nom", + "oauth2", + "openidconnect", + "pem", + "prost 0.11.9", + "prost-build 0.11.9", + "prost-derive 0.11.9", + "rand", + "regex", + "serde", + "serde_json", + "snap", + "tokio", + "tokio-native-tls", + "tokio-util", + "url", + "uuid", + "zstd", ] [[package]] @@ -1334,22 +2488,72 @@ dependencies = [ ] [[package]] -name = "regex-automata" -version = "0.4.3" +name = "regex-automata" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + +[[package]] +name = "reqwest" +version = "0.11.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +dependencies = [ + "base64 0.21.5", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "system-configuration", + "tokio", + "tokio-rustls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + +[[package]] +name = "rfc6979" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "f8dd2a808d456c4a54e300a23e9f5a67e122c3024119acbfd73e3bf664491cb2" dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", + "hmac", + "subtle", ] -[[package]] -name = "regex-syntax" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" - [[package]] name = "ring" version = "0.17.7" @@ -1359,17 +2563,60 @@ dependencies = [ "cc", "getrandom", "libc", - "spin", + "spin 0.9.8", "untrusted", "windows-sys 0.48.0", ] +[[package]] +name = "rsa" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core", + "signature", + "spki", + "subtle", + "zeroize", +] + [[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.28" @@ -1379,7 +2626,7 @@ dependencies = [ "bitflags 2.4.1", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.12", "windows-sys 0.52.0", ] @@ -1413,7 +2660,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.5", ] [[package]] @@ -1487,6 +2734,20 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sec1" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" +dependencies = [ + "base16ct", + "der", + "generic-array", + "pkcs8", + "subtle", + "zeroize", +] + [[package]] name = "secrecy" version = "0.8.0" @@ -1520,6 +2781,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" + [[package]] name = "serde" version = "1.0.193" @@ -1573,6 +2840,66 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + +[[package]] +name = "serde_plain" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1fc6db65a611022b23a0dec6975d63fb80a302cb3388835ff02c097258d50" +dependencies = [ + "serde", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "serde_with" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" +dependencies = [ + "base64 0.21.5", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.1.0", + "serde", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.40", +] + [[package]] name = "serde_yaml" version = "0.9.27" @@ -1586,6 +2913,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1595,6 +2933,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "slab" version = "0.4.9" @@ -1610,6 +2958,12 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.4.10" @@ -1630,18 +2984,40 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subtle" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" + [[package]] name = "syn" version = "1.0.109" @@ -1670,6 +3046,27 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.8.1" @@ -1677,9 +3074,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.1", "redox_syscall", - "rustix", + "rustix 0.38.28", "windows-sys 0.48.0", ] @@ -1703,6 +3100,50 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "time" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" +dependencies = [ + "deranged", + "itoa", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +dependencies = [ + "time-core", +] + +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.35.0" @@ -1742,6 +3183,16 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -1787,7 +3238,7 @@ dependencies = [ "async-stream", "async-trait", "axum", - "base64", + "base64 0.21.5", "bytes", "h2", "http", @@ -1796,7 +3247,7 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project", - "prost", + "prost 0.12.3", "tokio", "tokio-stream", "tower", @@ -1811,9 +3262,9 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ - "prettyplease", + "prettyplease 0.2.15", "proc-macro2", - "prost-build", + "prost-build 0.12.3", "quote", "syn 2.0.40", ] @@ -1844,7 +3295,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "base64", + "base64 0.21.5", "bitflags 2.4.1", "bytes", "futures-core", @@ -1918,6 +3369,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + [[package]] name = "uncased" version = "0.9.9" @@ -1927,12 +3384,27 @@ dependencies = [ "version_check", ] +[[package]] +name = "unicode-bidi" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + [[package]] name = "unsafe-libyaml" version = "0.2.9" @@ -1945,6 +3417,18 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + [[package]] name = "uuid" version = "1.6.1" @@ -1967,12 +3451,30 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "value-bag" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a72e1902dde2bd6441347de2b70b7f5d59bf157c6c62f0c44572607a1d55bbe" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.1" @@ -2013,6 +3515,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.89" @@ -2042,6 +3556,22 @@ version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" +[[package]] +name = "web-sys" +version = "0.3.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-roots" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" + [[package]] name = "which" version = "4.4.2" @@ -2051,7 +3581,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix", + "rustix 0.38.28", ] [[package]] @@ -2217,11 +3747,22 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "worker" version = "0.1.0" dependencies = [ "async-trait", + "bytes", "cc", "figment", "futures", @@ -2230,8 +3771,9 @@ dependencies = [ "murmur3", "num-bigint", "num_cpus", - "prost", - "prost-types", + "prost 0.12.3", + "prost-types 0.12.3", + "pulsar", "rand", "rayon", "schemars", @@ -2277,3 +3819,32 @@ name = "zeroize" version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" + +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.9+zstd.1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 968aff99619..d84d8cf80e7 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -3,6 +3,10 @@ name = "worker" version = "0.1.0" edition = "2021" +[[bin]] +name = "worker" +path = "src/bin/worker.rs" + [dependencies] tonic = "0.10" prost = "0.12" @@ -18,6 +22,7 @@ serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" futures = "0.3" num_cpus = "1.16.0" +pulsar = "6.1.0" murmur3 = "0.5.2" thiserror = "1.0.50" num-bigint = "0.4.4" @@ -25,6 +30,7 @@ tempfile = "3.8.1" schemars = "0.8.16" kube = { version = "0.87.1", features = ["runtime", "derive"] } k8s-openapi = { version = "0.20.0", features = ["latest"] } +bytes = "1.5.0" [build-dependencies] tonic-build = "0.10" diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml new file mode 100644 index 00000000000..569a5bec9f0 --- /dev/null +++ b/rust/worker/chroma_config.yaml @@ -0,0 +1,21 @@ +# Default configuration for Chroma worker +# In the long term, every service should have an entry in this file +# and this can become the global configuration file for Chroma +# for now we nest it in the worker directory + +worker: + my_ip: "10.244.0.85" + num_indexing_threads: 4 + pulsar_url: "pulsar://127.0.0.1:6650" + pulsar_tenant: "public" + pulsar_namespace: "default" + kube_namespace: "chroma" + assignment_policy: + RendezvousHashing: + hasher: Murmur3 + memberlist_provider: + CustomResource: + memberlist_name: "worker-memberlist" + queue_size: 100 + ingest: + queue_size: 100 diff --git a/rust/worker/src/assignment/assignment_policy.rs b/rust/worker/src/assignment/assignment_policy.rs index dbb5c32df1d..bde70b26625 100644 --- a/rust/worker/src/assignment/assignment_policy.rs +++ b/rust/worker/src/assignment/assignment_policy.rs @@ -8,7 +8,6 @@ use super::{ rendezvous_hash::{assign, AssignmentError, Murmur3Hasher}, }; use async_trait::async_trait; -use uuid::Uuid; /* =========================================== @@ -16,20 +15,22 @@ Interfaces =========================================== */ -/// AssignmentPolicy is a trait that defines how to assign a collection to a topic. +/// AssignmentPolicy is a trait that defines how to assign a key to a set of members. /// # Notes /// This trait mirrors the go and python versions of the assignment policy /// interface. /// # Methods -/// - assign: Assign a collection to a topic. -/// - get_topics: Get the topics that can be assigned to. +/// - assign: Assign a key to a topic. +/// - get_members: Get the members that can be assigned to. +/// - set_members: Set the members that can be assigned to. /// # Notes /// An assignment policy is not responsible for creating the topics it assigns to. /// It is the responsibility of the caller to ensure that the topics exist. /// An assignment policy must be Send. pub(crate) trait AssignmentPolicy: Send { - fn assign(&self, collection_id: Uuid) -> Result; - fn get_topics(&self) -> Vec; + fn assign(&self, key: &str) -> Result; + fn get_members(&self) -> Vec; + fn set_members(&mut self, members: Vec); } /* @@ -39,13 +40,8 @@ Implementation */ pub(crate) struct RendezvousHashingAssignmentPolicy { - // The pulsar tenant and namespace being in this implementation of the assignment policy - // is purely a temporary measure while the topic propagation is being worked on. - // TODO: Remove pulsar_tenant and pulsar_namespace from this struct once topic propagation - // is implemented. - pulsar_tenant: String, - pulsar_namespace: String, hasher: Murmur3Hasher, + members: Vec, } impl RendezvousHashingAssignmentPolicy { @@ -55,16 +51,19 @@ impl RendezvousHashingAssignmentPolicy { // take ownership of them and put the responsibility on the caller to clone them if they // need to. This is the general pattern we should follow in rust - put the burden of cloning // on the caller, and if they don't need to clone, they can pass ownership. - pub fn new( + pub(crate) fn new( pulsar_tenant: String, pulsar_namespace: String, ) -> RendezvousHashingAssignmentPolicy { return RendezvousHashingAssignmentPolicy { - pulsar_tenant: pulsar_tenant, - pulsar_namespace: pulsar_namespace, hasher: Murmur3Hasher {}, + members: vec![], }; } + + pub(crate) fn set_members(&mut self, members: Vec) { + self.members = members; + } } #[async_trait] @@ -77,31 +76,26 @@ impl Configurable for RendezvousHashingAssignmentPolicy { HasherType::Murmur3 => Murmur3Hasher {}, }; return Ok(RendezvousHashingAssignmentPolicy { - pulsar_tenant: worker_config.pulsar_tenant.clone(), - pulsar_namespace: worker_config.pulsar_namespace.clone(), hasher: hasher, + members: vec![], }); } } impl AssignmentPolicy for RendezvousHashingAssignmentPolicy { - fn assign(&self, collection_id: Uuid) -> Result { - let collection_id = collection_id.to_string(); - let topics = self.get_topics(); - let topic = assign(&collection_id, topics, &self.hasher); + fn assign(&self, key: &str) -> Result { + let topics = self.get_members(); + let topic = assign(key, topics, &self.hasher); return topic; } - fn get_topics(&self) -> Vec { - // This mirrors the current python and go code, which assumes a fixed set of topics - let mut topics = Vec::with_capacity(16); - for i in 0..16 { - let topic = format!( - "persistent://{}/{}/chroma_log_{}", - self.pulsar_tenant, self.pulsar_namespace, i - ); - topics.push(topic); - } - return topics; + fn get_members(&self) -> Vec { + // This is not designed to be used frequently for now, nor is the number of members + // expected to be large, so we can just clone the members + return self.members.clone(); + } + + fn set_members(&mut self, members: Vec) { + self.members = members; } } diff --git a/rust/worker/src/assignment/mod.rs b/rust/worker/src/assignment/mod.rs index 77f1e5fd180..7ed1525f0bc 100644 --- a/rust/worker/src/assignment/mod.rs +++ b/rust/worker/src/assignment/mod.rs @@ -1,3 +1,3 @@ -mod assignment_policy; +pub(crate) mod assignment_policy; pub(crate) mod config; mod rendezvous_hash; diff --git a/rust/worker/src/bin/worker.rs b/rust/worker/src/bin/worker.rs new file mode 100644 index 00000000000..16428d244ff --- /dev/null +++ b/rust/worker/src/bin/worker.rs @@ -0,0 +1,6 @@ +use worker::worker_entrypoint; + +#[tokio::main] +async fn main() { + worker_entrypoint().await; +} diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 12e98be9f36..8b6923ec237 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -16,10 +16,10 @@ const ENV_PREFIX: &str = "CHROMA_"; /// variables take precedence over values in the YAML file. /// By default, it is read from the current working directory, /// with the filename chroma_config.yaml. -struct RootConfig { +pub(crate) struct RootConfig { // The root config object wraps the worker config object so that // we can share the same config file between multiple services. - worker: WorkerConfig, + pub worker: WorkerConfig, } impl RootConfig { @@ -37,7 +37,7 @@ impl RootConfig { /// The default location is the current working directory, with the filename chroma_config.yaml. /// The environment variables are prefixed with CHROMA_ and are uppercase. /// Values in the envionment variables take precedence over values in the YAML file. - pub fn load() -> Self { + pub(crate) fn load() -> Self { return Self::load_from_path(DEFAULT_CONFIG_PATH); } @@ -56,7 +56,7 @@ impl RootConfig { /// # Notes /// The environment variables are prefixed with CHROMA_ and are uppercase. /// Values in the envionment variables take precedence over values in the YAML file. - pub fn load_from_path(path: &str) -> Self { + pub(crate) fn load_from_path(path: &str) -> Self { // Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them. // Excluding our own environment variables, which are prefixed with CHROMA_. let mut f = figment::Figment::from(Env::prefixed("CHROMA_").map(|k| match k { @@ -100,9 +100,11 @@ pub(crate) struct WorkerConfig { pub(crate) num_indexing_threads: u32, pub(crate) pulsar_tenant: String, pub(crate) pulsar_namespace: String, + pub(crate) pulsar_url: String, pub(crate) kube_namespace: String, pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, + pub(crate) ingest: crate::ingest::config::IngestConfig, } /// # Description @@ -133,6 +135,7 @@ mod tests { num_indexing_threads: 4 pulsar_tenant: "public" pulsar_namespace: "default" + pulsar_url: "pulsar://localhost:6650" kube_namespace: "chroma" assignment_policy: RendezvousHashing: @@ -141,6 +144,8 @@ mod tests { CustomResource: memberlist_name: "worker-memberlist" queue_size: 100 + ingest: + queue_size: 100 "#, ); let config = RootConfig::load(); @@ -164,6 +169,7 @@ mod tests { num_indexing_threads: 4 pulsar_tenant: "public" pulsar_namespace: "default" + pulsar_url: "pulsar://localhost:6650" kube_namespace: "chroma" assignment_policy: RendezvousHashing: @@ -172,6 +178,8 @@ mod tests { CustomResource: memberlist_name: "worker-memberlist" queue_size: 100 + ingest: + queue_size: 100 "#, ); @@ -212,6 +220,7 @@ mod tests { pulsar_tenant: "public" pulsar_namespace: "default" kube_namespace: "chroma" + pulsar_url: "pulsar://localhost:6650" assignment_policy: RendezvousHashing: hasher: Murmur3 @@ -219,6 +228,8 @@ mod tests { CustomResource: memberlist_name: "worker-memberlist" queue_size: 100 + ingest: + queue_size: 100 "#, ); @@ -236,6 +247,7 @@ mod tests { let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A"); let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B"); let _ = jail.set_env("CHROMA_WORKER__KUBE_NAMESPACE", "C"); + let _ = jail.set_env("CHROMA_WORKER__PULSAR_URL", "pulsar://localhost:6650"); let _ = jail.create_file( "chroma_config.yaml", r#" @@ -247,6 +259,8 @@ mod tests { CustomResource: memberlist_name: "worker-memberlist" queue_size: 100 + ingest: + queue_size: 100 "#, ); let config = RootConfig::load(); diff --git a/rust/worker/src/errors.rs b/rust/worker/src/errors.rs index c28d39ba9b7..515b460389c 100644 --- a/rust/worker/src/errors.rs +++ b/rust/worker/src/errors.rs @@ -41,6 +41,6 @@ pub(crate) enum ErrorCodes { DataLoss = 15, } -pub(crate) trait ChromaError: Error { +pub trait ChromaError: Error { fn code(&self) -> ErrorCodes; } diff --git a/rust/worker/src/ingest/config.rs b/rust/worker/src/ingest/config.rs new file mode 100644 index 00000000000..b7647cfe30e --- /dev/null +++ b/rust/worker/src/ingest/config.rs @@ -0,0 +1,6 @@ +use serde::Deserialize; + +#[derive(Deserialize)] +pub(crate) struct IngestConfig { + pub(crate) queue_size: usize, +} diff --git a/rust/worker/src/ingest/ingest.rs b/rust/worker/src/ingest/ingest.rs new file mode 100644 index 00000000000..cd013f11bb5 --- /dev/null +++ b/rust/worker/src/ingest/ingest.rs @@ -0,0 +1,307 @@ +use async_trait::async_trait; +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt}; +use prost::Message; +use std::{ + collections::{HashMap, HashSet}, + sync::RwLock, +}; + +use crate::{ + assignment::{ + self, + assignment_policy::{self, AssignmentPolicy}, + }, + chroma_proto, + config::{Configurable, WorkerConfig}, + errors::{ChromaError, ErrorCodes}, + memberlist::{CustomResourceMemberlistProvider, Memberlist}, + system::{Component, ComponentContext, ComponentHandle, Handler, StreamHandler}, +}; + +use pulsar::{ + consumer::topic, Consumer, DeserializeMessage, Payload, Pulsar, SubType, TokioExecutor, +}; +use thiserror::Error; + +/// An ingest component is responsible for ingesting data into the system from the log +/// stream. +/// # Notes +/// The only current implementation of the ingest is the Pulsar ingest. +pub(crate) struct Ingest { + assignment_policy: RwLock>, + assigned_topics: RwLock>, + topic_to_handle: RwLock>, + queue_size: usize, + my_ip: String, + pulsar_tenant: String, + pulsar_namespace: String, + pulsar: Pulsar, +} + +impl Component for Ingest { + fn queue_size(&self) -> usize { + self.queue_size + } +} + +#[derive(Error, Debug)] +pub(crate) enum IngestConfigurationError { + #[error("Cannot assign empty key")] + PulsarError(#[from] pulsar::Error), +} + +impl ChromaError for IngestConfigurationError { + fn code(&self) -> ErrorCodes { + match self { + IngestConfigurationError::PulsarError(e) => ErrorCodes::Internal, + } + } +} + +// TODO: Nest the ingest assignment policy inside the ingest component config so its +// specific to the ingest component and can be used here +#[async_trait] +impl Configurable for Ingest { + async fn try_from_config(worker_config: &WorkerConfig) -> Result> { + let assignment_policy = assignment_policy::RendezvousHashingAssignmentPolicy::new( + worker_config.pulsar_tenant.clone(), + worker_config.pulsar_namespace.clone(), + ); + let pulsar = match Pulsar::builder(worker_config.pulsar_url.clone(), TokioExecutor) + .build() + .await + { + Ok(pulsar) => pulsar, + Err(e) => { + return Err(Box::new(IngestConfigurationError::PulsarError(e))); + } + }; + let ingest = Ingest { + assignment_policy: RwLock::new(Box::new(assignment_policy)), + assigned_topics: RwLock::new(vec![]), + topic_to_handle: RwLock::new(HashMap::new()), + queue_size: worker_config.ingest.queue_size, + my_ip: worker_config.my_ip.clone(), + pulsar: pulsar, + pulsar_tenant: worker_config.pulsar_tenant.clone(), + pulsar_namespace: worker_config.pulsar_namespace.clone(), + }; + Ok(ingest) + } +} + +impl Ingest { + fn get_topics(&self) -> Vec { + // This mirrors the current python and go code, which assumes a fixed set of topics + let mut topics = Vec::with_capacity(16); + for i in 0..16 { + let topic = format!( + "persistent://{}/{}/chroma_log_{}", + self.pulsar_tenant, self.pulsar_namespace, i + ); + topics.push(topic); + } + return topics; + } +} + +#[async_trait] +impl Handler for Ingest { + async fn handle(&self, msg: Memberlist, ctx: &ComponentContext) { + println!("Memberlist message: {:?}", msg); + let mut new_assignments = HashSet::new(); + let candidate_topics: Vec = self.get_topics(); + + // Scope for assigner write lock to be released so we don't hold it over await + { + let mut assigner = match self.assignment_policy.write() { + Ok(assigner) => assigner, + Err(err) => { + println!("Failed to read assignment policy: {:?}", err); + return; + } + }; + + // Use the assignment policy to assign topics to this worker + assigner.set_members(msg); + for topic in candidate_topics.iter() { + let assignment = assigner.assign(topic); + let assignment = match assignment { + Ok(assignment) => assignment, + Err(err) => { + // TODO: Log error + println!("Failed to assign topic: {:?}", err); + continue; + } + }; + if assignment == self.my_ip { + println!("I am assigned to topic: {}", topic); + new_assignments.insert(topic); + } + } + } + + // Compute the topics we need to add/remove + let mut to_remove = Vec::new(); + let mut to_add = Vec::new(); + + // Scope for assigned topics read lock to be released so we don't hold it over await + { + let assigned_topics_handle = self.assigned_topics.read(); + match assigned_topics_handle { + Ok(assigned_topics) => { + // Compute the diff between the current assignments and the new assignments + for topic in assigned_topics.iter() { + if !new_assignments.contains(topic) { + to_remove.push(topic.clone()); + } + } + for topic in new_assignments.iter() { + if !assigned_topics.contains(*topic) { + to_add.push(topic.clone()); + } + } + } + Err(err) => { + println!("Failed to write assigned topics: {:?}", err); + } + } + } + + // Unsubscribe from topics we no longer need to listen to + for topic in to_remove.iter() { + println!("Removing topic: {}", topic); + match self.topic_to_handle.write() { + Ok(mut topic_to_handle) => { + let handle = topic_to_handle.remove(topic); + match handle { + Some(mut handle) => { + handle.stop(); + } + None => { + // TODO: This should log an error + println!("No handle found for topic: {}", topic); + } + } + } + Err(err) => { + println!("Failed to write topic to handle: {:?}", err); + } + } + } + + // Subscribe to new topics + for topic in to_add.iter() { + println!("Adding topic: {}", topic); + // Do the subscription and register the stream to this ingest component + let consumer: Consumer = self + .pulsar + .consumer() + .with_topic(topic.to_string()) + .with_subscription_type(SubType::Exclusive) + .build() + .await + .unwrap(); + + let ingest_topic_component = PulsarIngestTopic::new(consumer); + + let (handle, _) = ctx.system.clone().start_component(ingest_topic_component); + + // Bookkeep the handle so we can shut the stream down later + match self.topic_to_handle.write() { + Ok(mut topic_to_handle) => { + topic_to_handle.insert("test".to_string(), handle); + } + Err(err) => { + println!("Failed to write topic to handle: {:?}", err); + } + } + } + } +} + +impl DeserializeMessage for chroma_proto::SubmitEmbeddingRecord { + type Output = Self; + + fn deserialize_message(payload: &Payload) -> chroma_proto::SubmitEmbeddingRecord { + // Its a bit strange to unwrap here, but the pulsar api doesn't give us a way to + // return an error, so we have to panic if we can't decode the message + // also we are forced to clone since the api doesn't give us a way to borrow the bytes + // TODO: can we not clone? + // TODO: I think just typing this to Result<> would allow errors to propagate + let record = + chroma_proto::SubmitEmbeddingRecord::decode(Bytes::from(payload.data.clone())).unwrap(); + return record; + } +} + +struct PulsarIngestTopic { + consumer: RwLock>>, +} + +impl PulsarIngestTopic { + fn new(consumer: Consumer) -> Self { + PulsarIngestTopic { + consumer: RwLock::new(Some(consumer)), + } + } +} + +impl Component for PulsarIngestTopic { + fn queue_size(&self) -> usize { + 1000 + } +} + +#[async_trait] +impl Handler> for PulsarIngestTopic { + async fn handle( + &self, + _message: Option, + _ctx: &ComponentContext, PulsarIngestTopic>, + ) -> () { + // No-op + } + + fn on_start( + &self, + ctx: &ComponentContext, Self>, + ) -> () { + let stream = match self.consumer.write() { + Ok(mut consumer_handle) => consumer_handle.take(), + Err(err) => None, + }; + let stream = match stream { + Some(stream) => stream, + None => { + return; + } + }; + let stream = stream.then(|result| async { + match result { + Ok(msg) => { + let msg = msg.deserialize(); + return Some(msg); + } + Err(err) => { + // TODO: Log an error + // Put this on a dead letter queue + None + } + } + }); + self.register_stream(stream, ctx); + } +} + +#[async_trait] +impl StreamHandler> for PulsarIngestTopic { + async fn handle( + &self, + message: Option, + _ctx: &ComponentContext, PulsarIngestTopic>, + ) -> () { + println!("Received stream message: {:?}", message); + } +} diff --git a/rust/worker/src/ingest/mod.rs b/rust/worker/src/ingest/mod.rs new file mode 100644 index 00000000000..954234e98cb --- /dev/null +++ b/rust/worker/src/ingest/mod.rs @@ -0,0 +1,5 @@ +pub(crate) mod config; +mod ingest; + +// Re-export the ingest provider for use in the worker +pub(crate) use ingest::*; diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index d333a39738e..936ebec50bb 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -2,10 +2,48 @@ mod assignment; mod config; mod errors; mod index; +mod ingest; mod memberlist; mod system; mod types; +use config::Configurable; +use memberlist::MemberlistProvider; + mod chroma_proto { tonic::include_proto!("chroma"); } + +pub async fn worker_entrypoint() { + let config = config::RootConfig::load(); + // Create all the core components and start them + // TODO: This should be handled by an Application struct and we can push the config into it + // for now we expose the config to pub and inject it into the components + + // The two root components are ingest, and the gRPC server + + let ingest = match ingest::Ingest::try_from_config(&config.worker).await { + Ok(ingest) => ingest, + Err(err) => { + println!("Failed to create ingest component: {:?}", err); + return; + } + }; + + let memberlist = + match memberlist::CustomResourceMemberlistProvider::try_from_config(&config.worker).await { + Ok(memberlist) => memberlist, + Err(err) => { + println!("Failed to create memberlist component: {:?}", err); + return; + } + }; + + // Boot the system + let mut system = system::System::new(); + let (mut ingest_handle, ingest_sender) = system.start_component(ingest); + memberlist.subscribe(ingest_sender); + let (mut memberlist_handle, _) = system.start_component(memberlist); + // Join on all handles + let _ = tokio::join!(ingest_handle.join(), memberlist_handle.join()); +} diff --git a/rust/worker/src/memberlist/memberlist_provider.rs b/rust/worker/src/memberlist/memberlist_provider.rs index 677c1d1d61d..6d4cf80bbba 100644 --- a/rust/worker/src/memberlist/memberlist_provider.rs +++ b/rust/worker/src/memberlist/memberlist_provider.rs @@ -1,4 +1,4 @@ -use std::{mem, sync::RwLock}; +use std::sync::RwLock; use super::config::{CustomResourceMemberlistProviderConfig, MemberlistProviderConfig}; use crate::{ @@ -23,11 +23,12 @@ use tokio_util::sync::CancellationToken; /* =========== Basic Types ============== */ -pub type Memberlist = Vec; +pub(crate) type Memberlist = Vec; #[async_trait] pub(crate) trait MemberlistProvider: Component + Configurable { async fn get_memberlist(&self) -> Memberlist; + fn subscribe(&self, sender: Sender) -> (); } /* =========== CRD ============== */ @@ -59,6 +60,7 @@ pub(crate) struct CustomResourceMemberlistProvider { memberlist_cr_client: Api, queue_size: usize, current_memberlist: RwLock, + subscribers: RwLock>>, } #[derive(Error, Debug)] @@ -103,6 +105,7 @@ impl Configurable for CustomResourceMemberlistProvider { memberlist_cr_client: memberlist_cr_client, queue_size: my_config.queue_size, current_memberlist: RwLock::new(vec![]), + subscribers: RwLock::new(vec![]), }; Ok(c) } @@ -124,6 +127,7 @@ impl CustomResourceMemberlistProvider { memberlist_cr_client: memberlist_cr_client, queue_size: queue_size, current_memberlist: RwLock::new(vec![]), + subscribers: RwLock::new(vec![]), } } @@ -141,7 +145,6 @@ impl CustomResourceMemberlistProvider { match event { Ok(event) => { let event = event; - println!("Got event: {:?}", event); Some(event) } Err(err) => { @@ -152,6 +155,28 @@ impl CustomResourceMemberlistProvider { }); self.register_stream(stream, ctx); } + + fn notify_subscribers(&self) -> () { + let subscribers = match self.subscribers.read() { + Ok(subscribers) => subscribers, + Err(err) => { + // TODO: Log error and attempt recovery + return; + } + }; + + let curr_memberlist = match self.current_memberlist.read() { + Ok(curr_memberlist) => curr_memberlist, + Err(err) => { + // TODO: Log error and attempt recovery + return; + } + }; + + for subscriber in subscribers.iter() { + let _ = subscriber.send(curr_memberlist.clone()); + } + } } impl Component for CustomResourceMemberlistProvider { @@ -195,6 +220,8 @@ impl StreamHandler> for CustomResourceMemberlistP } } } + // Inform subscribers + self.notify_subscribers(); } None => { // Stream closed or error @@ -233,6 +260,18 @@ impl MemberlistProvider for CustomResourceMemberlistProvider { } } } + + fn subscribe(&self, sender: Sender) -> () { + let subscribers_handle = self.subscribers.write(); + match subscribers_handle { + Ok(mut subscribers) => { + subscribers.push(sender); + } + Err(err) => { + // TODO: log and handle lock poisoning + } + } + } } #[cfg(test)] diff --git a/rust/worker/src/memberlist/mod.rs b/rust/worker/src/memberlist/mod.rs index a3a852b6b28..14512b02023 100644 --- a/rust/worker/src/memberlist/mod.rs +++ b/rust/worker/src/memberlist/mod.rs @@ -1,2 +1,5 @@ pub(crate) mod config; mod memberlist_provider; + +// Re-export the memberlist provider for use in the worker +pub(crate) use memberlist_provider::*; diff --git a/rust/worker/src/system/system.rs b/rust/worker/src/system/system.rs index 0c59f1babd2..c79de20e6cb 100644 --- a/rust/worker/src/system/system.rs +++ b/rust/worker/src/system/system.rs @@ -61,8 +61,8 @@ impl System { component, self.clone(), ); - tokio::spawn(async move { executor.run(rx).await }); - return (ComponentHandle::new(cancel_token), tx); + let join_handle = tokio::spawn(async move { executor.run(rx).await }); + return (ComponentHandle::new(cancel_token, join_handle), tx); } pub(super) fn register_stream(&self, stream: S, ctx: &ComponentContext) diff --git a/rust/worker/src/system/types.rs b/rust/worker/src/system/types.rs index 91786ead1b7..491e618699f 100644 --- a/rust/worker/src/system/types.rs +++ b/rust/worker/src/system/types.rs @@ -70,13 +70,18 @@ where pub(crate) struct ComponentHandle { cancellation_token: tokio_util::sync::CancellationToken, state: ComponentState, + join_handle: Option>, } impl ComponentHandle { - pub(super) fn new(cancellation_token: tokio_util::sync::CancellationToken) -> Self { + pub(super) fn new( + cancellation_token: tokio_util::sync::CancellationToken, + join_handle: tokio::task::JoinHandle<()>, + ) -> Self { ComponentHandle { cancellation_token: cancellation_token, state: ComponentState::Running, + join_handle: Some(join_handle), } } @@ -85,6 +90,15 @@ impl ComponentHandle { self.state = ComponentState::Stopped; } + pub(crate) async fn join(&mut self) { + match self.join_handle.take() { + Some(handle) => { + handle.await; + } + None => return, + }; + } + pub(crate) fn state(&self) -> &ComponentState { return &self.state; } @@ -95,7 +109,7 @@ pub(crate) struct ComponentContext where C: Component + Send + Sync + 'static, { - pub(super) system: System, + pub(crate) system: System, pub(super) sender: tokio::sync::broadcast::Sender, pub(super) cancellation_token: tokio_util::sync::CancellationToken, pub(super) system_component: Arc, // A reference to the component that is running in the system From 4a510032f3dee9e0f37f7eb0afcec40e5d47b1dc Mon Sep 17 00:00:00 2001 From: hammadb Date: Thu, 14 Dec 2023 16:10:05 -0800 Subject: [PATCH 2/3] Ingest TODOs --- rust/worker/src/ingest/ingest.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/rust/worker/src/ingest/ingest.rs b/rust/worker/src/ingest/ingest.rs index cd013f11bb5..c22e6cc27fe 100644 --- a/rust/worker/src/ingest/ingest.rs +++ b/rust/worker/src/ingest/ingest.rs @@ -47,14 +47,14 @@ impl Component for Ingest { #[derive(Error, Debug)] pub(crate) enum IngestConfigurationError { - #[error("Cannot assign empty key")] + #[error(transparent)] PulsarError(#[from] pulsar::Error), } impl ChromaError for IngestConfigurationError { fn code(&self) -> ErrorCodes { match self { - IngestConfigurationError::PulsarError(e) => ErrorCodes::Internal, + IngestConfigurationError::PulsarError(_e) => ErrorCodes::Internal, } } } @@ -109,7 +109,6 @@ impl Ingest { #[async_trait] impl Handler for Ingest { async fn handle(&self, msg: Memberlist, ctx: &ComponentContext) { - println!("Memberlist message: {:?}", msg); let mut new_assignments = HashSet::new(); let candidate_topics: Vec = self.get_topics(); @@ -131,12 +130,10 @@ impl Handler for Ingest { Ok(assignment) => assignment, Err(err) => { // TODO: Log error - println!("Failed to assign topic: {:?}", err); continue; } }; if assignment == self.my_ip { - println!("I am assigned to topic: {}", topic); new_assignments.insert(topic); } } @@ -164,14 +161,13 @@ impl Handler for Ingest { } } Err(err) => { - println!("Failed to write assigned topics: {:?}", err); + // TODO: Log error and handle lock poisoning } } } // Unsubscribe from topics we no longer need to listen to for topic in to_remove.iter() { - println!("Removing topic: {}", topic); match self.topic_to_handle.write() { Ok(mut topic_to_handle) => { let handle = topic_to_handle.remove(topic); @@ -186,7 +182,7 @@ impl Handler for Ingest { } } Err(err) => { - println!("Failed to write topic to handle: {:?}", err); + // TODO: Log an error and handle lock poisoning } } } @@ -214,6 +210,7 @@ impl Handler for Ingest { topic_to_handle.insert("test".to_string(), handle); } Err(err) => { + // TODO: log error and handle lock poisoning println!("Failed to write topic to handle: {:?}", err); } } @@ -286,7 +283,8 @@ impl Handler> for PulsarIngestTopic } Err(err) => { // TODO: Log an error - // Put this on a dead letter queue + // Put this on a dead letter queue, this concept does not exist in our + // system yet None } } @@ -303,5 +301,6 @@ impl StreamHandler> for PulsarIngest _ctx: &ComponentContext, PulsarIngestTopic>, ) -> () { println!("Received stream message: {:?}", message); + // This will be where we filter the message and add it to the corresponding tenant queue } } From b57e0fc899de417f8c2841663c7718b1566207fe Mon Sep 17 00:00:00 2001 From: hammadb Date: Thu, 14 Dec 2023 16:14:51 -0800 Subject: [PATCH 3/3] cleanup --- rust/worker/src/errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/worker/src/errors.rs b/rust/worker/src/errors.rs index 515b460389c..c28d39ba9b7 100644 --- a/rust/worker/src/errors.rs +++ b/rust/worker/src/errors.rs @@ -41,6 +41,6 @@ pub(crate) enum ErrorCodes { DataLoss = 15, } -pub trait ChromaError: Error { +pub(crate) trait ChromaError: Error { fn code(&self) -> ErrorCodes; }