diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 021eb2e7..df122c19 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -5,6 +5,9 @@ concurrency: cancel-in-progress: true on: + pull_request: + tags: + - 'rc*' push: branches: - 'master' @@ -38,13 +41,11 @@ jobs: echo "GEYSER_PLUGIN_NAME=$plugin_name" | tee -a $GITHUB_ENV echo "GEYSER_PLUGIN_LIB=lib${plugin_lib_name}" | tee -a $GITHUB_ENV - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ env.RUST_STABLE }} - override: true - profile: minimal - - uses: actions/cache@v3 + - uses: actions/cache@v4 with: path: | ~/.cargo/bin/ @@ -74,13 +75,18 @@ jobs: - name: Build release tarball run: ./ci/create-tarball.sh + - name: Rename binaries for ubuntu22 release + run: | + mv target/release/client target/release/client-${{ matrix.os }} + mv target/release/config-check target/release/config-check-${{ matrix.os }} + mv target/release/grpc-google-pubsub target/release/grpc-google-pubsub-${{ matrix.os }} + mv target/release/grpc-kafka target/release/grpc-kafka-${{ matrix.os }} + mv target/release/grpc-scylladb target/release/grpc-scylladb-${{ matrix.os }} + + - name: Rename binaries for ubuntu22 release if: matrix.os == 'ubuntu-22.04' run: | - mv target/release/client target/release/client-22 - mv target/release/config-check target/release/config-check-22 - mv target/release/grpc-google-pubsub target/release/grpc-google-pubsub-22 - mv target/release/grpc-kafka target/release/grpc-kafka-22 mv ${{ env.GEYSER_PLUGIN_NAME }}-release-x86_64-unknown-linux-gnu.tar.bz2 ${{ env.GEYSER_PLUGIN_NAME }}-release22-x86_64-unknown-linux-gnu.tar.bz2 mv ${{ env.GEYSER_PLUGIN_NAME }}-release-x86_64-unknown-linux-gnu.yml ${{ env.GEYSER_PLUGIN_NAME }}-release22-x86_64-unknown-linux-gnu.yml @@ -90,11 +96,16 @@ jobs: target/release/client.d \ target/release/config-check.d \ target/release/grpc-google-pubsub.d \ - target/release/grpc-kafka.d + target/release/grpc-kafka.d \ + target/release/grpc-scylladb.d + + - name: test new vars + run: | + echo ${{ github.ref_type }} - name: Release - if: startsWith(github.ref, 'refs/tags/') - uses: softprops/action-gh-release@v1 + if: github.ref_type == 'tag' + uses: softprops/action-gh-release@master with: tag_name: ${{ env.BUILD_NAME }} body: | @@ -108,3 +119,30 @@ jobs: target/release/config-check* target/release/grpc-google-pubsub* target/release/grpc-kafka* + target/release/grpc-scylladb* + + - uses: actions/upload-artifact@v4 + if: matrix.os == 'ubuntu-22.04' + with: + name: yellowstone-grpc-${{ github.sha }}-22 + path: | + ${{ env.GEYSER_PLUGIN_NAME }}-release* + yellowstone-grpc-proto/proto/*.proto + target/release/client-$(( matrix.os }} + target/release/config-check-$(( matrix.os }} + target/release/grpc-google-pubsub-$(( matrix.os }} + target/release/grpc-kafka-$(( matrix.os }} + target/release/grpc-scylladb-$(( matrix.os }} + + - uses: actions/upload-artifact@v4 + if: matrix.os == 'ubuntu-20.04' + with: + name: yellowstone-grpc-${{ github.sha }}-20 + path: | + ${{ env.GEYSER_PLUGIN_NAME }}-release* + yellowstone-grpc-proto/proto/*.proto + target/release/client-$(( matrix.os }} + target/release/config-check-$(( matrix.os }} + target/release/grpc-google-pubsub-$(( matrix.os }} + target/release/grpc-kafka-$(( matrix.os }} + target/release/grpc-scylladb-$(( matrix.os }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bb1b67c3..9723dab9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,21 +36,17 @@ jobs: echo "GEYSER_PLUGIN_NAME=$plugin_name" | tee -a $GITHUB_ENV echo "GEYSER_PLUGIN_LIB=lib${plugin_lib_name}" | tee -a $GITHUB_ENV - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@master with: toolchain: nightly - override: true - profile: minimal components: rustfmt - - uses: actions-rs/toolchain@v1 + - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ env.RUST_STABLE }} - override: true - profile: minimal components: clippy, rustfmt - - uses: actions/cache@v3 + - uses: actions/cache@v4 with: path: | ~/.cargo/bin/ diff --git a/Cargo.lock b/Cargo.lock index 7a5ce9be..1a08c685 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,6 +179,12 @@ version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "ark-bn254" version = "0.4.0" @@ -837,7 +843,7 @@ version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.52", @@ -1068,6 +1074,19 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deranged" version = "0.3.11" @@ -1075,6 +1094,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1627,6 +1647,15 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.1" @@ -1654,6 +1683,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "histogram" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" + [[package]] name = "hmac" version = "0.8.1" @@ -1870,6 +1905,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -1880,6 +1916,7 @@ checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" dependencies = [ "equivalent", "hashbrown 0.14.3", + "serde", ] [[package]] @@ -2097,6 +2134,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lz4_flex" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +dependencies = [ + "twox-hash", +] + [[package]] name = "maplit" version = "1.0.2" @@ -2832,7 +2878,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.11.0", "log", "multimap", @@ -3006,6 +3052,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_pcg" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59cad018caf63deb318e5a4586d99a24424a364f40f1e5778c29aca23f4fc73e" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "rand_xorshift" version = "0.3.0" @@ -3361,6 +3416,68 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "scylla" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d2db76aa23f55d2ece5354e1a3778633098a3d1ea76153f494d71e92cd02d8" +dependencies = [ + "arc-swap", + "async-trait", + "byteorder", + "bytes", + "chrono", + "dashmap", + "futures", + "histogram", + "itertools 0.11.0", + "lz4_flex", + "num_enum 0.6.1", + "rand 0.8.5", + "rand_pcg", + "scylla-cql", + "scylla-macros", + "smallvec", + "snap", + "socket2", + "strum", + "strum_macros", + "thiserror", + "tokio", + "tracing", + "uuid", +] + +[[package]] +name = "scylla-cql" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "345626c0dd5d9624c413daaba854685bba6a65cff4eb5ea0fb0366df16901f67" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "lz4_flex", + "num_enum 0.6.1", + "scylla-macros", + "snap", + "thiserror", + "tokio", + "uuid", +] + +[[package]] +name = "scylla-macros" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb6085ff9c3fd7e5163826901d39164ab86f11bdca16b2f766a00c528ff9cef9" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -3461,7 +3578,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe" dependencies = [ "serde", - "serde_with_macros", + "serde_with_macros 2.3.3", +] + +[[package]] +name = "serde_with" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" +dependencies = [ + "base64 0.21.7", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.2.5", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros 3.7.0", + "time", ] [[package]] @@ -3476,6 +3611,18 @@ dependencies = [ "syn 2.0.52", ] +[[package]] +name = "serde_with_macros" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "serde_yaml" version = "0.9.32" @@ -3612,6 +3759,12 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "snap" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" + [[package]] name = "socket2" version = "0.5.6" @@ -3870,7 +4023,7 @@ dependencies = [ "serde_bytes", "serde_derive", "serde_json", - "serde_with", + "serde_with 2.3.3", "sha2 0.10.8", "sha3 0.10.8", "siphasher", @@ -4247,12 +4400,37 @@ dependencies = [ "spl-program-error", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" + +[[package]] +name = "strum_macros" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" +dependencies = [ + "heck 0.3.3", + "proc-macro2", + "quote", + "rustversion", + "syn 1.0.109", +] + [[package]] name = "subtle" version = "2.4.1" @@ -4779,6 +4957,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4818,6 +5006,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + [[package]] name = "universal-hash" version = "0.4.1" @@ -4888,6 +5082,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "getrandom 0.2.12", +] + [[package]] name = "valuable" version = "0.1.0" @@ -5342,8 +5545,10 @@ dependencies = [ "lazy_static", "prometheus", "rdkafka", + "scylla", "serde", "serde_json", + "serde_with 3.7.0", "serde_yaml", "sha2 0.10.8", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 5280e247..95dc423a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,8 @@ tracing-subscriber = "0.3.17" vergen = "8.2.1" yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.9" } yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.9" } +scylla = "0.12.0" +serde_with = "3.7.0" [profile.release] debug = true diff --git a/ci/cargo-install-all.sh b/ci/cargo-install-all.sh index 1742141c..9f35ede9 100755 --- a/ci/cargo-install-all.sh +++ b/ci/cargo-install-all.sh @@ -69,7 +69,7 @@ mkdir -p "$installDir/lib" ( set -x # shellcheck disable=SC2086 # Don't want to double quote $rust_version - cargo $maybeRustVersion build $maybeReleaseFlag + cargo $maybeRustVersion build $maybeReleaseFlag --features scylladb ) cp -fv "target/$buildVariant/${GEYSER_PLUGIN_LIB}.$libExt" "$installDir"/lib/ diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 1fb6e601..60370ec5 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -18,6 +18,10 @@ required-features = ["google-pubsub"] name = "grpc-kafka" required-features = ["kafka"] +[[bin]] +name = "grpc-scylladb" +required-features = ["scylladb"] + [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } @@ -35,7 +39,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } sha2 = { workspace = true, optional = true } -tokio = { workspace = true, features = ["signal"] } +tokio = { workspace = true, features = ["signal", "time"] } tokio-stream = { workspace = true } tonic = { workspace = true, features = ["gzip"] } tonic-health = { workspace = true } @@ -43,6 +47,11 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } +scylla = { workspace = true, optional = true } +serde_with = { workspace = true, optional = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } [target.'cfg(not(all(target_os = "macos", target_arch = "aarch64")))'.dependencies] rdkafka = { workspace = true, features = ["sasl", "ssl"], optional = true } @@ -60,3 +69,4 @@ vergen = { workspace = true, features = ["build", "rustc"] } default = ["google-pubsub", "kafka"] google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"] kafka = ["const-hex", "rdkafka", "sha2"] +scylladb = ["scylla", "serde_with"] diff --git a/yellowstone-grpc-tools/config-scylladb.json b/yellowstone-grpc-tools/config-scylladb.json new file mode 100644 index 00000000..bd6be724 --- /dev/null +++ b/yellowstone-grpc-tools/config-scylladb.json @@ -0,0 +1,23 @@ +{ + "prometheus": "127.0.0.1:8873", + "scylladb": { + "hostname": "localhost:9042", + "username": "cassandra", + "password": "cassandra" + }, + "grpc2scylladb": { + "endpoint": "https://index.rpcpool.com:10000", + "x_token": "", + "request": { + "accounts": { + "my_filter": { + "owner": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"] + } + } + }, + "batch_size_limit": 1000, + "linger": 10, + "keyspace": "solana", + "max_inflight_batch_delivery": 80 + } +} diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql new file mode 100644 index 00000000..24674d43 --- /dev/null +++ b/yellowstone-grpc-tools/solana.cql @@ -0,0 +1,85 @@ +-- message SubscribeUpdateAccount { +-- SubscribeUpdateAccountInfo account = 1; +-- uint64 slot = 2; +-- bool is_startup = 3; +-- } + +-- message SubscribeUpdateAccountInfo { +-- bytes pubkey = 1; +-- uint64 lamports = 2; +-- bytes owner = 3; +-- bool executable = 4; +-- uint64 rent_epoch = 5; +-- bytes data = 6; +-- uint64 write_version = 7; +-- optional bytes txn_signature = 8; +-- } + + +-- ## transaction + +-- message SubscribeUpdateTransaction { +-- SubscribeUpdateTransactionInfo transaction = 1; +-- uint64 slot = 2; +-- } + +-- message Transaction { +-- repeated bytes signatures = 1; +-- Message message = 2; +-- } + +-- message Message { +-- MessageHeader header = 1; +-- repeated bytes account_keys = 2; +-- bytes recent_blockhash = 3; +-- repeated CompiledInstruction instructions = 4; +-- bool versioned = 5; +-- repeated MessageAddressTableLookup address_table_lookups = 6; +-- } + +-- message MessageHeader { +-- uint32 num_required_signatures = 1; +-- uint32 num_readonly_signed_accounts = 2; +-- uint32 num_readonly_unsigned_accounts = 3; +-- } + +-- message MessageAddressTableLookup { +-- bytes account_key = 1; +-- bytes writable_indexes = 2; +-- bytes readonly_indexes = 3; +-- } + +-- message SubscribeUpdateTransactionInfo { +-- bytes signature = 1; +-- bool is_vote = 2; +-- solana.storage.ConfirmedBlock.Transaction transaction = 3; +-- solana.storage.ConfirmedBlock.TransactionStatusMeta meta = 4; +-- uint64 index = 5; +-- } + +CREATE KEYSPACE solana WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true; + +create table if not exists solana.account_update_log ( + slot bigint, + pubkey blob, + lamports bigint, + owner blob, + executable boolean, + rent_epoch bigint, + write_version bigint, + data blob, + txn_signature blob, + primary key ((slot, pubkey), write_version) +) WITH CLUSTERING ORDER BY (write_version DESC); + +-- create table if not exists solana.transaction_log ( +-- signatures list +-- pubkey blob, +-- lamports bigint, +-- owner blob, +-- executable boolean, +-- rent_epoch bigint, +-- uint64 write_version = 7, +-- data blobk, +-- txn_signature blob +-- ); \ No newline at end of file diff --git a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs new file mode 100644 index 00000000..2efbcc0d --- /dev/null +++ b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs @@ -0,0 +1,156 @@ +use { + anyhow::Ok, + clap::{Parser, Subcommand}, + futures::{future::BoxFuture, stream::StreamExt}, + std::{net::SocketAddr, time::Duration}, + tracing::{debug, info, trace, warn}, + yellowstone_grpc_client::GeyserGrpcClient, + yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, + yellowstone_grpc_tools::{ + config::{load as config_load, GrpcRequestToProto}, + create_shutdown, + prom::run_server as prometheus_run_server, + scylladb::{ + config::{Config, ConfigGrpc2ScyllaDB, ScyllaDbConnectionInfo}, + sink::ScyllaSink, + }, + setup_tracing, + }, +}; + +// 512MB +const MAX_DECODING_MESSAGE_SIZE_BYTES: usize = 512000000; + +#[derive(Debug, Clone, Parser)] +#[clap(author, version, about = "Yellowstone gRPC ScyllaDB Tool")] +struct Args { + /// Path to config file + #[clap(short, long)] + config: String, + + /// Prometheus listen address + #[clap(long)] + prometheus: Option, + + #[command(subcommand)] + action: ArgsAction, +} + +#[derive(Debug, Clone, Subcommand)] +enum ArgsAction { + /// Receive data from gRPC and send them to the Kafka + #[command(name = "grpc2scylla")] + Grpc2Scylla, + /// Receive data from Kafka and send them over gRPC + #[command(name = "scylla2grpc")] + Scylla2Grpc, +} + +impl ArgsAction { + async fn run(self, config: Config) -> anyhow::Result<()> { + let shutdown = create_shutdown()?; + match self { + ArgsAction::Grpc2Scylla => { + let config2 = config.grpc2scylladb.ok_or_else(|| { + anyhow::anyhow!("`grpc2scylladb` section in config should be defined") + })?; + Self::grpc2scylladb(config2, config.scylladb, shutdown).await + } + ArgsAction::Scylla2Grpc => { + Ok(()) + // let config = config.kafka2grpc.ok_or_else(|| { + // anyhow::anyhow!("`kafka2grpc` section in config should be defined") + // })?; + // Self::kafka2grpc(kafka_config, config, shutdown).await + } + } + } + + async fn grpc2scylladb( + config: ConfigGrpc2ScyllaDB, + scylladb_conn_config: ScyllaDbConnectionInfo, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + let sink_config = config.get_scylladb_sink_config(); + info!("sink configuration {:?}", sink_config); + + // Create gRPC client & subscribe + let mut client = GeyserGrpcClient::build_from_shared(config.endpoint)? + .x_token(config.x_token)? + .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE_BYTES) + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(5)) + .connect() + .await?; + + let mut geyser = client.subscribe_once(config.request.to_proto()).await?; + info!("Grpc subscription is successful ."); + + let mut sink = ScyllaSink::new( + sink_config, + scylladb_conn_config.hostname, + scylladb_conn_config.username, + scylladb_conn_config.password, + ) + .await; + + info!("ScyllaSink is ready."); + // Receive-send loop + loop { + let message = tokio::select! { + _ = &mut shutdown => break, + message = geyser.next() => message, + } + .transpose()?; + + match message { + Some(message) => { + let message = match message.update_oneof { + Some(value) => value, + None => unreachable!("Expect valid message"), + }; + + match message { + UpdateOneof::Account(msg) => { + let pubkey_opt: &Option<&[u8]> = + &msg.account.as_ref().map(|acc| acc.pubkey.as_ref()); + trace!( + "Received an account update slot={:?}, pubkey={:?}", + msg.slot, + pubkey_opt + ); + let acc_update = msg.try_into(); + if acc_update.is_err() { + // Drop the message if invalid + continue; + } + // If the sink is close, let it crash... + sink.log_account_update(acc_update.unwrap()).await.unwrap(); + } + _ => continue, + }; + } + _ => (), + } + } + Ok(()) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + setup_tracing()?; + + // Parse args + let args = Args::parse(); + let config = config_load::(&args.config).await?; + + // Run prometheus server + if let Some(address) = args.prometheus.or(config.prometheus) { + prometheus_run_server(address)?; + } + + args.action.run(config).await.unwrap(); + + Ok(()) +} diff --git a/yellowstone-grpc-tools/src/lib.rs b/yellowstone-grpc-tools/src/lib.rs index 8dc40a08..dbc0e95f 100644 --- a/yellowstone-grpc-tools/src/lib.rs +++ b/yellowstone-grpc-tools/src/lib.rs @@ -8,6 +8,8 @@ pub mod google_pubsub; #[cfg(feature = "kafka")] pub mod kafka; pub mod prom; +#[cfg(feature = "scylla")] +pub mod scylladb; pub mod version; use { diff --git a/yellowstone-grpc-tools/src/prom.rs b/yellowstone-grpc-tools/src/prom.rs index 36d316cf..2e27422c 100644 --- a/yellowstone-grpc-tools/src/prom.rs +++ b/yellowstone-grpc-tools/src/prom.rs @@ -6,6 +6,11 @@ use crate::google_pubsub::prom::{ }; #[cfg(feature = "kafka")] use crate::kafka::prom::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS}; +#[cfg(feature = "scylla")] +use crate::scylladb::prom::{ + SCYLLADB_BATCHITEM_DELIVERED, SCYLLADB_BATCH_DELIVERED, SCYLLADB_BATCH_QUEUE, + SCYLLADB_BATCH_REQUEST_LAG, SCYLLADB_BATCH_SIZE, SCYLLADB_PEAK_BATCH_LINGER_SECONDS, +}; use { crate::version::VERSION as VERSION_INFO, hyper::{ @@ -38,6 +43,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { .expect("collector can't be registered"); }; } + register!(VERSION); #[cfg(feature = "google-pubsub")] { @@ -56,6 +62,16 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(KAFKA_SENT_TOTAL); } + #[cfg(feature = "scylla")] + { + register!(SCYLLADB_PEAK_BATCH_LINGER_SECONDS); + register!(SCYLLADB_BATCH_DELIVERED); + register!(SCYLLADB_BATCHITEM_DELIVERED); + register!(SCYLLADB_BATCH_SIZE); + register!(SCYLLADB_BATCH_QUEUE); + register!(SCYLLADB_BATCH_REQUEST_LAG); + } + VERSION .with_label_values(&[ VERSION_INFO.buildts, diff --git a/yellowstone-grpc-tools/src/scylladb/config.rs b/yellowstone-grpc-tools/src/scylladb/config.rs new file mode 100644 index 00000000..6ce2456a --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/config.rs @@ -0,0 +1,85 @@ +use { + super::sink::ScyllaSinkConfig, + crate::config::ConfigGrpcRequest, + serde::Deserialize, + serde_with::{serde_as, DurationMilliSeconds}, + std::{net::SocketAddr, time::Duration}, +}; + +const fn default_batch_size_limit() -> usize { + 10 +} + +const fn default_linger() -> Duration { + Duration::from_millis(10) +} + +fn default_scylla_username() -> String { + "cassandra".into() +} + +fn default_scylla_password() -> String { + "cassandra".into() +} + +fn default_keyspace() -> String { + "default".into() +} + +const fn default_max_inflight_batch_delivery() -> usize { + 100 +} + +fn default_hostname() -> String { + String::from("127.0.0.1:9144") +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +pub struct Config { + pub prometheus: Option, + pub scylladb: ScyllaDbConnectionInfo, + pub grpc2scylladb: Option, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(default)] +pub struct ScyllaDbConnectionInfo { + #[serde(default = "default_hostname")] + pub hostname: String, + #[serde(default = "default_scylla_username")] + pub username: String, + #[serde(default = "default_scylla_password")] + pub password: String, +} + +#[serde_as] +#[derive(Debug, Deserialize)] +pub struct ConfigGrpc2ScyllaDB { + pub endpoint: String, + pub x_token: Option, + pub request: ConfigGrpcRequest, + #[serde(default = "default_batch_size_limit")] + pub batch_size_limit: usize, + + #[serde(default = "default_linger")] + #[serde_as(as = "DurationMilliSeconds")] + pub linger: Duration, + + #[serde(default = "default_keyspace")] + pub keyspace: String, + + #[serde(default = "default_max_inflight_batch_delivery")] + pub max_inflight_batch_delivery: usize, +} + +impl ConfigGrpc2ScyllaDB { + pub fn get_scylladb_sink_config(&self) -> ScyllaSinkConfig { + ScyllaSinkConfig { + batch_size_limit: self.batch_size_limit, + linger: self.linger, + keyspace: self.keyspace.clone(), + max_inflight_batch_delivery: self.max_inflight_batch_delivery, + } + } +} diff --git a/yellowstone-grpc-tools/src/scylladb/mod.rs b/yellowstone-grpc-tools/src/scylladb/mod.rs new file mode 100644 index 00000000..536709bd --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/mod.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod prom; +pub mod sink; +pub mod types; diff --git a/yellowstone-grpc-tools/src/scylladb/prom.rs b/yellowstone-grpc-tools/src/scylladb/prom.rs new file mode 100644 index 00000000..08677768 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/prom.rs @@ -0,0 +1,64 @@ +use { + prometheus::{Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Opts}, + scylla::batch, + std::time::Duration, +}; + +lazy_static::lazy_static! { + pub(crate) static ref SCYLLADB_BATCH_DELIVERED: IntCounter = IntCounter::new( + "scylladb_batch_sent_total", "Total number of batch delivered" + ).unwrap(); + + pub(crate) static ref SCYLLADB_BATCH_SIZE: Histogram = Histogram::with_opts( + HistogramOpts::new("scylladb_batch_size", "The batch size sent to Scylladb"), + ).unwrap(); + + pub(crate) static ref SCYLLADB_BATCH_REQUEST_LAG: IntGauge = IntGauge::new( + "scylladb_batch_request_lag", "The amount of batch request not being handle by a batching task" + ).unwrap(); + + pub(crate) static ref SCYLLADB_BATCHITEM_DELIVERED: IntCounter = IntCounter::new( + "scylladb_batchitem_sent_total", "Total number of batch items delivered" + ).unwrap(); + + pub(crate) static ref SCYLLADB_PEAK_BATCH_LINGER_SECONDS: Histogram = Histogram::with_opts( + HistogramOpts::new("scylladb_peak_batch_linger_seconds", "The actual batch linger of the next batch to sent"), + ).unwrap(); + + pub(crate) static ref SCYLLADB_BATCH_QUEUE: IntGauge = IntGauge::new( + "scylladb_batch_queue_size", "The amount of batch concurrently being linger." + ).unwrap(); + +} + +pub fn scylladb_batch_sent_inc() { + SCYLLADB_BATCH_DELIVERED.inc() +} + +pub fn scylladb_batchitem_sent_inc_by(amount: u64) { + SCYLLADB_BATCHITEM_DELIVERED.inc_by(amount) +} + +pub fn scylladb_batch_size_observe(batch_size: usize) { + SCYLLADB_BATCH_SIZE.observe(batch_size as f64) +} + +pub fn scylladb_peak_batch_linger_observe(batch_linger: Duration) { + SCYLLADB_PEAK_BATCH_LINGER_SECONDS.observe(batch_linger.as_secs_f64()) +} + +pub fn scylladb_batch_queue_inc() { + SCYLLADB_BATCH_QUEUE.inc() +} + +pub fn scylladb_batch_queue_dec() { + SCYLLADB_BATCH_QUEUE.dec() +} + +pub fn scylladb_batch_request_lag_inc() { + SCYLLADB_BATCH_REQUEST_LAG.inc() +} + +pub fn scylladb_batch_request_lag_sub(amount: i64) { + SCYLLADB_BATCH_REQUEST_LAG.sub(amount) +} diff --git a/yellowstone-grpc-tools/src/scylladb/sink.rs b/yellowstone-grpc-tools/src/scylladb/sink.rs new file mode 100644 index 00000000..3ebcc19c --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/sink.rs @@ -0,0 +1,808 @@ +use { + super::prom::{ + scylladb_batch_queue_dec, scylladb_batch_queue_inc, scylladb_batch_request_lag_inc, + scylladb_batch_request_lag_sub, scylladb_batch_sent_inc, scylladb_batch_size_observe, + scylladb_batchitem_sent_inc_by, scylladb_peak_batch_linger_observe, + }, + crate::scylladb::types::AccountUpdate, + futures::future::pending, + scylla::{ + batch::{Batch, BatchStatement}, + frame::response::result::ColumnType, + prepared_statement::PreparedStatement, + routing::Token, + serialize::{ + batch::{BatchValues, BatchValuesIterator}, + row::{RowSerializationContext, SerializedValues}, + }, + transport::errors::QueryError, + Session, SessionBuilder, + }, + std::{ + cmp::Reverse, + collections::{BinaryHeap, HashMap}, + sync::Arc, + time::Duration, + }, + tokio::{ + sync::mpsc::{UnboundedReceiver, UnboundedSender}, + task::{JoinError, JoinHandle, JoinSet}, + time::{self, Instant}, + }, + tonic::async_trait, + tracing::{debug, info, trace, warn}, +}; + +const SCYLLADB_ACCOUNT_UPDATE_LOG_TABLE_NAME: &str = "account_update_log"; + +const SCYLLADB_INSERT_ACCOUNT_UPDATE: &str = r###" + INSERT INTO account_update_log (slot, pubkey, lamports, owner, executable, rent_epoch, write_version, data, txn_signature) + VALUES (?,?,?,?,?,?,?,?,?) +"###; + +#[derive(Clone, PartialEq, Debug)] +pub struct ScyllaSinkConfig { + pub batch_size_limit: usize, + pub linger: Duration, + pub keyspace: String, + pub max_inflight_batch_delivery: usize, +} + +impl Default for ScyllaSinkConfig { + fn default() -> Self { + Self { + batch_size_limit: 3000, + linger: Duration::from_millis(10), + keyspace: String::from("solana"), + max_inflight_batch_delivery: 100, + } + } +} + +enum BatchItem { + // Add other action if necessary... + Account(AccountUpdate), +} + +pub struct BatchRequest { + stmt: BatchStatement, + item: BatchItem, +} + +impl Into for AccountUpdate { + fn into(self) -> SerializedValues { + let mut row = SerializedValues::new(); + row.add_value(&self.slot, &ColumnType::BigInt).unwrap(); + row.add_value(&self.pubkey, &ColumnType::Blob).unwrap(); + row.add_value(&self.lamports, &ColumnType::BigInt).unwrap(); + row.add_value(&self.owner, &ColumnType::Blob).unwrap(); + row.add_value(&self.executable, &ColumnType::Boolean) + .unwrap(); + row.add_value(&self.rent_epoch, &ColumnType::BigInt) + .unwrap(); + row.add_value(&self.write_version, &ColumnType::BigInt) + .unwrap(); + row.add_value(&self.data, &ColumnType::Blob).unwrap(); + row.add_value(&self.txn_signature, &ColumnType::Blob) + .unwrap(); + row + } +} + +impl BatchItem { + fn get_partition_key(&self) -> SerializedValues { + match self { + BatchItem::Account(acc_update) => { + let slot = acc_update.slot; + let pubkey = acc_update.pubkey; + let mut pk_ser = SerializedValues::new(); + pk_ser.add_value(&slot, &ColumnType::BigInt).unwrap(); + pk_ser.add_value(&pubkey, &ColumnType::Blob).unwrap(); + pk_ser + } + } + } + + fn resolve_token(&self, tt: &impl TokenTopology) -> Token { + let pk = self.get_partition_key(); + + let table = match self { + BatchItem::Account(_) => SCYLLADB_ACCOUNT_UPDATE_LOG_TABLE_NAME, + }; + tt.compute_token(table, &pk) + } + + fn serialize(self) -> SerializedValues { + match self { + BatchItem::Account(acc_update) => acc_update.into(), + } + } +} + +#[async_trait] +pub trait ScyllaBatcher { + async fn batch(&self, br: BatchRequest); +} + +type NodeUuid = u128; + +/// This batcher is aware of the current partitioning scheme of scylla and build different +/// batch for different endpoint +pub struct TokenAwareBatcher { + config: ScyllaSinkConfig, + batch_sender: B, + token_topology: TT, + inner: UnboundedReceiver, + batch_map: HashMap)>, + batch_schedule: BinaryHeap>, + inflight_deliveries: JoinSet>, + batch_last_consumed_map: HashMap, +} + +struct PreSerializedBatchValuesIterator<'a> { + inner: &'a [SerializedValues], + i: usize, +} + +struct PreSerializedBatchValues(Vec); + +impl BatchValues for PreSerializedBatchValues { + type BatchValuesIter<'r> = PreSerializedBatchValuesIterator<'r>; + + fn batch_values_iter(&self) -> Self::BatchValuesIter<'_> { + PreSerializedBatchValuesIterator { + inner: &self.0, + i: 0, + } + } +} + +impl<'bv> BatchValuesIterator<'bv> for PreSerializedBatchValuesIterator<'bv> { + fn serialize_next( + &mut self, + _ctx: &RowSerializationContext<'_>, + writer: &mut scylla::serialize::RowWriter, + ) -> Option> { + writer.append_serialize_row(self.inner.get(self.i).unwrap()); + self.i += 1; + Some(Ok(())) + } + + fn is_empty_next(&mut self) -> Option { + self.inner.get(self.i).map(|_| true) + } + + fn skip_next(&mut self) -> Option<()> { + self.inner.get(self.i).map(|_| { + self.i += 1; + }) + } +} + +pub trait TokenTopology { + fn get_node_uuid_for_token(&self, token: Token) -> NodeUuid; + + fn is_node_uuid_exists(&self, node_uuid: NodeUuid) -> bool; + + fn compute_token(&self, table: &str, serialized_values: &SerializedValues) -> Token; +} + +#[derive(Clone)] +struct LiveTokenTopology(Arc); + +impl TokenTopology for LiveTokenTopology { + fn get_node_uuid_for_token(&self, token: Token) -> NodeUuid { + self.0 + .get_cluster_data() + .replica_locator() + .ring() + .get_elem_for_token(token) + .map(|node| node.host_id.as_u128()) + .unwrap() // If it is None it means we have no more -> we need to crash asap! + } + + fn is_node_uuid_exists(&self, node_uuid: NodeUuid) -> bool { + self.0 + .get_cluster_data() + .get_nodes_info() + .iter() + .any(|node| node.host_id.as_u128() == node_uuid) + } + + fn compute_token(&self, table: &str, partition_key: &SerializedValues) -> Token { + let current_keysapce = self.0.get_keyspace().unwrap(); + self.0 + .get_cluster_data() + .compute_token(¤t_keysapce, table, partition_key) + .unwrap() + } +} + +#[async_trait] +pub trait BatchSender: Send + Sync + Clone { + async fn send_batch( + self, + batch: Batch, + serialized_rows: Vec, + ) -> Result<(), QueryError>; +} + +#[derive(Clone)] +struct LiveBatchSender(Arc); + +// Session is Send so is LiveBatchSender +unsafe impl Send for LiveBatchSender {} + +unsafe impl Sync for LiveBatchSender {} + +#[async_trait] +impl BatchSender for LiveBatchSender { + async fn send_batch( + self, + batch: Batch, + serialized_rows: Vec, + ) -> Result<(), QueryError> { + let before = Instant::now(); + scylladb_batch_size_observe(batch.statements.len()); + + let result = self + .0 + .batch(&batch, PreSerializedBatchValues(serialized_rows)) + .await + .map(|_| ()); + + let after = Instant::now(); + info!( + "Batch sent: size={:?}, latency={:?}", + batch.statements.len(), + after - before + ); + if result.is_ok() { + scylladb_batch_sent_inc(); + scylladb_batchitem_sent_inc_by(batch.statements.len() as u64); + scylladb_batch_request_lag_sub(batch.statements.len() as i64) + } + result + } +} + +#[derive(Debug, PartialEq)] +enum TickError { + DeliveryError, + Timeout, +} + +struct BatcherHandle { + inner: JoinHandle<()>, + sender: UnboundedSender, +} + +impl BatcherHandle { + fn send( + &self, + br: BatchRequest, + ) -> Result<(), tokio::sync::mpsc::error::SendError> { + let res = self.sender.send(br); + if res.is_ok() { + scylladb_batch_request_lag_inc(); + } + res + } + + fn abort(&self) { + self.inner.abort() + } + + async fn join(self) -> Result<(), JoinError> { + self.inner.await + } +} + +impl TokenAwareBatcher { + /// Tick allow the Batcher to "step foward" into its state. + /// At each tick, it concurrently handles one of: batch request, batch delivery or throttling. + async fn tick(&mut self, now: Instant) -> Result<(), TickError> { + let tick_timeout = now + self.config.linger; + let deadline: Instant = self + .batch_schedule + .peek() + .map(|rev| rev.0 .0) + .unwrap_or(tick_timeout); + tokio::select! { + _ = time::sleep_until(deadline), if !self.need_throttling() => { + let old_value = self.consume_next_in_schedule(now); + if let Some((batch, ser_values)) = old_value { + if !batch.statements.is_empty() { + let bs = self.batch_sender.clone(); + let fut = bs.send_batch(batch, ser_values); + self.inflight_deliveries.spawn(fut); + } + } + }, + // recv is cancel safe, so no data will be lost if the other branch finish first or if branch pre-condition is false + Some(BatchRequest { stmt, item }) = self.inner.recv(), if !self.need_throttling() && deadline > now => { + let token = item.resolve_token(&self.token_topology); + let (node_uuid, current_batch_size) = { + let (node_uuid, (batch, ser_values)) = self.get_batch_for_token(token); + let serialized_row = item.serialize(); + batch.append_statement(stmt); + ser_values.push(serialized_row); + (node_uuid, ser_values.len()) + }; + + if current_batch_size >= self.config.batch_size_limit { + self.batch_schedule.push(Reverse((now, node_uuid))); + } + }, + Some(Err(_join_err)) = self.inflight_deliveries.join_next() => return Err(TickError::DeliveryError), + + _ = time::sleep_until(tick_timeout) => return Err(TickError::Timeout) + } + Ok(()) + } + + fn need_throttling(&self) -> bool { + let res = self.inflight_deliveries.len() >= self.config.max_inflight_batch_delivery; + res + } + + fn get_batch_for_token( + &mut self, + token: Token, + ) -> (NodeUuid, &mut (Batch, Vec)) { + let node_uuid = self.token_topology.get_node_uuid_for_token(token); + if !self.batch_map.contains_key(&node_uuid) { + self.refresh_schedule_for_node(Instant::now(), node_uuid); + scylladb_batch_queue_inc(); + } + (node_uuid, self.batch_map.entry(node_uuid).or_default()) + } + + fn refresh_schedule_for_node(&mut self, from: Instant, node_uuid: NodeUuid) { + let next_instant = from.checked_add(self.config.linger).unwrap(); + self.batch_schedule.push(Reverse((next_instant, node_uuid))); + } + + fn consume_next_in_schedule( + &mut self, + consumed_at: Instant, + ) -> Option<(Batch, Vec)> { + if let Some(Reverse((instant, node_uuid))) = self.batch_schedule.pop() { + // This block of code is here to avoid double sending the same batch within the same linger period + if let Some(last_consumed_instant) = self.batch_last_consumed_map.get(&node_uuid) { + let time_delta = consumed_at.duration_since(*last_consumed_instant); + if time_delta < self.config.linger { + let additional_wait_time = self.config.linger - time_delta; + // Reschedule + self.batch_schedule + .push(Reverse((instant + additional_wait_time, node_uuid))); + return None; + } + } + + scylladb_peak_batch_linger_observe(instant.elapsed()); + self.batch_last_consumed_map.insert(node_uuid, consumed_at); + let ret = self.batch_map.remove(&node_uuid); + if ret.is_some() { + scylladb_batch_queue_dec(); + } + ret + } else { + None + } + } + + fn new( + config: ScyllaSinkConfig, + batch_sender: B2, + token_topology: TT2, + ) -> (TokenAwareBatcher, UnboundedSender) { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let batcher = TokenAwareBatcher { + config, + batch_sender, + token_topology, + inner: receiver, + batch_map: HashMap::default(), + batch_schedule: BinaryHeap::default(), + inflight_deliveries: JoinSet::new(), + batch_last_consumed_map: HashMap::default(), + }; + (batcher, sender) + } + + /// + /// Runs a TokenAwareBatcher in background task and returns a channel to send batch request and the underlying + /// background task handle. + fn spawn( + config: ScyllaSinkConfig, + batch_sender: B2, + token_topology: TT2, + ) -> BatcherHandle { + let (batcher, sender) = + TokenAwareBatcher::::new(config, batch_sender, token_topology); + + // Background process handling the batcher + let h = tokio::spawn(async move { + let mut my_batcher = batcher; + loop { + let now = Instant::now(); + if let Err(TickError::DeliveryError) = my_batcher.tick(now).await { + break; + } + } + }); + BatcherHandle { inner: h, sender } + } +} + +pub struct ScyllaSink { + insert_account_update_ps: PreparedStatement, + batcher_handle: BatcherHandle, +} + +#[derive(Debug)] +pub enum ScyllaSinkError { + SinkClose, +} + +impl ScyllaSink { + pub async fn new( + config: ScyllaSinkConfig, + hostname: impl AsRef, + username: impl Into, + password: impl Into, + ) -> Self { + let session: Session = SessionBuilder::new() + .known_node(hostname) + .user(username, password) + .use_keyspace(config.keyspace.clone(), false) + .build() + .await + .unwrap(); + + let session = Arc::new(session); + let insert_account_update_ps = session + .prepare(SCYLLADB_INSERT_ACCOUNT_UPDATE) + .await + .unwrap(); + + let batch_sender = LiveBatchSender(Arc::clone(&session)); + let token_topology = LiveTokenTopology(Arc::clone(&session)); + let batcher_handle = TokenAwareBatcher::::spawn( + config.clone(), + batch_sender, + token_topology, + ); + + ScyllaSink { + insert_account_update_ps, + batcher_handle, + } + } + + pub async fn log_account_update( + &mut self, + update: AccountUpdate, + ) -> Result<(), ScyllaSinkError> { + let br = BatchRequest { + stmt: BatchStatement::PreparedStatement(self.insert_account_update_ps.clone()), + item: BatchItem::Account(update), + }; + self.batcher_handle + .send(br) + .map_err(|_e| ScyllaSinkError::SinkClose) + } +} + +mod tests { + use { + super::*, + scylla::query::Query, + std::{cell::RefCell, iter::repeat}, + tokio::sync::mpsc::error::TryRecvError, + }; + + type BatchSenderCallback = UnboundedReceiver<(Batch, Vec)>; + + #[derive(Clone)] + struct NullBatchSender { + callback: UnboundedSender<(Batch, Vec)>, + } + + #[derive(Clone)] + struct StuckedBatchSender; + unsafe impl Sync for StuckedBatchSender {} + unsafe impl Send for StuckedBatchSender {} + + #[async_trait] + impl BatchSender for StuckedBatchSender { + async fn send_batch( + self, + batch: Batch, + serialized_rows: Vec, + ) -> Result<(), QueryError> { + pending().await + } + } + + #[derive(Clone)] + struct ConstTokenTopology { + node_uuid: RefCell, + token: RefCell, + is_node_exists: RefCell, + } + + impl Default for ConstTokenTopology { + fn default() -> Self { + Self { + node_uuid: Default::default(), + token: RefCell::new(Token { + value: Default::default(), + }), + is_node_exists: RefCell::new(true), + } + } + } + unsafe impl Sync for NullBatchSender {} + unsafe impl Send for NullBatchSender {} + + #[async_trait] + impl BatchSender for NullBatchSender { + async fn send_batch( + self, + batch: Batch, + serialized_rows: Vec, + ) -> Result<(), QueryError> { + self.callback.send((batch, serialized_rows)).unwrap(); + Ok(()) + } + } + + impl TokenTopology for ConstTokenTopology { + fn get_node_uuid_for_token(&self, token: Token) -> NodeUuid { + self.node_uuid.borrow().clone() + } + + fn is_node_uuid_exists(&self, node_uuid: NodeUuid) -> bool { + self.is_node_exists.borrow().clone() + } + + fn compute_token(&self, table: &str, serialized_values: &SerializedValues) -> Token { + self.token.borrow().clone() + } + } + + fn stucked_batcher( + config: ScyllaSinkConfig, + ) -> ( + TokenAwareBatcher, + UnboundedSender, + ) { + let (batcher, sender) = TokenAwareBatcher::::new( + config, + StuckedBatchSender {}, + ConstTokenTopology::default(), + ); + (batcher, sender) + } + + fn manual_test_batcher( + config: ScyllaSinkConfig, + ) -> ( + TokenAwareBatcher, + UnboundedSender, + BatchSenderCallback, + ) { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let (batcher, sender2) = TokenAwareBatcher::::new( + config, + NullBatchSender { callback: sender }, + ConstTokenTopology::default(), + ); + (batcher, sender2, receiver) + } + + fn spawn_test_batcher(config: ScyllaSinkConfig) -> (BatcherHandle, BatchSenderCallback) { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + ( + TokenAwareBatcher::::spawn( + config, + NullBatchSender { callback: sender }, + ConstTokenTopology::default(), + ), + receiver, + ) + } + + #[tokio::test(start_paused = true)] + async fn it_should_send_one_batch() { + let (bh, mut cb) = spawn_test_batcher(Default::default()); + let _ = bh + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + + let res = cb.recv().await; + assert!(res.is_some()); + } + + #[tokio::test] + async fn it_should_batch_related_item_within_the_lingering_limit() { + let (mut batcher, sender, mut cb) = manual_test_batcher(Default::default()); + + let now = Instant::now(); + + // 1st item to batch + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + + batcher.tick(now).await.unwrap(); + + // 2nd item to batch + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + + batcher.tick(now).await.unwrap(); + + // It should send the batch since we reach the lingering limit + let now = now + batcher.config.linger; + batcher.tick(now).await.unwrap(); + + let res = cb.recv().await.unwrap(); + + assert_eq!(res.0.statements.len(), 2); + assert_eq!(res.1.len(), 2); + } + + #[tokio::test] + async fn it_should_throttle_if_we_reach_the_maximum_inflight() { + let config = ScyllaSinkConfig { + batch_size_limit: 1, + linger: Duration::from_millis(10), + keyspace: String::from("solana"), + // We put a very limited amount of inflight batch delivery + max_inflight_batch_delivery: 1, + }; + let (mut batcher, sender) = stucked_batcher(config.clone()); + + let now = Instant::now(); + + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + + // 1st tick: start the batch + batcher.tick(now).await.unwrap(); + + let now = now + config.linger; + + // 2nd tick: send it to the BatchSender + batcher.tick(now).await.unwrap(); + + // 3rd tick: should cause tick timeout + + let now = now + config.linger; + let tick_result = batcher.tick(now).await; + + assert_eq!(tick_result, Err(TickError::Timeout)); + + // Any subsequent send, should still not be batched, since we are stucked and we have a maximum of a one inflight delivery + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + let now = now + config.linger; + let tick_result2 = batcher.tick(now).await; + assert_eq!(tick_result2, Err(TickError::Timeout)); + } + + #[tokio::test] + async fn it_should_handle_orphan_batch_when_token_topology_changes() { + let (mut batcher, sender, mut cb) = manual_test_batcher(Default::default()); + + let now = Instant::now(); + + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + + // 1st tick = create a new batch for a a given node_uuid + batcher.tick(now).await.unwrap(); + + // This will create a topology change where a specific token range is now assigned to another node. + // This should create an orphan batch in the tokenbatcher + batcher.token_topology.node_uuid.replace(100); + + // 2nd item to batch + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + + // 2nd tick = create a new batch since we changed the topology + batcher.tick(now).await.unwrap(); + + let now = now + batcher.config.linger; + // consume 1st orphan batch + batcher.tick(now).await.unwrap(); + let now = now + batcher.config.linger; + // consume 2nd batch + batcher.tick(now).await.unwrap(); + + let res1 = cb.recv().await.unwrap(); + let res2 = cb.recv().await.unwrap(); + assert_eq!(res1.0.statements.len(), 1); + assert_eq!(res1.1.len(), 1); + assert_eq!(res2.0.statements.len(), 1); + assert_eq!(res2.1.len(), 1); + } + + #[tokio::test] + async fn it_should_send_batch_when_batch_limit_size_is_reached() { + let sink_config = ScyllaSinkConfig { + batch_size_limit: 10, + linger: Duration::from_millis(10), + keyspace: String::from("default"), + max_inflight_batch_delivery: 10, + }; + + let (mut batcher, sender, mut cb) = manual_test_batcher(sink_config.clone()); + + let mut now = Instant::now(); + + for _ in repeat(()).take(19) { + let _ = sender + .send(BatchRequest { + stmt: BatchStatement::Query(Query::new(SCYLLADB_INSERT_ACCOUNT_UPDATE)), + item: BatchItem::Account(AccountUpdate::zero_account()), + }) + .unwrap(); + } + + // Batch 10 requests (size limit) + for _ in repeat(()).take(10) { + batcher.tick(now).await.unwrap(); + } + now = now + Duration::from_millis(1); + // Should trigger batch send + batcher.tick(now).await.unwrap(); + + let res1 = cb.recv().await.unwrap(); + assert_eq!(res1.0.statements.len(), 10); + assert_eq!(res1.1.len(), 10); + + for _ in repeat(()).take(9) { + batcher.tick(now).await.unwrap(); + } + // Should trigger batch send + now = now + sink_config.linger; + batcher.tick(now).await.unwrap(); + let res2 = cb.recv().await.unwrap(); + assert_eq!(res2.0.statements.len(), 9); + assert_eq!(res2.1.len(), 9); + + let try_recv_result = cb.try_recv(); + assert_eq!(try_recv_result.err().unwrap(), TryRecvError::Empty); + } +} diff --git a/yellowstone-grpc-tools/src/scylladb/types.rs b/yellowstone-grpc-tools/src/scylladb/types.rs new file mode 100644 index 00000000..848e0a72 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/types.rs @@ -0,0 +1,105 @@ +use {std::iter::repeat, yellowstone_grpc_proto::geyser::SubscribeUpdateAccount}; + +type Pubkey = [u8; 32]; + +pub struct AccountUpdate { + pub slot: i64, + pub pubkey: Pubkey, + pub lamports: i64, + pub owner: Pubkey, + pub executable: bool, + pub rent_epoch: i64, + pub write_version: i64, + pub data: Vec, + pub txn_signature: Option>, +} + +impl From + for ( + i64, + Pubkey, + i64, + Pubkey, + bool, + i64, + i64, + Vec, + Option>, + ) +{ + fn from(acc: AccountUpdate) -> Self { + ( + acc.slot, + acc.pubkey, + acc.lamports, + acc.owner, + acc.executable, + acc.rent_epoch, + acc.write_version, + acc.data, + acc.txn_signature, + ) + } +} + +impl AccountUpdate { + #[allow(clippy::type_complexity)] + pub fn as_row( + self, + ) -> ( + i64, + Pubkey, + i64, + Pubkey, + bool, + i64, + i64, + Vec, + Option>, + ) { + self.into() + } + + pub fn zero_account() -> Self { + let bytes_vec: Vec = repeat(0).take(32).collect(); + let bytes_arr: [u8; 32] = bytes_vec.try_into().unwrap(); + AccountUpdate { + slot: 0, + pubkey: bytes_arr, + lamports: 0, + owner: bytes_arr, + executable: false, + rent_epoch: 0, + write_version: 0, + data: vec![], + txn_signature: None, + } + } +} + +impl TryFrom for AccountUpdate { + type Error = (); + fn try_from(value: SubscribeUpdateAccount) -> Result { + let slot = value.slot; + if value.account.is_none() { + Err(()) + } else { + let acc: yellowstone_grpc_proto::prelude::SubscribeUpdateAccountInfo = + value.account.unwrap(); + let pubkey: Pubkey = acc.pubkey.try_into().map_err(|_| ())?; + let owner: Pubkey = acc.owner.try_into().map_err(|_| ())?; + let ret = AccountUpdate { + slot: slot as i64, + pubkey, + lamports: acc.lamports as i64, + owner, + executable: acc.executable, + rent_epoch: acc.rent_epoch as i64, + write_version: acc.write_version as i64, + data: acc.data, + txn_signature: acc.txn_signature, + }; + Ok(ret) + } + } +}