diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3f3d3bca..2f1ce2ab 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -85,7 +85,7 @@ jobs: - name: check features in `proto` run: cargo check -p yellowstone-grpc-proto --all-targets --tests - name: check features in `tools` - run: cargo check -p yellowstone-grpc-tools --all-targets --tests --all-features + run: cargo check -p yellowstone-grpc-tools --all-targets --tests - name: Build run: ./ci/cargo-build-test.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 3474bdb1..b0681387 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,19 +14,9 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features -### Breaking - -## 2024-05-15 - -- yellowstone-grpc-client-1.15.0+solana.1.18.13 -- yellowstone-grpc-geyser-1.14.2+solana.1.18.13 -- yellowstone-grpc-proto-1.14.0+solana.1.18.13 -- yellowstone-grpc-tools-1.0.0-rc.11+solana.1.18.13 - -### Features - - geyser: use runtime instaed of unconstrained ([#332](https://github.com/rpcpool/yellowstone-grpc/pull/332)) -- solana: update to 1.18.13 ([#339](https://github.com/rpcpool/yellowstone-grpc/pull/339)) + +### Breaking ## 2024-04-30 diff --git a/Cargo.lock b/Cargo.lock index c1e240ee..c56626fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3840,9 +3840,9 @@ dependencies = [ [[package]] name = "solana-account-decoder" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f31c3dc9c7ebfaff452f063b406bbf64d326d71120996f4d3fdeee7ae7f1b6e" +checksum = "142161f13c328e7807fe98fb8f6eaaa5045a8eaf4492414aa81254870c4fc8a0" dependencies = [ "Inflector", "base64 0.21.7", @@ -3865,9 +3865,9 @@ dependencies = [ [[package]] name = "solana-config-program" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d12f4c7ca44f55afb012dfadd21a352cb818a225f4e6d7fe3db5c3fcb1e28ca1" +checksum = "970d28779e92a11e32a89ee453edc7d89394d3a68d8c4b75ef0ffb833944c588" dependencies = [ "bincode", "chrono", @@ -3879,9 +3879,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9843fe4a4e4d541bd056465257704d8d53b50ed59328dcb5f37821ae0f843676" +checksum = "35a0b24cc4d0ebd5fd45d6bd47bed3790f8a75ade67af8ff24a3d719a8bc93bc" dependencies = [ "block-buffer 0.10.4", "bs58", @@ -3904,9 +3904,9 @@ dependencies = [ [[package]] name = "solana-frozen-abi-macro" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f24edb8172842544ace0ccb9547353cc55fe4a6d3b2786e209939d3a8bf271d" +checksum = "51600f4066d3663ab2981fd24e77a8c2e65f5d20ea71b550b853ca9ae40eee7f" dependencies = [ "proc-macro2", "quote", @@ -3916,9 +3916,9 @@ dependencies = [ [[package]] name = "solana-geyser-plugin-interface" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bac9c1d761318b992ea6514d2e32853e285af07e6158879dc299500f7fee9033" +checksum = "ff310104f06b0be5fc637a0085f35435e09e0a8ef058a9d1b9ccdbb24d89f1dd" dependencies = [ "log", "solana-sdk", @@ -3928,9 +3928,9 @@ dependencies = [ [[package]] name = "solana-logger" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a9c97300d5fd98fd490819186debfda9d47b1a5c82b5ffdb76e2ea6bad055c4" +checksum = "dd79ef26804612173c95be8da84df3128d648173cf1f746de8f183ec8dbedd92" dependencies = [ "env_logger 0.9.3", "lazy_static", @@ -3939,9 +3939,9 @@ dependencies = [ [[package]] name = "solana-measure" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9bf69dbc3d69406b67d3d263c8a5aa0d8501051d75aa842f47502652060596d" +checksum = "300f716a5f1c2f4b562fb008a0cc7d7c0d889cff802a7f8177fdf28772ae1ed9" dependencies = [ "log", "solana-sdk", @@ -3949,9 +3949,9 @@ dependencies = [ [[package]] name = "solana-metrics" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35a2112662341adaf1b8fbd4a8d819bc24ae5d1d59655e0561161c5c816894b9" +checksum = "abf1705d52e4f123856725e1b3842cd4928b954ff62391a95af142a5adc58ac6" dependencies = [ "crossbeam-channel", "gethostname", @@ -3964,9 +3964,9 @@ dependencies = [ [[package]] name = "solana-program" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9de9a1634b9d30ca0e5c2d53806c030a5d9c07dfcc4505ebeb218206514d17b8" +checksum = "2a5513a02d622ba89e76baf4b49d25ae20c2c2c623fced12b0d6dd7b8f23e006" dependencies = [ "ark-bn254", "ark-ec", @@ -4019,9 +4019,9 @@ dependencies = [ [[package]] name = "solana-program-runtime" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "078fbc30339aff91d84ef5fc49ad75818419fedc543da22617d2f36a93d56bff" +checksum = "64dc9f666a8e4f93166ce58eea9dfbf275e5cad461b2f1bbfa06538718dc3212" dependencies = [ "base64 0.21.7", "bincode", @@ -4047,9 +4047,9 @@ dependencies = [ [[package]] name = "solana-sdk" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "323d21f0cb307e28ccfbcb3a24a5ae230abc8176bfb82492df6773deb79b62de" +checksum = "8f50cac89269a01235f6b421bc580132191f4df388f4265513e78fd00cf864dd" dependencies = [ "assert_matches", "base64 0.21.7", @@ -4102,9 +4102,9 @@ dependencies = [ [[package]] name = "solana-sdk-macro" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff6d088aff04f5ad17f6f4a1a84a7a6aef633d48e8ed6c12154fcbb5dfde07bd" +checksum = "5cb099b2f9c0a65a6f23ced791325141cd68c27b04d11c04fef838a00f613861" dependencies = [ "bs58", "proc-macro2", @@ -4121,9 +4121,9 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" [[package]] name = "solana-transaction-status" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d08bc13fa4f5ddf945253ac957b8b924c6181f9a80283e47e9922c07e73a845c" +checksum = "3efa0d30f78dbc74e795638b053dd6ec7230739301e7f0e06b586f7731fd25c8" dependencies = [ "Inflector", "base64 0.21.7", @@ -4146,9 +4146,9 @@ dependencies = [ [[package]] name = "solana-zk-token-sdk" -version = "1.18.13" +version = "1.18.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7ea6cfb74066a35ea9ad53b1108bb26f35752569bcfb3d9203f58a7bf57fac5" +checksum = "630dc0b5f6250cf6a4c8b2bd3895283738915e83eba5453db20bb02b2527f302" dependencies = [ "aes-gcm-siv", "base64 0.21.7", @@ -5482,7 +5482,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client" -version = "1.15.0+solana.1.18.13" +version = "1.15.0+solana.1.18.12" dependencies = [ "bytes", "futures", @@ -5495,7 +5495,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-client-simple" -version = "1.13.0+solana.1.18.13" +version = "1.13.0+solana.1.18.12" dependencies = [ "anyhow", "backoff", @@ -5518,7 +5518,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-geyser" -version = "1.14.2+solana.1.18.13" +version = "1.14.1+solana.1.18.12" dependencies = [ "anyhow", "base64 0.21.7", @@ -5551,7 +5551,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-proto" -version = "1.14.0+solana.1.18.13" +version = "1.14.0+solana.1.18.12" dependencies = [ "anyhow", "bincode", @@ -5566,7 +5566,7 @@ dependencies = [ [[package]] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.11+solana.1.18.13" +version = "1.0.0-rc.11+solana.1.18.12" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 8cb25d79..fdec63de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [workspace] resolver = "2" members = [ - "examples/rust", # 1.13.0+solana.1.18.13 - "yellowstone-grpc-client", # 1.15.0+solana.1.18.13 - "yellowstone-grpc-geyser", # 1.14.2+solana.1.18.13 - "yellowstone-grpc-proto", # 1.14.0+solana.1.18.13 - "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.13 + "examples/rust", # 1.13.0+solana.1.18.12 + "yellowstone-grpc-client", # 1.15.0+solana.1.18.12 + "yellowstone-grpc-geyser", # 1.14.1+solana.1.18.12 + "yellowstone-grpc-proto", # 1.14.0+solana.1.18.12 + "yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.12 ] [workspace.package] @@ -55,11 +55,11 @@ serde_json = "1.0.86" serde_with = "3.7.0" serde_yaml = "0.9.25" sha2 = "0.10.7" -solana-account-decoder = "=1.18.13" -solana-geyser-plugin-interface = "=1.18.13" -solana-logger = "=1.18.13" -solana-sdk = "=1.18.13" -solana-transaction-status = "=1.18.13" +solana-account-decoder = "=1.18.12" +solana-geyser-plugin-interface = "=1.18.12" +solana-logger = "=1.18.12" +solana-sdk = "=1.18.12" +solana-transaction-status = "=1.18.12" spl-token-2022 = "0.9.0" thiserror = "1.0" tokio = "1.21.2" @@ -71,8 +71,8 @@ tracing = "0.1.37" tracing-subscriber = "0.3.17" uuid = "1.8.0" vergen = "8.2.1" -yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.13" } -yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.13" } +yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.12" } +yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.12" } [profile.release] debug = true diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index d55d2e09..281d6d7c 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client-simple" -version = "1.13.0+solana.1.18.13" +version = "1.13.0+solana.1.18.12" authors = { workspace = true } edition = { workspace = true } homepage = { workspace = true } diff --git a/yellowstone-grpc-client/Cargo.toml b/yellowstone-grpc-client/Cargo.toml index 50f42e6c..17075fd2 100644 --- a/yellowstone-grpc-client/Cargo.toml +++ b/yellowstone-grpc-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-client" -version = "1.15.0+solana.1.18.13" +version = "1.15.0+solana.1.18.12" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Simple Client" diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 41af8d80..1417b4d7 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-geyser" -version = "1.14.2+solana.1.18.13" +version = "1.14.1+solana.1.18.12" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Plugin" diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 959c1f75..3abbaaaa 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-proto" -version = "1.14.0+solana.1.18.13" +version = "1.14.0+solana.1.18.12" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Geyser Protobuf Definitions" diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 6a4c6f1c..846ce40f 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yellowstone-grpc-tools" -version = "1.0.0-rc.11+solana.1.18.13" +version = "1.0.0-rc.11+solana.1.18.12" authors = { workspace = true } edition = { workspace = true } description = "Yellowstone gRPC Tools" @@ -54,6 +54,7 @@ uuid = { workspace = true, optional = true } yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } + [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } diff --git a/yellowstone-grpc-tools/config-ys-log-server.json b/yellowstone-grpc-tools/config-ys-log-server.json new file mode 100644 index 00000000..a64004c4 --- /dev/null +++ b/yellowstone-grpc-tools/config-ys-log-server.json @@ -0,0 +1,12 @@ +{ + "prometheus": "127.0.0.1:8873", + "scylladb": { + "hostname": "localhost:9042", + "username": "cassandra", + "password": "cassandra" + }, + "yellowstone_log_server": { + "listen": "localhost:10001", + "keyspace": "solana" + } +} diff --git a/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh b/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh new file mode 100644 index 00000000..0ffca32b --- /dev/null +++ b/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh @@ -0,0 +1,6 @@ +#!/usr/bin/bash + + +grpcurl -plaintext -import-path . -proto yellowstone-log.proto \ + -d '{"initial_offset_policy": 0, "event_subscription_policy": 0 }' \ + '127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume \ No newline at end of file diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql index 3eecea4d..3f270835 100644 --- a/yellowstone-grpc-tools/solana.cql +++ b/yellowstone-grpc-tools/solana.cql @@ -3,7 +3,7 @@ CREATE KEYSPACE solana WITH replication = {'class': 'NetworkTopologyStrategy', ' drop materialized view if exists solana.producer_consumer_mapping_mv; drop materialized view if exists solana.slot_map; -drop table if exists solana.producer_slot_seen; +drop table if exists solana.producer_max_approx_slot; drop table if exists solana.shard_statistics; drop table if exists solana.producer_info; drop table if exists solana.consumer_info; diff --git a/yellowstone-grpc-tools/solana_keyspace_setup.sh b/yellowstone-grpc-tools/solana_keyspace_setup.sh new file mode 100644 index 00000000..f2c1a554 --- /dev/null +++ b/yellowstone-grpc-tools/solana_keyspace_setup.sh @@ -0,0 +1,3 @@ +#!/usr/bin/bash + +nodetool disableautocompaction solana log producer_period_commit_log producer_slot_seen \ No newline at end of file diff --git a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs index e112ef7f..835ae570 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs @@ -1,24 +1,31 @@ use { anyhow::Ok, clap::{Parser, Subcommand}, - futures::{future::BoxFuture, stream::StreamExt}, + futures::{future::BoxFuture, stream::StreamExt, TryFutureExt}, scylla::{frame::Compression, Session, SessionBuilder}, std::{net::SocketAddr, sync::Arc, time::Duration}, tokio::time::Instant, + tonic::transport::Server, tracing::{error, info, warn}, yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{ - prelude::subscribe_update::UpdateOneof, yellowstone::log::EventSubscriptionPolicy, + prelude::subscribe_update::UpdateOneof, + yellowstone::log::{ + yellowstone_log_server::{self, YellowstoneLog, YellowstoneLogServer}, + EventSubscriptionPolicy, + }, }, yellowstone_grpc_tools::{ config::{load as config_load, GrpcRequestToProto}, create_shutdown, prom::run_server as prometheus_run_server, scylladb::{ - config::{Config, ConfigGrpc2ScyllaDB, ScyllaDbConnectionInfo}, + config::{ + Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo, + }, consumer::{ common::InitialOffsetPolicy, - grpc::{spawn_grpc_consumer, SpawnGrpcConsumerReq}, + grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq}, }, sink::ScyllaSink, types::Transaction, @@ -50,9 +57,11 @@ enum ArgsAction { /// Receive data from gRPC and send them to the Kafka #[command(name = "grpc2scylla")] Grpc2Scylla, + /// Receive data from Kafka and send them over gRPC - #[command(name = "scylla2grpc")] - Scylla2Grpc, + #[command(name = "yellowstone-log-server")] + YellowstoneLogServer, + #[command(name = "test")] Test, } @@ -67,8 +76,11 @@ impl ArgsAction { })?; Self::grpc2scylladb(config2, config.scylladb, shutdown).await } - ArgsAction::Scylla2Grpc => { - unimplemented!(); + ArgsAction::YellowstoneLogServer => { + let config2 = config.yellowstone_log_server.ok_or_else(|| { + anyhow::anyhow!("`grpc2scylladb` section in config should be defined") + })?; + Self::yellowstone_log_server(config2, config.scylladb, shutdown).await } ArgsAction::Test => { let config2 = config.grpc2scylladb.ok_or_else(|| { @@ -79,6 +91,39 @@ impl ArgsAction { } } + async fn yellowstone_log_server( + config: ConfigYellowstoneLogServer, + scylladb_conn_config: ScyllaDbConnectionInfo, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + let addr = config.listen.parse().unwrap(); + + let session: Session = SessionBuilder::new() + .known_node(scylladb_conn_config.hostname) + .user(scylladb_conn_config.username, scylladb_conn_config.password) + .compression(Some(Compression::Lz4)) + .use_keyspace(config.keyspace.clone(), false) + .build() + .await?; + + let session = Arc::new(session); + let scylla_ys_log = ScyllaYsLog::new(session); + let ys_log_server = YellowstoneLogServer::new(scylla_ys_log); + + println!("YellowstoneLogServer listening on {}", addr); + + let server_fut = Server::builder() + // GrpcWeb is over http1 so we must enable it. + .add_service(ys_log_server) + .serve(addr) + .map_err(anyhow::Error::new); + + tokio::select! { + _ = &mut shutdown => Ok(()), + result = server_fut => result, + } + } + async fn test( config: ConfigGrpc2ScyllaDB, scylladb_conn_config: ScyllaDbConnectionInfo, @@ -168,12 +213,14 @@ impl ArgsAction { } .transpose(); - if let Err(error) = &message { - error!("geyser plugin disconnected: {error:?}"); + if message.is_err() { + error!("geyser plugin disconnected: {:?}", message); break; } - if let Some(message) = message? { + let message = message?; + + if let Some(message) = message { let message = match message.update_oneof { Some(value) => value, None => unreachable!("Expect valid message"), diff --git a/yellowstone-grpc-tools/src/scylladb/config.rs b/yellowstone-grpc-tools/src/scylladb/config.rs index 59c3da90..94b2cccd 100644 --- a/yellowstone-grpc-tools/src/scylladb/config.rs +++ b/yellowstone-grpc-tools/src/scylladb/config.rs @@ -40,6 +40,7 @@ pub struct Config { pub prometheus: Option, pub scylladb: ScyllaDbConnectionInfo, pub grpc2scylladb: Option, + pub yellowstone_log_server: Option, } #[derive(Debug, Default, Deserialize)] @@ -53,6 +54,14 @@ pub struct ScyllaDbConnectionInfo { pub password: String, } +#[serde_as] +#[derive(Debug, Deserialize)] +pub struct ConfigYellowstoneLogServer { + pub listen: String, + #[serde(default = "default_keyspace")] + pub keyspace: String, +} + #[serde_as] #[derive(Debug, Deserialize)] pub struct ConfigGrpc2ScyllaDB { diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs index 5aefe66d..c06e107b 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs @@ -29,7 +29,7 @@ use { tokio::{sync::mpsc, time::Instant}, tokio_stream::wrappers::ReceiverStream, tonic::Response, - tracing::{error, info}, + tracing::{error, info, warn}, uuid::Uuid, yellowstone_grpc_proto::{ geyser::{subscribe_update::UpdateOneof, SubscribeUpdate}, @@ -39,6 +39,10 @@ use { }, }; +const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); + +const FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD: Duration = Duration::from_millis(500); + const DEFAULT_LAST_HEARTBEAT_TIME_DELTA: Duration = Duration::from_secs(10); const DEFAULT_OFFSET_COMMIT_INTERVAL: Duration = Duration::from_secs(10); @@ -235,19 +239,27 @@ async fn get_producer_id_with_least_assigned_consumer( session: Arc, ) -> anyhow::Result { let locked_producers = list_producers_with_lock_held(Arc::clone(&session)).await?; + + info!("{} producer lock(s) detected", locked_producers.len()); let recently_active_producers = BTreeSet::from_iter( list_producers_heartbeat(Arc::clone(&session), DEFAULT_LAST_HEARTBEAT_TIME_DELTA).await?, ); + info!( + "{} living producer(s) detected", + recently_active_producers.len() + ); + let elligible_producers = locked_producers .into_iter() .filter(|producer_id| recently_active_producers.contains(producer_id)) .collect::>(); + info!("{} elligible producer(s)", recently_active_producers.len()); let mut producer_count_pairs = session .query(GET_PRODUCERS_CONSUMER_COUNT, &[]) .await? - .rows_typed::<(ProducerId, i32)>()? + .rows_typed::<(ProducerId, i64)>()? .collect::, _>>()?; elligible_producers.iter().for_each(|producer_id| { @@ -308,9 +320,10 @@ async fn register_new_consumer( ) -> anyhow::Result { let producer_id = get_producer_id_with_least_assigned_consumer(Arc::clone(&session)).await?; + let insert_consumer_mapping_ps = session.prepare(INSERT_CONSUMER_PRODUCER_MAPPING).await?; session - .query( - INSERT_CONSUMER_PRODUCER_MAPPING, + .execute( + &insert_consumer_mapping_ps, (consumer_id.as_ref(), producer_id), ) .await?; @@ -376,6 +389,8 @@ async fn get_or_register_consumer( }; Ok(cs) } else { + let cid = consumer_id.as_ref(); + info!("Bootstrapping consumer {cid}"); register_new_consumer( session, consumer_id, @@ -472,6 +487,12 @@ pub struct ScyllaYsLog { session: Arc, } +impl ScyllaYsLog { + pub fn new(session: Arc) -> Self { + ScyllaYsLog { session } + } +} + pub type LogStream = Pin> + Send>>; #[tonic::async_trait] @@ -505,6 +526,12 @@ impl YellowstoneLog for ScyllaYsLog { let account_update_event_filter = cr.account_update_event_filter; let tx_event_filter = cr.tx_event_filter; + info!( + consumer_id = consumer_id, + initital_offset_policy = ?initial_offset_policy, + event_subscription_policy = ?event_subscription_policy, + ); + let req = SpawnGrpcConsumerReq { consumer_id, account_update_event_filter, @@ -762,10 +789,25 @@ impl GrpcConsumerSource { self.shard_iterators .sort_by_key(|it| (it.shard_id, it.event_type)); + let mut max_seen_slot = -1; + let mut num_event_between_two_slots = 0; + + let mut t = Instant::now(); loop { for shard_it in self.shard_iterators.iter_mut() { let maybe = shard_it.try_next().await?; if let Some(block_chain_event) = maybe { + if t.elapsed() >= FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD { + warn!( + "consumer {consumer_id} micro batch took {:?} to fetch.", + t.elapsed() + ); + } + if max_seen_slot < block_chain_event.slot { + info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)"); + max_seen_slot = block_chain_event.slot; + num_event_between_two_slots = 0; + } let geyser_event = match block_chain_event.event_type { BlockchainEventType::AccountUpdate => { UpdateOneof::Account(block_chain_event.try_into()?) @@ -778,9 +820,18 @@ impl GrpcConsumerSource { filters: Default::default(), update_oneof: Some(geyser_event), }; - self.sender.send(Ok(subscribe_update)).await.map_err(|_| { - anyhow::anyhow!("Failed to deliver message to consumer {}", consumer_id) - })?; + let t_send = Instant::now(); + + if self.sender.send(Ok(subscribe_update)).await.is_err() { + warn!("Consumer {consumer_id} closed its streaming half"); + return Ok(()); + } + let send_latency = t_send.elapsed(); + if send_latency >= CLIENT_LAG_WARN_THRESHOLD { + warn!("Slow read from consumer {consumer_id}, recorded latency: {send_latency:?}") + } + num_event_between_two_slots += 1; + t = Instant::now(); } } @@ -805,10 +856,7 @@ impl GrpcConsumerSource { if let Err(_actual_offset_in_scylla) = result { anyhow::bail!("two concurrent connections are using the same consumer instance") } - info!( - "Successfully committed offsets for consumer {:?}", - consumer_id - ); + info!("Successfully committed offsets for consumer {consumer_id}"); std::mem::swap(&mut new_offsets_to_commit, &mut last_committed_offsets); commit_offset_deadline = Instant::now() + self.offset_commit_interval; } diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs b/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs index 3024c483..68201471 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs @@ -7,6 +7,7 @@ use { scylla::{prepared_statement::PreparedStatement, Session}, std::{collections::VecDeque, sync::Arc}, tokio::sync::oneshot::{self, error::TryRecvError}, + tracing::warn, }; const MICRO_BATCH_SIZE: usize = 40; @@ -240,12 +241,9 @@ impl ShardIterator { .rows_typed_or_empty::() .collect::, _>>() .expect("failed to typed scylladb rows"); - sender - .send(micro_batch) - .map_err(|_| ()) - .unwrap_or_else(|_| { - panic!("Failed to send micro batch to shard iterator {}", shard_id) - }); + if sender.send(micro_batch).is_err() { + warn!("Shard iterator {shard_id} was fetching micro batch, but client closed its stream half.") + } }); receiver } diff --git a/yellowstone-grpc-tools/src/scylladb/sink.rs b/yellowstone-grpc-tools/src/scylladb/sink.rs index a3c0abde..ff79c200 100644 --- a/yellowstone-grpc-tools/src/scylladb/sink.rs +++ b/yellowstone-grpc-tools/src/scylladb/sink.rs @@ -261,10 +261,11 @@ impl Shard { .execute(&commit_period_ps, (producer_id, shard_id, curr_period - 1)) .await?; info!( - shard = shard_id, - producer_id = ?self.producer_id, - committed_period = curr_period, - time_to_commit = ?t.elapsed() + "shard={},producer_id={:?} committed period: {}: time to commit: {:?}", + shard_id, + self.producer_id, + curr_period, + t.elapsed() ); }