From 98a3140e9a092ce571edc4d5ae2755ddbfe6a3d1 Mon Sep 17 00:00:00 2001 From: lvboudre Date: Fri, 10 May 2024 16:53:55 -0400 Subject: [PATCH] tools: scylladb cosumer and refactor scylla sink --- Cargo.lock | 76 +- Cargo.toml | 3 +- yellowstone-grpc-proto/build.rs | 2 +- .../proto/yellowstone-log.proto | 43 + yellowstone-grpc-proto/src/lib.rs | 6 + yellowstone-grpc-tools/Cargo.toml | 5 +- yellowstone-grpc-tools/solana.cql | 236 +++-- .../src/bin/grpc-scylladb.rs | 147 ++- yellowstone-grpc-tools/src/scylladb/agent.rs | 327 ------ .../src/scylladb/consumer/common.rs | 23 + .../src/scylladb/consumer/grpc.rs | 804 +++++++++++++++ .../src/scylladb/consumer/mod.rs | 3 + .../src/scylladb/consumer/shard_iterator.rs | 476 +++++++++ yellowstone-grpc-tools/src/scylladb/mod.rs | 2 +- yellowstone-grpc-tools/src/scylladb/sink.rs | 927 ++++++------------ yellowstone-grpc-tools/src/scylladb/types.rs | 546 ++++++++--- 16 files changed, 2369 insertions(+), 1257 deletions(-) create mode 100644 yellowstone-grpc-proto/proto/yellowstone-log.proto delete mode 100644 yellowstone-grpc-tools/src/scylladb/agent.rs create mode 100644 yellowstone-grpc-tools/src/scylladb/consumer/common.rs create mode 100644 yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs create mode 100644 yellowstone-grpc-tools/src/scylladb/consumer/mod.rs create mode 100644 yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs diff --git a/Cargo.lock b/Cargo.lock index 16380f96..38431ba5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.5" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "once_cell", @@ -110,6 +110,12 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -843,7 +849,7 @@ version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" dependencies = [ - "heck 0.4.1", + "heck", "proc-macro2", "quote", "syn 2.0.60", @@ -1658,7 +1664,7 @@ version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" dependencies = [ - "ahash 0.8.5", + "ahash 0.8.11", ] [[package]] @@ -1666,14 +1672,9 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" - -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" dependencies = [ - "unicode-segmentation", + "ahash 0.8.11", + "allocator-api2", ] [[package]] @@ -2840,6 +2841,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "project-root" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df" + [[package]] name = "prometheus" version = "0.13.3" @@ -2898,7 +2905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", - "heck 0.4.1", + "heck", "itertools 0.11.0", "log", "multimap", @@ -3438,9 +3445,9 @@ dependencies = [ [[package]] name = "scylla" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d2db76aa23f55d2ece5354e1a3778633098a3d1ea76153f494d71e92cd02d8" +checksum = "9439d92eea9f86c07175c819c3a129ca28b02477b47df26db354a1f4ea7ee276" dependencies = [ "arc-swap", "async-trait", @@ -3449,10 +3456,11 @@ dependencies = [ "chrono", "dashmap", "futures", + "hashbrown 0.14.3", "histogram", "itertools 0.11.0", + "lazy_static", "lz4_flex", - "num_enum 0.6.1", "rand 0.8.5", "rand_pcg", "scylla-cql", @@ -3460,8 +3468,6 @@ dependencies = [ "smallvec", "snap", "socket2", - "strum", - "strum_macros", "thiserror", "tokio", "tracing", @@ -3470,15 +3476,14 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345626c0dd5d9624c413daaba854685bba6a65cff4eb5ea0fb0366df16901f67" +checksum = "64037fb9d9c59ae15137fff9a56c4d528908dfd38d09e75b5f8e56e3894966dd" dependencies = [ "async-trait", "byteorder", "bytes", "lz4_flex", - "num_enum 0.6.1", "scylla-macros", "snap", "thiserror", @@ -3488,9 +3493,9 @@ dependencies = [ [[package]] name = "scylla-macros" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb6085ff9c3fd7e5163826901d39164ab86f11bdca16b2f766a00c528ff9cef9" +checksum = "7e5fe1d389adebe6a1a27bce18b81a65ff18c25d58a795de490e18b0e7a27b9f" dependencies = [ "darling", "proc-macro2", @@ -4432,25 +4437,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" -[[package]] -name = "strum" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" - -[[package]] -name = "strum_macros" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", -] - [[package]] name = "subtle" version = "2.4.1" @@ -5026,12 +5012,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" - [[package]] name = "universal-hash" version = "0.4.1" @@ -5564,6 +5544,7 @@ dependencies = [ "hyper", "json5", "lazy_static", + "project-root", "prometheus", "rdkafka", "scylla", @@ -5578,6 +5559,7 @@ dependencies = [ "tonic-health", "tracing", "tracing-subscriber", + "uuid", "vergen", "yellowstone-grpc-client", "yellowstone-grpc-proto", diff --git a/Cargo.toml b/Cargo.toml index 7e128263..d059a8c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ prometheus = "0.13.2" prost = "0.12.1" protobuf-src = "1.1.0" rdkafka = "0.34.0" -scylla = "0.12.0" +scylla = "0.13.0" serde = "1.0.145" serde_json = "1.0.86" serde_with = "3.7.0" @@ -68,6 +68,7 @@ tonic-build = "0.10.2" tonic-health = "0.10.2" tracing = "0.1.37" tracing-subscriber = "0.3.17" +uuid = "1.8.0" vergen = "8.2.1" yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.0+solana.1.18.12" } yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.0+solana.1.18.12" } diff --git a/yellowstone-grpc-proto/build.rs b/yellowstone-grpc-proto/build.rs index 66db00ba..a2a02f48 100644 --- a/yellowstone-grpc-proto/build.rs +++ b/yellowstone-grpc-proto/build.rs @@ -1,5 +1,5 @@ fn main() -> anyhow::Result<()> { std::env::set_var("PROTOC", protobuf_src::protoc()); - tonic_build::compile_protos("proto/geyser.proto")?; + tonic_build::compile_protos("proto/yellowstone-log.proto")?; Ok(()) } diff --git a/yellowstone-grpc-proto/proto/yellowstone-log.proto b/yellowstone-grpc-proto/proto/yellowstone-log.proto new file mode 100644 index 00000000..bec67e39 --- /dev/null +++ b/yellowstone-grpc-proto/proto/yellowstone-log.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +import public "geyser.proto"; + + +option go_package = "github.com/rpcpool/solana-geyser-grpc/golang/proto"; + +package yellowstone.log; + +service YellowstoneLog { + rpc Consume(ConsumeRequest) returns (stream geyser.SubscribeUpdate) {} +} + + +enum InitialOffsetPolicy { + EARLIEST = 0; + LATEST = 1; + SLOT = 2; +} + +enum EventSubscriptionPolicy { + ACCOUNT_UPDATE_ONLY = 0; + TRANSACTION_ONLY = 1; + BOTH = 2; +} + +message ConsumeRequest { + optional string consumer_id = 1; + InitialOffsetPolicy initial_offset_policy = 2; + optional int64 at_slot = 3; + EventSubscriptionPolicy event_subscription_policy = 4; + optional AccountUpdateEventFilter account_update_event_filter = 5; + optional TransactionEventFilter tx_event_filter = 6; +} + +message AccountUpdateEventFilter { + repeated bytes pubkeys = 1; + repeated bytes owners = 2; +} + +message TransactionEventFilter { + repeated bytes account_keys = 1; +} \ No newline at end of file diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 8760c7cd..50a77277 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -1,5 +1,11 @@ #![allow(clippy::large_enum_variant)] +pub mod yellowstone { + pub mod log { + tonic::include_proto!("yellowstone.log"); + } +} + pub mod geyser { tonic::include_proto!("geyser"); } diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index b6101615..380cf862 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -48,9 +48,11 @@ tonic = { workspace = true, features = ["gzip"] } tonic-health = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } +uuid = { workspace = true, optional = true } yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } + [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } @@ -61,6 +63,7 @@ rdkafka = { workspace = true, features = ["sasl", "ssl"], optional = true } rdkafka = { workspace = true, features = ["sasl", "ssl-vendored"], optional = true } [build-dependencies] +project-root = "0.2.2" anyhow = { workspace = true } cargo-lock = { workspace = true } git-version = { workspace = true } @@ -70,4 +73,4 @@ vergen = { workspace = true, features = ["build", "rustc"] } default = ["google-pubsub", "kafka"] google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"] kafka = ["const-hex", "rdkafka", "sha2"] -scylladb = ["scylla", "serde_with", "deepsize"] +scylladb = ["scylla", "serde_with", "deepsize", "uuid"] diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql index d3204f3b..4dfa623d 100644 --- a/yellowstone-grpc-tools/solana.cql +++ b/yellowstone-grpc-tools/solana.cql @@ -1,5 +1,94 @@ CREATE KEYSPACE solana WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true; +drop materialized view if exists solana.producer_consumer_mapping_mv; +drop materialized view if exists solana.shard_max_offset_mv; +drop materialized view if exists solana.slot_map; + +drop table if exists solana.shard_statistics; +drop table if exists solana.producer_info; +drop table if exists solana.consumer_info; +drop table if exists solana.consumer_producer_mapping; +drop table if exists solana.log; +drop type if exists solana.transaction_meta; +drop type if exists solana.message_addr_table_lookup; +drop type if exists solana.compiled_instr; +drop type if exists solana.tx_token_balance; +drop type if exists solana.reward; +drop type if exists solana.inner_instrs; +drop type if exists solana.inner_instr; +drop type if exists solana.return_data; + + +create table if not exists solana.consumer_info ( + consumer_id text, + producer_id blob, + shard_id smallint, + event_type smallint, + offset bigint, + created_at timestamp, + updated_at timestamp, + PRIMARY KEY ((consumer_id, producer_id), shard_id, event_type) +); + +-- insert into solana.consumer_info ( +-- consumer_id, +-- producer_id, +-- shard_id +-- offset, +-- created_at, +-- updated_at +-- ) +-- values +-- ('test_lv', 0x01, [0], currentTimestamp(), currentTimestamp()); + +-- update solana.consumer_info +-- set shard_offset_arrmap = [0,0] +-- where consumer_id = 'test_lv' and producer_id = 0x01; + +-- update solana.consumer_info +-- set shard_offset_arrmap[0] = 1, updated_at = currentTimestamp() +-- where consumer_id = 'test_lv' and producer_id = 0x01 +-- if shard_offset_arrmap[0] = 0; + + + +create table if not exists solana.consumer_producer_mapping ( + consumer_id text, + producer_id blob, + created_at timestamp, + updated_at timestamp, + PRIMARY KEY (consumer_id) +); + +create materialized view if not exists solana.producer_consumer_mapping_mv +as +select + producer_id, + consumer_id +from solana.consumer_producer_mapping +where + consumer_id is not null + and producer_id is not null +primary key (producer_id, consumer_id); + +create table if not exists solana.producer_info ( + producer_id blob, + num_shards smallint, + is_active boolean, + created_at timestamp, + updated_at timestamp, + PRIMARY KEY (producer_id) +); + +-- # example +insert into solana.producer_info ( + producer_id, + num_shards, + is_active, + created_at, + updated_at +) +values (0x00, 256, true, {}, currentTimestamp(), currentTimestamp()); create table if not exists solana.producer_period_commit_log ( @@ -23,58 +112,11 @@ create table if not exists solana.shard_statistics ( ) with CLUSTERING order by (period desc, offset desc); --- ScyllaDB table can hold different kind of entities at the same time. --- There is not performance advantage to have separate tables since ScyllaDB is wide-column family database. --- ScyllaDB is built to have sparse columns (alot of unused columns) --- On each query, the storage engine only retrieves what matters to the query. -create table if not exists solana.log ( - - -- commun columns - shard_id smallint, - period bigint, - producer_id blob, - offset bigint, - slot bigint, - entry_type smallint, - -- 0 = account update - -- 1 = new transaction - - -- account columns - pubkey blob, - lamports bigint, - owner blob, - executable boolean, - rent_epoch bigint, - write_version bigint, - data blob, - txn_signature blob, - - - -- transaction columns - signature blob, - signatures frozen>, - num_required_signatures int, - num_readonly_signed_accounts int, - num_readonly_unsigned_accounts int, - account_keys frozen>, - recent_blockhash blob, - instructions frozen>, - versioned boolean, - address_table_lookups frozen>, - meta solana.transaction_meta, - - - -- meta data field for debugging purposes - created_at timestamp, - - primary key ((shard_id, period), producer_id, offset) -); - -create materialized view solana.shard_max_offset_mv +create materialized view if not exists solana.shard_max_offset_mv as select - shard_id, producer_id, + shard_id, offset, period from solana.log @@ -83,18 +125,27 @@ where and offset is not null and period is not null and producer_id is not null -primary key(shard_id, producer_id, offset, period); +primary key((producer_id, shard_id), offset, period) +with clustering order by (offset desc, period desc); +create materialized view if not exists solana.slot_map_mv +as +select + slot, + producer_id, + shard_id, + period, + offset +from solana.log +where + slot is not null + and producer_id is not null + and shard_id is not null + and period is not null + and offset is not null +primary key (slot, producer_id, shard_id, period, offset); -drop table if exists solana.transaction_log; -drop type if exists solana.transaction_meta; -drop type if exists solana.message_addr_table_lookup; -drop type if exists solana.compiled_instr; -drop type if exists solana.tx_token_balance; -drop type if exists solana.reward; -drop type if exists solana.inner_instrs; -drop type if exists solana.inner_instr; create type if not exists solana.message_addr_table_lookup ( account_key blob, @@ -132,6 +183,7 @@ create type if not exists solana.tx_token_balance ( mint text, --varchar(44) ui_token_amount frozen, owner text, --varchar(44) + program_id text, ); create type if not exists solana.reward ( @@ -142,6 +194,11 @@ create type if not exists solana.reward ( commission text ); +create type if not exists solana.return_data ( + program_id blob, + data blob +); + create type if not exists solana.transaction_meta ( error blob, fee bigint, @@ -151,11 +208,42 @@ create type if not exists solana.transaction_meta ( log_messages frozen>, pre_token_balances frozen>, post_token_balances frozen>, - rewards frozen> + rewards frozen>, + loaded_writable_addresses frozen>, + loaded_readonly_addresses frozen>, + return_data frozen, + compute_units_consumed bigint ); -create table if not exists solana.transaction_log ( + +-- ScyllaDB table can hold different kind of entities at the same time. +-- There is not performance advantage to have separate tables since ScyllaDB is wide-column family database. +-- ScyllaDB is built to have sparse columns (alot of unused columns) +-- On each query, the storage engine only retrieves what matters to the query. +create table if not exists solana.log ( + + -- commun columns + shard_id smallint, + period bigint, + producer_id blob, + offset bigint, slot bigint, + event_type smallint, + -- 0 = account update + -- 1 = new transaction + + -- account columns + pubkey blob, + lamports bigint, + owner blob, + executable boolean, + rent_epoch bigint, + write_version bigint, + data blob, + txn_signature blob, + + + -- transaction columns signature blob, signatures frozen>, num_required_signatures int, @@ -167,30 +255,12 @@ create table if not exists solana.transaction_log ( versioned boolean, address_table_lookups frozen>, meta solana.transaction_meta, - primary key ((slot, signature), recent_blockhash) -); + is_vote boolean, + tx_index bigint, -create table if not exists solana.account_update_log ( - slot bigint, - pubkey blob, - lamports bigint, - owner blob, - executable boolean, - rent_epoch bigint, - write_version bigint, - data blob, - txn_signature blob, - primary key ((slot, subslot), pubkey, write_version) -) WITH CLUSTERING ORDER BY (pubkey ASC, write_version ASC); + -- meta data field for debugging purposes + created_at timestamp, -create table if not exists solana.test ( - id bigint, - bucket int, - x text, - data int, - primary key ((id, bucket), x) + primary key ((shard_id, period, producer_id), offset) ); - -insert into solana.test (id, bucket, x, data) -values (1, 1, 'a', 1); \ No newline at end of file diff --git a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs index b395ffca..0a2c74a7 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs @@ -2,16 +2,24 @@ use { anyhow::Ok, clap::{Parser, Subcommand}, futures::{future::BoxFuture, stream::StreamExt}, - std::{net::SocketAddr, time::Duration}, + scylla::{frame::Compression, Session, SessionBuilder}, + std::{net::SocketAddr, sync::Arc, time::Duration}, + tokio::time::Instant, tracing::{info, warn}, yellowstone_grpc_client::GeyserGrpcClient, - yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, + yellowstone_grpc_proto::{ + prelude::subscribe_update::UpdateOneof, yellowstone::log::EventSubscriptionPolicy, + }, yellowstone_grpc_tools::{ config::{load as config_load, GrpcRequestToProto}, create_shutdown, prom::run_server as prometheus_run_server, scylladb::{ config::{Config, ConfigGrpc2ScyllaDB, ScyllaDbConnectionInfo}, + consumer::{ + common::InitialOffsetPolicy, + grpc::{get_or_register_consumer, spawn_grpc_consumer, SpawnGrpcConsumerReq}, + }, sink::ScyllaSink, types::Transaction, }, @@ -63,7 +71,75 @@ impl ArgsAction { unimplemented!(); } ArgsAction::Test => { - unimplemented!(); + let config2 = config.grpc2scylladb.ok_or_else(|| { + anyhow::anyhow!("`grpc2scylladb` section in config should be defined") + })?; + Self::test(config2, config.scylladb, shutdown).await + } + } + } + + async fn test( + config: ConfigGrpc2ScyllaDB, + scylladb_conn_config: ScyllaDbConnectionInfo, + mut shutdown: BoxFuture<'static, ()>, + ) -> anyhow::Result<()> { + let session: Session = SessionBuilder::new() + .known_node(scylladb_conn_config.hostname) + .user(scylladb_conn_config.username, scylladb_conn_config.password) + .compression(Some(Compression::Lz4)) + .use_keyspace(config.keyspace.clone(), false) + .build() + .await?; + let session = Arc::new(session); + let ci = get_or_register_consumer( + Arc::clone(&session), + "test", + InitialOffsetPolicy::Earliest, + EventSubscriptionPolicy::TransactionOnly, + ) + .await?; + + // let hexstr = "16daf15e85d893b89d83a8ca7d7f86416f134905d1d79e4f62e3da70a3a20a7d"; + // let _pubkey = (0..hexstr.len()) + // .step_by(2) + // .map(|i| u8::from_str_radix(&hexstr[i..i + 2], 16)) + // .collect::, _>>()?; + let req = SpawnGrpcConsumerReq { + session: Arc::clone(&session), + consumer_info: ci, + account_update_event_filter: None, + tx_event_filter: None, + buffer_capacity: None, + offset_commit_interval: None, + }; + let mut rx = spawn_grpc_consumer(req).await?; + + let mut print_tx_secs = Instant::now() + Duration::from_secs(1); + let mut num_events = 0; + loop { + if print_tx_secs.elapsed() > Duration::ZERO { + println!("event/second {}", num_events); + num_events = 0; + print_tx_secs = Instant::now() + Duration::from_secs(1); + } + tokio::select! { + _ = &mut shutdown => return Ok(()), + Some(result) = rx.recv() => { + if result.is_err() { + anyhow::bail!("fail!!!") + } + let _x = result?.update_oneof.expect("got none"); + // match x { + // UpdateOneof::Account(acc) => println!("acc, slot {:?}", acc.slot), + // UpdateOneof::Transaction(tx) => panic!("got tx"), + // _ => unimplemented!() + // } + num_events += 1; + }, + _ = tokio::time::sleep_until(Instant::now() + Duration::from_secs(1)) => { + warn!("received no event") + } } } } @@ -105,45 +181,36 @@ impl ArgsAction { } .transpose()?; - match message { - Some(message) => { - let message = match message.update_oneof { - Some(value) => value, - None => unreachable!("Expect valid message"), - }; - - match message { - UpdateOneof::Account(msg) => { - // let pubkey_opt: &Option<&[u8]> = - // &msg.account.as_ref().map(|acc| acc.pubkey.as_ref()); - // trace!( - // "Received an account update slot={:?}, pubkey={:?}", - // msg.slot, pubkey_opt - // ); - let acc_update = msg.clone().try_into(); - if acc_update.is_err() { - // Drop the message if invalid - warn!( - "failed to parse account update: {:?}", - acc_update.err().unwrap() - ); - continue; - } - // If the sink is close, let it crash... - sink.log_account_update(acc_update.unwrap()).await.unwrap(); + if let Some(message) = message { + let message = match message.update_oneof { + Some(value) => value, + None => unreachable!("Expect valid message"), + }; + + match message { + UpdateOneof::Account(msg) => { + let acc_update = msg.clone().try_into(); + if acc_update.is_err() { + // Drop the message if invalid + warn!( + "failed to parse account update: {:?}", + acc_update.err().unwrap() + ); + continue; } - UpdateOneof::Transaction(msg) => { - let tx: Result = msg.try_into(); - if tx.is_err() { - warn!("failed to convert update tx: {:?}", tx.err().unwrap()); - continue; - } - sink.log_transaction(tx.unwrap()).await.unwrap(); + // If the sink is close, let it crash... + sink.log_account_update(acc_update.unwrap()).await.unwrap(); + } + UpdateOneof::Transaction(msg) => { + let tx: Result = msg.try_into(); + if tx.is_err() { + warn!("failed to convert update tx: {:?}", tx.err().unwrap()); + continue; } - _ => continue, - }; - } - _ => (), + sink.log_transaction(tx.unwrap()).await.unwrap(); + } + _ => continue, + }; } } Ok(()) diff --git a/yellowstone-grpc-tools/src/scylladb/agent.rs b/yellowstone-grpc-tools/src/scylladb/agent.rs deleted file mode 100644 index f607e31f..00000000 --- a/yellowstone-grpc-tools/src/scylladb/agent.rs +++ /dev/null @@ -1,327 +0,0 @@ -use { - anyhow::anyhow, - futures::{Future, FutureExt, TryFutureExt}, - std::{future::pending, pin::Pin, sync::Arc, time::Duration}, - tokio::{ - sync::{ - self, - mpsc::{channel, error::TrySendError, Permit}, - oneshot, - }, - task::{AbortHandle, JoinHandle, JoinSet}, - time::Instant, - }, - tonic::async_trait, - tracing::{error, warn}, -}; - -pub type Nothing = (); - -pub type Callback = oneshot::Receiver; - -pub type CallbackSender = oneshot::Sender; - -const LOOP_DELAY_WARN_THRESHOLD: std::time::Duration = Duration::from_millis(500); - -/// Watch to see when a message, previously sent, has been consumed and processed by an agent. - -#[async_trait] -pub trait Ticker { - type Input: Send + 'static; - - /// - /// Optional timeout future that is pulled at the same time as the next message in the message loop. - /// - /// Implement this function if you need to flush your Ticker every N unit of time, such as batching. - /// - /// If the timeout finish before the next message is pull, then [`Ticker::on_timeout`] is invoked. - /// - fn timeout(&self) -> Pin + Send + 'static>> { - pending().boxed() - } - - fn timeout2(&self, _now: Instant) -> bool { - false - } - - async fn init(&mut self) -> anyhow::Result { - Ok(()) - } - - /// - /// Called if [`Ticker::timeout`] promise returned before the next message pull. - /// - async fn on_timeout(&mut self, _now: Instant) -> anyhow::Result { - Ok(()) - } - - fn is_pull_ready(&self) -> bool { - true - } - - /// Called on each new message received by the message loop - async fn tick(&mut self, now: Instant, msg: Self::Input) -> anyhow::Result { - let (sender, _receiver) = oneshot::channel(); - self.tick_with_callback_sender(now, msg, vec![sender]).await - } - - async fn tick_with_callback_sender( - &mut self, - now: Instant, - msg: Self::Input, - callback_senders: Vec, - ) -> anyhow::Result { - let result = self.tick(now, msg).await; - - for cb_sender in callback_senders { - if cb_sender.send(()).is_err() { - warn!("Failed to notified because endpoint already closed"); - } - } - - result - } - - /// This is called if the agent handler must gracefully kill you. - async fn terminate(&mut self, _now: Instant) -> Result { - warn!("Agent terminated"); - Ok(()) - } -} - -#[derive(Debug)] -pub enum AgentHandlerError { - Closed, - AgentError, -} - -struct Message { - data: T, - callbacks: Vec, - //FireAndForget(T), - //WithCallbacks(T, Vec>), -} - -#[derive(Clone)] -pub struct AgentHandler { - name: String, - sender: sync::mpsc::Sender>, - #[allow(dead_code)] - abort_handle: Arc, - deadletter_queue: Option>>, -} - -struct DeadLetterQueueHandler { - sender: sync::mpsc::Sender, - #[allow(dead_code)] - handle: Arc>, -} - -impl DeadLetterQueueHandler { - async fn send(&self, msg: T) { - self.sender.send(msg).await.unwrap(); - } -} - -pub struct Slot<'a, T> { - inner: Permit<'a, Message>, -} - -impl<'a, T> Slot<'a, T> { - fn new(permit: Permit<'a, Message>) -> Self { - Slot { inner: permit } - } - - pub fn send(self, msg: T) { - self.send_with_callback_senders(msg, Vec::new()); - } - - pub fn send_with_callback_senders(self, msg: T, callback_senders: IT) - where - IT: IntoIterator, - { - self.inner.send(Message { - data: msg, - callbacks: callback_senders.into_iter().collect(), - }) - } - - pub fn send_and_subscribe(self, msg: T) -> oneshot::Receiver<()> { - let (sender, receiver) = oneshot::channel(); - self.send_with_callback_senders(msg, [sender]); - receiver - } -} - -impl AgentHandler { - pub async fn send(&self, msg: T) -> anyhow::Result { - self.send_with_callback_senders(msg, Vec::new()).await - } - - async fn handle_failed_transmission(&self, msg: T) -> anyhow::Error { - if let Some(dlq) = self.deadletter_queue.clone() { - let emsg = format!( - "({:?}) Failed to send message, will reroute to deadletter queue", - self.name - ); - error!(emsg); - dlq.send(msg).await; - anyhow::anyhow!(emsg) - } else { - let emsg = format!("({:?}) Failed to send message, message will be dropped (no deadletter queue detected)", self.name); - error!(emsg); - anyhow::anyhow!(emsg) - } - } - - pub async fn reserve(&self) -> anyhow::Result> { - self.sender - .reserve() - .map_err(anyhow::Error::new) - .await - .map(Slot::new) - } - - pub fn try_reserve(&self) -> Result, TrySendError<()>> { - self.sender.try_reserve().map(Slot::new) - } - - pub async fn send_with_callback_senders( - &self, - msg: T, - callback_senders: IT, - ) -> anyhow::Result - where - IT: IntoIterator, - { - let now = Instant::now(); - let envelope = Message { - data: msg, - callbacks: callback_senders.into_iter().collect(), - }; - let result = self.sender.send(envelope).await; - - if now.elapsed() > Duration::from_millis(500) { - warn!( - "AgentHandler::send slow function detected: {:?}", - now.elapsed() - ); - } - - if let Err(e) = result { - error!("error in send_with_watch"); - Err(self.handle_failed_transmission(e.0.data).await) - } else { - Ok(()) - } - } - - pub async fn send_and_subscribe(&self, msg: T) -> anyhow::Result> { - let (sender, receiver) = oneshot::channel(); - self.send_with_callback_senders(msg, vec![sender]).await?; - Ok(receiver) - } - - pub fn kill(self) { - self.abort_handle.abort(); - } -} - -pub struct AgentSystem { - pub default_agent_buffer_capacity: usize, - handlers: JoinSet>, -} - -impl AgentSystem { - pub fn new(default_agent_buffer_capacity: usize) -> Self { - AgentSystem { - default_agent_buffer_capacity, - handlers: JoinSet::new(), - } - } - - pub fn spawn>(&mut self, name: N, ticker: T) -> AgentHandler - where - T: Ticker + Send + 'static, - { - self.spawn_with_capacity(name, ticker, self.default_agent_buffer_capacity) - } - - pub fn spawn_with_capacity>( - &mut self, - name: N, - mut ticker: T, - buffer: usize, - ) -> AgentHandler - where - T: Ticker + Send + 'static, - { - let (sender, mut receiver) = channel::>(buffer); - let agent_name: String = name.into(); - let inner_agent_name = agent_name.clone(); - let abort_handle = self.handlers.spawn(async move { - let name = inner_agent_name; - - let init_result = ticker.init().await; - if init_result.is_err() { - error!("{:?} error during init: {:?}", name, init_result); - return init_result; - } - - loop { - let before = Instant::now(); - let result = tokio::select! { - _ = ticker.timeout() => { - ticker.on_timeout(Instant::now()).await - } - opt_msg = receiver.recv(), if ticker.is_pull_ready() => { - match opt_msg { - Some(msg) => { - let now = Instant::now(); - if msg.callbacks.is_empty() { - ticker.tick(now, msg.data).await - } else { - ticker.tick_with_callback_sender(now, msg.data, msg.callbacks).await - } - }, - None => { - let now = Instant::now(); - return Err( - ticker.terminate(now) - .await - .err() - .unwrap_or(anyhow!("Agent is closed")) - ) - }, - } - } - }; - - if let Err(e) = result { - let emsg = format!("message loop: {:?}, {:?}", name, e); - anyhow::bail!(emsg) - } - - let iteration_duration = before.elapsed(); - if iteration_duration >= LOOP_DELAY_WARN_THRESHOLD { - warn!("{:?} loop iteration took: {:?}", name, iteration_duration); - } - } - }); - - AgentHandler { - name: agent_name, - sender, - abort_handle: Arc::new(abort_handle), - deadletter_queue: None, - } - } - - pub async fn until_one_agent_dies(&mut self) -> anyhow::Result { - self.handlers - .join_next() - .map(|inner| inner.unwrap_or(Ok(Ok(())))) - .map(|join_result| join_result?) - .await - } -} diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/common.rs b/yellowstone-grpc-tools/src/scylladb/consumer/common.rs new file mode 100644 index 00000000..4dffef59 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/consumer/common.rs @@ -0,0 +1,23 @@ +use crate::scylladb::types::{BlockchainEventType, ProducerId, ShardId, ShardOffset}; + +pub type OldShardOffset = ShardOffset; + +pub type ConsumerId = String; + +/// +/// Initial position in the log when creating a new consumer. +/// +#[derive(Default, Debug, Clone, Copy)] +pub enum InitialOffsetPolicy { + Earliest, + #[default] + Latest, + SlotApprox(i64), +} + +pub struct ConsumerInfo { + pub consumer_id: ConsumerId, + pub producer_id: ProducerId, + pub shard_offsets: Vec<(ShardId, BlockchainEventType, ShardOffset)>, + pub subscribed_blockchain_event_types: Vec, +} diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs new file mode 100644 index 00000000..857646f5 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs @@ -0,0 +1,804 @@ +use { + super::{ + common::{self, ConsumerId, ConsumerInfo, InitialOffsetPolicy}, + shard_iterator::{ShardFilter, ShardIterator}, + }, + crate::scylladb::types::{ + BlockchainEventType, ProducerId, ProducerInfo, ShardId, ShardOffset, MAX_PRODUCER, + MIN_PROCUDER, + }, + futures::Stream, + scylla::{ + batch::{Batch, BatchType}, + cql_to_rust::FromCqlVal, + prepared_statement::PreparedStatement, + query::Query, + transport::query_result::SingleRowTypedError, + Session, + }, + std::{iter::repeat, pin::Pin, sync::Arc, time::Duration}, + tokio::{sync::mpsc, task::JoinSet, time::Instant}, + tokio_stream::wrappers::ReceiverStream, + tonic::Response, + tracing::{error, info}, + uuid::Uuid, + yellowstone_grpc_proto::{ + geyser::{subscribe_update::UpdateOneof, SubscribeUpdate}, + yellowstone::log::{ + yellowstone_log_server::YellowstoneLog, ConsumeRequest, EventSubscriptionPolicy, + }, + }, +}; + +const DEFAULT_OFFSET_COMMIT_INTERVAL: Duration = Duration::from_secs(10); + +const DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY: usize = 100; + +const UPDATE_CONSUMER_SHARD_OFFSET: &str = r###" + UPDATE consumer_info + SET offset = ?, updated_at = currentTimestamp() + WHERE + consumer_id = ? + AND producer_id = ? + AND shard_id = ? + AND event_type = ? + IF offset = ? +"###; + +pub const GET_MIN_OFFSET_FOR_SLOT: &str = r###" + SELECT + shard_id, + min(offset) + FROM solana.slot_map_mv + WHERE slot = ? and producer_id = ? + ORDER BY shard_id + GROUP BY shard_id; +"###; + +pub const INSERT_CONSUMER_OFFSET: &str = r###" + INSERT INTO consumer_info ( + consumer_id, + producer_id, + shard_id, + event_type, + offset, + created_at, + updated_at + ) + VALUES + (?,?,?,?,?,currentTimestamp(), currentTimestamp()) +"###; + +pub const GET_CONSUMER_PRODUCER_MAPPING: &str = r###" + SELECT + producer_id + FROM consumer_producer_mapping + where consumer_id = ? +"###; + +pub const GET_SHARD_OFFSETS_FOR_CONSUMER_ID: &str = r###" + SELECT + shard_id, + event_type, + offset + FROM consumer_info + WHERE + consumer_id = ? + AND producer_id = ? + ORDER BY shard_id ASC +"###; + +pub const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" + SELECT + producer_id, + count(1) + FROM producer_consumer_mapping_mv + GROUP BY producer_id +"###; + +pub const INSERT_CONSUMER_PRODUCER_MAPPING: &str = r###" + INSERT INTO consumer_producer_mapping ( + consumer_id, + producer_id, + created_at, + updated_at + ) + VALUES (?, ?, currentTimestamp(), currentTimestamp()) +"###; + +/// +/// CQL does not support OR conditions, +/// this is why use >=/<= to emulate the following condition: (producer_id = ? or ?) +/// produ +pub const GET_PRODUCER_INFO_BY_ID_OR_ANY: &str = r###" + SELECT + producer_id, + num_shards, + is_active + FROM producer_info + WHERE producer_id >= ? and producer_id <= ? + LIMIT 1 + ALLOW FILTERING +"###; + +/// +/// Returns the latest offset per shard for a consumer id +/// +pub async fn get_shard_offsets_info_for_consumer_id( + session: Arc, + consumer_id: impl AsRef, + producer_id: ProducerId, +) -> anyhow::Result> { + let qr = session + .query( + GET_SHARD_OFFSETS_FOR_CONSUMER_ID, + (consumer_id.as_ref(), producer_id), + ) + .await?; + + let mut ret = Vec::new(); + for result in qr.rows_typed_or_empty::<(ShardId, BlockchainEventType, ShardOffset)>() { + let typed_row = result?; + ret.push(typed_row); + } + + Ok(ret) +} + +/// +/// Returns the assigned producer id to specific consumer if any. +/// +pub async fn get_producer_id_for_consumer( + session: Arc, + consumer_id: impl AsRef, +) -> anyhow::Result> { + session + .query(GET_CONSUMER_PRODUCER_MAPPING, (consumer_id.as_ref(),)) + .await? + .maybe_first_row_typed::<(ProducerId,)>() + .map(|opt| opt.map(|row| row.0)) + .map_err(anyhow::Error::new) +} + +/// +/// Returns the producer id with least consumer assignment. +/// +pub async fn get_producer_id_with_least_assigned_consumer( + session: Arc, +) -> anyhow::Result> { + let res = session + .query(GET_PRODUCERS_CONSUMER_COUNT, &[]) + .await? + .rows_typed_or_empty::<(ProducerId, i32)>() + .map(|result| result.unwrap()) + .min_by_key(|r| r.1) + .map(|r| r.0); + + Ok(res) +} + +/// +/// Returns a specific producer information by id or return a random producer_info if `producer_id` is None. +pub async fn get_producer_info_by_id_or_any( + session: Arc, + producer_id: Option, +) -> anyhow::Result> { + let qr = session + .query( + GET_PRODUCER_INFO_BY_ID_OR_ANY, + ( + producer_id.unwrap_or(MIN_PROCUDER), + producer_id.unwrap_or(MAX_PRODUCER), + ), + ) + .await?; + + match qr.single_row_typed::() { + Ok(row) => Ok(Some(row)), + Err(SingleRowTypedError::BadNumberOfRows(_)) => Ok(None), + Err(e) => Err(anyhow::Error::new(e)), + } +} + +fn get_blockchain_event_types( + event_sub_policy: EventSubscriptionPolicy, +) -> Vec { + match event_sub_policy { + EventSubscriptionPolicy::AccountUpdateOnly => vec![BlockchainEventType::AccountUpdate], + EventSubscriptionPolicy::TransactionOnly => vec![BlockchainEventType::NewTransaction], + EventSubscriptionPolicy::Both => vec![ + BlockchainEventType::AccountUpdate, + BlockchainEventType::NewTransaction, + ], + } +} + +/// +/// Gets an existing consumer with id = `consumer_id` if exists, otherwise creates a new consumer. +/// +pub async fn get_or_register_consumer( + session: Arc, + consumer_id: impl AsRef, + initial_offset_policy: InitialOffsetPolicy, + event_sub_policy: EventSubscriptionPolicy, +) -> anyhow::Result { + let maybe_producer_id = + get_producer_id_for_consumer(Arc::clone(&session), consumer_id.as_ref()).await?; + let producer_id = if let Some(producer_id) = maybe_producer_id { + info!( + "consumer {:?} exists with producer {:?} assigned to it", + consumer_id.as_ref(), + producer_id + ); + producer_id + } else { + let maybe = get_producer_id_with_least_assigned_consumer(Arc::clone(&session)).await?; + + let producer_id = if let Some(producer_id) = maybe { + producer_id + } else { + let producer = get_producer_info_by_id_or_any(Arc::clone(&session), None).await?; + producer.expect("No producer registered").producer_id + }; + info!( + "consumer {:?} does not exists, will try to assign producer {:?}", + consumer_id.as_ref(), + producer_id + ); + + session + .query( + INSERT_CONSUMER_PRODUCER_MAPPING, + (consumer_id.as_ref(), producer_id), + ) + .await?; + info!( + "consumer {:?} successfully assigned producer {:?}", + consumer_id.as_ref(), + producer_id + ); + producer_id + }; + + let ev_types = get_blockchain_event_types(event_sub_policy); + + let shard_offsets = get_shard_offsets_info_for_consumer_id( + Arc::clone(&session), + consumer_id.as_ref(), + producer_id, + ) + .await?; + let shard_offsets = if !shard_offsets.is_empty() { + shard_offsets + } else { + info!( + "new consumer {:?} initial offset policy {:?}", + consumer_id.as_ref(), + initial_offset_policy + ); + set_initial_consumer_shard_offsets( + Arc::clone(&session), + consumer_id.as_ref(), + producer_id, + initial_offset_policy, + event_sub_policy, + ) + .await? + }; + let cs = ConsumerInfo { + consumer_id: String::from(consumer_id.as_ref()), + producer_id, + shard_offsets, + subscribed_blockchain_event_types: ev_types, + }; + Ok(cs) +} + +fn build_offset_per_shard_query(num_shards: ShardId, ordering: &str) -> impl Into { + let shard_bind_markers = (0..num_shards) + .map(|x| format!("{}", x)) + .collect::>() + .join(", "); + + format!( + r###" + SELECT + shard_id, + offset + FROM shard_max_offset_mv + WHERE + producer_id = ? + AND shard_id IN ({shard_bind_markers}) + ORDER BY offset {ordering}, period {ordering} + PER PARTITION LIMIT 1 + "###, + shard_bind_markers = shard_bind_markers, + ordering = ordering + ) +} + +fn get_max_offset_per_shard_query(num_shards: ShardId) -> impl Into { + build_offset_per_shard_query(num_shards, "DESC") +} + +fn get_min_offset_per_shard_query(num_shards: ShardId) -> impl Into { + build_offset_per_shard_query(num_shards, "ASC") +} + +/// Sets the initial shard offsets for a newly created consumer based on [[`InitialOffsetPolicy`]]. +/// +/// Similar to seeking in a file, we can seek right at the beginning of the log, completly at the end or at first +/// log event containg a specific slot number. +async fn set_initial_consumer_shard_offsets( + session: Arc, + new_consumer_id: impl AsRef, + producer_id: ProducerId, + initial_offset_policy: InitialOffsetPolicy, + event_sub_policy: EventSubscriptionPolicy, +) -> anyhow::Result> { + // Create all the shards counter + let producer_info = get_producer_info_by_id_or_any(Arc::clone(&session), Some(producer_id)) + .await? + .unwrap_or_else(|| panic!("Producer Info `{:?}` must exists", producer_id)); + + let num_shards = producer_info.num_shards; + + let shard_offsets_query_result = match initial_offset_policy { + InitialOffsetPolicy::Latest => { + session + .query(get_max_offset_per_shard_query(num_shards), (producer_id,)) + .await? + } + InitialOffsetPolicy::Earliest => { + session + .query(get_min_offset_per_shard_query(num_shards), (producer_id,)) + .await? + } + InitialOffsetPolicy::SlotApprox(slot) => { + session + .query(GET_MIN_OFFSET_FOR_SLOT, (slot, producer_id)) + .await? + } + }; + + let adjustment = match initial_offset_policy { + InitialOffsetPolicy::Earliest | InitialOffsetPolicy::SlotApprox(_) => -1, + InitialOffsetPolicy::Latest => 0, + }; + + let insert_consumer_offset_ps: PreparedStatement = + session.prepare(INSERT_CONSUMER_OFFSET).await?; + + let rows = shard_offsets_query_result + .rows_typed_or_empty::<(ShardId, ShardOffset)>() + .collect::, _>>()?; + + let mut batch = Batch::new(BatchType::Unlogged); + let mut buffer = Vec::with_capacity(rows.len()); + + let ev_types = get_blockchain_event_types(event_sub_policy); + + ev_types + .into_iter() + .flat_map(|ev_type| { + rows.iter() + .cloned() + .map(move |(shard_id, offset)| (ev_type, shard_id, offset)) + }) + .for_each(|(ev_type, shard_id, offset)| { + let offset = offset + adjustment; + batch.append_statement(insert_consumer_offset_ps.clone()); + buffer.push(( + new_consumer_id.as_ref(), + producer_id, + shard_id, + ev_type, + offset, + )); + }); + + session.batch(&batch, &buffer).await?; + + let shard_offsets = buffer + .drain(..) + .map(|(_, _, shard_id, ev_type, offset)| (shard_id, ev_type, offset)) + .collect::>(); + + Ok(shard_offsets) +} + +pub struct ScyllaYsLog { + session: Arc, +} + +pub type LogStream = Pin> + Send>>; + +#[tonic::async_trait] +impl YellowstoneLog for ScyllaYsLog { + #[doc = r" Server streaming response type for the consume method."] + type ConsumeStream = LogStream; + + async fn consume( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let cr = request.into_inner(); + + let consumer_id = cr.consumer_id.clone().unwrap_or(Uuid::new_v4().to_string()); + let initial_offset_policy = match cr.initial_offset_policy() { + yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Earliest => { + InitialOffsetPolicy::Earliest + } + yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Latest => { + InitialOffsetPolicy::Latest + } + yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Slot => { + let slot = cr.at_slot.ok_or(tonic::Status::invalid_argument( + "Expected at_lot when initital_offset_policy is to `Slot`", + ))?; + InitialOffsetPolicy::SlotApprox(slot) + } + }; + + let event_subscription_policy = cr.event_subscription_policy(); + let account_update_event_filter = cr.account_update_event_filter; + let tx_event_filter = cr.tx_event_filter; + + let session = Arc::clone(&self.session); + let consumer_info = get_or_register_consumer( + session, + consumer_id.as_str(), + initial_offset_policy, + event_subscription_policy, + ) + .await + .map_err(|e| { + error!("{:?}", e); + tonic::Status::new( + tonic::Code::Internal, + format!("failed to get or create consumer {:?}", consumer_id), + ) + })?; + + let req = SpawnGrpcConsumerReq { + session: Arc::clone(&self.session), + consumer_info, + account_update_event_filter, + tx_event_filter, + buffer_capacity: None, + offset_commit_interval: None, + }; + + let rx = spawn_grpc_consumer(req) + .await + .map_err(|_e| tonic::Status::internal("fail to spawn consumer"))?; + + let ret = ReceiverStream::new(rx); + + let res = Response::new(Box::pin(ret) as Self::ConsumeStream); + Ok(res) + } +} + +struct GrpcConsumerSession { + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender>, + // The interval at which we want to commit our Offset progression to Scylla + offset_commit_interval: Duration, + shard_iterators: Vec, +} + +pub struct SpawnGrpcConsumerReq { + pub session: Arc, + pub consumer_info: common::ConsumerInfo, + pub account_update_event_filter: + Option, + pub tx_event_filter: Option, + pub buffer_capacity: Option, + pub offset_commit_interval: Option, +} + +pub async fn spawn_grpc_consumer( + spawnreq: SpawnGrpcConsumerReq, +) -> anyhow::Result>> { + let session = spawnreq.session; + let buffer_capacity = spawnreq + .buffer_capacity + .unwrap_or(DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY); + let (sender, receiver) = mpsc::channel(buffer_capacity); + //let last_committed_offsets = state.shard_offsets.clone(); + let consumer_session = Arc::clone(&session); + + let shard_filter = ShardFilter { + tx_account_keys: spawnreq + .tx_event_filter + .map(|f| f.account_keys) + .unwrap_or_default(), + account_pubkyes: spawnreq + .account_update_event_filter + .as_ref() + .map(|f| f.pubkeys.to_owned()) + .unwrap_or_default(), + account_owners: spawnreq + .account_update_event_filter + .as_ref() + .map(|f| f.owners.to_owned()) + .unwrap_or_default(), + }; + + // We pre-warm all the shard iterator before streaming any event + let mut prewarm_set: JoinSet> = JoinSet::new(); + + // if the subscription policy requires to listen to both transaction + account update we have to double the amount of shard iterator. + let mut account_update_shard_iterators = Vec::new(); + let mut new_tx_shard_iterators = Vec::new(); + + for (shard_id, ev_type, shard_offset) in spawnreq.consumer_info.shard_offsets.iter().cloned() { + if !spawnreq + .consumer_info + .subscribed_blockchain_event_types + .contains(&ev_type) + { + continue; + } + let session = Arc::clone(&session); + let producer_id = spawnreq.consumer_info.producer_id; + let shard_filter = shard_filter.clone(); + prewarm_set.spawn(async move { + let mut shard_iterator = ShardIterator::new( + session, + producer_id, + shard_id, + shard_offset, + // The ev_type will dictate if shard iterator streams account update or transaction. + ev_type, + Some(shard_filter), + ) + .await?; + shard_iterator.warm().await?; + Ok(shard_iterator) + }); + } + + info!( + "Prewarming shard iterators for consumer {}...", + spawnreq.consumer_info.consumer_id + ); + // Wait each shard iterator to finish warming up + while let Some(result) = prewarm_set.join_next().await { + let shard_iterator = result??; + match shard_iterator.event_type { + BlockchainEventType::AccountUpdate => { + account_update_shard_iterators.push(shard_iterator) + } + BlockchainEventType::NewTransaction => new_tx_shard_iterators.push(shard_iterator), + } + } + + let shard_iterators = interleave(account_update_shard_iterators, new_tx_shard_iterators); + + let consumer = GrpcConsumerSession::new( + consumer_session, + spawnreq.consumer_info, + sender, + spawnreq + .offset_commit_interval + .unwrap_or(DEFAULT_OFFSET_COMMIT_INTERVAL), + shard_iterators, + ) + .await?; + + tokio::spawn(async move { + consumer + .into_daemon() + .await + .expect("consumer terminated abruptly"); + }); + Ok(receiver) +} + +struct UpdateShardOffsetClosure { + session: Arc, + consumer_id: ConsumerId, + producer_id: ProducerId, + update_prepared_stmt: PreparedStatement, +} + +impl UpdateShardOffsetClosure { + async fn new( + session: Arc, + consumer_id: ConsumerId, + producer_id: ProducerId, + ) -> anyhow::Result { + let ps = session.prepare(UPDATE_CONSUMER_SHARD_OFFSET).await?; + Ok(UpdateShardOffsetClosure { + session, + consumer_id, + producer_id, + update_prepared_stmt: ps, + }) + } + + async fn execute( + &self, + old_offsets: &[(ShardId, BlockchainEventType, ShardOffset)], + new_offsets: &[(ShardId, BlockchainEventType, ShardOffset)], + ) -> anyhow::Result> { + // Since the commit offset is partitionned by consumer_id/producer_id + // and that we using LWT, the entire batch will be atomic. + // + // LOGGING Batch mode is when you have a batch that span multiple partition and need some atomicity. + // In our case, we can disable batch logging since we are batching since-partition data. + // Apparently, this is done by default by Scylla, but we make it explicit here since the driver is not quite mature. + let mut atomic_batch = Batch::new(BatchType::Unlogged); + + let buffer = old_offsets + .iter() + .zip(new_offsets.iter()) + .filter(|((_, _, old_offset), (_, _, new_offset))| old_offset < new_offset) + .map( + |((shard_id, event_type, old_offset), (shard_id2, event_type2, new_offset))| { + if shard_id != shard_id2 { + panic!("Misaligned consumer offset update"); + } + if event_type != event_type2 { + panic!("Misaligned event type during offset update"); + } + ( + new_offset, + self.consumer_id.clone(), + self.producer_id, + shard_id, + event_type, + old_offset, + ) + }, + ) + .collect::>(); + + if buffer.is_empty() { + return Ok(Ok(())); + } + + repeat(()) + .take(buffer.len()) + .for_each(|_| atomic_batch.append_statement(self.update_prepared_stmt.clone())); + + let query_result = self.session.batch(&atomic_batch, &buffer).await?; + + let row = query_result.first_row().map_err(anyhow::Error::new)?; + + let success = row + .columns + .first() // first column of LWT is always "success" field + .and_then(|opt| opt.to_owned()) + .map(bool::from_cql) + .transpose()? + .unwrap_or(false); + + let actual_offset = row + .columns + .get(5) // offset column + .and_then(|opt| opt.to_owned()) + .map(ShardOffset::from_cql) + .transpose()?; + + if success { + Ok(Ok(())) + } else { + Ok(Err(actual_offset.expect("missing actual offset from LWT"))) + } + } +} + +fn interleave(it1: IT, it2: IT) -> Vec<::Item> +where + IT: IntoIterator, +{ + let mut ret = vec![]; + let mut iter1 = it1.into_iter(); + let mut iter2 = it2.into_iter(); + loop { + match (iter1.next(), iter2.next()) { + (Some(x), Some(y)) => { + ret.push(x); + ret.push(y); + } + (Some(x), None) => ret.push(x), + (None, Some(y)) => ret.push(y), + (None, None) => break, + } + } + + ret +} + +impl GrpcConsumerSession { + async fn new( + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender>, + offset_commit_interval: Duration, + shard_iterators: Vec, + ) -> anyhow::Result { + Ok(GrpcConsumerSession { + session, + consumer_info, + sender, + offset_commit_interval, + shard_iterators, + }) + } + + async fn into_daemon(mut self) -> anyhow::Result<()> { + let consumer_id = self.consumer_info.consumer_id; + let producer_id = self.consumer_info.producer_id; + let mut commit_offset_deadline = Instant::now() + self.offset_commit_interval; + let update_shard_offset_fn = UpdateShardOffsetClosure::new( + Arc::clone(&self.session), + consumer_id.clone(), + producer_id, + ) + .await?; + + info!("Serving consumer: {:?}", consumer_id); + + let mut last_committed_offsets = self.consumer_info.shard_offsets.clone(); + last_committed_offsets.sort_by_key(|tuple| (tuple.0, tuple.1)); + self.shard_iterators + .sort_by_key(|it| (it.shard_id, it.event_type)); + + loop { + for shard_it in self.shard_iterators.iter_mut() { + let maybe = shard_it.try_next().await?; + if let Some(block_chain_event) = maybe { + let geyser_event = match block_chain_event.event_type { + BlockchainEventType::AccountUpdate => { + UpdateOneof::Account(block_chain_event.try_into()?) + } + BlockchainEventType::NewTransaction => { + UpdateOneof::Transaction(block_chain_event.try_into()?) + } + }; + let subscribe_update = SubscribeUpdate { + filters: Default::default(), + update_oneof: Some(geyser_event), + }; + self.sender.send(Ok(subscribe_update)).await.map_err(|_| { + anyhow::anyhow!("Failed to deliver message to consumer {}", consumer_id) + })?; + } + } + + // Every now and then, we commit where the consumer is loc + if commit_offset_deadline.elapsed() > Duration::ZERO { + let mut new_offsets_to_commit = self + .shard_iterators + .iter() + .map(|shard_it| { + ( + shard_it.shard_id, + shard_it.event_type, + shard_it.last_offset(), + ) + }) + .collect::>(); + + let result = update_shard_offset_fn + .execute(&last_committed_offsets, &new_offsets_to_commit) + .await?; + + if let Err(_actual_offset_in_scylla) = result { + anyhow::bail!("two concurrent connections are using the same consumer instance") + } + info!( + "Successfully committed offsets for consumer {:?}", + consumer_id + ); + std::mem::swap(&mut new_offsets_to_commit, &mut last_committed_offsets); + commit_offset_deadline = Instant::now() + self.offset_commit_interval; + } + } + } +} diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/mod.rs b/yellowstone-grpc-tools/src/scylladb/consumer/mod.rs new file mode 100644 index 00000000..6d3b120c --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/consumer/mod.rs @@ -0,0 +1,3 @@ +pub mod common; +pub mod grpc; +mod shard_iterator; diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs b/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs new file mode 100644 index 00000000..94666c1b --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs @@ -0,0 +1,476 @@ +use { + crate::scylladb::types::{ + BlockchainEvent, BlockchainEventType, ProducerId, ShardId, ShardOffset, ShardPeriod, + SHARD_OFFSET_MODULO, + }, + core::fmt, + scylla::{prepared_statement::PreparedStatement, Session}, + std::{collections::VecDeque, sync::Arc}, + tokio::sync::oneshot::{self, error::TryRecvError}, +}; + +const MICRO_BATCH_SIZE: usize = 40; + +pub const GET_NEW_TRANSACTION_EVENT: &str = r###" + SELECT + shard_id, + period, + producer_id, + offset, + slot, + event_type, + + pubkey, + lamports, + owner, + executable, + rent_epoch, + write_version, + data, + txn_signature, + + signature, + signatures, + num_required_signatures, + num_readonly_signed_accounts, + num_readonly_unsigned_accounts, + account_keys, + recent_blockhash, + instructions, + versioned, + address_table_lookups, + meta, + is_vote, + tx_index + FROM log + WHERE producer_id = ? and shard_id = ? and offset > ? and period = ? + and event_type = 1 + ORDER BY offset ASC + ALLOW FILTERING +"###; + +const PRODUCER_SHARD_PERIOD_COMMIT_EXISTS: &str = r###" + SELECT + producer_id + FROM producer_period_commit_log + WHERE + producer_id = ? + AND shard_id = ? + AND period = ? +"###; + +/// Represents the state of a shard iterator, which is used to manage the iteration +/// and retrieval of blockchain events from a shard. +/// +/// The `ShardIteratorState` enum encapsulates different states that the iterator +/// can be in during its lifecycle. +enum ShardIteratorState { + /// The iterator is initialized and empty. + Empty(ShardOffset), + + /// The iterator is in the process of loading blockchain events from the shard. + Loading(ShardOffset, oneshot::Receiver>), + + /// The iterator has loaded blockchain events and is ready for retrieval. + Loaded(ShardOffset, VecDeque), + + /// The iterator is confirming the end of a period in the shard. + ConfirmingPeriod(ShardOffset, oneshot::Receiver), + + /// The iterator is actively streaming blockchain events. + AvailableData(ShardOffset, VecDeque), + + /// The iterator is waiting for the end of a period in the shard. + WaitingEndOfPeriod(ShardOffset, oneshot::Receiver), +} + +impl fmt::Debug for ShardIteratorState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Empty(arg0) => f.debug_tuple("Empty").field(arg0).finish(), + Self::Loading(arg0, _) => f.debug_tuple("Loading").field(arg0).finish(), + Self::Loaded(arg0, micro_batch) => f + .debug_tuple("Loaded") + .field(arg0) + .field(&format!("micro_batch({})", micro_batch.len())) + .finish(), + Self::ConfirmingPeriod(arg0, _) => { + f.debug_tuple("ConfirmingPeriod").field(arg0).finish() + } + Self::AvailableData(arg0, micro_batch) => f + .debug_tuple("Available") + .field(arg0) + .field(&format!("micro_batch({})", micro_batch.len())) + .finish(), + Self::WaitingEndOfPeriod(arg0, _) => f.debug_tuple("EndOfPeriod").field(arg0).finish(), + } + } +} + +impl ShardIteratorState { + const fn last_offset(&self) -> ShardOffset { + match self { + Self::Empty(offset) => *offset, + Self::Loading(offset, _) => *offset, + Self::Loaded(offset, _) => *offset, + Self::ConfirmingPeriod(offset, _) => *offset, + Self::AvailableData(offset, _) => *offset, + Self::WaitingEndOfPeriod(offset, _) => *offset, + } + } + + const fn is_empty(&self) -> bool { + matches!(self, ShardIteratorState::Empty(_)) + } +} + +#[derive(Clone, Default)] +pub(crate) struct ShardFilter { + pub(crate) tx_account_keys: Vec>, + pub(crate) account_owners: Vec>, + pub(crate) account_pubkyes: Vec>, +} + +pub(crate) struct ShardIterator { + session: Arc, + pub(crate) producer_id: ProducerId, + pub(crate) shard_id: ShardId, + inner: ShardIteratorState, + pub(crate) event_type: BlockchainEventType, + get_events_prepared_stmt: PreparedStatement, + period_commit_exists_prepared_stmt: PreparedStatement, + last_period_confirmed: ShardPeriod, + filter: ShardFilter, +} + +/// Represents an iterator for fetching and processing blockchain events from a specific shard. +/// The iterator fetch "micro batch" at a time. +impl ShardIterator { + pub(crate) async fn new( + session: Arc, + producer_id: ProducerId, + shard_id: ShardId, + offset: ShardOffset, + event_type: BlockchainEventType, + filter: Option, + ) -> anyhow::Result { + let get_events_ps = if event_type == BlockchainEventType::AccountUpdate { + let query_str = forge_account_upadate_event_query(filter.clone().unwrap_or_default()); + session.prepare(query_str).await? + } else { + session.prepare(GET_NEW_TRANSACTION_EVENT).await? + }; + + let period_commit_exists_ps = session.prepare(PRODUCER_SHARD_PERIOD_COMMIT_EXISTS).await?; + + Ok(ShardIterator { + session, + producer_id, + shard_id, + inner: ShardIteratorState::Empty(offset), + event_type, + get_events_prepared_stmt: get_events_ps, + period_commit_exists_prepared_stmt: period_commit_exists_ps, + last_period_confirmed: (offset / SHARD_OFFSET_MODULO) - 1, + filter: filter.unwrap_or_default(), + }) + } + + pub(crate) const fn last_offset(&self) -> ShardOffset { + self.inner.last_offset() + } + + /// Warms up the shard iterator by loading the initial micro batch if in the `Empty` state. + pub(crate) async fn warm(&mut self) -> anyhow::Result<()> { + if !self.inner.is_empty() { + return Ok(()); + } + let last_offset = self.inner.last_offset(); + + let micro_batch = self.fetch_micro_batch(last_offset).await?; + let new_state = ShardIteratorState::AvailableData(last_offset, micro_batch); + self.inner = new_state; + Ok(()) + } + + /// Checks if a period is committed based on the given last offset. + fn is_period_committed(&self, last_offset: ShardOffset) -> oneshot::Receiver { + let session = Arc::clone(&self.session); + let producer_id = self.producer_id; + let ps = self.period_commit_exists_prepared_stmt.clone(); + let shard_id = self.shard_id; + let period = last_offset / SHARD_OFFSET_MODULO; + let (sender, receiver) = oneshot::channel(); + tokio::spawn(async move { + let result = session + .execute(&ps, (producer_id, shard_id, period)) + .await + .expect("failed to query period commit state") + .maybe_first_row() + .expect("query not elligible to return rows") + .map(|_row| true) + .unwrap_or(false); + sender.send(result).map_err(|_| ()).unwrap_or_else(|_| { + panic!( + "failed to send back period commit status to shard iterator {}", + shard_id + ) + }); + }); + receiver + } + + /// Fetches a micro batch of blockchain events starting from the given last offset. + fn fetch_micro_batch( + &self, + last_offset: ShardOffset, + ) -> oneshot::Receiver> { + let period = (last_offset + 1) / SHARD_OFFSET_MODULO; + let producer_id = self.producer_id; + let ps = self.get_events_prepared_stmt.clone(); + let shard_id = self.shard_id; + let session = Arc::clone(&self.session); + let (sender, receiver) = oneshot::channel(); + tokio::spawn(async move { + let micro_batch = session + .execute(&ps, (producer_id, shard_id, last_offset, period)) + .await + .expect("failed to fetch micro batch from scylladb") + .rows_typed_or_empty::() + .collect::, _>>() + .expect("failed to typed scylladb rows"); + sender + .send(micro_batch) + .map_err(|_| ()) + .unwrap_or_else(|_| { + panic!("Failed to send micro batch to shard iterator {}", shard_id) + }); + }); + receiver + } + + /// + /// Apply any filter that cannot be pushed down to the database + /// + fn filter_row(&self, row: BlockchainEvent) -> Option { + if row.event_type == BlockchainEventType::NewTransaction { + // Apply transaction filter here + let elligible_acc_keys = &self.filter.tx_account_keys; + if !elligible_acc_keys.is_empty() { + let is_row_elligible = row + .account_keys + .as_ref() + .filter(|actual_keys| { + actual_keys + .iter() + .any(|account_key| elligible_acc_keys.contains(account_key)) + }) + .map(|_| true) + .unwrap_or(false); + if !is_row_elligible { + return None; + } + } + } + + Some(row) + } + + /// Attempts to retrieve the next blockchain event from the shard iterator. + /// + /// This method asynchronously advances the iterator's state and fetches the next blockchain event + /// based on its current state. + /// + /// It handles different states of the iterator and performs + /// appropriate actions such as loading, streaming, and period confirmation. + /// + /// Returns `Ok(None)` if no event is available or the iterator is waiting for period confirmation. + pub(crate) async fn try_next(&mut self) -> anyhow::Result> { + let last_offset = self.inner.last_offset(); + let current_state = + std::mem::replace(&mut self.inner, ShardIteratorState::Empty(last_offset)); + + let (next_state, maybe_to_return) = match current_state { + ShardIteratorState::Empty(last_offset) => { + let receiver = self.fetch_micro_batch(last_offset); + (ShardIteratorState::Loading(last_offset, receiver), None) + } + ShardIteratorState::Loading(last_offset, mut receiver) => { + let result = receiver.try_recv(); + match result { + Err(TryRecvError::Empty) => { + (ShardIteratorState::Loading(last_offset, receiver), None) + } + Err(TryRecvError::Closed) => anyhow::bail!("failed to receive micro batch"), + Ok(micro_batch) => (ShardIteratorState::Loaded(last_offset, micro_batch), None), + } + } + ShardIteratorState::Loaded(last_offset, mut micro_batch) => { + let maybe_row = micro_batch.pop_front(); + if let Some(row) = maybe_row { + ( + ShardIteratorState::AvailableData(row.offset, micro_batch), + Some(row), + ) + } else { + let curr_period = last_offset / SHARD_OFFSET_MODULO; + if curr_period <= self.last_period_confirmed { + let last_period_offset = ((curr_period + 1) * SHARD_OFFSET_MODULO) - 1; + (ShardIteratorState::Empty(last_period_offset), None) + } else { + // If a newly loaded row stream is already empty, we must figure out if + // its because there no more data in the period or is it because we consume too fast and we should try again later. + let receiver = self.is_period_committed(last_offset); + ( + ShardIteratorState::ConfirmingPeriod(last_offset, receiver), + None, + ) + } + } + } + ShardIteratorState::ConfirmingPeriod(last_offset, mut rx) => match rx.try_recv() { + Err(TryRecvError::Empty) => { + (ShardIteratorState::ConfirmingPeriod(last_offset, rx), None) + } + Err(TryRecvError::Closed) => anyhow::bail!("fail"), + Ok(period_committed) => { + if period_committed { + self.last_period_confirmed = last_offset / SHARD_OFFSET_MODULO; + } + (ShardIteratorState::Empty(last_offset), None) + } + }, + ShardIteratorState::AvailableData(last_offset, mut micro_batch) => { + let maybe_row = micro_batch.pop_front(); + if let Some(row) = maybe_row { + ( + ShardIteratorState::AvailableData(row.offset, micro_batch), + Some(row), + ) + } else if (last_offset + 1) % SHARD_OFFSET_MODULO == 0 { + let receiver = self.is_period_committed(last_offset); + ( + ShardIteratorState::WaitingEndOfPeriod(last_offset, receiver), + None, + ) + } else { + (ShardIteratorState::Empty(last_offset), None) + } + } + ShardIteratorState::WaitingEndOfPeriod(last_offset, mut rx) => { + match rx.try_recv() { + Err(TryRecvError::Empty) => ( + ShardIteratorState::WaitingEndOfPeriod(last_offset, rx), + None, + ), + Err(TryRecvError::Closed) => anyhow::bail!("fail"), + Ok(period_committed) => { + if period_committed { + self.last_period_confirmed = last_offset / SHARD_OFFSET_MODULO; + (ShardIteratorState::Empty(last_offset), None) + } else { + // Renew the background task + let rx2 = self.is_period_committed(last_offset); + ( + ShardIteratorState::WaitingEndOfPeriod(last_offset, rx2), + None, + ) + } + } + } + } + }; + let _ = std::mem::replace(&mut self.inner, next_state); + Ok(maybe_to_return.and_then(|row| self.filter_row(row))) + } +} + +const LOG_PRIMARY_KEY_CONDITION: &str = r###" + producer_id = ? and shard_id = ? and offset > ? and period = ? +"###; + +const LOG_PROJECTION: &str = r###" + shard_id, + period, + producer_id, + offset, + slot, + event_type, + pubkey, + lamports, + owner, + executable, + rent_epoch, + write_version, + data, + txn_signature, + signature, + signatures, + num_required_signatures, + num_readonly_signed_accounts, + num_readonly_unsigned_accounts, + account_keys, + recent_blockhash, + instructions, + versioned, + address_table_lookups, + meta, + is_vote, + tx_index +"###; + +fn format_as_scylla_hexstring(bytes: &[u8]) -> String { + if bytes.is_empty() { + panic!("byte slice is empty") + } + let hex = bytes + .iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .join(""); + format!("0x{}", hex) +} + +fn forge_account_upadate_event_query(filter: ShardFilter) -> String { + let mut conds = vec![]; + + let pubkeys = filter + .account_pubkyes + .iter() + .map(|pubkey| format_as_scylla_hexstring(pubkey.as_slice())) + .collect::>(); + + let owners = filter + .account_owners + .iter() + .map(|owner| format_as_scylla_hexstring(owner.as_slice())) + .collect::>(); + + if !pubkeys.is_empty() { + let cond = format!("AND pubkey IN ({})", pubkeys.join(", ")); + conds.push(cond); + } + if !owners.is_empty() { + let cond = format!("AND owner IN ({})", owners.join(", ")); + conds.push(cond) + } + let conds_string = conds.join(" "); + + format!( + r###" + SELECT + {projection} + FROM log + WHERE {primary_key_cond} + AND event_type = 0 + {other_conds} + ORDER BY offset ASC + LIMIT {batch_size} + ALLOW FILTERING + "###, + projection = LOG_PROJECTION, + primary_key_cond = LOG_PRIMARY_KEY_CONDITION, + other_conds = conds_string, + batch_size = MICRO_BATCH_SIZE, + ) +} diff --git a/yellowstone-grpc-tools/src/scylladb/mod.rs b/yellowstone-grpc-tools/src/scylladb/mod.rs index d5722b1c..3ff178a6 100644 --- a/yellowstone-grpc-tools/src/scylladb/mod.rs +++ b/yellowstone-grpc-tools/src/scylladb/mod.rs @@ -1,5 +1,5 @@ -pub mod agent; pub mod config; +pub mod consumer; pub mod prom; pub mod sink; pub mod types; diff --git a/yellowstone-grpc-tools/src/scylladb/sink.rs b/yellowstone-grpc-tools/src/scylladb/sink.rs index 327bd036..b84509cd 100644 --- a/yellowstone-grpc-tools/src/scylladb/sink.rs +++ b/yellowstone-grpc-tools/src/scylladb/sink.rs @@ -1,45 +1,34 @@ use { super::{ - agent::{AgentHandler, AgentSystem, Callback, CallbackSender, Nothing, Ticker}, prom::{ scylladb_batch_request_lag_inc, scylladb_batch_request_lag_sub, scylladb_batch_sent_inc, scylladb_batch_size_observe, scylladb_batchitem_sent_inc_by, }, types::{ - AccountUpdate, ProducerId, ShardId, ShardOffset, ShardPeriod, ShardStatistics, - ShardedAccountUpdate, ShardedTransaction, Transaction, SHARD_OFFSET_MODULO, + AccountUpdate, ProducerId, ShardId, ShardOffset, ShardedAccountUpdate, + ShardedTransaction, Transaction, SHARD_OFFSET_MODULO, }, }, deepsize::DeepSizeOf, - futures::{future::ready, Future, FutureExt}, scylla::{ - batch::{Batch, BatchStatement}, - frame::{response::result::ColumnType, Compression}, - routing::Token, + batch::{Batch, BatchType}, + frame::Compression, serialize::{ - row::{RowSerializationContext, SerializeRow, SerializedValues}, + row::{RowSerializationContext, SerializeRow}, RowWriter, }, Session, SessionBuilder, }, - std::{ - borrow::BorrowMut, - collections::{HashMap, HashSet}, - pin::Pin, - sync::Arc, - time::Duration, - }, - tokio::{ - task::{JoinHandle, JoinSet}, - time::{self, Instant, Sleep}, - }, - tonic::async_trait, + std::{sync::Arc, time::Duration}, + tokio::{task::JoinHandle, time::Instant}, tracing::{info, warn}, }; -const SHARD_COUNT: usize = 256; +const WARNING_SCYLLADB_LATENCY_THRESHOLD: Duration = Duration::from_millis(50); -const SCYLLADB_SOLANA_LOG_TABLE_NAME: &str = "log"; +const DEFAULT_SHARD_MAX_BUFFER_CAPACITY: usize = 15; + +const SHARD_COUNT: usize = 64; const SCYLLADB_COMMIT_PRODUCER_PERIOD: &str = r###" INSERT INTO producer_period_commit_log ( @@ -51,32 +40,6 @@ const SCYLLADB_COMMIT_PRODUCER_PERIOD: &str = r###" VALUES (?,?,?,currentTimestamp()) "###; -const SCYLLADB_GET_PRODUCER_MAX_OFFSET_FOR_SHARD_MV: &str = r###" - SELECT - offset - FROM shard_max_offset_mv - WHERE - shard_id = ? - AND producer_id = ? - - ORDER BY offset DESC - PER PARTITION LIMIT 1 -"###; - -const SCYLLADB_INSERT_SHARD_STATISTICS: &str = r###" - INSERT INTO shard_statistics ( - shard_id, - period, - producer_id, - offset, - min_slot, - max_slot, - total_events, - slot_event_counter - ) - VALUES (?,?,?,?,?,?,?,?) -"###; - const SCYLLADB_INSERT_ACCOUNT_UPDATE: &str = r###" INSERT INTO log ( shard_id, @@ -84,8 +47,7 @@ const SCYLLADB_INSERT_ACCOUNT_UPDATE: &str = r###" producer_id, offset, slot, - entry_type, - + event_type, pubkey, lamports, owner, @@ -94,7 +56,6 @@ const SCYLLADB_INSERT_ACCOUNT_UPDATE: &str = r###" write_version, data, txn_signature, - created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,currentTimestamp()) @@ -107,8 +68,7 @@ const SCYLLADB_INSERT_TRANSACTION: &str = r###" producer_id, offset, slot, - entry_type, - + event_type, signature, signatures, num_readonly_signed_accounts, @@ -119,11 +79,13 @@ const SCYLLADB_INSERT_TRANSACTION: &str = r###" instructions, versioned, address_table_lookups, - meta, + meta, + is_vote, + tx_index, created_at ) - VALUES (?,?,?,?, ?,?,?,?, ?,?,?,?, ?,?,?,?,?, currentTimestamp()) + VALUES (?,?,?,?,?,?, ?,?,?,?,?,?, ?,?,?,?,?, ?,?, currentTimestamp()) "###; #[derive(Clone, PartialEq, Debug)] @@ -143,15 +105,6 @@ enum ClientCommand { InsertTransaction(Transaction), } -impl ClientCommand { - pub fn slot(&self) -> i64 { - match self { - Self::InsertAccountUpdate(x) => x.slot, - Self::InsertTransaction(x) => x.slot, - } - } -} - #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, DeepSizeOf)] struct ShardedClientCommand { @@ -174,7 +127,6 @@ impl SerializeRow for ShardedClientCommand { .clone() .as_blockchain_event(self.shard_id, self.producer_id, self.offset) .into(); - //let serval = SerializedValues::from_serializable(&ctx, &val); val.serialize(ctx, writer) } ClientCommand::InsertTransaction(val) => { @@ -182,7 +134,6 @@ impl SerializeRow for ShardedClientCommand { .clone() .as_blockchain_event(self.shard_id, self.producer_id, self.offset) .into(); - //let serval = SerializedValues::from_serializable(&ctx, &val); val.serialize(ctx, writer) } } @@ -194,7 +145,7 @@ impl SerializeRow for ShardedClientCommand { } impl ClientCommand { - fn with_shard_info( + const fn with_shard_info( self, shard_id: ShardId, producer_id: ProducerId, @@ -209,291 +160,44 @@ impl ClientCommand { } } -type NodeUuid = u128; - -fn get_node_uuid_for_token(session: Arc, token: Token) -> NodeUuid { - session - .get_cluster_data() - .replica_locator() - .ring() - .get_elem_for_token(token) - .map(|node| node.host_id.as_u128()) - .unwrap() // If it is None it means we have no more -> we need to crash asap! -} - -fn get_node_uuids(session: Arc) -> Vec { - session - .get_cluster_data() - .get_nodes_info() - .iter() - .map(|node| node.host_id.as_u128()) - .collect() -} - -fn compute_token(session: Arc, table: &str, partition_key: &SerializedValues) -> Token { - let current_keysapce = session.get_keyspace().unwrap(); - session - .get_cluster_data() - .compute_token(¤t_keysapce, table, partition_key) - .unwrap() -} - -struct Buffer { - // TODO implement bitarray - shard_id_presents: HashSet, - scylla_stmt_batch: Batch, - rows: Vec, - curr_batch_byte_size: usize, -} - -impl Buffer { - fn with_capacity(capacity: usize) -> Buffer { - Buffer { - shard_id_presents: HashSet::new(), - scylla_stmt_batch: Batch::default(), - rows: Vec::with_capacity(capacity), - curr_batch_byte_size: 0, - } - } - - fn len(&self) -> usize { - self.rows.len() - } - - fn push(&mut self, stmt: BatchStatement, row: ShardedClientCommand) { - let row_byte_size = row.deep_size_of(); - self.shard_id_presents.insert(row.shard_id); - self.rows.push(row); - self.scylla_stmt_batch.append_statement(stmt); - self.curr_batch_byte_size += row_byte_size; - } - - fn is_empty(&self) -> bool { - self.len() == 0 - } - - fn total_byte_size(&self) -> usize { - self.curr_batch_byte_size - } - - fn clear(&mut self) { - self.rows.clear(); - self.scylla_stmt_batch.statements.clear(); - self.curr_batch_byte_size = 0; - self.shard_id_presents.clear(); - } - - fn drain_into(&mut self, target: &mut Buffer) { - self.rows - .drain(..) - .zip(self.scylla_stmt_batch.statements.drain(..)) - .for_each(|(row, bs)| target.push(bs, row)); - - self.clear(); - } -} - -struct Flusher { - session: Arc, -} - -impl Flusher { - fn new(session: Arc) -> Self { - Flusher { session } - } -} - -#[async_trait] -impl Ticker for Flusher { - type Input = Buffer; - - async fn tick(&mut self, _now: Instant, msg: Self::Input) -> anyhow::Result { - let mut buffer = msg; - let batch_len = buffer.len(); - scylladb_batch_size_observe(buffer.len()); - if batch_len > 0 { - //info!("Sending batch of length: {:?}, curr_batch_size: {:?}", batch_len, self.curr_batch_byte_size); - let prepared_batch = self - .session - .prepare_batch(&buffer.scylla_stmt_batch) - .await - .map_err(anyhow::Error::new)?; - - let rows = &buffer.rows; - - self.session - .batch(&prepared_batch, rows) - .await - .map(|_| ()) - .map_err(anyhow::Error::new)?; - scylladb_batch_sent_inc(); - } - - buffer.clear(); - scylladb_batchitem_sent_inc_by(batch_len as u64); - scylladb_batch_request_lag_sub(batch_len as i64); - Ok(()) - } -} - -struct Batcher { +/// Represents a shard responsible for processing and batching `ClientCommand` messages +/// before committing them to the database in a background daemon. +/// +/// This struct encapsulates the state and behavior required to manage message buffering, +/// batching, and period-based commitment for a specific shard within a distributed system. +struct Shard { + /// Arc-wrapped database session for executing queries. session: Arc, - timer: Timer, - callback_senders: Vec, - insert_tx_query: BatchStatement, - insert_acccount_update_query: BatchStatement, - max_batch_capacity: usize, - max_batch_byte_size: usize, - buffer: Buffer, - flusher: Arc>, -} - -impl Batcher { - fn new( - session: Arc, - linger: Duration, - max_batch_len: usize, - max_batch_size_kb: usize, - flusher_handle: Arc>, - ) -> Self { - Batcher { - session, - timer: Timer::new(linger), - callback_senders: Vec::with_capacity(10), - insert_tx_query: SCYLLADB_INSERT_TRANSACTION.into(), - insert_acccount_update_query: SCYLLADB_INSERT_ACCOUNT_UPDATE.into(), - max_batch_capacity: max_batch_len, - max_batch_byte_size: max_batch_size_kb * 1000, - buffer: Buffer::with_capacity(max_batch_len), - flusher: flusher_handle, - } - } - - async fn flush(&mut self) -> anyhow::Result { - self.timer.restart(); - if self.buffer.is_empty() { - // The callback senders should be empty, but clear it incase of so if anyone is waiting on the signal gets unblock. - self.callback_senders.clear(); - return Ok(()); - } - let mut new_buffer = Buffer::with_capacity(self.buffer.len()); - self.buffer.drain_into(&mut new_buffer); - - let reserve_result = self.flusher.reserve().await?; - reserve_result.send_with_callback_senders(new_buffer, self.callback_senders.drain(..)); - - Ok(()) - } -} - -#[async_trait] -impl Ticker for Batcher { - type Input = ShardedClientCommand; - - fn timeout(&self) -> Pin + Send + 'static>> { - if self.timer.deadline.elapsed() > Duration::from_millis(0) { - // If deadline has already pass.. - ready(()).boxed() - } else { - self.timer.sleep().boxed() - } - } - - async fn init(&mut self) -> anyhow::Result { - let batch_stmts = [ - self.insert_acccount_update_query.borrow_mut(), - self.insert_tx_query.borrow_mut(), - ]; - - for stmt in batch_stmts { - if let BatchStatement::Query(query) = stmt { - let ps = self.session.prepare(query.clone()).await?; - *stmt = BatchStatement::PreparedStatement(ps); - }; - } - - Ok(()) - } - - async fn on_timeout(&mut self, _now: Instant) -> anyhow::Result { - self.flush().await - } - - async fn tick( - &mut self, - _now: Instant, - msg: ShardedClientCommand, - ) -> Result { - let msg_size = msg.deep_size_of(); - - let beginning_batch_len = self.buffer.len(); - - // TODO: make the capacity parameterized - let need_flush = beginning_batch_len >= self.max_batch_capacity - || (self.buffer.total_byte_size() + msg_size) >= self.max_batch_byte_size; + /// Unique identifier for the shard. + shard_id: ShardId, - if need_flush { - self.flush().await?; - if !self.buffer.is_empty() { - panic!("Corrupted flush buffer"); - } - } + /// Unique identifier for the producer associated with this shard. + producer_id: ProducerId, - let batch_stmt = match msg.client_command { - ClientCommand::InsertAccountUpdate(_) => self.insert_acccount_update_query.clone(), - ClientCommand::InsertTransaction(_) => self.insert_tx_query.clone(), - }; + /// The next offset to be assigned for incoming client commands. + next_offset: ShardOffset, - //self.scylla_batch.append_statement(batch_stmt); - self.buffer.push(batch_stmt, msg); - Ok(()) - } + /// Buffer to store sharded client commands before batching. + buffer: Vec, - async fn tick_with_callback_sender( - &mut self, - now: Instant, - msg: Self::Input, - mut callback_senders: Vec, - ) -> anyhow::Result { - self.tick(now, msg).await?; - self.callback_senders.append(&mut callback_senders); - Ok(()) - } -} + /// Maximum capacity of the buffer (number of commands it can hold). + max_buffer_capacity: usize, -struct Timer { - deadline: Instant, - linger: Duration, -} + /// Maximum byte size of the buffer (sum of sizes of commands it can hold). + max_buffer_byte_size: usize, -impl Timer { - fn new(linger: Duration) -> Self { - Timer { - deadline: Instant::now(), - linger, - } - } + /// Batch for executing database statements in bulk. + scylla_batch: Batch, - fn restart(&mut self) { - self.deadline = Instant::now() + self.linger; - } + /// Current byte size of the batch being constructed. + curr_batch_byte_size: usize, - fn sleep(&self) -> Sleep { - time::sleep_until(self.deadline) - } -} + /// Duration to linger before flushing the buffer. + buffer_linger: Duration, -struct Shard { - session: Arc, - shard_id: ShardId, - producer_id: ProducerId, - next_offset: ShardOffset, - batchers: Arc<[AgentHandler]>, - current_batcher: Option, - slot_event_counter: HashMap, - shard_stats_checkpoint_timer: Timer, - curr_bg_period_commit: Option>>, + // This variable will hold any background (bg) period commit task + bg_period_commit_task: Option>>, } impl Shard { @@ -502,247 +206,139 @@ impl Shard { shard_id: ShardId, producer_id: ProducerId, next_offset: ShardOffset, - batchers: Arc<[AgentHandler]>, + max_buffer_capacity: usize, + max_buffer_byte_size: usize, + buffer_linger: Duration, ) -> Self { Shard { session, shard_id, producer_id, next_offset, - batchers, - current_batcher: None, - slot_event_counter: HashMap::new(), - shard_stats_checkpoint_timer: Timer::new(Duration::from_secs(60)), - curr_bg_period_commit: None, - } - } - - fn get_batcher_idx_for_token(&self, token: Token) -> usize { - let node_uuid = get_node_uuid_for_token(Arc::clone(&self.session), token); - let mut node_uuids = get_node_uuids(Arc::clone(&self.session)); - node_uuids.sort(); - - // this always hold true: node_uuids.len() << batchers.len() - let batch_partition_size: usize = self.batchers.len() / node_uuids.len(); - - if let Ok(i) = node_uuids.binary_search(&node_uuid) { - let batch_partition = self.batchers.chunks(batch_partition_size).nth(i).unwrap(); - - let batch_partition_offset = (self.shard_id as usize) % batch_partition.len(); - let global_offset = (batch_partition_size * i) + batch_partition_offset; - if global_offset > self.batchers.len() { - panic!("batcher idx fell out of batchers list index bound") - } - global_offset - } else { - warn!( - "Token topology didn't know about {:?} at the time of batcher assignment.", - node_uuid - ); - 0 - } - } - - async fn do_shard_stats_checkpoint(&mut self) -> anyhow::Result { - self.session - .query( - SCYLLADB_INSERT_SHARD_STATISTICS, - ShardStatistics::from_slot_event_counter( - self.shard_id, - self.period(), - self.producer_id, - self.next_offset, - &self.slot_event_counter, - ), - ) - .await - .map_err(anyhow::Error::new)?; - - self.slot_event_counter.clear(); - self.shard_stats_checkpoint_timer.restart(); - Ok(()) - } - - fn period(&self) -> i64 { - self.next_offset / SHARD_OFFSET_MODULO - } - - async fn bg_period_commit( - &mut self, - period: ShardPeriod, - callback: Callback, - ) -> anyhow::Result { - let shard_id = self.shard_id; - let producer_id = self.producer_id; - - if let Some(bg_commit) = self.curr_bg_period_commit.take() { - // If there is already a background commit job, wait for it to finish first. - bg_commit.await.map_err(anyhow::Error::new)??; + buffer: Vec::with_capacity(max_buffer_capacity), + max_buffer_capacity, + max_buffer_byte_size, + // Since each shard will only batch into a single partition at a time, we can safely disable batch logging + // without losing atomicity guarantee provided by scylla. + scylla_batch: Batch::new(BatchType::Unlogged), + buffer_linger, + bg_period_commit_task: Default::default(), + curr_batch_byte_size: 0, } - let session = Arc::clone(&self.session); - let fut = tokio::spawn(async move { - callback.await.map_err(anyhow::Error::new)?; - session - .query( - SCYLLADB_COMMIT_PRODUCER_PERIOD, - (producer_id, shard_id, period), - ) - .await - .map(|_qr| ()) - .map_err(anyhow::Error::new) - }); - - self.curr_bg_period_commit.replace(fut); - - Ok(()) } - fn pick_batcher_if_nonset(&mut self) { - let shard_id = self.shard_id; - if self.current_batcher.is_some() { - return; - } - - info!("shard({:?}) will pick a new batcher.", shard_id); - // Resolve the partition key - let mut partition_key = SerializedValues::new(); - let period = self.period(); - partition_key - .add_value(&shard_id, &ColumnType::SmallInt) - .unwrap(); - partition_key - .add_value(&period, &ColumnType::BigInt) - .unwrap(); - let token = compute_token( - Arc::clone(&self.session), - SCYLLADB_SOLANA_LOG_TABLE_NAME, - &partition_key, - ); - let idx = self.get_batcher_idx_for_token(token); - let old = self.current_batcher.replace(idx); - if old.is_some() { - panic!("Sharder is trying to get a new batcher while he's holding one already"); - } + fn clear_buffer(&mut self) { + self.buffer.clear(); + self.curr_batch_byte_size = 0; + self.scylla_batch.statements.clear(); } -} - -#[async_trait] -impl Ticker for Shard { - type Input = ClientCommand; - fn timeout(&self) -> Pin + Send + 'static>> { - if self.shard_stats_checkpoint_timer.deadline.elapsed() > Duration::from_millis(0) { - // If deadline has already pass.. - ready(()).boxed() - } else { - self.shard_stats_checkpoint_timer.sleep().boxed() + async fn flush(&mut self) -> anyhow::Result<()> { + let buffer_len = self.buffer.len(); + if buffer_len > 0 { + let before = Instant::now(); + // We must wait for the batch success to guarantee monotonicity in the shard's timeline. + self.session.batch(&self.scylla_batch, &self.buffer).await?; + scylladb_batch_request_lag_sub(buffer_len as i64); + scylladb_batch_sent_inc(); + scylladb_batch_size_observe(buffer_len); + scylladb_batchitem_sent_inc_by(buffer_len as u64); + if before.elapsed() >= WARNING_SCYLLADB_LATENCY_THRESHOLD { + warn!("sent {} elements in {:?}", buffer_len, before.elapsed()); + } } - } - - async fn on_timeout(&mut self, now: Instant) -> anyhow::Result { - self.do_shard_stats_checkpoint().await?; - info!("shard({:?}) checkpoint at {:?}", self.shard_id, now); + self.clear_buffer(); Ok(()) } - async fn tick(&mut self, _now: Instant, msg: Self::Input) -> anyhow::Result { - let shard_id = self.shard_id; - let producer_id = self.producer_id; - let offset = self.next_offset; - let curr_period = self.period(); - self.next_offset += 1; - if offset % SHARD_OFFSET_MODULO == 0 { - let _ = self.current_batcher.take(); - } - - if self.current_batcher.is_none() { - self.pick_batcher_if_nonset(); - } - - let batcher_idx = self.current_batcher.unwrap(); - - let is_end_of_period = (offset + 1) % SHARD_OFFSET_MODULO == 0; - - let batcher = &self.batchers[batcher_idx]; - let slot = msg.slot(); - let sharded = msg.with_shard_info(shard_id, producer_id, offset); - - // Handle the end of a period - let result = if is_end_of_period { - // Remove the current_batcher so next round we decide on a new batcher - let _ = self.current_batcher.take(); - - // With watch allows us to block until the period is either completly committed or abandonned - // before sharding again. - let callback = batcher.send_and_subscribe(sharded).await?; - - self.bg_period_commit(curr_period, callback).await - } else { - batcher.send(sharded).await - }; - - if result.is_ok() { - *self.slot_event_counter.entry(slot).or_default() += 1; - - if is_end_of_period { - // Flush important statistics - self.do_shard_stats_checkpoint().await?; + /// Converts the current `Shard` instance into a background daemon for processing and batching `ClientCommand` messages. + /// + /// This method spawns an asynchronous task (`tokio::spawn`) to continuously receive messages from a channel (`receiver`), + /// batch process them, and commit periods to the database. It handles message buffering + /// and period commitment based on the configured buffer settings and period boundaries. + /// + /// # Returns + /// Returns a `Sender` channel (`tokio::sync::mpsc::Sender`) that can be used to send `ClientCommand` messages + /// to the background daemon for processing and batching. + fn into_daemon(mut self) -> tokio::sync::mpsc::Sender { + let (sender, mut receiver) = tokio::sync::mpsc::channel::(16); + + let _handle: JoinHandle> = tokio::spawn(async move { + let insert_account_ps = self.session.prepare(SCYLLADB_INSERT_ACCOUNT_UPDATE).await?; + let insert_tx_ps = self.session.prepare(SCYLLADB_INSERT_TRANSACTION).await?; + + let mut buffering_timeout = Instant::now() + self.buffer_linger; + + loop { + let shard_id = self.shard_id; + let producer_id = self.producer_id; + let offset = self.next_offset; + let curr_period = offset / SHARD_OFFSET_MODULO; + self.next_offset += 1; + + let is_end_of_period = (offset + 1) % SHARD_OFFSET_MODULO == 0; + + let msg = receiver + .recv() + .await + .ok_or(anyhow::anyhow!("Shard mailbox closed"))?; + let sharded_msg = msg.with_shard_info(shard_id, producer_id, offset); + let msg_byte_size = sharded_msg.deep_size_of(); + + let need_flush = self.buffer.len() >= self.max_buffer_capacity + || self.curr_batch_byte_size + msg_byte_size >= self.max_buffer_byte_size + || buffering_timeout.elapsed() > Duration::ZERO; + + if need_flush { + self.flush().await?; + buffering_timeout = Instant::now() + self.buffer_linger; + } + + let batch_stmt = match &sharded_msg.client_command { + ClientCommand::InsertAccountUpdate(_) => insert_account_ps.clone(), + ClientCommand::InsertTransaction(_) => insert_tx_ps.clone(), + }; + + self.buffer.push(sharded_msg); + self.scylla_batch.append_statement(batch_stmt); + self.curr_batch_byte_size += msg_byte_size; + + // Handle the end of a period + if is_end_of_period { + if let Some(task) = self.bg_period_commit_task.take() { + task.await??; + } + + let session = Arc::clone(&self.session); + + let handle = tokio::spawn(async move { + let result = session + .query( + SCYLLADB_COMMIT_PRODUCER_PERIOD, + (producer_id, shard_id, curr_period), + ) + .await + .map(|_qr| ()) + .map_err(anyhow::Error::new); + info!( + "shard={},producer_id={:?} committed period: {}", + shard_id, self.producer_id, curr_period + ); + result + }); + // We put the period commit in background so we don't block the next period. + // However, we can not commit the next period until the last period was committed. + // By the time we finish the next period, the last period commit should have have happen. + self.bg_period_commit_task.replace(handle); + } } - } - - return result; - } -} - -struct RoundRobinRouter { - destinations: Vec>, - idx: usize, -} - -impl RoundRobinRouter { - pub fn new(batchers: Vec>) -> Self { - RoundRobinRouter { - destinations: batchers, - idx: 0, - } - } -} - -#[async_trait] -impl Ticker for RoundRobinRouter { - type Input = T; - - async fn tick(&mut self, now: Instant, msg: Self::Input) -> Result { - let begin = self.idx; - let maybe_permit = self - .destinations - .iter() - .enumerate() - // Cycle forever until you find a destination - .cycle() - .skip(begin) - .take(self.destinations.len()) - .find_map(|(i, dest)| dest.try_reserve().ok().map(|slot| (i, slot))); - - if let Some((i, permit)) = maybe_permit { - self.idx = (i + 1) % self.destinations.len(); - scylladb_batch_request_lag_inc(); - permit.send(msg); - return Ok(()); - } else { - warn!("failed to find a sharder without waiting "); - let result = self.destinations[self.idx].send(msg).await; - scylladb_batch_request_lag_inc(); - warn!("find a sharder after: {:?}", now.elapsed()); - self.idx = (self.idx + 1) % self.destinations.len(); - result - } + }); + sender } } pub struct ScyllaSink { - batch_router_handle: AgentHandler, - system: AgentSystem, + router_handle: tokio::sync::mpsc::Sender, } #[derive(Debug)] @@ -750,47 +346,115 @@ pub enum ScyllaSinkError { SinkClose, } -async fn get_max_offset_for_shard_and_producer( +/// Retrieves the latest shard offsets for a specific producer from the `shard_max_offset_mv` materialized view. +/// +/// This asynchronous function queries the database session to fetch the latest shard offsets associated with +/// a given `producer_id` from the `shard_max_offset_mv` materialized view. It constructs and executes a SELECT +/// query to retrieve the shard IDs and corresponding offsets ordered by offset and period. +/// +/// # Parameters +/// - `session`: An Arc-wrapped database session (`Arc`) for executing database queries. +/// - `producer_id`: The unique identifier (`ProducerId`) of the producer whose shard offsets are being retrieved. +/// +/// # Returns +/// - `Ok(None)`: If no shard offsets are found for the specified producer. +/// - `Ok(Some(rows))`: If shard offsets are found, returns a vector of tuples containing shard IDs and offsets. +/// Each tuple represents a shard's latest offset for the producer. +/// - `Err`: If an error occurs during database query execution or result parsing, returns an `anyhow::Result`. +async fn get_shard_offsets_for_producer( session: Arc, - shard_id: i16, producer_id: ProducerId, -) -> anyhow::Result> { - let query_result = session - .query( - SCYLLADB_GET_PRODUCER_MAX_OFFSET_FOR_SHARD_MV, - (shard_id, producer_id), - ) - .await?; - - query_result - .single_row() - .ok() - .map(|row| row.into_typed::<(ShardOffset,)>()) - .transpose() - .map(|maybe| maybe.map(|typed_row| typed_row.0)) - .map_err(anyhow::Error::new) +) -> anyhow::Result>> { + let shard_bind_markers = (0..SHARD_COUNT) + .map(|x| format!("{}", x)) + .collect::>() + .join(", "); + + let query = format!( + r###" + SELECT + shard_id, + offset + FROM shard_max_offset_mv + WHERE + producer_id = ? + AND shard_id IN ({shard_bind_markers}) + ORDER BY offset DESC, period DESC + PER PARTITION LIMIT 1 + "###, + shard_bind_markers = shard_bind_markers + ); + + let query_result = session.query(query, (producer_id,)).await?; + + let rows = query_result + .rows_typed_or_empty::<(ShardId, ShardOffset)>() + .map(|result| result.map(|typed_row| typed_row.1)) + .collect::, _>>() + .map_err(anyhow::Error::new)?; + + if rows.is_empty() { + info!("producer {:?} offsets don't exists", producer_id); + Ok(None) + } else { + info!( + "producer {:?} offsets already exists: {:?}", + producer_id, rows + ); + Ok(Some(rows)) + } } -type BatcherArray = Arc<[AgentHandler]>; +/// Spawns a round-robin dispatcher for sending `ClientCommand` messages to a list of shard mailboxes. +/// +/// This function takes a vector of shard mailboxes (`tokio::sync::mpsc::Sender`) and returns +/// a new `Sender` that can be used to dispatch messages in a round-robin fashion to the provided shard mailboxes. +/// +/// The dispatcher cycles through the shard mailboxes indefinitely, ensuring each message is sent to the next +/// available shard without waiting, or falling back to the original shard if all are busy. It increments the +/// ScyllaDB batch request lag for monitoring purposes. +/// +/// # Parameters +/// - `shard_mailboxes`: A vector of `Sender` channels representing shard mailboxes to dispatch messages to. +/// +/// # Returns +/// A `Sender` channel that can be used to send `ClientCommand` messages to the shard mailboxes in a round-robin manner. +fn spawn_round_robin( + shard_mailboxes: Vec>, +) -> tokio::sync::mpsc::Sender { + let (sender, mut receiver) = tokio::sync::mpsc::channel(DEFAULT_SHARD_MAX_BUFFER_CAPACITY); + let _h: JoinHandle> = tokio::spawn(async move { + let mut i: usize = 0; + let total_shards = shard_mailboxes.len(); + loop { + let msg = receiver + .recv() + .await + .ok_or(anyhow::anyhow!("round robin received end is closed"))?; + let begin = i; + let maybe_permit = shard_mailboxes + .iter() + .enumerate() + // Cycle forever until you find a destination + .cycle() + .skip(begin) + .take(total_shards) + .find_map(|(i, dest)| dest.try_reserve().ok().map(|slot| (i, slot))); + + let shard_idx = if let Some((j, permit)) = maybe_permit { + permit.send(msg); + j + } else { + warn!("failed to find a shard without waiting"); + shard_mailboxes[i].send(msg).await?; + i + }; -async fn shard_factory( - session: Arc, - shard_id: ShardId, - producer_id: ProducerId, - batchers: BatcherArray, -) -> anyhow::Result { - let before: Instant = Instant::now(); - let max_offset = - get_max_offset_for_shard_and_producer(Arc::clone(&session), shard_id, producer_id).await?; - let next_offset = max_offset.unwrap_or(0) + 1; - let shard = Shard::new(session, shard_id, producer_id, next_offset, batchers); - info!( - "sharder {:?} next_offset: {:?}, stats collected in: {:?}", - shard_id, - next_offset, - before.elapsed() - ); - Ok(shard) + scylladb_batch_request_lag_inc(); + i = (shard_idx + 1) % total_shards; + } + }); + sender } impl ScyllaSink { @@ -808,68 +472,45 @@ impl ScyllaSink { .compression(Some(Compression::Lz4)) .use_keyspace(config.keyspace.clone(), false) .build() - .await - .unwrap(); + .await?; let session = Arc::new(session); - let mut system = AgentSystem::new(16); let shard_count = SHARD_COUNT; - let num_batcher = SHARD_COUNT / 2; - - let mut batchers = Vec::with_capacity(num_batcher as usize); - for i in 0..num_batcher { - let flusher = Flusher::new(Arc::clone(&session)); - - let flusher_handle = system.spawn(format!("flusher({:?})", i), flusher); - - let lbs = Batcher::new( - Arc::clone(&session), - config.linger, - config.batch_len_limit, - config.batch_size_kb_limit, - Arc::new(flusher_handle), - ); - let lbs_handler = system.spawn_with_capacity(format!("batcher({:?})", i), lbs, 100); - batchers.push(lbs_handler); - } - - let batchers: Arc<[AgentHandler]> = - Arc::from(batchers.into_boxed_slice()); let mut sharders = vec![]; - let mut js: JoinSet> = JoinSet::new(); info!("Will create {:?} shards", shard_count); - for shard_id in 0..shard_count { + let maybe_shard_offsets = + get_shard_offsets_for_producer(Arc::clone(&session), producer_id).await?; + let shard_offsets = maybe_shard_offsets.unwrap_or(vec![1; shard_count]); + for (shard_id, next_offset) in shard_offsets.iter().enumerate() { let session = Arc::clone(&session); - let batchers = Arc::clone(&batchers); - js.spawn(async move { - shard_factory(session, shard_id as i16, producer_id, batchers).await - }); - } - - while let Some(join_result) = js.join_next().await { - let shard = join_result??; - sharders.push(system.spawn(format!("shard({:?})", shard.shard_id), shard)); + let shard = Shard::new( + session, + shard_id as i16, + producer_id, + *next_offset, + DEFAULT_SHARD_MAX_BUFFER_CAPACITY, + config.batch_size_kb_limit * 1024, + config.linger, + ); + let shard_mailbox = shard.into_daemon(); + sharders.push(shard_mailbox); } - let router = RoundRobinRouter::new(sharders); - - let router_handle = system.spawn("router", router); - info!("Shard router has started."); + let sender = spawn_round_robin(sharders); Ok(ScyllaSink { - batch_router_handle: router_handle, - system, + router_handle: sender, }) } async fn inner_log(&mut self, cmd: ClientCommand) -> anyhow::Result<()> { - tokio::select! { - _ = self.batch_router_handle.send(cmd) => Ok(()), - Err(e) = self.system.until_one_agent_dies() => Err(e) - } + self.router_handle + .send(cmd) + .await + .map_err(|_e| anyhow::anyhow!("failed to route")) } pub async fn log_account_update(&mut self, update: AccountUpdate) -> anyhow::Result<()> { diff --git a/yellowstone-grpc-tools/src/scylladb/types.rs b/yellowstone-grpc-tools/src/scylladb/types.rs index d746dbbc..2a07c908 100644 --- a/yellowstone-grpc-tools/src/scylladb/types.rs +++ b/yellowstone-grpc-tools/src/scylladb/types.rs @@ -1,5 +1,5 @@ use { - anyhow::anyhow, + anyhow::{anyhow, Ok}, deepsize::DeepSizeOf, scylla::{ cql_to_rust::{FromCqlVal, FromCqlValError}, @@ -7,64 +7,29 @@ use { serialize::value::SerializeCql, FromRow, FromUserType, SerializeCql, SerializeRow, }, - std::{collections::HashMap, iter::repeat}, + std::iter::repeat, yellowstone_grpc_proto::{ - geyser::{SubscribeUpdateAccount, SubscribeUpdateTransaction}, - solana::storage::confirmed_block, + geyser::{ + SubscribeUpdateAccount, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + }, + solana::storage::confirmed_block::{self, CompiledInstruction}, }, }; +pub const SHARD_COUNT: i16 = 256; pub const SHARD_OFFSET_MODULO: i64 = 10000; +pub type ProgramId = [u8; 32]; + pub type ShardId = i16; pub type ShardPeriod = i64; pub type ShardOffset = i64; - pub type ProducerId = [u8; 1]; // one byte is enough to assign an id to a machine -#[derive(SerializeRow, Clone, Debug, FromRow)] -pub(crate) struct ShardStatistics { - pub(crate) shard_id: ShardId, - pub(crate) period: ShardPeriod, - pub(crate) producer_id: ProducerId, - pub(crate) offset: ShardOffset, - pub(crate) min_slot: i64, - pub(crate) max_slot: i64, - pub(crate) total_events: i64, - pub(crate) slot_event_counter: HashMap, -} - -#[derive(SerializeRow, Clone, Debug, FromRow)] -pub(crate) struct ProducerInfo { - pub(crate) producer_id: ProducerId, - pub(crate) min_offset_per_shard: HashMap, -} - -impl ShardStatistics { - pub(crate) fn from_slot_event_counter( - shard_id: ShardId, - period: ShardPeriod, - producer_id: ProducerId, - offset: ShardOffset, - counter_map: &HashMap, - ) -> Self { - let min_slot = counter_map.keys().min().copied().unwrap_or(-1); - let max_slot = counter_map.keys().max().copied().unwrap_or(-1); - let total_events: i64 = counter_map.values().map(|cnt| *cnt as i64).sum(); - ShardStatistics { - shard_id, - period, - producer_id, - offset, - min_slot, - max_slot, - total_events, - slot_event_counter: counter_map.clone(), - } - } -} +pub const MIN_PROCUDER: ProducerId = [0x00]; +pub const MAX_PRODUCER: ProducerId = [0xFF]; -#[derive(Clone, Debug, PartialEq, PartialOrd, Copy, DeepSizeOf)] +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Copy, DeepSizeOf)] pub enum BlockchainEventType { AccountUpdate = 0, NewTransaction = 1, @@ -122,30 +87,32 @@ pub struct BlockchainEvent { pub producer_id: ProducerId, pub offset: ShardOffset, pub slot: i64, - pub entry_type: BlockchainEventType, + pub event_type: BlockchainEventType, // AccountUpdate - pub pubkey: Pubkey, - pub lamports: i64, - pub owner: Pubkey, - pub executable: bool, - pub rent_epoch: i64, - pub write_version: i64, - pub data: Vec, + pub pubkey: Option, + pub lamports: Option, + pub owner: Option, + pub executable: Option, + pub rent_epoch: Option, + pub write_version: Option, + pub data: Option>, pub txn_signature: Option>, // Transaction - pub signature: Vec, - pub signatures: Vec>, - pub num_required_signatures: i32, - pub num_readonly_signed_accounts: i32, - pub num_readonly_unsigned_accounts: i32, - pub account_keys: Vec>, - pub recent_blockhash: Vec, - pub instructions: Vec, - pub versioned: bool, - pub address_table_lookups: Vec, - pub meta: TransactionMeta, + pub signature: Option>, + pub signatures: Option>>, + pub num_required_signatures: Option, + pub num_readonly_signed_accounts: Option, + pub num_readonly_unsigned_accounts: Option, + pub account_keys: Option>>, + pub recent_blockhash: Option>, + pub instructions: Option>, + pub versioned: Option, + pub address_table_lookups: Option>, + pub meta: Option, + pub is_vote: Option, + pub tx_index: Option, } type Pubkey = [u8; 32]; @@ -194,6 +161,17 @@ impl From for MessageAddrTableLookup } } +impl From for confirmed_block::MessageAddressTableLookup { + fn from(msg: MessageAddrTableLookup) -> Self { + // Create a new instance of AddressLookup + confirmed_block::MessageAddressTableLookup { + account_key: msg.account_key, + writable_indexes: msg.writable_indexes, + readonly_indexes: msg.readonly_indexes, + } + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct CompiledInstr { @@ -220,6 +198,18 @@ impl From for CompiledInstr { } } +impl TryFrom for confirmed_block::CompiledInstruction { + type Error = anyhow::Error; + + fn try_from(value: CompiledInstr) -> Result { + Ok(CompiledInstruction { + program_id_index: value.program_id_index.try_into()?, + accounts: value.accounts, + data: value.data, + }) + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct InnerInstr { @@ -240,6 +230,19 @@ impl From for InnerInstr { } } +impl TryFrom for confirmed_block::InnerInstruction { + type Error = anyhow::Error; + + fn try_from(value: InnerInstr) -> Result { + Ok(confirmed_block::InnerInstruction { + program_id_index: value.program_id_index.try_into()?, + accounts: value.accounts, + data: value.data, + stack_height: value.stack_height.map(|x| x.try_into()).transpose()?, + }) + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct InnerInstrs { @@ -261,6 +264,17 @@ impl TryFrom for InnerInstrs { } } +impl TryFrom for confirmed_block::InnerInstructions { + type Error = anyhow::Error; + + fn try_from(value: InnerInstrs) -> Result { + Ok(confirmed_block::InnerInstructions { + index: value.index.try_into()?, + instructions: try_collect(value.instructions)?, + }) + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct UiTokenAmount { @@ -281,6 +295,19 @@ impl From for UiTokenAmount { } } +impl TryFrom for confirmed_block::UiTokenAmount { + type Error = anyhow::Error; + + fn try_from(value: UiTokenAmount) -> Result { + Ok(confirmed_block::UiTokenAmount { + ui_amount: value.ui_amount, + decimals: value.decimals.try_into()?, + amount: value.amount, + ui_amount_string: value.ui_amount_string, + }) + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct TxTokenBalance { @@ -288,6 +315,7 @@ pub struct TxTokenBalance { pub mint: String, pub ui_token_amount: Option, pub owner: String, + pub program_id: String, } impl From for TxTokenBalance { @@ -295,12 +323,27 @@ impl From for TxTokenBalance { TxTokenBalance { account_index: value.account_index.into(), mint: value.mint, - ui_token_amount: value.ui_token_amount.map(|x| x.into()), + ui_token_amount: value.ui_token_amount.map(Into::into), owner: value.owner, + program_id: value.program_id, } } } +impl TryFrom for confirmed_block::TokenBalance { + type Error = anyhow::Error; + + fn try_from(value: TxTokenBalance) -> Result { + Ok(confirmed_block::TokenBalance { + account_index: value.account_index.try_into()?, + mint: value.mint, + ui_token_amount: value.ui_token_amount.map(TryInto::try_into).transpose()?, + owner: value.owner, + program_id: value.program_id, + }) + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct Reward { @@ -324,6 +367,49 @@ impl TryFrom for Reward { } } +impl TryFrom for confirmed_block::Reward { + type Error = anyhow::Error; + + fn try_from(value: Reward) -> Result { + Ok(confirmed_block::Reward { + pubkey: value.pubkey, + lamports: value.lamports, + post_balance: value.post_balance.try_into()?, + reward_type: value.reward_type, + commission: value.commission, + }) + } +} + +#[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] +#[scylla(flavor = "match_by_name")] +pub struct ReturnData { + pub program_id: ProgramId, + pub data: Vec, +} + +impl TryFrom for ReturnData { + type Error = anyhow::Error; + fn try_from(value: confirmed_block::ReturnData) -> Result { + Ok(ReturnData { + program_id: value + .program_id + .try_into() + .map_err(|e| anyhow::anyhow!("Inavlid readonly address, got: {:?}", e))?, + data: value.data, + }) + } +} + +impl From for confirmed_block::ReturnData { + fn from(value: ReturnData) -> Self { + confirmed_block::ReturnData { + program_id: value.program_id.into(), + data: value.data, + } + } +} + #[derive(Debug, SerializeCql, Clone, DeepSizeOf, FromUserType, Default)] #[scylla(flavor = "match_by_name")] pub struct TransactionMeta { @@ -331,11 +417,15 @@ pub struct TransactionMeta { pub fee: i64, pub pre_balances: Vec, pub post_balances: Vec, - pub inner_instructions: Vec, - pub log_messages: Vec, + pub inner_instructions: Option>, + pub log_messages: Option>, pub pre_token_balances: Vec, pub post_token_balances: Vec, pub rewards: Vec, + pub loaded_writable_addresses: Vec, + pub loaded_readonly_addresses: Vec, + pub return_data: Option, + pub compute_units_consumed: Option, } impl TryFrom for TransactionMeta { @@ -363,17 +453,44 @@ impl TryFrom for TransactionMeta { let rewards: Vec = try_collect(status_meta.rewards)?; + let loaded_readonly_addresses: Vec = + try_collect(status_meta.loaded_readonly_addresses) + .map_err(|e| anyhow::anyhow!("Inavlid readonly address, got: {:?}", e))?; + let loaded_writable_addresses = try_collect(status_meta.loaded_writable_addresses) + .map_err(|e| anyhow::anyhow!("Inavlid readonly address, got: {:?}", e))?; + + let return_data = status_meta + .return_data + .map(|rd| rd.try_into()) + .transpose()?; + let compute_units_consumed = status_meta + .compute_units_consumed + .map(|cu| cu.try_into()) + .transpose()?; + // Create a new TransactionMeta instance let transaction_meta = TransactionMeta { error, fee, pre_balances, post_balances, - inner_instructions, - log_messages, + inner_instructions: if status_meta.inner_instructions_none { + Some(inner_instructions) + } else { + None + }, + log_messages: if status_meta.log_messages_none { + Some(log_messages) + } else { + None + }, pre_token_balances, post_token_balances, rewards, + loaded_readonly_addresses, + loaded_writable_addresses, + return_data, + compute_units_consumed, }; // Return the new TransactionMeta instance @@ -381,6 +498,47 @@ impl TryFrom for TransactionMeta { } } +impl TryFrom for confirmed_block::TransactionStatusMeta { + type Error = anyhow::Error; + + fn try_from(value: TransactionMeta) -> Result { + let inner_instructions_none = value.inner_instructions.is_none(); + let log_messages_none = value.log_messages.is_none(); + let return_data_none = value.return_data.is_none(); + Ok(confirmed_block::TransactionStatusMeta { + err: value + .error + .map(|bindata| confirmed_block::TransactionError { err: bindata }), + fee: value.fee.try_into()?, + pre_balances: try_collect(value.pre_balances)?, + post_balances: try_collect(value.post_balances)?, + inner_instructions: value + .inner_instructions + .map(try_collect) + .transpose()? + .unwrap_or(Vec::new()), + inner_instructions_none, + log_messages: value + .log_messages + .map(try_collect) + .transpose()? + .unwrap_or(Vec::new()), + log_messages_none, + pre_token_balances: try_collect(value.pre_token_balances)?, + post_token_balances: try_collect(value.post_token_balances)?, + rewards: try_collect(value.rewards)?, + loaded_writable_addresses: try_collect(value.loaded_writable_addresses)?, + loaded_readonly_addresses: try_collect(value.loaded_readonly_addresses)?, + return_data: value.return_data.map(Into::into), + return_data_none, + compute_units_consumed: value + .compute_units_consumed + .map(TryInto::try_into) + .transpose()?, + }) + } +} + #[derive(Debug, SerializeRow, Clone, DeepSizeOf)] pub struct Transaction { pub slot: i64, @@ -395,6 +553,8 @@ pub struct Transaction { pub versioned: bool, pub address_table_lookups: Vec, pub meta: TransactionMeta, + pub is_vote: bool, + pub tx_index: i64, } impl TryFrom for Transaction { @@ -440,12 +600,50 @@ impl TryFrom for Transaction { .map(|atl| atl.into()) .collect(), meta: meta.try_into()?, + is_vote: val_tx.is_vote, + tx_index: val_tx.index as i64, }; Ok(res) } } +impl TryFrom for SubscribeUpdateTransaction { + type Error = anyhow::Error; + + fn try_from(value: Transaction) -> Result { + let ret = SubscribeUpdateTransaction { + transaction: Some(SubscribeUpdateTransactionInfo { + signature: value.signature, + is_vote: value.is_vote, + transaction: Some(confirmed_block::Transaction { + signatures: value.signatures, + message: Some(confirmed_block::Message { + header: Some(confirmed_block::MessageHeader { + num_required_signatures: value.num_required_signatures.try_into()?, + num_readonly_signed_accounts: value + .num_readonly_signed_accounts + .try_into()?, + num_readonly_unsigned_accounts: value + .num_readonly_unsigned_accounts + .try_into()?, + }), + account_keys: value.account_keys, + recent_blockhash: value.recent_blockhash, + instructions: try_collect(value.instructions)?, + versioned: value.versioned, + address_table_lookups: try_collect(value.address_table_lookups)?, + }), + }), + meta: Some(value.meta.try_into()).transpose()?, + index: value.tx_index.try_into()?, + }), + slot: value.slot.try_into()?, + }; + Ok(ret) + } +} + impl From for ( i64, @@ -503,14 +701,14 @@ impl AccountUpdate { producer_id, offset, slot: self.slot, - entry_type: BlockchainEventType::AccountUpdate, - pubkey: self.pubkey, - lamports: self.lamports, - owner: self.owner, - executable: self.executable, - rent_epoch: self.rent_epoch, - write_version: self.write_version, - data: self.data, + event_type: BlockchainEventType::AccountUpdate, + pubkey: Some(self.pubkey), + lamports: Some(self.lamports), + owner: Some(self.owner), + executable: Some(self.executable), + rent_epoch: Some(self.rent_epoch), + write_version: Some(self.write_version), + data: Some(self.data), txn_signature: self.txn_signature, signature: Default::default(), signatures: Default::default(), @@ -523,6 +721,8 @@ impl AccountUpdate { versioned: Default::default(), address_table_lookups: Default::default(), meta: Default::default(), + is_vote: Default::default(), + tx_index: Default::default(), } } } @@ -574,7 +774,7 @@ impl Transaction { producer_id, offset, slot: self.slot, - entry_type: BlockchainEventType::NewTransaction, + event_type: BlockchainEventType::NewTransaction, pubkey: Default::default(), lamports: Default::default(), @@ -585,17 +785,19 @@ impl Transaction { data: Default::default(), txn_signature: Default::default(), - signature: self.signature, - signatures: self.signatures, - num_required_signatures: self.num_required_signatures, - num_readonly_signed_accounts: self.num_readonly_signed_accounts, - num_readonly_unsigned_accounts: self.num_readonly_unsigned_accounts, - account_keys: self.account_keys, - recent_blockhash: self.recent_blockhash, - instructions: self.instructions, - versioned: self.versioned, - address_table_lookups: self.address_table_lookups, - meta: self.meta, + signature: Some(self.signature), + signatures: Some(self.signatures), + num_required_signatures: Some(self.num_required_signatures), + num_readonly_signed_accounts: Some(self.num_readonly_signed_accounts), + num_readonly_unsigned_accounts: Some(self.num_readonly_unsigned_accounts), + account_keys: Some(self.account_keys), + recent_blockhash: Some(self.recent_blockhash), + instructions: Some(self.instructions), + versioned: Some(self.versioned), + address_table_lookups: Some(self.address_table_lookups), + meta: Some(self.meta), + is_vote: Some(self.is_vote), + tx_index: Some(self.tx_index), } } } @@ -608,7 +810,7 @@ pub struct ShardedAccountUpdate { pub producer_id: ProducerId, pub offset: ShardOffset, pub slot: i64, - pub entry_type: BlockchainEventType, + pub event_type: BlockchainEventType, // AccountUpdate pub pubkey: Pubkey, @@ -629,7 +831,7 @@ pub struct ShardedTransaction { pub producer_id: ProducerId, pub offset: ShardOffset, pub slot: i64, - pub entry_type: BlockchainEventType, + pub event_type: BlockchainEventType, // Transaction pub signature: Vec, @@ -643,6 +845,8 @@ pub struct ShardedTransaction { pub versioned: bool, pub address_table_lookups: Vec, pub meta: TransactionMeta, + pub is_vote: bool, + pub tx_index: i64, } // Implement Into for BlockchainEvent @@ -653,15 +857,15 @@ impl From for ShardedAccountUpdate { period: val.period, producer_id: val.producer_id, offset: val.offset, + event_type: val.event_type, slot: val.slot, - entry_type: val.entry_type, - pubkey: val.pubkey, - lamports: val.lamports, - owner: val.owner, - executable: val.executable, - rent_epoch: val.rent_epoch, - write_version: val.write_version, - data: val.data, + pubkey: val.pubkey.expect("pubkey is none"), + lamports: val.lamports.expect("lamports is none"), + owner: val.owner.expect("owner is none"), + executable: val.executable.expect("executable is none"), + rent_epoch: val.rent_epoch.expect("rent_epch is none"), + write_version: val.write_version.expect("write_version is none"), + data: val.data.expect("data is none"), txn_signature: val.txn_signature, } } @@ -675,19 +879,135 @@ impl From for ShardedTransaction { period: val.period, producer_id: val.producer_id, offset: val.offset, + event_type: val.event_type, + slot: val.slot, + signature: val.signature.expect("signature is none"), + signatures: val.signatures.expect("signatures is none"), + num_required_signatures: val + .num_required_signatures + .expect("num_required_signature is none"), + num_readonly_signed_accounts: val + .num_readonly_signed_accounts + .expect("num_readonly_signed_accounts is none"), + num_readonly_unsigned_accounts: val + .num_readonly_unsigned_accounts + .expect("num_readonly_unsigned_accounts is none"), + account_keys: val.account_keys.expect("account_keys is none"), + recent_blockhash: val.recent_blockhash.expect("recent_blockhash is none"), + instructions: val.instructions.expect("instructions is none"), + versioned: val.versioned.expect("versioned is none"), + address_table_lookups: val + .address_table_lookups + .expect("address_table_lookups is none"), + meta: val.meta.expect("meta is none"), + is_vote: val.is_vote.expect("is_vote is none"), + tx_index: val.tx_index.expect("tx_index is none"), + } + } +} + +impl From for Transaction { + fn from(val: BlockchainEvent) -> Self { + Transaction { slot: val.slot, - entry_type: val.entry_type, - signature: val.signature, - signatures: val.signatures, - num_required_signatures: val.num_required_signatures, - num_readonly_signed_accounts: val.num_readonly_signed_accounts, - num_readonly_unsigned_accounts: val.num_readonly_unsigned_accounts, - account_keys: val.account_keys, - recent_blockhash: val.recent_blockhash, - instructions: val.instructions, - versioned: val.versioned, - address_table_lookups: val.address_table_lookups, - meta: val.meta, + signature: val.signature.expect("signature is none"), + signatures: val.signatures.expect("signatures is none"), + num_required_signatures: val + .num_required_signatures + .expect("num_required_signature is none"), + num_readonly_signed_accounts: val + .num_readonly_signed_accounts + .expect("num_readonly_signed_accounts is none"), + num_readonly_unsigned_accounts: val + .num_readonly_unsigned_accounts + .expect("num_readonly_unsigned_accounts is none"), + account_keys: val.account_keys.expect("account_keys is none"), + recent_blockhash: val.recent_blockhash.expect("recent_blockhash is none"), + instructions: val.instructions.expect("instructions is none"), + versioned: val.versioned.expect("versioned is none"), + address_table_lookups: val + .address_table_lookups + .expect("address_table_lookups is none"), + meta: val.meta.expect("meta is none"), + is_vote: val.is_vote.expect("is_vote is none"), + tx_index: val.tx_index.expect("tx_index is none"), + } + } +} + +impl From for AccountUpdate { + fn from(val: BlockchainEvent) -> Self { + AccountUpdate { + slot: val.slot, + pubkey: val.pubkey.expect("pubkey is none"), + lamports: val.lamports.expect("lamports is none"), + owner: val.owner.expect("owner is none"), + executable: val.executable.expect("executable is none"), + rent_epoch: val.rent_epoch.expect("rent_epch is none"), + write_version: val.write_version.expect("write_version is none"), + data: val.data.expect("data is none"), + txn_signature: val.txn_signature, } } } + +#[derive(FromRow, Debug, Clone)] +pub struct ProducerInfo { + pub producer_id: ProducerId, + #[allow(dead_code)] + pub num_shards: ShardId, + #[allow(dead_code)] + pub is_active: bool, +} + +impl TryFrom for SubscribeUpdateAccount { + type Error = anyhow::Error; + + fn try_from(acc_update: AccountUpdate) -> anyhow::Result { + let _pubkey_bytes: [u8; 32] = acc_update.pubkey; + let _owner_bytes: [u8; 32] = acc_update.owner; + + // Create the SubscribeUpdateAccount instance + let subscribe_update_account = SubscribeUpdateAccount { + slot: acc_update.slot as u64, + account: Some( + yellowstone_grpc_proto::prelude::SubscribeUpdateAccountInfo { + pubkey: Vec::from(acc_update.pubkey), + lamports: acc_update.lamports as u64, + owner: Vec::from(acc_update.owner), + executable: acc_update.executable, + rent_epoch: acc_update.rent_epoch as u64, + write_version: acc_update.write_version as u64, + data: acc_update.data, + txn_signature: acc_update.txn_signature, + }, + ), + is_startup: false, + }; + + Ok(subscribe_update_account) + } +} + +impl TryFrom for SubscribeUpdateAccount { + type Error = anyhow::Error; + fn try_from(value: BlockchainEvent) -> Result { + if value.event_type != BlockchainEventType::AccountUpdate { + anyhow::bail!("BlockchainEvent is not an AccountUpdate"); + } + let ret: AccountUpdate = value.into(); + ret.try_into() + } +} + +impl TryFrom for SubscribeUpdateTransaction { + type Error = anyhow::Error; + fn try_from(value: BlockchainEvent) -> Result { + anyhow::ensure!( + value.event_type != BlockchainEventType::NewTransaction, + "BlockchainEvent is not a Transaction" + ); + let ret: Transaction = value.into(); + ret.try_into() + } +}