diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b8458b04..1b8bcc7d 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -5,9 +5,7 @@ concurrency: cancel-in-progress: true on: - workflow_dispatch: - push: tags: - 'v*' @@ -18,15 +16,11 @@ env: CARGO_TERM_COLOR: always jobs: - release: - strategy: matrix: os: [ubuntu-20.04, ubuntu-22.04] - runs-on: ["${{ matrix.os }}"] - steps: - uses: actions/checkout@v3 @@ -58,6 +52,11 @@ jobs: target key: ${{ matrix.os }}-cargo-${{ hashFiles('rust-toolchain.toml') }}-${{ hashFiles('**/Cargo.lock') }}-0001 + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y libsasl2-dev + - name: Check Solana version run: | echo "CI_TAG=$(ci/getTag.sh)" >> "$GITHUB_ENV" @@ -78,6 +77,7 @@ jobs: run: | mv target/release/client target/release/client-22 mv target/release/config-check target/release/config-check-22 + mv target/release/grpc-kafka target/release/grpc-kafka-22 mv ${{ env.GEYSER_PLUGIN_NAME }}-release-x86_64-unknown-linux-gnu.tar.bz2 ${{ env.GEYSER_PLUGIN_NAME }}-release22-x86_64-unknown-linux-gnu.tar.bz2 mv ${{ env.GEYSER_PLUGIN_NAME }}-release-x86_64-unknown-linux-gnu.yml ${{ env.GEYSER_PLUGIN_NAME }}-release22-x86_64-unknown-linux-gnu.yml @@ -99,3 +99,4 @@ jobs: yellowstone-grpc-proto/proto/*.proto target/release/client* target/release/config-check* + target/release/grpc-kafka* diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7285e807..cbcdd628 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,15 +12,11 @@ env: CARGO_TERM_COLOR: always jobs: - test: - strategy: matrix: os: [ubuntu-20.04, ubuntu-22.04] - runs-on: ["${{ matrix.os }}"] - steps: - uses: actions/checkout@v3 @@ -52,6 +48,11 @@ jobs: target key: ${{ matrix.os }}-cargo-${{ hashFiles('rust-toolchain.toml') }}-${{ hashFiles('**/Cargo.lock') }}-0001 + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y libsasl2-dev + - name: cargo tree run: | cargo tree diff --git a/CHANGELOG.md b/CHANGELOG.md index ee2e6415..a8af473b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi ### Breaking +## 2023-08-28 + +- yellowstone-grpc-kafka-1.0.0-rc.0+solana.1.16.1 + +### Features + +kafka: init ([#170](https://github.com/rpcpool/yellowstone-grpc/pull/170)). + ## 2023-08-21 - yellowstone-grpc-geyser-1.7.1+solana.1.16.1 diff --git a/Cargo.lock b/Cargo.lock index 4c775530..447ee864 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -351,18 +351,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] name = "async-trait" -version = "0.1.68" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -675,7 +675,7 @@ checksum = "fdde5c9cd29ebd706ce1b35600920a33550e402fc998a2e53ad3b42c3c47a192" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -776,7 +776,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -824,6 +824,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "const-hex" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca268df6cd88e646b564e6aff1a016834e5f42077c736ef6b6789c31ef9ec5dc" +dependencies = [ + "cfg-if", + "cpufeatures", + "hex", + "serde", +] + [[package]] name = "constant_time_eq" version = "0.2.6" @@ -977,7 +989,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -988,7 +1000,7 @@ checksum = "29a358ff9f12ec09c3e61fef9b5a9902623a695a46a917b07f269bff1445611a" dependencies = [ "darling_core", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -1028,6 +1040,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "duct" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ae3fc31835f74c2a7ceda3aeede378b0ae2e74c8f1c36559fcc9ae2a4e7d3e" +dependencies = [ + "libc", + "once_cell", + "os_pipe", + "shared_child", +] + [[package]] name = "eager" version = "0.1.0" @@ -1101,7 +1125,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -1253,7 +1277,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -1526,9 +1550,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.26" +version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ "bytes", "futures-channel", @@ -1778,6 +1802,18 @@ dependencies = [ "libsecp256k1-core", ] +[[package]] +name = "libz-sys" +version = "1.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.3.8" @@ -1806,6 +1842,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.7.0" @@ -1880,6 +1925,16 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.2.1" @@ -2028,7 +2083,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -2049,6 +2104,34 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "866b5f16f90776b9bb8dc1e1802ac6f0513de3a7a7465867bfbc563dc737faac" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "os_pipe" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.1" @@ -2138,7 +2221,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -2220,9 +2303,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.60" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -2322,9 +2405,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.28" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -2431,6 +2514,38 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdkafka" +version = "0.33.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da18026aad1c24033da3da726200de7e911e75c2e2cc2f77ffb9b4502720faae" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.5.0+1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bb0676c2112342ac7165decdedbc4e7086c0af384479ccce534546b10687a5d" +dependencies = [ + "libc", + "libz-sys", + "num_enum 0.5.11", + "openssl-sys", + "pkg-config", + "sasl2-sys", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -2448,9 +2563,24 @@ checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.7.2" @@ -2603,6 +2733,18 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "sasl2-sys" +version = "0.1.20+2.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e645bd98535fc8fd251c43ba7c7c1f9be1e0369c99b6a5ea719052a773e655c" +dependencies = [ + "cc", + "duct", + "libc", + "pkg-config", +] + [[package]] name = "schannel" version = "0.1.21" @@ -2706,7 +2848,7 @@ checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -2760,7 +2902,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -2809,6 +2951,34 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "shared_child" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0d94659ad3c2137fef23ae75b03d5241d633f8acded53d672decfa0e6e0caef" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "1.6.4" @@ -2951,7 +3121,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -3146,7 +3316,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -3265,7 +3435,7 @@ checksum = "b4fa8f409b5c5e0ac571df17c981ae1424b204743daa4428430627d38717caf5" dependencies = [ "quote", "spl-discriminator-syn", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -3277,7 +3447,7 @@ dependencies = [ "proc-macro2", "quote", "solana-program", - "syn 2.0.18", + "syn 2.0.29", "thiserror", ] @@ -3311,7 +3481,7 @@ checksum = "173f3cc506847882189b3a5b67299f617fed2f9730f122dd197b82e1e213dee5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -3449,9 +3619,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.18" +version = "2.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" dependencies = [ "proc-macro2", "quote", @@ -3504,7 +3674,17 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", +] + +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", ] [[package]] @@ -3591,6 +3771,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", @@ -3614,7 +3795,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -3806,7 +3987,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] @@ -3816,6 +3997,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3903,6 +4114,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vergen" version = "8.2.1" @@ -3975,7 +4198,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", "wasm-bindgen-shared", ] @@ -4009,7 +4232,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4306,6 +4529,34 @@ dependencies = [ "yellowstone-grpc-proto", ] +[[package]] +name = "yellowstone-grpc-kafka" +version = "1.0.0-rc.0+solana.1.16.1" +dependencies = [ + "anyhow", + "async-trait", + "atty", + "cargo-lock", + "clap", + "const-hex", + "futures", + "git-version", + "hyper", + "lazy_static", + "prometheus", + "rdkafka", + "serde", + "serde_json", + "sha2 0.10.7", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-subscriber", + "vergen", + "yellowstone-grpc-proto", +] + [[package]] name = "yellowstone-grpc-proto" version = "1.9.0+solana.1.16.1" @@ -4334,7 +4585,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.18", + "syn 2.0.29", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4a42843c..8d3cc3c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "examples/rust", # 1.9.0+solana.1.16.1 "yellowstone-grpc-client", # 1.9.0+solana.1.16.1 "yellowstone-grpc-geyser", # 1.7.1+solana.1.16.1 + "yellowstone-grpc-kafka", # 1.0.0+solana.1.16.1 "yellowstone-grpc-proto", # 1.9.0+solana.1.16.1 ] diff --git a/yellowstone-grpc-kafka/Cargo.toml b/yellowstone-grpc-kafka/Cargo.toml new file mode 100644 index 00000000..8b6e1b0c --- /dev/null +++ b/yellowstone-grpc-kafka/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "yellowstone-grpc-kafka" +version = "1.0.0-rc.0+solana.1.16.1" +authors = ["Triton One"] +edition = "2021" +description = "Yellowstone gRPC Kafka Producer/Dedup/Consumer" +publish = false + +[dependencies] +anyhow = "1.0.62" +async-trait = "0.1.73" +atty = "0.2.14" +clap = { version = "4.3.0", features = ["cargo", "derive"] } +const-hex = "1.6.2" +futures = "0.3.24" +hyper = { version = "0.14.27", features = ["server"] } +lazy_static = "1.4.0" +prometheus = "0.13.2" +rdkafka = { version = "0.33.2", features = ["ssl", "sasl"] } +serde = { version = "1.0.145", features = ["derive"] } +serde_json = "1.0.86" +sha2 = "0.10.7" +tokio = { version = "1.21.2", features = ["rt-multi-thread", "macros", "time", "fs", "signal"] } +tokio-stream = "0.1.11" +tonic = { version = "0.9.2", features = ["gzip", "tls", "tls-roots"] } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } +yellowstone-grpc-proto = { path = "../yellowstone-grpc-proto" } + +[build-dependencies] +anyhow = "1.0.62" +cargo-lock = "9.0.0" +git-version = "0.3.5" +vergen = { version = "8.2.1", features = ["build", "rustc"] } diff --git a/yellowstone-grpc-kafka/build.rs b/yellowstone-grpc-kafka/build.rs new file mode 100644 index 00000000..92e1f4c7 --- /dev/null +++ b/yellowstone-grpc-kafka/build.rs @@ -0,0 +1,38 @@ +use {cargo_lock::Lockfile, std::collections::HashSet}; + +fn main() -> anyhow::Result<()> { + let mut envs = vergen::EmitBuilder::builder(); + envs.all_build().all_rustc(); + envs.emit()?; + + // vergen git version does not looks cool + println!( + "cargo:rustc-env=GIT_VERSION={}", + git_version::git_version!() + ); + + // Extract packages version + let lockfile = Lockfile::load("../Cargo.lock")?; + println!( + "cargo:rustc-env=SOLANA_SDK_VERSION={}", + get_pkg_version(&lockfile, "solana-sdk") + ); + println!( + "cargo:rustc-env=YELLOWSTONE_GRPC_PROTO_VERSION={}", + get_pkg_version(&lockfile, "yellowstone-grpc-proto") + ); + + Ok(()) +} + +fn get_pkg_version(lockfile: &Lockfile, pkg_name: &str) -> String { + lockfile + .packages + .iter() + .filter(|pkg| pkg.name.as_str() == pkg_name) + .map(|pkg| pkg.version.to_string()) + .collect::>() + .into_iter() + .collect::>() + .join(",") +} diff --git a/yellowstone-grpc-kafka/config.json b/yellowstone-grpc-kafka/config.json new file mode 100644 index 00000000..2de53aae --- /dev/null +++ b/yellowstone-grpc-kafka/config.json @@ -0,0 +1,44 @@ +{ + "kafka": { + "bootstrap.servers": "localhost:29092" + }, + "dedup": { + "kafka": { + "group.id": "dedup", + "group.instance.id": "dedup" + }, + "kafka_input": "grpc1", + "kafka_output": "grpc2", + "kafka_queue_size": 10000, + "backend": { + "type": "memory" + } + }, + "grpc2kafka": { + "endpoint": "http://127.0.0.1:10000", + "x_token": "", + "request": { + "slots": ["client"], + "blocks": { + "client": { + "account_include": [], + "include_transactions": false, + "include_accounts": false, + "include_entries": false + } + } + }, + "kafka": {}, + "kafka_topic": "grpc1", + "kafka_queue_size": 10000 + }, + "kafka2grpc": { + "kafka": { + "group.id": "kafka2grpc", + "group.instance.id": "kafka2grpc" + }, + "kafka_topic": "grpc2", + "listen": "127.0.0.1:10001", + "channel_capacity": 250000 + } +} diff --git a/yellowstone-grpc-kafka/docker-kafka.yml b/yellowstone-grpc-kafka/docker-kafka.yml new file mode 100644 index 00000000..cdcb8068 --- /dev/null +++ b/yellowstone-grpc-kafka/docker-kafka.yml @@ -0,0 +1,23 @@ +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs new file mode 100644 index 00000000..546d1d4d --- /dev/null +++ b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs @@ -0,0 +1,400 @@ +use { + anyhow::Context, + clap::{Parser, Subcommand}, + futures::{ + channel::mpsc, + future::{BoxFuture, FutureExt}, + sink::SinkExt, + stream::StreamExt, + }, + rdkafka::{ + config::ClientConfig, + consumer::{Consumer, StreamConsumer}, + message::Message, + producer::{FutureProducer, FutureRecord}, + }, + sha2::{Digest, Sha256}, + std::{net::SocketAddr, sync::Arc}, + tokio::{ + signal::unix::{signal, SignalKind}, + task::JoinSet, + }, + tonic::{ + codec::Streaming, + metadata::AsciiMetadataValue, + transport::{Channel, ClientTlsConfig}, + Request, Response, + }, + tracing::{debug, trace, warn}, + tracing_subscriber::{ + filter::{EnvFilter, LevelFilter}, + layer::SubscriberExt, + util::SubscriberInitExt, + }, + yellowstone_grpc_kafka::{ + config::{Config, ConfigDedup, ConfigGrpc2Kafka, ConfigKafka2Grpc, GrpcRequestToProto}, + dedup::KafkaDedup, + grpc::GrpcService, + prom, + }, + yellowstone_grpc_proto::{ + prelude::{geyser_client::GeyserClient, subscribe_update::UpdateOneof, SubscribeUpdate}, + prost::Message as _, + }, +}; + +#[derive(Debug, Clone, Parser)] +#[clap(author, version, about)] +struct Args { + /// Path to config file + #[clap(short, long)] + config: String, + + /// Prometheus listen address + #[clap(long)] + prometheus: Option, + + #[command(subcommand)] + action: ArgsAction, +} + +#[derive(Debug, Clone, Subcommand)] +enum ArgsAction { + // Receive data from Kafka, deduplicate and send them back to Kafka + Dedup, + /// Receive data from gRPC and send them to the Kafka + #[command(name = "grpc2kafka")] + Grpc2Kafka, + /// Receive data from Kafka and send them over gRPC + #[command(name = "kafka2grpc")] + Kafka2Grpc, +} + +impl ArgsAction { + async fn run( + self, + config: Config, + kafka_config: ClientConfig, + shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + match self { + ArgsAction::Dedup => { + let config = config.dedup.ok_or_else(|| { + anyhow::anyhow!("`dedup` section in config should be defined") + })?; + Self::dedup(kafka_config, config, shutdown).await + } + ArgsAction::Grpc2Kafka => { + let config = config.grpc2kafka.ok_or_else(|| { + anyhow::anyhow!("`grpc2kafka` section in config should be defined") + })?; + Self::grpc2kafka(kafka_config, config, shutdown).await + } + ArgsAction::Kafka2Grpc => { + let config = config.kafka2grpc.ok_or_else(|| { + anyhow::anyhow!("`kafka2grpc` section in config should be defined") + })?; + Self::kafka2grpc(kafka_config, config, shutdown).await + } + } + } + + async fn dedup( + mut kafka_config: ClientConfig, + config: ConfigDedup, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + for (key, value) in config.kafka.into_iter() { + kafka_config.set(key, value); + } + + // input + let consumer: StreamConsumer = kafka_config.create()?; + consumer.subscribe(&[&config.kafka_input])?; + + // output + let kafka: FutureProducer = kafka_config + .create() + .context("failed to create kafka producer")?; + + // dedup + let dedup = config.backend.create().await?; + + // input -> output loop + let kafka_output = Arc::new(config.kafka_output); + let mut send_tasks = JoinSet::new(); + loop { + let message = tokio::select! { + _ = &mut shutdown => break, + maybe_result = send_tasks.join_next() => match maybe_result { + Some(result) => { + result??; + continue; + } + None => tokio::select! { + _ = &mut shutdown => break, + message = consumer.recv() => message, + } + }, + message = consumer.recv() => message, + }?; + trace!( + "received message with key: {:?}", + message.key().and_then(|k| std::str::from_utf8(k).ok()) + ); + + let (key, payload) = match ( + message + .key() + .and_then(|k| String::from_utf8(k.to_vec()).ok()), + message.payload(), + ) { + (Some(key), Some(payload)) => (key, payload.to_vec()), + _ => continue, + }; + let (slot, hash, bytes) = match key + .split_once('_') + .and_then(|(slot, hash)| slot.parse::().ok().map(|slot| (slot, hash))) + .and_then(|(slot, hash)| { + let mut bytes: [u8; 32] = [0u8; 32]; + const_hex::decode_to_slice(hash, &mut bytes) + .ok() + .map(|()| (slot, hash, bytes)) + }) { + Some((slot, hash, bytes)) => (slot, hash, bytes), + _ => continue, + }; + debug!("received message slot #{slot} with hash {hash}"); + + let kafka = kafka.clone(); + let dedup = dedup.clone(); + let kafka_output = Arc::clone(&kafka_output); + send_tasks.spawn(async move { + if dedup.allowed(slot, bytes).await { + let record = FutureRecord::to(&kafka_output).key(&key).payload(&payload); + match kafka.send_result(record) { + Ok(future) => { + let result = future.await; + debug!("kafka send message with key: {key}, result: {result:?}"); + + result?.map_err(|(error, _message)| error)?; + Ok::<(), anyhow::Error>(()) + } + Err(error) => Err(error.0.into()), + } + } else { + Ok(()) + } + }); + if send_tasks.len() >= config.kafka_queue_size { + tokio::select! { + _ = &mut shutdown => break, + result = send_tasks.join_next() => { + if let Some(result) = result { + result??; + } + } + } + } + } + warn!("shutdown received..."); + while let Some(result) = send_tasks.join_next().await { + result??; + } + Ok(()) + } + + async fn grpc2kafka( + mut kafka_config: ClientConfig, + config: ConfigGrpc2Kafka, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + for (key, value) in config.kafka.into_iter() { + kafka_config.set(key, value); + } + + // Connect to kafka + let kafka: FutureProducer = kafka_config + .create() + .context("failed to create kafka producer")?; + + // Create gRPC client + let mut endpoint = Channel::from_shared(config.endpoint)?; + if endpoint.uri().scheme_str() == Some("https") { + endpoint = endpoint.tls_config(ClientTlsConfig::new())?; + } + let channel = endpoint.connect().await?; + let x_token: Option = match config.x_token { + Some(x_token) => Some(x_token.try_into()?), + None => None, + }; + let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| { + if let Some(x_token) = x_token.clone() { + req.metadata_mut().insert("x-token", x_token); + } + Ok(req) + }); + + // Subscribe on Geyser events + let (mut subscribe_tx, subscribe_rx) = mpsc::unbounded(); + subscribe_tx.send(config.request.to_proto()).await?; + let response: Response> = client.subscribe(subscribe_rx).await?; + let mut geyser = response.into_inner().boxed(); + + // Receive-send loop + let mut send_tasks = JoinSet::new(); + loop { + let message = tokio::select! { + _ = &mut shutdown => break, + maybe_result = send_tasks.join_next() => match maybe_result { + Some(result) => { + let _ = result??; + continue; + } + None => tokio::select! { + _ = &mut shutdown => break, + message = geyser.next() => message, + } + }, + message = geyser.next() => message, + } + .transpose()?; + + match message { + Some(message) => { + if matches!(message.update_oneof, Some(UpdateOneof::Ping(_))) { + continue; + } + + let slot = match &message.update_oneof { + Some(UpdateOneof::Account(msg)) => msg.slot, + Some(UpdateOneof::Slot(msg)) => msg.slot, + Some(UpdateOneof::Transaction(msg)) => msg.slot, + Some(UpdateOneof::Block(msg)) => msg.slot, + Some(UpdateOneof::Ping(_)) => unreachable!("Ping message not expected"), + Some(UpdateOneof::BlockMeta(msg)) => msg.slot, + Some(UpdateOneof::Entry(msg)) => msg.slot, + None => unreachable!("Expect valid message"), + }; + let payload = message.encode_to_vec(); + let hash = Sha256::digest(&payload); + let key = format!("{slot}_{}", const_hex::encode(hash)); + + let record = FutureRecord::to(&config.kafka_topic) + .key(&key) + .payload(&payload); + + match kafka.send_result(record) { + Ok(future) => { + let _ = send_tasks.spawn(async move { + let result = future.await; + debug!("kafka send message with key: {key}, result: {result:?}"); + + Ok::<(i32, i64), anyhow::Error>( + result?.map_err(|(error, _message)| error)?, + ) + }); + if send_tasks.len() >= config.kafka_queue_size { + tokio::select! { + _ = &mut shutdown => break, + result = send_tasks.join_next() => { + if let Some(result) = result { + result??; + } + } + } + } + } + Err(error) => return Err(error.0.into()), + } + } + None => break, + } + } + warn!("shutdown received..."); + while let Some(result) = send_tasks.join_next().await { + let _ = result??; + } + Ok(()) + } + + async fn kafka2grpc( + mut kafka_config: ClientConfig, + config: ConfigKafka2Grpc, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + for (key, value) in config.kafka.into_iter() { + kafka_config.set(key, value); + } + + let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?; + + let consumer: StreamConsumer = kafka_config.create()?; + consumer.subscribe(&[&config.kafka_topic])?; + + loop { + let message = tokio::select! { + _ = &mut shutdown => break, + message = consumer.recv() => message?, + }; + debug!( + "received message with key: {:?}", + message.key().and_then(|k| std::str::from_utf8(k).ok()) + ); + + if let Some(payload) = message.payload() { + match SubscribeUpdate::decode(payload) { + Ok(message) => { + let _ = grpc_tx.send(message); + } + Err(error) => { + warn!("failed to decode message: {error}"); + } + } + } + } + + warn!("shutdown received..."); + Ok(grpc_shutdown.await??) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Setup tracing + let is_atty = atty::is(atty::Stream::Stdout) && atty::is(atty::Stream::Stderr); + let io_layer = tracing_subscriber::fmt::layer().with_ansi(is_atty); + let level_layer = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + tracing_subscriber::registry() + .with(io_layer) + .with(level_layer) + .try_init()?; + + // Parse args + let args = Args::parse(); + let config = Config::load(&args.config).await?; + + // Run prometheus server + prom::run_server(args.prometheus)?; + + // Create kafka config + let mut kafka_config = ClientConfig::new(); + for (key, value) in config.kafka.iter() { + kafka_config.set(key, value); + } + + // Create shutdown signal + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + let shutdown = async move { + tokio::select! { + _ = sigint.recv() => {}, + _ = sigterm.recv() => {} + }; + } + .boxed(); + + args.action.run(config, kafka_config, shutdown).await +} diff --git a/yellowstone-grpc-kafka/src/config.rs b/yellowstone-grpc-kafka/src/config.rs new file mode 100644 index 00000000..4a2abcf3 --- /dev/null +++ b/yellowstone-grpc-kafka/src/config.rs @@ -0,0 +1,302 @@ +use { + crate::dedup::{KafkaDedup, KafkaDedupMemory}, + anyhow::Context, + serde::{Deserialize, Serialize}, + std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + path::Path, + }, + tokio::fs, + yellowstone_grpc_proto::prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + CommitmentLevel, SubscribeRequest, SubscribeRequestAccountsDataSlice, + SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, + SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, + SubscribeRequestFilterTransactions, + }, +}; + +pub trait GrpcRequestToProto { + fn to_proto(self) -> T; +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +pub struct Config { + pub kafka: HashMap, + pub dedup: Option, + pub grpc2kafka: Option, + pub kafka2grpc: Option, +} + +impl Config { + pub async fn load(path: impl AsRef) -> anyhow::Result { + let text = fs::read_to_string(path) + .await + .context("failed to read config from file")?; + + serde_json::from_str(&text).context("failed to parse config from file") + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigDedup { + #[serde(default)] + pub kafka: HashMap, + pub kafka_input: String, + pub kafka_output: String, + pub kafka_queue_size: usize, + pub backend: ConfigDedupBackend, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum ConfigDedupBackend { + Memory, +} + +impl ConfigDedupBackend { + pub async fn create(&self) -> anyhow::Result> { + Ok(match self { + Self::Memory => Box::::default(), + }) + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigGrpc2Kafka { + pub endpoint: String, + pub x_token: Option, + pub request: ConfigGrpc2KafkaRequest, + #[serde(default)] + pub kafka: HashMap, + pub kafka_topic: String, + pub kafka_queue_size: usize, +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpc2KafkaRequest { + pub slots: HashSet, + pub accounts: HashMap, + pub transactions: HashMap, + pub entries: HashSet, + pub blocks: HashMap, + pub blocks_meta: HashSet, + pub commitment: Option, + pub accounts_data_slice: Vec, +} + +impl ConfigGrpc2KafkaRequest { + fn map_to_proto(map: HashMap>) -> HashMap { + map.into_iter().map(|(k, v)| (k, v.to_proto())).collect() + } + + fn set_to_proto(set: HashSet) -> HashMap { + set.into_iter().map(|v| (v, T::default())).collect() + } + + fn vec_to_proto(vec: Vec>) -> Vec { + vec.into_iter().map(|v| v.to_proto()).collect() + } +} + +impl GrpcRequestToProto for ConfigGrpc2KafkaRequest { + fn to_proto(self) -> SubscribeRequest { + SubscribeRequest { + slots: ConfigGrpc2KafkaRequest::set_to_proto(self.slots), + accounts: ConfigGrpc2KafkaRequest::map_to_proto(self.accounts), + transactions: ConfigGrpc2KafkaRequest::map_to_proto(self.transactions), + entry: ConfigGrpc2KafkaRequest::set_to_proto(self.entries), + blocks: ConfigGrpc2KafkaRequest::map_to_proto(self.blocks), + blocks_meta: ConfigGrpc2KafkaRequest::set_to_proto(self.blocks_meta), + commitment: self.commitment.map(|v| v.to_proto() as i32), + accounts_data_slice: ConfigGrpc2KafkaRequest::vec_to_proto(self.accounts_data_slice), + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpc2KafkaRequestAccounts { + account: Vec, + owner: Vec, + filters: Vec, +} + +impl GrpcRequestToProto for ConfigGrpc2KafkaRequestAccounts { + fn to_proto(self) -> SubscribeRequestFilterAccounts { + SubscribeRequestFilterAccounts { + account: self.account, + owner: self.owner, + filters: self.filters.into_iter().map(|f| f.to_proto()).collect(), + } + } +} + +#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum ConfigGrpc2KafkaRequestAccountsFilter { + Memcmp { offset: u64, base58: String }, + DataSize(u64), + TokenAccountState, +} + +impl GrpcRequestToProto + for ConfigGrpc2KafkaRequestAccountsFilter +{ + fn to_proto(self) -> SubscribeRequestFilterAccountsFilter { + SubscribeRequestFilterAccountsFilter { + filter: Some(match self { + ConfigGrpc2KafkaRequestAccountsFilter::Memcmp { offset, base58 } => { + AccountsFilterDataOneof::Memcmp(SubscribeRequestFilterAccountsFilterMemcmp { + offset, + data: Some(AccountsFilterMemcmpOneof::Base58(base58)), + }) + } + ConfigGrpc2KafkaRequestAccountsFilter::DataSize(size) => { + AccountsFilterDataOneof::Datasize(size) + } + ConfigGrpc2KafkaRequestAccountsFilter::TokenAccountState => { + AccountsFilterDataOneof::TokenAccountState(true) + } + }), + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpc2KafkaRequestTransactions { + pub vote: Option, + pub failed: Option, + pub signature: Option, + pub account_include: Vec, + pub account_exclude: Vec, + pub account_required: Vec, +} + +impl GrpcRequestToProto + for ConfigGrpc2KafkaRequestTransactions +{ + fn to_proto(self) -> SubscribeRequestFilterTransactions { + SubscribeRequestFilterTransactions { + vote: self.vote, + failed: self.failed, + signature: self.signature, + account_include: self.account_include, + account_exclude: self.account_exclude, + account_required: self.account_required, + } + } +} + +#[derive(Debug, Default, Deserialize, Serialize)] +#[serde(default)] +pub struct ConfigGrpc2KafkaRequestBlocks { + pub account_include: Vec, + pub include_transactions: Option, + pub include_accounts: Option, + pub include_entries: Option, +} + +impl GrpcRequestToProto for ConfigGrpc2KafkaRequestBlocks { + fn to_proto(self) -> SubscribeRequestFilterBlocks { + SubscribeRequestFilterBlocks { + account_include: self.account_include, + include_transactions: self.include_transactions, + include_accounts: self.include_accounts, + include_entries: self.include_entries, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum ConfigGrpc2KafkaRequestCommitment { + Processed, + Confirmed, + Finalized, +} + +impl GrpcRequestToProto for ConfigGrpc2KafkaRequestCommitment { + fn to_proto(self) -> CommitmentLevel { + match self { + Self::Processed => CommitmentLevel::Processed, + Self::Confirmed => CommitmentLevel::Confirmed, + Self::Finalized => CommitmentLevel::Finalized, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ConfigGrpc2KafkaRequestAccountsDataSlice { + pub offset: u64, + pub length: u64, +} + +impl GrpcRequestToProto + for ConfigGrpc2KafkaRequestAccountsDataSlice +{ + fn to_proto(self) -> SubscribeRequestAccountsDataSlice { + SubscribeRequestAccountsDataSlice { + offset: self.offset, + length: self.length, + } + } +} + +#[derive(Debug, Deserialize)] +pub struct ConfigKafka2Grpc { + #[serde(default)] + pub kafka: HashMap, + pub kafka_topic: String, + pub listen: SocketAddr, + #[serde(default = "ConfigKafka2Grpc::channel_capacity_default")] + pub channel_capacity: usize, +} + +impl ConfigKafka2Grpc { + const fn channel_capacity_default() -> usize { + 250_000 + } +} + +#[cfg(test)] +mod tests { + use super::ConfigGrpc2KafkaRequestAccountsFilter; + + #[test] + fn grpc_config_accounts_filter_memcmp() { + let filter = ConfigGrpc2KafkaRequestAccountsFilter::Memcmp { + offset: 42, + base58: "123".to_owned(), + }; + let text = serde_json::to_string(&filter).unwrap(); + assert_eq!( + serde_json::from_str::(&text).unwrap(), + filter + ); + } + + #[test] + fn grpc_config_accounts_filter_datasize() { + let filter = ConfigGrpc2KafkaRequestAccountsFilter::DataSize(42); + let text = serde_json::to_string(&filter).unwrap(); + assert_eq!( + serde_json::from_str::(&text).unwrap(), + filter + ); + } + + #[test] + fn grpc_config_accounts_filter_token() { + let filter = ConfigGrpc2KafkaRequestAccountsFilter::TokenAccountState; + let text = serde_json::to_string(&filter).unwrap(); + assert_eq!( + serde_json::from_str::(&text).unwrap(), + filter + ); + } +} diff --git a/yellowstone-grpc-kafka/src/dedup.rs b/yellowstone-grpc-kafka/src/dedup.rs new file mode 100644 index 00000000..b4e78841 --- /dev/null +++ b/yellowstone-grpc-kafka/src/dedup.rs @@ -0,0 +1,48 @@ +use { + std::{ + collections::{btree_map::Entry, BTreeMap, HashSet}, + sync::Arc, + }, + tokio::sync::Mutex, +}; + +#[async_trait::async_trait] +pub trait KafkaDedup: Clone { + async fn allowed(&self, slot: u64, hash: [u8; 32]) -> bool; +} + +#[derive(Debug, Default, Clone)] +pub struct KafkaDedupMemory { + inner: Arc>>>, +} + +#[async_trait::async_trait] +impl KafkaDedup for KafkaDedupMemory { + async fn allowed(&self, slot: u64, hash: [u8; 32]) -> bool { + let mut map = self.inner.lock().await; + + if let Some(key_slot) = map.keys().next().cloned() { + if slot < key_slot { + return false; + } + } + + match map.entry(slot) { + Entry::Vacant(entry) => { + entry.insert(HashSet::new()).insert(hash); + + // remove old sets, keep ~30sec log + while let Some(key_slot) = map.keys().next().cloned() { + if key_slot < slot - 75 { + map.remove(&key_slot); + } else { + break; + } + } + + true + } + Entry::Occupied(entry) => entry.into_mut().insert(hash), + } + } +} diff --git a/yellowstone-grpc-kafka/src/grpc.rs b/yellowstone-grpc-kafka/src/grpc.rs new file mode 100644 index 00000000..c1a455ff --- /dev/null +++ b/yellowstone-grpc-kafka/src/grpc.rs @@ -0,0 +1,242 @@ +use { + crate::version::VERSION, + futures::future::{BoxFuture, FutureExt}, + std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }, + tokio::{ + sync::{broadcast, mpsc, Notify}, + task::JoinError, + time::{sleep, Duration}, + }, + tokio_stream::wrappers::ReceiverStream, + tonic::{ + codec::{CompressionEncoding, Streaming}, + transport::{ + server::{Server, TcpIncoming}, + Error as TransportError, + }, + Request, Response, Result as TonicResult, Status, + }, + tracing::{error, info}, + yellowstone_grpc_proto::prelude::{ + geyser_server::{Geyser, GeyserServer}, + subscribe_update::UpdateOneof, + GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, + GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, + GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, + PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdatePing, + }, +}; + +#[derive(Debug)] +pub struct GrpcService { + subscribe_id: AtomicUsize, + channel_capacity: usize, + broadcast_tx: broadcast::Sender, +} + +impl GrpcService { + #[allow(clippy::type_complexity)] + pub fn run( + listen: SocketAddr, + channel_capacity: usize, + ) -> anyhow::Result<( + broadcast::Sender, + BoxFuture<'static, Result, JoinError>>, + )> { + // Bind service address + let incoming = TcpIncoming::new( + listen, + true, // tcp_nodelay + Some(Duration::from_secs(20)), // tcp_keepalive + ) + .map_err(|error| anyhow::anyhow!(format!("{error:?}")))?; + + // Messages to clients combined by commitment + let (broadcast_tx, _) = broadcast::channel(channel_capacity); + + // Run Server + let service = GeyserServer::new(Self { + subscribe_id: AtomicUsize::new(0), + channel_capacity, + broadcast_tx: broadcast_tx.clone(), + }) + .accept_compressed(CompressionEncoding::Gzip) + .send_compressed(CompressionEncoding::Gzip); + + let shutdown = Arc::new(Notify::new()); + let shutdown_grpc = Arc::clone(&shutdown); + + let server = tokio::spawn(async move { + Server::builder() + .http2_keepalive_interval(Some(Duration::from_secs(5))) + .add_service(service) + .serve_with_incoming_shutdown(incoming, shutdown_grpc.notified()) + .await + }); + let shutdown = async move { + shutdown.notify_one(); + server.await + } + .boxed(); + + Ok((broadcast_tx, shutdown)) + } +} + +#[tonic::async_trait] +impl Geyser for GrpcService { + type SubscribeStream = ReceiverStream>; + + async fn subscribe( + &self, + mut request: Request>, + ) -> TonicResult> { + let id = self.subscribe_id.fetch_add(1, Ordering::SeqCst); + let (stream_tx, stream_rx) = mpsc::channel(self.channel_capacity); + let notify_client = Arc::new(Notify::new()); + let notify_exit1 = Arc::new(Notify::new()); + let notify_exit2 = Arc::new(Notify::new()); + + let ping_stream_tx = stream_tx.clone(); + let ping_client = Arc::clone(¬ify_client); + let ping_exit = Arc::clone(¬ify_exit1); + tokio::spawn(async move { + let exit = ping_exit.notified(); + tokio::pin!(exit); + + let ping_msg = SubscribeUpdate { + filters: vec![], + update_oneof: Some(UpdateOneof::Ping(SubscribeUpdatePing {})), + }; + + loop { + tokio::select! { + _ = &mut exit => break, + _ = sleep(Duration::from_secs(10)) => { + match ping_stream_tx.try_send(Ok(ping_msg.clone())) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => {} + Err(mpsc::error::TrySendError::Closed(_)) => { + ping_client.notify_one(); + break; + } + } + } + } + } + }); + + let incoming_client = Arc::clone(¬ify_client); + let incoming_exit = Arc::clone(¬ify_exit2); + tokio::spawn(async move { + let exit = incoming_exit.notified(); + tokio::pin!(exit); + + loop { + tokio::select! { + _ = &mut exit => break, + message = request.get_mut().message() => match message { + Ok(Some(_request)) => {} + Ok(None) => break, + Err(_error) => { + let _ = incoming_client.notify_one(); + break; + } + } + } + } + }); + + let mut messages_rx = self.broadcast_tx.subscribe(); + tokio::spawn(async move { + info!("client #{id}: new"); + loop { + tokio::select! { + _ = notify_client.notified() => break, + message = messages_rx.recv() => { + match message { + Ok(message) => { + match stream_tx.try_send(Ok(message)) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + error!("client #{id}: lagged to send update"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("lagged"))).await; + }); + break; + } + Err(mpsc::error::TrySendError::Closed(_)) => { + error!("client #{id}: stream closed"); + break; + } + } + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => { + info!("client #{id}: lagged to receive geyser messages"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("lagged"))).await; + }); + break; + } + } + } + } + } + info!("client #{id}: removed"); + notify_exit1.notify_one(); + notify_exit2.notify_one(); + }); + + Ok(Response::new(ReceiverStream::new(stream_rx))) + } + + async fn ping(&self, request: Request) -> Result, Status> { + let count = request.get_ref().count; + let response = PongResponse { count }; + Ok(Response::new(response)) + } + + async fn get_latest_blockhash( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in kafka reader")) + } + + async fn get_block_height( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in kafka reader")) + } + + async fn get_slot( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in kafka reader")) + } + + async fn is_blockhash_valid( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("not implemented in kafka reader")) + } + + async fn get_version( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(GetVersionResponse { + version: serde_json::to_string(&VERSION).unwrap(), + })) + } +} diff --git a/yellowstone-grpc-kafka/src/lib.rs b/yellowstone-grpc-kafka/src/lib.rs new file mode 100644 index 00000000..fd288ea1 --- /dev/null +++ b/yellowstone-grpc-kafka/src/lib.rs @@ -0,0 +1,9 @@ +#![deny(clippy::clone_on_ref_ptr)] +#![deny(clippy::missing_const_for_fn)] +#![deny(clippy::trivially_copy_pass_by_ref)] + +pub mod config; +pub mod dedup; +pub mod grpc; +pub mod prom; +pub mod version; diff --git a/yellowstone-grpc-kafka/src/prom.rs b/yellowstone-grpc-kafka/src/prom.rs new file mode 100644 index 00000000..5b0513e8 --- /dev/null +++ b/yellowstone-grpc-kafka/src/prom.rs @@ -0,0 +1,84 @@ +use { + crate::version::VERSION as VERSION_INFO, + hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, StatusCode, + }, + prometheus::{IntCounterVec, Opts, Registry, TextEncoder}, + std::{net::SocketAddr, sync::Once}, + tracing::{error, info}, +}; + +lazy_static::lazy_static! { + pub static ref REGISTRY: Registry = Registry::new(); + + static ref VERSION: IntCounterVec = IntCounterVec::new( + Opts::new("version", "Plugin version info"), + &["buildts", "git", "package", "proto", "rustc", "solana", "version"] + ).unwrap(); +} + +pub fn run_server(address: Option) -> anyhow::Result<()> { + static REGISTER: Once = Once::new(); + REGISTER.call_once(|| { + macro_rules! register { + ($collector:ident) => { + REGISTRY + .register(Box::new($collector.clone())) + .expect("collector can't be registered"); + }; + } + register!(VERSION); + + VERSION + .with_label_values(&[ + VERSION_INFO.buildts, + VERSION_INFO.git, + VERSION_INFO.package, + VERSION_INFO.proto, + VERSION_INFO.rustc, + VERSION_INFO.solana, + VERSION_INFO.version, + ]) + .inc(); + }); + + if let Some(address) = address { + let make_service = make_service_fn(move |_: &AddrStream| async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { + let response = match req.uri().path() { + "/metrics" => metrics_handler(), + _ => not_found_handler(), + }; + Ok::<_, hyper::Error>(response) + })) + }); + let server = Server::try_bind(&address)?.serve(make_service); + info!("prometheus server started: {address:?}"); + tokio::spawn(async move { + if let Err(error) = server.await { + error!("prometheus server failed: {error:?}"); + } + }); + } + + Ok(()) +} + +fn metrics_handler() -> Response { + let metrics = TextEncoder::new() + .encode_to_string(®ISTRY.gather()) + .unwrap_or_else(|error| { + error!("could not encode custom metrics: {}", error); + String::new() + }); + Response::builder().body(Body::from(metrics)).unwrap() +} + +fn not_found_handler() -> Response { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()) + .unwrap() +} diff --git a/yellowstone-grpc-kafka/src/version.rs b/yellowstone-grpc-kafka/src/version.rs new file mode 100644 index 00000000..b9da6284 --- /dev/null +++ b/yellowstone-grpc-kafka/src/version.rs @@ -0,0 +1,22 @@ +use {serde::Serialize, std::env}; + +#[derive(Debug, Serialize)] +pub struct Version { + pub package: &'static str, + pub version: &'static str, + pub proto: &'static str, + pub solana: &'static str, + pub git: &'static str, + pub rustc: &'static str, + pub buildts: &'static str, +} + +pub const VERSION: Version = Version { + package: env!("CARGO_PKG_NAME"), + version: env!("CARGO_PKG_VERSION"), + proto: env!("YELLOWSTONE_GRPC_PROTO_VERSION"), + solana: env!("SOLANA_SDK_VERSION"), + git: env!("GIT_VERSION"), + rustc: env!("VERGEN_RUSTC_SEMVER"), + buildts: env!("VERGEN_BUILD_TIMESTAMP"), +};