diff --git a/Cargo.lock b/Cargo.lock index ba6b0cb8..fd300d89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5593,6 +5593,7 @@ dependencies = [ "serde_with 3.7.0", "serde_yaml", "sha2 0.10.8", + "thiserror", "tokio", "tokio-stream", "tonic 0.10.2", diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 0f754d57..4b8d34ed 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -44,6 +44,7 @@ serde_json = { workspace = true } serde_with = { workspace = true, optional = true } serde_yaml = { workspace = true } sha2 = { workspace = true, optional = true } +thiserror = { workspace = true, optional = true } tokio = { workspace = true, features = ["signal", "time"] } tokio-stream = { workspace = true } tonic = { workspace = true, features = ["gzip"] } @@ -74,4 +75,4 @@ vergen = { workspace = true, features = ["build", "rustc"] } default = ["google-pubsub", "kafka"] google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"] kafka = ["const-hex", "rdkafka", "sha2"] -scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono"] +scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror"] diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql index 3eecea4d..176782ca 100644 --- a/yellowstone-grpc-tools/solana.cql +++ b/yellowstone-grpc-tools/solana.cql @@ -1,56 +1,60 @@ CREATE KEYSPACE solana WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true; +CREATE KEYSPACE solana2 WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true; -drop materialized view if exists solana.producer_consumer_mapping_mv; -drop materialized view if exists solana.slot_map; - -drop table if exists solana.producer_slot_seen; -drop table if exists solana.shard_statistics; -drop table if exists solana.producer_info; -drop table if exists solana.consumer_info; -drop table if exists solana.consumer_producer_mapping; -drop table if exists solana.log; -drop type if exists solana.transaction_meta; -drop type if exists solana.message_addr_table_lookup; -drop type if exists solana.compiled_instr; -drop type if exists solana.tx_token_balance; -drop type if exists solana.reward; -drop type if exists solana.inner_instrs; -drop type if exists solana.inner_instr; -drop type if exists solana.return_data; - - -create table if not exists solana.consumer_info ( + +drop materialized view if exists producer_consumer_mapping_mv; +drop materialized view if exists slot_map; + +drop table if exists producer_slot_seen; +drop table if exists shard_statistics; +drop table if exists producer_info; +drop table if exists consumer_shard_offset; +drop table if exists consumer_producer_mapping; +drop table if exists log; +drop type if exists transaction_meta; +drop type if exists message_addr_table_lookup; +drop type if exists compiled_instr; +drop type if exists tx_token_balance; +drop type if exists reward; +drop type if exists inner_instrs; +drop type if exists inner_instr; +drop type if exists return_data; + + +create table if not exists consumer_shard_offset ( consumer_id text, producer_id blob, shard_id smallint, event_type smallint, offset bigint, + slot bigint, created_at timestamp, updated_at timestamp, PRIMARY KEY ((consumer_id, producer_id), shard_id, event_type) ); -create table if not exists solana.consumer_producer_mapping ( +create table if not exists consumer_info ( consumer_id text, producer_id blob, + subscribed_event_types frozen>, created_at timestamp, updated_at timestamp, PRIMARY KEY (consumer_id) ); -create materialized view if not exists solana.producer_consumer_mapping_mv +create materialized view if not exists producer_consumer_mapping_mv as select producer_id, consumer_id -from solana.consumer_producer_mapping +from consumer_info where consumer_id is not null and producer_id is not null primary key (producer_id, consumer_id); -create table if not exists solana.producer_info ( +create table if not exists producer_info ( producer_id blob, num_shards smallint, created_at timestamp, @@ -58,26 +62,16 @@ create table if not exists solana.producer_info ( PRIMARY KEY (producer_id) ); -create table if not exists solana.producer_lock ( +create table if not exists producer_lock ( producer_id blob, lock_id text, ifname text, ipv4 text, created_at timestamp, primary key (producer_id) -) - --- # example -insert into solana.producer_info ( - producer_id, - num_shards, - created_at, - updated_at -) -values (0x00, 256, currentTimestamp(), currentTimestamp()); - +); -create table if not exists solana.producer_period_commit_log ( +create table if not exists producer_period_commit_log ( producer_id blob, shard_id smallint, period bigint, @@ -86,58 +80,46 @@ create table if not exists solana.producer_period_commit_log ( ) with clustering order by (period DESC); -create table if not exists solana.shard_statistics ( - shard_id smallint, - period bigint, - producer_id blob, - offset bigint, - min_slot bigint, - max_slot bigint, - total_events bigint, - slot_event_counter map, - primary key(shard_id, period, offset) -) with CLUSTERING order by (period desc, offset desc); - -create type if not exists solana.message_addr_table_lookup ( +create type if not exists message_addr_table_lookup ( account_key blob, writable_indexes blob, readonly_indexes blob ); -create type if not exists solana.compiled_instr ( +create type if not exists compiled_instr ( program_id_index bigint, accounts blob, data blob ); -create type if not exists solana.inner_instr ( +create type if not exists inner_instr ( program_id_index bigint, accounts blob, data blob, stack_height bigint ); -create type if not exists solana.inner_instrs ( +create type if not exists inner_instrs ( "index" bigint, - instructions frozen> + instructions frozen> ); -create type if not exists solana.ui_token_amount ( +create type if not exists ui_token_amount ( ui_amount double, decimals bigint, amount text, ui_amount_string text ); -create type if not exists solana.tx_token_balance ( +create type if not exists tx_token_balance ( account_index bigint, mint text, --varchar(44) - ui_token_amount frozen, + ui_token_amount frozen, owner text, --varchar(44) program_id text, ); -create type if not exists solana.reward ( +create type if not exists reward ( pubkey text, -- varchar(44) lamports bigint, post_balance bigint, @@ -145,24 +127,24 @@ create type if not exists solana.reward ( commission text ); -create type if not exists solana.return_data ( +create type if not exists return_data ( program_id blob, data blob ); -create type if not exists solana.transaction_meta ( +create type if not exists transaction_meta ( error blob, fee bigint, pre_balances frozen>, post_balances frozen>, - inner_instructions frozen>, + inner_instructions frozen>, log_messages frozen>, - pre_token_balances frozen>, - post_token_balances frozen>, - rewards frozen>, + pre_token_balances frozen>, + post_token_balances frozen>, + rewards frozen>, loaded_writable_addresses frozen>, loaded_readonly_addresses frozen>, - return_data frozen, + return_data frozen, compute_units_consumed bigint ); @@ -171,7 +153,7 @@ create type if not exists solana.transaction_meta ( -- There is not performance advantage to have separate tables since ScyllaDB is wide-column family database. -- ScyllaDB is built to have sparse columns (alot of unused columns) -- On each query, the storage engine only retrieves what matters to the query. -create table if not exists solana.log ( +create table if not exists log ( -- commun columns shard_id smallint, @@ -202,10 +184,10 @@ create table if not exists solana.log ( num_readonly_unsigned_accounts int, account_keys frozen>, recent_blockhash blob, - instructions frozen>, + instructions frozen>, versioned boolean, - address_table_lookups frozen>, - meta solana.transaction_meta, + address_table_lookups frozen>, + meta transaction_meta, is_vote boolean, tx_index bigint, @@ -217,8 +199,18 @@ create table if not exists solana.log ( ) WITH CLUSTERING ORDER BY (offset desc); +create table if not exists producer_slot_seen ( + producer_id blob, + slot bigint, + shard_offset_map frozen>>, + created_at timestamp, + primary key (producer_id, slot) +) +with clustering order by (slot DESC); + + -create materialized view if not exists solana.slot_map_mv +create materialized view if not exists slot_map_mv as select slot, @@ -226,7 +218,7 @@ select shard_id, period, offset -from solana.log +from log where slot is not null and producer_id is not null @@ -236,10 +228,11 @@ where primary key (slot, producer_id, shard_id, period, offset); -create table if not exists solana.producer_slot_seen ( - producer_id blob, - slot bigint, - created_at timestamp, - primary key (producer_id, slot) -) -with clustering order by (slot DESC); \ No newline at end of file +-- clear all table + +truncate log; +truncate producer_period_commit_log; +truncate producer_slot_seen; + +truncate consumer_info; +truncate consumer_shard_offset; diff --git a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs index 7f3126a8..b842fc23 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs @@ -10,10 +10,7 @@ use { yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{ prelude::subscribe_update::UpdateOneof, - yellowstone::log::{ - yellowstone_log_server::{self, YellowstoneLog, YellowstoneLogServer}, - EventSubscriptionPolicy, - }, + yellowstone::log::{yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy}, }, yellowstone_grpc_tools::{ config::{load as config_load, GrpcRequestToProto}, @@ -249,8 +246,8 @@ impl ArgsAction { _ => continue, }; - if result.is_err() { - error!("errror detected in sink..."); + if let Err(e) = result { + error!("error detected in sink: {e}"); break; } } diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/common.rs b/yellowstone-grpc-tools/src/scylladb/consumer/common.rs index 0eebb1e0..d486c6b7 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/common.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/common.rs @@ -1,13 +1,11 @@ -use crate::scylladb::types::{BlockchainEventType, ProducerId, ShardId, ShardOffset}; +use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset}; pub type OldShardOffset = ShardOffset; -pub type ConsumerId = String; - /// /// Initial position in the log when creating a new consumer. /// -#[derive(Default, Debug, Clone, Copy)] +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] pub enum InitialOffsetPolicy { Earliest, #[default] @@ -18,6 +16,6 @@ pub enum InitialOffsetPolicy { pub struct ConsumerInfo { pub consumer_id: ConsumerId, pub producer_id: ProducerId, - pub initital_shard_offsets: Vec<(ShardId, BlockchainEventType, ShardOffset)>, + //pub initital_shard_offsets: Vec, pub subscribed_blockchain_event_types: Vec, } diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs index c06e107b..45452f64 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs @@ -1,31 +1,35 @@ use { super::{ - common::{ConsumerId, ConsumerInfo, InitialOffsetPolicy}, + common::InitialOffsetPolicy, shard_iterator::{ShardFilter, ShardIterator}, }, crate::scylladb::{ sink, types::{ - BlockchainEventType, ProducerId, ProducerInfo, ShardId, ShardOffset, MAX_PRODUCER, - MIN_PROCUDER, + BlockchainEventType, ConsumerId, ConsumerInfo, ConsumerShardOffset, ProducerId, + ProducerInfo, ShardId, ShardOffset, ShardPeriod, Slot, SHARD_OFFSET_MODULO, + UNDEFINED_SLOT, }, }, chrono::{DateTime, TimeDelta, Utc}, - futures::{future::try_join_all, Stream}, + core::fmt, + futures::{ + future::{try_join, try_join_all}, + Stream, + }, scylla::{ batch::{Batch, BatchType}, - cql_to_rust::FromCqlVal, prepared_statement::PreparedStatement, transport::query_result::SingleRowTypedError, Session, }, std::{ collections::{BTreeMap, BTreeSet}, - iter::repeat, pin::Pin, sync::Arc, time::Duration, }, + thiserror::Error, tokio::{sync::mpsc, time::Instant}, tokio_stream::wrappers::ReceiverStream, tonic::Response, @@ -39,6 +43,8 @@ use { }, }; +const CHECK_PRODUCER_LIVENESS_DELAY: Duration = Duration::from_millis(600); + const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); const FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD: Duration = Duration::from_millis(500); @@ -50,21 +56,29 @@ const DEFAULT_OFFSET_COMMIT_INTERVAL: Duration = Duration::from_secs(10); const DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY: usize = 100; const UPDATE_CONSUMER_SHARD_OFFSET: &str = r###" - UPDATE consumer_info - SET offset = ?, updated_at = currentTimestamp() + UPDATE consumer_shard_offset + SET offset = ?, slot = ?, updated_at = currentTimestamp() WHERE consumer_id = ? AND producer_id = ? AND shard_id = ? AND event_type = ? - IF offset = ? +"###; + +const LIST_PRODUCER_WITH_SLOT: &str = r###" + SELECT + producer_id, + min(slot) + FROM slot_map_mv + WHERE slot = ? + GROUP BY producer_id "###; /// /// This query leverage the fact that partition data are always sorted by the clustering key and that scylla /// always iterator or scan data in cluster order. In leyman terms that mean per partition limit will always return /// the most recent entry for each producer_id. -pub const LIST_PRODUCER_LAST_HEARBEAT: &str = r###" +const LIST_PRODUCER_LAST_HEARBEAT: &str = r###" SELECT producer_id, created_at @@ -72,56 +86,49 @@ pub const LIST_PRODUCER_LAST_HEARBEAT: &str = r###" PER PARTITION LIMIT 1 "###; -pub const GET_MIN_OFFSET_FOR_SLOT: &str = r###" +const GET_SHARD_OFFSET_AT_SLOT_APPROX: &str = r###" SELECT - shard_id, - min(offset) - FROM solana.slot_map_mv - WHERE slot = ? and producer_id = ? - ORDER BY shard_id - GROUP BY shard_id; + shard_offset_map, + slot + FROM producer_slot_seen + where + producer_id = ? + AND slot <= ? + ORDER BY slot desc + LIMIT 1; "###; -pub const INSERT_CONSUMER_OFFSET: &str = r###" - INSERT INTO consumer_info ( +const INSERT_CONSUMER_OFFSET: &str = r###" + INSERT INTO consumer_shard_offset ( consumer_id, producer_id, shard_id, event_type, offset, + slot, created_at, updated_at ) VALUES - (?,?,?,?,?,currentTimestamp(), currentTimestamp()) + (?,?,?,?,?,?, currentTimestamp(), currentTimestamp()) "###; -pub const GET_CONSUMER_PRODUCER_MAPPING: &str = r###" +const GET_CONSUMER_INFO_BY_ID: &str = r###" SELECT - producer_id - FROM consumer_producer_mapping + consumer_id, + producer_id, + subscribed_event_types + FROM consumer_info where consumer_id = ? "###; -pub const GET_SHARD_OFFSETS_FOR_CONSUMER_ID: &str = r###" - SELECT - shard_id, - event_type, - offset - FROM consumer_info - WHERE - consumer_id = ? - AND producer_id = ? - ORDER BY shard_id ASC -"###; - -pub const LIST_PRODUCERS_WITH_LOCK: &str = r###" +const LIST_PRODUCERS_WITH_LOCK: &str = r###" SELECT producer_id FROM producer_lock "###; -pub const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" +const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" SELECT producer_id, count(1) @@ -129,69 +136,58 @@ pub const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" GROUP BY producer_id "###; -pub const INSERT_CONSUMER_PRODUCER_MAPPING: &str = r###" - INSERT INTO consumer_producer_mapping ( - consumer_id, - producer_id, - created_at, - updated_at - ) - VALUES (?, ?, currentTimestamp(), currentTimestamp()) +const INSERT_CONSUMER_INFO: &str = r###" + INSERT INTO consumer_info (consumer_id, producer_id, subscribed_event_types, created_at, updated_at) + VALUES (?,?,?, currentTimestamp(), currentTimestamp()) "###; -/// -/// CQL does not support OR conditions, -/// this is why use >=/<= to emulate the following condition: (producer_id = ? or ?) -/// produ -pub const GET_PRODUCER_INFO_BY_ID_OR_ANY: &str = r###" +const UPSERT_CONSUMER_INFO: &str = r###" + UPDATE consumer_info + SET producer_id = ?, + subscribed_event_types = ?, + updated_at = currentTimestamp() + WHERE consumer_id = ? +"###; + +const GET_PRODUCER_INFO_BY_ID: &str = r###" SELECT producer_id, num_shards FROM producer_info - WHERE producer_id >= ? and producer_id <= ? - LIMIT 1 - ALLOW FILTERING + WHERE producer_id = ? "###; -/// -/// Returns the latest offset per shard for a consumer id -/// -pub async fn get_shard_offsets_info_for_consumer_id( - session: Arc, - consumer_id: impl AsRef, - producer_id: ProducerId, - ev_types_to_include: &[BlockchainEventType], -) -> anyhow::Result> { - session - .query( - GET_SHARD_OFFSETS_FOR_CONSUMER_ID, - (consumer_id.as_ref(), producer_id), - ) - .await? - .rows_typed_or_empty::<(ShardId, BlockchainEventType, ShardOffset)>() - .filter(|result| { - if let Ok(triplet) = result { - ev_types_to_include.contains(&triplet.1) - } else { - false - } - }) - .collect::, _>>() - .map_err(anyhow::Error::new) +#[derive(Clone, Debug, Error, PartialEq, Eq, Copy)] +struct ImpossibleSlotOffset(Slot); + +impl fmt::Display for ImpossibleSlotOffset { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let slot = self.0; + f.write_str(format!("ImpossbielInititalOffset({slot})").as_str()) + } +} + +#[derive(Clone, Debug, PartialEq, Error, Eq, Copy)] +struct DeadProducerErr(ProducerId); + +impl fmt::Display for DeadProducerErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let producer_id = self.0[0]; + f.write_str(format!("ProducerStale({producer_id})").as_str()) + } } /// /// Returns the assigned producer id to specific consumer if any. /// -pub async fn get_producer_id_for_consumer( +pub async fn get_consumer_info_by_id( session: Arc, - consumer_id: impl AsRef, -) -> anyhow::Result> { + consumer_id: ConsumerId, +) -> anyhow::Result> { session - .query(GET_CONSUMER_PRODUCER_MAPPING, (consumer_id.as_ref(),)) + .query(GET_CONSUMER_INFO_BY_ID, (consumer_id,)) .await? - .maybe_first_row_typed::<(ProducerId,)>() - .map(|opt| opt.map(|row| row.0)) + .maybe_first_row_typed::() .map_err(anyhow::Error::new) } @@ -208,35 +204,80 @@ async fn list_producers_with_lock_held(session: Arc) -> anyhow::Result< .map_err(anyhow::Error::new) } +async fn list_producer_with_slot( + session: Arc, + slot: Slot, +) -> anyhow::Result> { + session + .query(LIST_PRODUCER_WITH_SLOT, (slot,)) + .await? + .rows_typed_or_empty::<(ProducerId, Slot)>() + .map(|result| result.map(|(producer_id, _slot)| producer_id)) + .collect::, _>>() + .map_err(anyhow::Error::new) +} + async fn list_producers_heartbeat( session: Arc, heartbeat_time_dt: Duration, ) -> anyhow::Result> { let utc_now = Utc::now(); - let min_last_heartbeat = utc_now + let heartbeat_lower_bound = utc_now .checked_sub_signed(TimeDelta::seconds(heartbeat_time_dt.as_secs().try_into()?)) .ok_or(anyhow::anyhow!("Invalid heartbeat time delta"))?; - + println!("heartbeat lower bound: {heartbeat_lower_bound}"); let producer_id_with_last_hb_datetime_pairs = session .query(LIST_PRODUCER_LAST_HEARBEAT, &[]) .await? .rows_typed::<(ProducerId, DateTime)>()? //.map(|result| result.map(|row| row.0)) .collect::, _>>()?; + + println!("{producer_id_with_last_hb_datetime_pairs:?}"); //.map_err(anyhow::Error::new) Ok(producer_id_with_last_hb_datetime_pairs .into_iter() - .filter(|(_, last_hb)| last_hb >= &min_last_heartbeat) + .filter(|(_, last_hb)| last_hb >= &heartbeat_lower_bound) .map(|(pid, _)| pid) .collect::>()) } +async fn is_producer_still_alive( + session: Arc, + producer_id: ProducerId, +) -> anyhow::Result { + let check_last_slot_seen = r###" + SELECT + slot, + created_at + FROM producer_slot_seen + WHERE + producer_id = ? + ORDER BY slot DESC + PER PARTITION LIMIT 1 + "###; + let heartbeat_lower_bound = + Utc::now() - TimeDelta::seconds(DEFAULT_LAST_HEARTBEAT_TIME_DELTA.as_secs() as i64); + let check_if_lock_held = "SELECT producer_id FROM producer_lock WHERE producer_id = ?"; + let fut1 = session.query(check_last_slot_seen, (producer_id,)); + let fut2 = session.query(check_if_lock_held, (producer_id,)); + let (qr1, qr2) = try_join(fut1, fut2).await?; + if let Some((_slot, created_at)) = qr1.maybe_first_row_typed::<(Slot, DateTime)>()? { + if created_at < heartbeat_lower_bound { + return Ok(false); + } + } + + Ok(qr2.rows.is_some()) +} + /// /// Returns the producer id with least consumer assignment. /// async fn get_producer_id_with_least_assigned_consumer( session: Arc, + slot_requirement: Option, ) -> anyhow::Result { let locked_producers = list_producers_with_lock_held(Arc::clone(&session)).await?; @@ -250,11 +291,29 @@ async fn get_producer_id_with_least_assigned_consumer( recently_active_producers.len() ); - let elligible_producers = locked_producers + let mut elligible_producers = locked_producers .into_iter() .filter(|producer_id| recently_active_producers.contains(producer_id)) .collect::>(); + if elligible_producers.is_empty() { + anyhow::bail!("No producer available at the moment"); + } + + if let Some(slot) = slot_requirement { + let producers_with_slot = + BTreeSet::from_iter(list_producer_with_slot(Arc::clone(&session), slot).await?); + info!( + "{} producer(s) with required slot {slot}", + producers_with_slot.len() + ); + + elligible_producers.retain(|k| producers_with_slot.contains(k)); + if elligible_producers.is_empty() { + return Err(anyhow::Error::new(ImpossibleSlotOffset(slot))); + } + }; + info!("{} elligible producer(s)", recently_active_producers.len()); let mut producer_count_pairs = session .query(GET_PRODUCERS_CONSUMER_COUNT, &[]) @@ -278,18 +337,12 @@ async fn get_producer_id_with_least_assigned_consumer( /// /// Returns a specific producer information by id or return a random producer_info if `producer_id` is None. -pub async fn get_producer_info_by_id_or_any( +pub async fn get_producer_info_by_id( session: Arc, - producer_id: Option, + producer_id: ProducerId, ) -> anyhow::Result> { let qr = session - .query( - GET_PRODUCER_INFO_BY_ID_OR_ANY, - ( - producer_id.unwrap_or(MIN_PROCUDER), - producer_id.unwrap_or(MAX_PRODUCER), - ), - ) + .query(GET_PRODUCER_INFO_BY_ID, (producer_id,)) .await?; match qr.single_row_typed::() { @@ -312,92 +365,123 @@ fn get_blockchain_event_types( } } -async fn register_new_consumer( +async fn assign_producer_to_consumer( session: Arc, - consumer_id: impl AsRef, + consumer_id: ConsumerId, initial_offset_policy: InitialOffsetPolicy, event_sub_policy: EventSubscriptionPolicy, -) -> anyhow::Result { - let producer_id = get_producer_id_with_least_assigned_consumer(Arc::clone(&session)).await?; + is_new: bool, +) -> anyhow::Result<(ConsumerInfo, Vec)> { + let maybe_slot_hint = if let InitialOffsetPolicy::SlotApprox(slot) = initial_offset_policy { + Some(slot) + } else { + None + }; - let insert_consumer_mapping_ps = session.prepare(INSERT_CONSUMER_PRODUCER_MAPPING).await?; - session - .execute( - &insert_consumer_mapping_ps, - (consumer_id.as_ref(), producer_id), - ) - .await?; + let producer_id = + get_producer_id_with_least_assigned_consumer(Arc::clone(&session), maybe_slot_hint).await?; + if is_new { + session + .query( + INSERT_CONSUMER_INFO, + ( + consumer_id.as_str(), + producer_id, + get_blockchain_event_types(event_sub_policy), + ), + ) + .await?; + } else { + session + .query( + UPSERT_CONSUMER_INFO, + ( + producer_id, + get_blockchain_event_types(event_sub_policy), + consumer_id.as_str(), + ), + ) + .await?; + } info!( "consumer {:?} successfully assigned producer {:?}", - consumer_id.as_ref(), + consumer_id.as_str(), producer_id ); let initital_shard_offsets = set_initial_consumer_shard_offsets( Arc::clone(&session), - consumer_id.as_ref(), + consumer_id.as_str(), producer_id, initial_offset_policy, event_sub_policy, ) .await?; + info!("Successfully set consumer shard offsets following {initial_offset_policy:?} policy"); let cs = ConsumerInfo { - consumer_id: String::from(consumer_id.as_ref()), + consumer_id: consumer_id.clone(), producer_id, - initital_shard_offsets, subscribed_blockchain_event_types: get_blockchain_event_types(event_sub_policy), }; - Ok(cs) + Ok((cs, initital_shard_offsets)) } -/// -/// Gets an existing consumer with id = `consumer_id` if exists, otherwise creates a new consumer. -/// -async fn get_or_register_consumer( +async fn get_min_offset_for_producer( session: Arc, - consumer_id: impl AsRef, - initial_offset_policy: InitialOffsetPolicy, - event_sub_policy: EventSubscriptionPolicy, -) -> anyhow::Result { - let maybe_producer_id = - get_producer_id_for_consumer(Arc::clone(&session), consumer_id.as_ref()).await?; + producer_id: ProducerId, + num_shards: usize, +) -> anyhow::Result> { + let shard_id_list = (0..num_shards) + .map(|x| format!("{x}")) + .collect::>() + .join(", "); + let query = format!( + r###" + SELECT + shard_id, + period + FROM producer_period_commit_log + WHERE + producer_id = ? + AND shard_id in ({shard_id_list}) + ORDER BY period ASC + PER PARTITION LIMIT 1 + "### + ); - if let Some(producer_id) = maybe_producer_id { - info!( - "consumer {:?} exists with producer {:?} assigned to it", - consumer_id.as_ref(), - producer_id - ); + session + .query(query, (producer_id,)) + .await? + .rows_typed::<(ShardId, ShardPeriod)>()? + .map(|result| { + result + .map(|(shard_id, period)| (shard_id, (period * SHARD_OFFSET_MODULO) as ShardOffset)) + }) + .collect::, _>>() + .map_err(anyhow::Error::new) +} - let ev_types = get_blockchain_event_types(event_sub_policy); - let shard_offsets = get_shard_offsets_info_for_consumer_id( - Arc::clone(&session), - consumer_id.as_ref(), - producer_id, - &ev_types, - ) - .await?; - if shard_offsets.is_empty() { - anyhow::bail!("Consumer state is corrupted, existing consumer should have offset already available."); - } - let cs = ConsumerInfo { - consumer_id: String::from(consumer_id.as_ref()), - producer_id, - initital_shard_offsets: shard_offsets, - subscribed_blockchain_event_types: ev_types, - }; - Ok(cs) +async fn get_slot_shard_offsets2( + session: Arc, + slot: Slot, + producer_id: ProducerId, + _num_shards: ShardId, +) -> anyhow::Result>> { + let maybe = session + .query(GET_SHARD_OFFSET_AT_SLOT_APPROX, (producer_id, slot)) + .await? + .maybe_first_row_typed::<(Vec<(ShardId, ShardOffset)>, Slot)>()?; + + if let Some((offsets, slot_approx)) = maybe { + Ok(Some( + offsets + .into_iter() + .map(|(shard_id, shard_offset)| (shard_id, shard_offset, slot_approx)) + .collect(), + )) } else { - let cid = consumer_id.as_ref(); - info!("Bootstrapping consumer {cid}"); - register_new_consumer( - session, - consumer_id, - initial_offset_policy, - event_sub_policy, - ) - .await + Ok(None) } } @@ -411,12 +495,14 @@ async fn set_initial_consumer_shard_offsets( producer_id: ProducerId, initial_offset_policy: InitialOffsetPolicy, event_sub_policy: EventSubscriptionPolicy, -) -> anyhow::Result> { +) -> anyhow::Result> { // Create all the shards counter - let producer_info = get_producer_info_by_id_or_any(Arc::clone(&session), Some(producer_id)) + let producer_info = get_producer_info_by_id(Arc::clone(&session), producer_id) .await? .unwrap_or_else(|| panic!("Producer Info `{:?}` must exists", producer_id)); + let new_consumer_id = new_consumer_id.as_ref(); + info!("consumer {new_consumer_id} will be assigned to producer {producer_id:?}"); let num_shards = producer_info.num_shards; let shard_offset_pairs = match initial_offset_policy { @@ -428,18 +514,24 @@ async fn set_initial_consumer_shard_offsets( ) .await? } - InitialOffsetPolicy::Earliest => repeat(0) - .take(num_shards as usize) - .enumerate() - .map(|(i, x)| (i as ShardId, x)) - .collect::>(), - InitialOffsetPolicy::SlotApprox(slot) => session - .query(GET_MIN_OFFSET_FOR_SLOT, (slot, producer_id)) - .await? - .rows_typed_or_empty::<(ShardId, ShardOffset)>() - .collect::, _>>()?, + InitialOffsetPolicy::Earliest => { + get_min_offset_for_producer(Arc::clone(&session), producer_id, num_shards as usize) + .await? + .into_iter() + .map(|(shard_id, shard_offset)| (shard_id, shard_offset, UNDEFINED_SLOT)) + .collect::>() + } + InitialOffsetPolicy::SlotApprox(slot) => { + get_slot_shard_offsets2(Arc::clone(&session), slot, producer_id, num_shards) + .await? + .ok_or(ImpossibleSlotOffset(slot))? + } }; + if shard_offset_pairs.is_empty() { + anyhow::bail!("Producer {producer_id:?} shard offsets is incomplete {new_consumer_id}"); + } + let adjustment = match initial_offset_policy { InitialOffsetPolicy::Earliest | InitialOffsetPolicy::SlotApprox(_) => -1, InitialOffsetPolicy::Latest => 0, @@ -459,17 +551,18 @@ async fn set_initial_consumer_shard_offsets( shard_offset_pairs .iter() .cloned() - .map(move |(shard_id, offset)| (ev_type, shard_id, offset)) + .map(move |(shard_id, offset, slot)| (ev_type, shard_id, offset, slot)) }) - .for_each(|(ev_type, shard_id, offset)| { + .for_each(|(ev_type, shard_id, offset, slot)| { let offset = offset + adjustment; batch.append_statement(insert_consumer_offset_ps.clone()); buffer.push(( - new_consumer_id.as_ref(), + new_consumer_id.to_owned(), producer_id, shard_id, ev_type, offset, + slot, )); }); @@ -477,7 +570,16 @@ async fn set_initial_consumer_shard_offsets( let shard_offsets = buffer .drain(..) - .map(|(_, _, shard_id, ev_type, offset)| (shard_id, ev_type, offset)) + .map( + |(consumer_id, producer_id, shard_id, event_type, offset, slot)| ConsumerShardOffset { + consumer_id, + producer_id, + shard_id, + event_type, + offset, + slot, + }, + ) .collect::>(); Ok(shard_offsets) @@ -533,38 +635,38 @@ impl YellowstoneLog for ScyllaYsLog { ); let req = SpawnGrpcConsumerReq { - consumer_id, + consumer_id: consumer_id.clone(), account_update_event_filter, tx_event_filter, buffer_capacity: None, offset_commit_interval: None, }; - let rx = spawn_grpc_consumer( + let result = spawn_grpc_consumer( Arc::clone(&self.session), req, initial_offset_policy, event_subscription_policy, ) - .await - .map_err(|_e| tonic::Status::internal("fail to spawn consumer"))?; + .await; - let ret = ReceiverStream::new(rx); - - let res = Response::new(Box::pin(ret) as Self::ConsumeStream); - Ok(res) + match result { + Ok(rx) => { + let ret = ReceiverStream::new(rx); + let res = Response::new(Box::pin(ret) as Self::ConsumeStream); + Ok(res) + } + Err(e) => { + error!(consumer_id=consumer_id, error = %e); + Err(tonic::Status::internal(format!( + "({consumer_id})fail to spawn consumer" + ))) + } + } } } -struct GrpcConsumerSource { - session: Arc, - consumer_info: ConsumerInfo, - sender: mpsc::Sender>, - // The interval at which we want to commit our Offset progression to Scylla - offset_commit_interval: Duration, - shard_iterators: Vec, -} - +#[derive(Clone)] pub struct SpawnGrpcConsumerReq { pub consumer_id: ConsumerId, pub account_update_event_filter: @@ -574,32 +676,26 @@ pub struct SpawnGrpcConsumerReq { pub offset_commit_interval: Option, } +type GrpcConsumerSender = mpsc::Sender>; type GrpcConsumerReceiver = mpsc::Receiver>; -pub async fn spawn_grpc_consumer( +async fn build_grpc_consumer_source( + sender: GrpcConsumerSender, session: Arc, req: SpawnGrpcConsumerReq, initial_offset_policy: InitialOffsetPolicy, event_subscription_policy: EventSubscriptionPolicy, -) -> anyhow::Result { - let consumer_info = get_or_register_consumer( + is_new: bool, +) -> anyhow::Result { + let (consumer_info, initial_shard_offsets) = assign_producer_to_consumer( Arc::clone(&session), - req.consumer_id.as_str(), + req.consumer_id.clone(), initial_offset_policy, event_subscription_policy, + is_new, ) - .await - .map_err(|e| { - error!("{:?}", e); - tonic::Status::new( - tonic::Code::Internal, - format!("failed to get or create consumer {:?}", req.consumer_id), - ) - })?; - let buffer_capacity = req - .buffer_capacity - .unwrap_or(DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY); - let (sender, receiver) = mpsc::channel(buffer_capacity); + .await?; + //let last_committed_offsets = state.shard_offsets.clone(); let consumer_session = Arc::clone(&session); @@ -620,18 +716,18 @@ pub async fn spawn_grpc_consumer( .unwrap_or_default(), }; - let shard_iterators = try_join_all(consumer_info.initital_shard_offsets.iter().cloned().map( - |(shard_id, ev_type, shard_offset)| { + let shard_iterators = try_join_all(initial_shard_offsets.iter().cloned().map( + |consumer_shard_offset| { let session = Arc::clone(&session); let producer_id = consumer_info.producer_id; let shard_filter = shard_filter.clone(); ShardIterator::new( session, producer_id, - shard_id, - shard_offset, + consumer_shard_offset.shard_id, + consumer_shard_offset.offset, // The ev_type will dictate if shard iterator streams account update or transaction. - ev_type, + consumer_shard_offset.event_type, Some(shard_filter), ) }, @@ -647,108 +743,80 @@ pub async fn spawn_grpc_consumer( shard_iterators, ) .await?; - - tokio::spawn(async move { - consumer - .into_daemon() - .await - .expect("consumer terminated abruptly"); - }); - Ok(receiver) + Ok(consumer) } -struct UpdateShardOffsetClosure { +pub async fn spawn_grpc_consumer( session: Arc, - consumer_id: ConsumerId, - producer_id: ProducerId, - update_prepared_stmt: PreparedStatement, -} + req: SpawnGrpcConsumerReq, + initial_offset_policy: InitialOffsetPolicy, + event_subscription_policy: EventSubscriptionPolicy, +) -> anyhow::Result { + let original_req = req.clone(); + let buffer_capacity = req + .buffer_capacity + .unwrap_or(DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY); + let (sender, receiver) = mpsc::channel(buffer_capacity); -impl UpdateShardOffsetClosure { - async fn new( - session: Arc, - consumer_id: ConsumerId, - producer_id: ProducerId, - ) -> anyhow::Result { - let ps = session.prepare(UPDATE_CONSUMER_SHARD_OFFSET).await?; - Ok(UpdateShardOffsetClosure { - session, - consumer_id, - producer_id, - update_prepared_stmt: ps, - }) - } + let mut grpc_consumer_source = build_grpc_consumer_source( + sender.clone(), + Arc::clone(&session), + req, + initial_offset_policy, + event_subscription_policy, + true, + ) + .await?; + let consumer_id = original_req.consumer_id.to_owned(); - async fn execute( - &self, - old_offsets: &[(ShardId, BlockchainEventType, ShardOffset)], - new_offsets: &[(ShardId, BlockchainEventType, ShardOffset)], - ) -> anyhow::Result> { - // Since the commit offset is partitionned by consumer_id/producer_id - // and that we using LWT, the entire batch will be atomic. - // - // LOGGING Batch mode is when you have a batch that span multiple partition and need some atomicity. - // In our case, we can disable batch logging since we are batching since-partition data. - // Apparently, this is done by default by Scylla, but we make it explicit here since the driver is not quite mature. - let mut atomic_batch = Batch::new(BatchType::Unlogged); - - let buffer = old_offsets - .iter() - .zip(new_offsets.iter()) - .filter(|((_, _, old_offset), (_, _, new_offset))| old_offset < new_offset) - .map( - |((shard_id, event_type, old_offset), (shard_id2, event_type2, new_offset))| { - if shard_id != shard_id2 { - panic!("Misaligned consumer offset update"); - } - if event_type != event_type2 { - panic!("Misaligned event type during offset update"); + info!("Spawning consumer {consumer_id} thread"); + tokio::spawn(async move { + let consumer_id = original_req.consumer_id.to_owned(); + let sender = sender; + let session = session; + while !sender.is_closed() { + match grpc_consumer_source.run_forever().await { + Ok(_) => break, + Err(e) => { + warn!("Consumer {consumer_id} source has stop with {e:?}"); + if let Some(DeadProducerErr(_producer_id)) = e.downcast_ref::() + { + let forged_offset_policy = grpc_consumer_source + .shard_iterators_slot + .into_iter() + .min() + .map(InitialOffsetPolicy::SlotApprox) + .unwrap_or(initial_offset_policy); + + grpc_consumer_source = build_grpc_consumer_source( + sender.clone(), + Arc::clone(&session), + original_req.clone(), + forged_offset_policy, + event_subscription_policy, + false, + ) + .await + .unwrap_or_else(|_| panic!("cannot translate consumer {consumer_id}")); + } else { + panic!("{e:?}") } - ( - new_offset, - self.consumer_id.clone(), - self.producer_id, - shard_id, - event_type, - old_offset, - ) - }, - ) - .collect::>(); - - if buffer.is_empty() { - return Ok(Ok(())); + } + } } + }); + Ok(receiver) +} - repeat(()) - .take(buffer.len()) - .for_each(|_| atomic_batch.append_statement(self.update_prepared_stmt.clone())); - - let query_result = self.session.batch(&atomic_batch, &buffer).await?; - - let row = query_result.first_row().map_err(anyhow::Error::new)?; - - let success = row - .columns - .first() // first column of LWT is always "success" field - .and_then(|opt| opt.to_owned()) - .map(bool::from_cql) - .transpose()? - .unwrap_or(false); - - let actual_offset = row - .columns - .get(5) // offset column - .and_then(|opt| opt.to_owned()) - .map(ShardOffset::from_cql) - .transpose()?; - - if success { - Ok(Ok(())) - } else { - Ok(Err(actual_offset.expect("missing actual offset from LWT"))) - } - } +struct GrpcConsumerSource { + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender>, + // The interval at which we want to commit our Offset progression to Scylla + offset_commit_interval: Duration, + shard_iterators: Vec, + shard_iterators_slot: Vec, + update_consumer_shard_offset_prepared_stmt: PreparedStatement, } impl GrpcConsumerSource { @@ -759,44 +827,63 @@ impl GrpcConsumerSource { offset_commit_interval: Duration, mut shard_iterators: Vec, ) -> anyhow::Result { + let update_consumer_shard_offset_prepared_stmt = + session.prepare(UPDATE_CONSUMER_SHARD_OFFSET).await?; // Prewarm every shard iterator try_join_all(shard_iterators.iter_mut().map(|shard_it| shard_it.warm())).await?; - + let num_shard_iterators = shard_iterators.len(); + let shard_iterators_slot = vec![UNDEFINED_SLOT; num_shard_iterators]; Ok(GrpcConsumerSource { session, consumer_info, sender, offset_commit_interval, shard_iterators, + shard_iterators_slot, + update_consumer_shard_offset_prepared_stmt, }) } - async fn into_daemon(mut self) -> anyhow::Result<()> { - let consumer_id = self.consumer_info.consumer_id; + async fn update_consumer_shard_offsets(&self) -> anyhow::Result<()> { + let mut batch = Batch::new(BatchType::Unlogged); + let mut values = Vec::with_capacity(self.shard_iterators_slot.len()); + for (i, shard_it) in self.shard_iterators.iter().enumerate() { + values.push(( + shard_it.last_offset(), + self.shard_iterators_slot[i], + self.consumer_info.consumer_id.to_owned(), + self.consumer_info.producer_id, + shard_it.shard_id, + shard_it.event_type, + )); + batch.append_statement(self.update_consumer_shard_offset_prepared_stmt.clone()); + } + + self.session.batch(&batch, values).await?; + Ok(()) + } + + async fn run_forever(&mut self) -> anyhow::Result<()> { let producer_id = self.consumer_info.producer_id; + let consumer_id = self.consumer_info.consumer_id.to_owned(); let mut commit_offset_deadline = Instant::now() + self.offset_commit_interval; - let update_shard_offset_fn = UpdateShardOffsetClosure::new( - Arc::clone(&self.session), - consumer_id.clone(), - producer_id, - ) - .await?; info!("Serving consumer: {:?}", consumer_id); - let mut last_committed_offsets = self.consumer_info.initital_shard_offsets.clone(); - last_committed_offsets.sort_by_key(|tuple| (tuple.0, tuple.1)); self.shard_iterators .sort_by_key(|it| (it.shard_id, it.event_type)); - let mut max_seen_slot = -1; + let mut max_seen_slot = UNDEFINED_SLOT; let mut num_event_between_two_slots = 0; let mut t = Instant::now(); + let mut next_producer_live_probing = Instant::now() + CHECK_PRODUCER_LIVENESS_DELAY; + let mut producer_is_dead = false; loop { - for shard_it in self.shard_iterators.iter_mut() { + for (i, shard_it) in self.shard_iterators.iter_mut().enumerate() { let maybe = shard_it.try_next().await?; if let Some(block_chain_event) = maybe { + self.shard_iterators_slot[i] = block_chain_event.slot; if t.elapsed() >= FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD { warn!( "consumer {consumer_id} micro batch took {:?} to fetch.", @@ -835,31 +922,30 @@ impl GrpcConsumerSource { } } - // Every now and then, we commit where the consumer is loc - if commit_offset_deadline.elapsed() > Duration::ZERO { - let mut new_offsets_to_commit = self - .shard_iterators - .iter() - .map(|shard_it| { - ( - shard_it.shard_id, - shard_it.event_type, - shard_it.last_offset(), - ) - }) - .collect::>(); - - let result = update_shard_offset_fn - .execute(&last_committed_offsets, &new_offsets_to_commit) - .await?; - - if let Err(_actual_offset_in_scylla) = result { - anyhow::bail!("two concurrent connections are using the same consumer instance") + if next_producer_live_probing.elapsed() > Duration::ZERO { + producer_is_dead = !is_producer_still_alive( + Arc::clone(&self.session), + self.consumer_info.producer_id, + ) + .await?; + if !producer_is_dead { + info!("producer {producer_id:?} is alive"); } - info!("Successfully committed offsets for consumer {consumer_id}"); - std::mem::swap(&mut new_offsets_to_commit, &mut last_committed_offsets); + next_producer_live_probing = Instant::now() + CHECK_PRODUCER_LIVENESS_DELAY; + } + + // Every now and then, we commit where the consumer is loc + if commit_offset_deadline.elapsed() > Duration::ZERO || producer_is_dead { + let t = Instant::now(); + self.update_consumer_shard_offsets().await?; + info!("updated consumer shard offset in {:?}", t.elapsed()); commit_offset_deadline = Instant::now() + self.offset_commit_interval; } + + if producer_is_dead { + warn!("Producer {producer_id:?} is considered dead"); + return Err(anyhow::Error::new(DeadProducerErr(producer_id))); + } } } } diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs b/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs index 68201471..f73868a4 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs @@ -314,8 +314,9 @@ impl ShardIterator { } else { let curr_period = last_offset / SHARD_OFFSET_MODULO; if curr_period <= self.last_period_confirmed { - let last_period_offset = ((curr_period + 1) * SHARD_OFFSET_MODULO) - 1; - (ShardIteratorState::Empty(last_period_offset), None) + let last_offset_for_curr_period = + ((curr_period + 1) * SHARD_OFFSET_MODULO) - 1; + (ShardIteratorState::Empty(last_offset_for_curr_period), None) } else { // If a newly loaded row stream is already empty, we must figure out if // its because there no more data in the period or is it because we consume too fast and we should try again later. diff --git a/yellowstone-grpc-tools/src/scylladb/sink.rs b/yellowstone-grpc-tools/src/scylladb/sink.rs index a3c0abde..162b784a 100644 --- a/yellowstone-grpc-tools/src/scylladb/sink.rs +++ b/yellowstone-grpc-tools/src/scylladb/sink.rs @@ -6,11 +6,14 @@ use { }, types::{ AccountUpdate, BlockchainEvent, ProducerId, ProducerInfo, ShardId, ShardOffset, - ShardPeriod, Transaction, SHARD_OFFSET_MODULO, + ShardPeriod, Slot, Transaction, SHARD_OFFSET_MODULO, UNDEFINED_SLOT, }, }, deepsize::DeepSizeOf, - futures::future, + futures::{ + future::{self, try_join_all}, + Future, + }, local_ip_address::{list_afinet_netifas, local_ip}, scylla::{ batch::{Batch, BatchType}, @@ -19,7 +22,11 @@ use { FromRow, Session, SessionBuilder, }, std::{collections::BTreeMap, net::IpAddr, sync::Arc, time::Duration}, - tokio::{task::JoinHandle, time::Instant}, + tokio::{ + sync::mpsc::{error::SendError, Permit}, + task::{JoinError, JoinHandle}, + time::Instant, + }, tracing::{error, info, warn}, uuid::Uuid, }; @@ -53,8 +60,8 @@ impl FromRow for LwtSuccess { } const INSERT_PRODUCER_SLOT: &str = r###" - INSERT INTO producer_slot_seen (producer_id, slot, created_at) - VALUES (?, ?, currentTimestamp()) + INSERT INTO producer_slot_seen (producer_id, slot, shard_offset_map, created_at) + VALUES (?, ?, ?, currentTimestamp()) "###; const DROP_PRODUCER_LOCK: &str = r###" @@ -127,15 +134,15 @@ pub struct ScyllaSinkConfig { } #[allow(clippy::large_enum_variant)] -#[derive(Debug, Clone, PartialEq)] -enum ClientCommand { +#[derive(Debug, PartialEq)] +enum ShardCommand { Shutdown, // Add other action if necessary... InsertAccountUpdate(AccountUpdate), InsertTransaction(Transaction), } -/// Represents a shard responsible for processing and batching `ClientCommand` messages +/// Represents a shard responsible for processing and batching `ShardCommand` messages /// before committing them to the database in a background daemon. /// /// This struct encapsulates the state and behavior required to manage message buffering, @@ -170,6 +177,8 @@ struct Shard { /// Duration to linger before flushing the buffer. buffer_linger: Duration, + + last_committed_period: ShardPeriod, } impl Shard { @@ -198,6 +207,7 @@ impl Shard { scylla_batch: Batch::new(BatchType::Unlogged), buffer_linger, curr_batch_byte_size: 0, + last_committed_period: -1, } } @@ -225,22 +235,19 @@ impl Shard { Ok(()) } - /// Converts the current `Shard` instance into a background daemon for processing and batching `ClientCommand` messages. + /// Converts the current `Shard` instance into a background daemon for processing and batching `ShardCommand` messages. /// /// This method spawns an asynchronous task (`tokio::spawn`) to continuously receive messages from a channel (`receiver`), /// batch process them, and commit periods to the database. It handles message buffering /// and period commitment based on the configured buffer settings and period boundaries. /// /// # Returns - /// Returns a `Sender` channel (`tokio::sync::mpsc::Sender`) that can be used to send `ClientCommand` messages + /// Returns a `Sender` channel (`tokio::sync::mpsc::Sender`) that can be used to send `ShardCommand` messages /// to the background daemon for processing and batching. - fn into_daemon( - mut self, - ) -> ( - tokio::sync::mpsc::Sender, - JoinHandle>, - ) { - let (sender, mut receiver) = tokio::sync::mpsc::channel::(16); + fn into_daemon(mut self) -> ShardHandle { + let (sender, mut receiver) = tokio::sync::mpsc::channel::(16); + let shard_id = self.shard_id; + let (wsender, wreceiver) = tokio::sync::watch::channel(self.next_offset - 1); let handle: JoinHandle> = tokio::spawn(async move { let insert_event_ps = self.session.prepare(INSERT_BLOCKCHAIN_EVENT).await?; @@ -252,13 +259,17 @@ impl Shard { let producer_id = self.producer_id; let offset = self.next_offset; let curr_period = offset / SHARD_OFFSET_MODULO; + let prev_period = curr_period - 1; // If we started a new period - if offset % SHARD_OFFSET_MODULO == 0 && offset > 0 { + if offset % SHARD_OFFSET_MODULO == 0 + && offset > 0 + && self.last_committed_period != prev_period + { // Make sure the last period is committed let t = Instant::now(); self.session - .execute(&commit_period_ps, (producer_id, shard_id, curr_period - 1)) + .execute(&commit_period_ps, (producer_id, shard_id, prev_period)) .await?; info!( shard = shard_id, @@ -266,20 +277,25 @@ impl Shard { committed_period = curr_period, time_to_commit = ?t.elapsed() ); + self.last_committed_period = prev_period; } - self.next_offset += 1; let msg = receiver .recv() .await .ok_or(anyhow::anyhow!("Shard mailbox closed"))?; let maybe_blockchain_event = match msg { - ClientCommand::Shutdown => None, - ClientCommand::InsertAccountUpdate(acc_update) => { + ShardCommand::Shutdown => { + warn!("Shard {} received shutdown command.", shard_id); + self.flush().await?; + warn!("shard {} finished shutdown procedure", shard_id); + return Ok(()); + } + ShardCommand::InsertAccountUpdate(acc_update) => { Some(acc_update.as_blockchain_event(shard_id, producer_id, offset)) } - ClientCommand::InsertTransaction(new_tx) => { + ShardCommand::InsertTransaction(new_tx) => { Some(new_tx.as_blockchain_event(shard_id, producer_id, offset)) } }; @@ -299,22 +315,60 @@ impl Shard { self.buffer.push(blockchain_event); self.scylla_batch.append_statement(insert_event_ps.clone()); self.curr_batch_byte_size += msg_byte_size; - } else { - warn!("Shard {} received shutdown command.", shard_id); - self.flush().await?; - warn!("shard {} finished shutdown procedure", shard_id); - return Ok(()); + wsender + .send(offset) + .map_err(|_offset| anyhow::anyhow!("failed to notify committed offset"))?; + self.next_offset += 1; } } }); - (sender, handle) + + ShardHandle { + shard_id, + sender, + tokio_handle: handle, + shard_offset_watch: wreceiver, + } + } +} + +struct ShardHandle { + shard_id: ShardId, + sender: tokio::sync::mpsc::Sender, + tokio_handle: JoinHandle>, + shard_offset_watch: tokio::sync::watch::Receiver, +} + +impl ShardHandle { + async fn reserve(&self) -> Result, SendError<()>> { + self.sender.reserve().await + } + + async fn send(&self, value: ShardCommand) -> Result<(), SendError> { + self.sender.send(value).await + } + + fn get_last_committed_offset(&self) -> ShardOffset { + self.shard_offset_watch.borrow().to_owned() + } +} + +impl Future for ShardHandle { + type Output = Result, JoinError>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let handle = &mut self.tokio_handle; + tokio::pin!(handle); + handle.poll(cx) } } pub struct ScyllaSink { - router_sender: tokio::sync::mpsc::Sender, + router_sender: tokio::sync::mpsc::Sender, router_handle: JoinHandle>, - shard_handles: Vec>>, producer_lock: ProducerLock, } @@ -343,7 +397,7 @@ pub(crate) async fn get_max_shard_offsets_for_producer( session: Arc, producer_id: ProducerId, num_shards: usize, -) -> anyhow::Result> { +) -> anyhow::Result> { let cql_shard_list = (0..num_shards) .map(|shard_id| format!("{shard_id}")) .collect::>() @@ -378,7 +432,8 @@ pub(crate) async fn get_max_shard_offsets_for_producer( let query_max_offset_for_shard_period = r###" SELECT - offset + offset, + slot FROM log WHERE producer_id = ? @@ -396,15 +451,14 @@ pub(crate) async fn get_max_shard_offsets_for_producer( let ps = max_offset_for_shard_period_ps.clone(); let session = Arc::clone(&session); async move { - let max_offset = session + let (max_offset, slot) = session .execute(&ps, (producer_id, shard_id, curr_period)) .await? - .maybe_first_row_typed::<(ShardOffset,)>()? - .map(|tuple| tuple.0) + .maybe_first_row_typed::<(ShardOffset, Slot)>()? // If row is None, it means no period has started since the last period commit. // So we seek at the end of the previous period. - .unwrap_or((curr_period * SHARD_OFFSET_MODULO) - 1); - Ok::<_, anyhow::Error>((*shard_id, max_offset)) + .unwrap_or(((curr_period * SHARD_OFFSET_MODULO) - 1, UNDEFINED_SLOT)); + Ok::<_, anyhow::Error>((*shard_id, max_offset, slot)) } }, )) @@ -419,9 +473,9 @@ pub(crate) async fn get_max_shard_offsets_for_producer( Ok(shard_max_offset_pairs) } -/// Spawns a round-robin dispatcher for sending `ClientCommand` messages to a list of shard mailboxes. +/// Spawns a round-robin dispatcher for sending `ShardCommand` messages to a list of shard mailboxes. /// -/// This function takes a vector of shard mailboxes (`tokio::sync::mpsc::Sender`) and returns +/// This function takes a vector of shard mailboxes (`tokio::sync::mpsc::Sender`) and returns /// a new `Sender` that can be used to dispatch messages in a round-robin fashion to the provided shard mailboxes. /// /// The dispatcher cycles through the shard mailboxes indefinitely, ensuring each message is sent to the next @@ -432,13 +486,13 @@ pub(crate) async fn get_max_shard_offsets_for_producer( /// - `shard_mailboxes`: A vector of `Sender` channels representing shard mailboxes to dispatch messages to. /// /// # Returns -/// A `Sender` channel that can be used to send `ClientCommand` messages to the shard mailboxes in a round-robin manner. +/// A `Sender` channel that can be used to send `ShardCommand` messages to the shard mailboxes in a round-robin manner. fn spawn_round_robin( session: Arc, producer_id: ProducerId, - shard_mailboxes: Vec>, + shard_handles: Vec, ) -> ( - tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Sender, JoinHandle>, ) { let (sender, mut receiver) = tokio::sync::mpsc::channel(DEFAULT_SHARD_MAX_BUFFER_CAPACITY); @@ -448,7 +502,7 @@ fn spawn_round_robin( //session.execute(&insert_slot_ps, (producer_id,)).await?; - let iterator = shard_mailboxes.iter().enumerate().cycle(); + let iterator = shard_handles.iter().enumerate().cycle(); info!("Started round robin router"); let mut msg_between_slot = 0; let mut max_slot_seen = -1; @@ -456,15 +510,16 @@ fn spawn_round_robin( let mut background_commit_max_slot_seen = tokio::spawn(future::ready(Ok::<(), anyhow::Error>(()))); for (i, shard_sender) in iterator { - let msg = receiver.recv().await.unwrap_or(ClientCommand::Shutdown); - if msg == ClientCommand::Shutdown { + let msg = receiver.recv().await.unwrap_or(ShardCommand::Shutdown); + + if msg == ShardCommand::Shutdown { warn!("round robin router's mailbox closed unexpectly."); break; } let slot = match &msg { - ClientCommand::Shutdown => -1, - ClientCommand::InsertAccountUpdate(x) => x.slot, - ClientCommand::InsertTransaction(x) => x.slot, + ShardCommand::Shutdown => -1, + ShardCommand::InsertAccountUpdate(x) => x.slot, + ShardCommand::InsertTransaction(x) => x.slot, }; if max_slot_seen < slot { max_slot_seen = slot; @@ -476,9 +531,14 @@ fn spawn_round_robin( let session = Arc::clone(&session); let insert_slot_ps = insert_slot_ps.clone(); + let shard_offset_pairs = shard_handles + .iter() + .map(|sh| (sh.shard_id, sh.get_last_committed_offset())) + .collect::>(); + background_commit_max_slot_seen = tokio::spawn(async move { session - .execute(&insert_slot_ps, (producer_id, slot)) + .execute(&insert_slot_ps, (producer_id, slot, shard_offset_pairs)) .await?; let time_to_commit_slot = t.elapsed(); @@ -502,11 +562,13 @@ fn spawn_round_robin( } } // Send shutdown to all shards - for (i, shard_sender) in shard_mailboxes.iter().enumerate() { + for (i, shard_sender) in shard_handles.iter().enumerate() { warn!("Shutting down shard: {}", i); - shard_sender.send(ClientCommand::Shutdown).await?; + shard_sender.send(ShardCommand::Shutdown).await?; } + try_join_all(shard_handles.into_iter()).await?; + warn!("End of round robin router"); Ok(()) }); @@ -629,15 +691,13 @@ impl ScyllaSink { info!("init producer {producer_id:?} period commit log successful."); - let mut sharders = vec![]; - let shard_offsets = get_max_shard_offsets_for_producer(Arc::clone(&session), producer_id, shard_count) .await?; info!("Got back last offsets of all {shard_count} shards"); let mut shard_handles = Vec::with_capacity(shard_count); - for (shard_id, last_offset) in shard_offsets.into_iter() { + for (shard_id, last_offset, _slot) in shard_offsets.into_iter() { let session = Arc::clone(&session); let shard = Shard::new( session, @@ -648,41 +708,33 @@ impl ScyllaSink { config.batch_size_kb_limit * 1024, config.linger, ); - let (shard_mailbox, shard_handle) = shard.into_daemon(); + let shard_handle = shard.into_daemon(); shard_handles.push(shard_handle); - sharders.push(shard_mailbox); } let (sender, router_handle) = - spawn_round_robin(Arc::clone(&session), producer_id, sharders); + spawn_round_robin(Arc::clone(&session), producer_id, shard_handles); Ok(ScyllaSink { router_sender: sender, router_handle, - shard_handles, producer_lock, }) } pub async fn shutdown(self) -> anyhow::Result<()> { warn!("Shutthing down scylla sink..."); - let router_result = self.router_sender.send(ClientCommand::Shutdown).await; + let router_result = self.router_sender.send(ShardCommand::Shutdown).await; if router_result.is_err() { - error!("router was closed before we could gracefully shutdown all sharders. Sharder should terminate on their own...") + error!("router was closed before we could gracefully shutdown all sharders."); } - if let Ok(Err(e)) = self.router_handle.await { - error!("Router error: {e:?}"); + if let Err(e) = self.router_handle.await? { + error!("router error {e}"); } - for (i, shard_handle) in self.shard_handles.into_iter().enumerate() { - if let Ok(Err(e)) = shard_handle.await { - error!("shard {i} error: {e:?}"); - } - } - self.producer_lock.release().await?; - Ok(()) + self.producer_lock.release().await } - async fn inner_log(&mut self, cmd: ClientCommand) -> anyhow::Result<()> { + async fn inner_log(&mut self, cmd: ShardCommand) -> anyhow::Result<()> { self.router_sender .send(cmd) .await @@ -690,12 +742,12 @@ impl ScyllaSink { } pub async fn log_account_update(&mut self, update: AccountUpdate) -> anyhow::Result<()> { - let cmd = ClientCommand::InsertAccountUpdate(update); + let cmd = ShardCommand::InsertAccountUpdate(update); self.inner_log(cmd).await } pub async fn log_transaction(&mut self, tx: Transaction) -> anyhow::Result<()> { - let cmd = ClientCommand::InsertTransaction(tx); + let cmd = ShardCommand::InsertTransaction(tx); self.inner_log(cmd).await } } diff --git a/yellowstone-grpc-tools/src/scylladb/types.rs b/yellowstone-grpc-tools/src/scylladb/types.rs index 1d414e1e..53265147 100644 --- a/yellowstone-grpc-tools/src/scylladb/types.rs +++ b/yellowstone-grpc-tools/src/scylladb/types.rs @@ -16,18 +16,36 @@ use { }, }; -pub const SHARD_OFFSET_MODULO: i64 = 10000; - pub type ProgramId = [u8; 32]; - +pub type Pubkey = [u8; 32]; pub type Slot = i64; pub type ShardId = i16; pub type ShardPeriod = i64; pub type ShardOffset = i64; pub type ProducerId = [u8; 1]; // one byte is enough to assign an id to a machine - +pub type ConsumerId = String; +pub const SHARD_OFFSET_MODULO: i64 = 10000; pub const MIN_PROCUDER: ProducerId = [0x00]; pub const MAX_PRODUCER: ProducerId = [0xFF]; +pub const UNDEFINED_SLOT: Slot = -1; + +#[derive(Clone, Debug, PartialEq, Eq, FromRow)] +pub struct ConsumerInfo { + pub consumer_id: ConsumerId, + pub producer_id: ProducerId, + //pub initital_shard_offsets: Vec, + pub subscribed_blockchain_event_types: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq, FromRow)] +pub struct ConsumerShardOffset { + pub consumer_id: ConsumerId, + pub producer_id: ProducerId, + pub shard_id: ShardId, + pub event_type: BlockchainEventType, + pub offset: ShardOffset, + pub slot: Slot, +} #[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Copy, DeepSizeOf)] pub enum BlockchainEventType { @@ -115,8 +133,6 @@ pub struct BlockchainEvent { pub tx_index: Option, } -type Pubkey = [u8; 32]; - #[derive(SerializeRow, Clone, Debug, DeepSizeOf, PartialEq, Eq)] pub struct AccountUpdate { pub slot: i64,