From f28384ff559c359deb8ae40531bd6c14d37bc801 Mon Sep 17 00:00:00 2001 From: lvboudre Date: Mon, 27 May 2024 16:08:02 -0400 Subject: [PATCH] tools: add commitment level supports for consumer + better metadata (#350) --- .../proto/yellowstone-log.proto | 90 +++++ yellowstone-grpc-tools/solana.cql | 32 +- .../src/bin/grpc-scylladb.rs | 22 +- yellowstone-grpc-tools/src/scylladb/config.rs | 7 +- .../src/scylladb/consumer/common.rs | 9 +- .../src/scylladb/consumer/grpc.rs | 314 +++++++++++++----- yellowstone-grpc-tools/src/scylladb/sink.rs | 12 +- yellowstone-grpc-tools/src/scylladb/types.rs | 68 ++++ 8 files changed, 440 insertions(+), 114 deletions(-) diff --git a/yellowstone-grpc-proto/proto/yellowstone-log.proto b/yellowstone-grpc-proto/proto/yellowstone-log.proto index bec67e39..6a88b559 100644 --- a/yellowstone-grpc-proto/proto/yellowstone-log.proto +++ b/yellowstone-grpc-proto/proto/yellowstone-log.proto @@ -12,32 +12,122 @@ service YellowstoneLog { } +/// The InitialOffsetPolicy enum determines the initial offset used when subscribing to events or messages. It provides three options: +/// +/// EARLIEST (0) +/// This policy subscribes to events or messages starting from the earliest available offset in the data stream. It ensures that all historical data is consumed from the beginning. +/// +/// LATEST (1) +/// Subscribes to events or messages starting from the latest available offset in the data stream. It only consumes new events or messages generated after the subscription is initiated. +/// +/// SLOT (2) +/// This policy subscribes to events or messages starting from a specific slot number in the data stream. It allows for precise control over where consumption begins based on slot numbers. enum InitialOffsetPolicy { EARLIEST = 0; LATEST = 1; SLOT = 2; } + +/// The EventSubscriptionPolicy enum defines the types of events to subscribe to. It offers three options: +/// +/// ACCOUNT_UPDATE_ONLY (0) +/// Subscribes to account update events exclusively. It filters out other types of events, focusing solely on account-related updates. +/// +/// TRANSACTION_ONLY (1) +/// Subscribes to transaction events exclusively. It filters out non-transactional events, ensuring that only transaction-related data is consumed. +/// +/// BOTH (2) +/// This policy subscribes to both account update and transaction events. It enables consumption of a wider range of event types, encompassing both account-related updates and transactions. enum EventSubscriptionPolicy { ACCOUNT_UPDATE_ONLY = 0; TRANSACTION_ONLY = 1; BOTH = 2; } +/// Timeline Translation Policy +/// The TimelineTranslationPolicy enum defines different strategies for translating users to timelines based on their last seen slot number or a specified lag. This is particularly useful for managing data consumption in conjunction with the ConsumeRequest.ttp_maximum_slot_lag parameter. +/// +/// ALLOW_LAG +/// This policy allows users to be translated to a timeline that contains the last seen slot number or up to a certain lag. It extends the allowable lag for eligible timelines. +/// +/// Example: +/// +/// Suppose we have three timelines with the latest available slot numbers: +/// +/// timeline1 : 10 +/// timeline2 : 5 +/// timeline3 : 8 +/// If a consumer is assigned to timeline1 with ttp_maximum_slot_lag set to 2, then the only eligible destination timeline would be timeline3. +/// +/// STRICT_SLOT +/// Under this policy, eligible destination timelines must contain the last seen slot number in the current consumer timeline; otherwise, the translation fails. This ensures strict adherence to slot numbers when translating users to timelines. +enum TimelineTranslationPolicy { + ALLOW_LAG = 0; + STRICT_SLOT = 1; +} + +/// The ConsumeRequest message defines parameters for consuming events or messages from a data stream. It includes the following fields: +/// +/// consumer_id (1) +/// An optional string representing the consumer's unique identifier. +/// +/// initial_offset_policy (2) +/// Specifies the initial offset policy for subscribing to events. It uses values from the InitialOffsetPolicy enum. +/// +/// at_slot (3) +/// An optional int64 indicating the specific slot number from which consumption should start. This is relevant when initial_offset_policy is set to SLOT. +/// +/// event_subscription_policy (4) +/// Defines the event subscription policy using values from the EventSubscriptionPolicy enum. +/// +/// account_update_event_filter (5) +/// An optional AccountUpdateEventFilter message specifying filters for account update events. +/// +/// tx_event_filter (6) +/// An optional TransactionEventFilter message defining filters for transaction events. +/// +/// commitment_level (7) +/// Specifies the commitment level for consuming events. It uses values from the geyser.CommitmentLevel enum. +/// +/// timelineTranslationPolicy (8) +/// An optional TimelineTranslationPolicy describing the policy for triggering timeline translation when an ingester is out of service and consumers need to be translated to a different ingestion timeline. +/// +/// ttp_maximum_slot_lag (9) +/// An optional uint32 indicating the maximum slot lag allowed for timeline translation. message ConsumeRequest { optional string consumer_id = 1; + InitialOffsetPolicy initial_offset_policy = 2; optional int64 at_slot = 3; + EventSubscriptionPolicy event_subscription_policy = 4; optional AccountUpdateEventFilter account_update_event_filter = 5; optional TransactionEventFilter tx_event_filter = 6; + geyser.CommitmentLevel commitment_level = 7; + + // timelineTranslationPolicy is used when an ingester is out of service and we need to translate a set + // of consumer to a different ingestion timeline. The policy describe what to do when we need to trigger timeline translation. + optional TimelineTranslationPolicy timelineTranslationPolicy = 8; + optional uint32 ttp_maximum_slot_lag = 9; } +/// The AccountUpdateEventFilter message defines filters for account update events. It includes the following fields: +/// +/// pubkeys (1) +/// A repeated field of bytes representing public keys. Events matching any of these public keys will be included in the filtered results. +/// +/// owners (2) +/// A repeated field of bytes representing account owners. Events matching any of these account owners will be included in the filtered results. message AccountUpdateEventFilter { repeated bytes pubkeys = 1; repeated bytes owners = 2; } +/// The TransactionEventFilter message specifies filters for transaction events. It contains the following field: + +/// account_keys (1) +/// A repeated field of bytes representing account keys. Events associated with any of these account keys will be included in the filtered results. message TransactionEventFilter { repeated bytes account_keys = 1; } \ No newline at end of file diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql index 3af1703e..4a620157 100644 --- a/yellowstone-grpc-tools/solana.cql +++ b/yellowstone-grpc-tools/solana.cql @@ -31,12 +31,14 @@ create table if not exists consumer_shard_offset ( created_at timestamp, updated_at timestamp, PRIMARY KEY ((consumer_id, producer_id), shard_id, event_type) -); +) +with default_time_to_live = 3600; create table if not exists consumer_info ( consumer_id text, producer_id blob, + consumer_ip: text, subscribed_event_types frozen>, created_at timestamp, updated_at timestamp, @@ -56,6 +58,7 @@ primary key (producer_id, consumer_id); create table if not exists producer_info ( producer_id blob, + commitment smallint, num_shards smallint, created_at timestamp, updated_at timestamp, @@ -192,7 +195,8 @@ create table if not exists log ( primary key ((shard_id, period, producer_id), offset) ) WITH CLUSTERING ORDER BY (offset desc) - AND default_time_to_live = 86400; -- 24 hours + AND default_time_to_live = 86400 + and compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'MINUTES', 'compaction_window_size' : 10}; create table if not exists producer_slot_seen ( producer_id blob, @@ -204,22 +208,14 @@ create table if not exists producer_slot_seen ( with clustering order by (slot DESC) AND default_time_to_live = 82800; -- 23 hours -create materialized view if not exists slot_map_mv -as -select - slot, - producer_id, - shard_id, - period, - offset -from log -where - slot is not null - and producer_id is not null - and shard_id is not null - and period is not null - and offset is not null -primary key (slot, producer_id, shard_id, period, offset); +CREATE materialized VIEW if not exists slot_producer_seen_mv +AS +SELECT slot, producer_id FROM producer_slot_seen +WHERE + producer_id is not null + and slot is not null +PRIMARY KEY (slot, producer_id); + create table if not exists producer_period_commit_log ( diff --git a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs index b842fc23..3612be42 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs @@ -10,7 +10,10 @@ use { yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{ prelude::subscribe_update::UpdateOneof, - yellowstone::log::{yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy}, + yellowstone::log::{ + yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy, + TimelineTranslationPolicy, + }, }, yellowstone_grpc_tools::{ config::{load as config_load, GrpcRequestToProto}, @@ -21,11 +24,11 @@ use { Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo, }, consumer::{ - common::InitialOffsetPolicy, + common::InitialOffset, grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq}, }, sink::ScyllaSink, - types::Transaction, + types::{CommitmentLevel, Transaction}, }, setup_tracing, }, @@ -136,18 +139,17 @@ impl ArgsAction { let session = Arc::new(session); let req = SpawnGrpcConsumerReq { consumer_id: String::from("test"), + consumer_ip: None, account_update_event_filter: None, tx_event_filter: None, buffer_capacity: None, offset_commit_interval: None, + event_subscription_policy: EventSubscriptionPolicy::Both, + commitment_level: CommitmentLevel::Processed, + timeline_translation_policy: TimelineTranslationPolicy::AllowLag, + timeline_translation_allowed_lag: None, }; - let mut rx = spawn_grpc_consumer( - session, - req, - InitialOffsetPolicy::Earliest, - EventSubscriptionPolicy::Both, - ) - .await?; + let mut rx = spawn_grpc_consumer(session, req, InitialOffset::Earliest).await?; let mut print_tx_secs = Instant::now() + Duration::from_secs(1); let mut num_events = 0; diff --git a/yellowstone-grpc-tools/src/scylladb/config.rs b/yellowstone-grpc-tools/src/scylladb/config.rs index 94b2cccd..a52787a8 100644 --- a/yellowstone-grpc-tools/src/scylladb/config.rs +++ b/yellowstone-grpc-tools/src/scylladb/config.rs @@ -1,5 +1,5 @@ use { - super::sink::ScyllaSinkConfig, + super::{sink::ScyllaSinkConfig, types::CommitmentLevel}, crate::config::ConfigGrpcRequest, serde::Deserialize, serde_with::{serde_as, DurationMilliSeconds}, @@ -97,6 +97,11 @@ impl ConfigGrpc2ScyllaDB { linger: self.linger, keyspace: self.keyspace.clone(), ifname: self.ifname.to_owned(), + commitment_level: match self.request.commitment.expect("Missing commitment level") { + crate::config::ConfigGrpcRequestCommitment::Processed => CommitmentLevel::Processed, + crate::config::ConfigGrpcRequestCommitment::Confirmed => CommitmentLevel::Confirmed, + crate::config::ConfigGrpcRequestCommitment::Finalized => CommitmentLevel::Finalized, + }, } } } diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/common.rs b/yellowstone-grpc-tools/src/scylladb/consumer/common.rs index d486c6b7..4c990746 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/common.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/common.rs @@ -1,4 +1,4 @@ -use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset}; +use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset, Slot}; pub type OldShardOffset = ShardOffset; @@ -6,11 +6,14 @@ pub type OldShardOffset = ShardOffset; /// Initial position in the log when creating a new consumer. /// #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] -pub enum InitialOffsetPolicy { +pub enum InitialOffset { Earliest, #[default] Latest, - SlotApprox(i64), + SlotApprox { + desired_slot: Slot, + min_slot: Slot, + }, } pub struct ConsumerInfo { diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs index 9b66b934..486c2a15 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs @@ -1,13 +1,13 @@ use { super::{ - common::InitialOffsetPolicy, + common::InitialOffset, shard_iterator::{ShardFilter, ShardIterator}, }, crate::scylladb::{ sink, types::{ - BlockchainEventType, ConsumerId, ConsumerInfo, ConsumerShardOffset, ProducerId, - ProducerInfo, ShardId, ShardOffset, Slot, UNDEFINED_SLOT, + BlockchainEventType, CommitmentLevel, ConsumerId, ConsumerInfo, ConsumerShardOffset, + ProducerId, ProducerInfo, ShardId, ShardOffset, Slot, UNDEFINED_SLOT, }, }, chrono::{DateTime, TimeDelta, Utc}, @@ -24,6 +24,8 @@ use { }, std::{ collections::{BTreeMap, BTreeSet}, + net::IpAddr, + ops::RangeInclusive, pin::Pin, sync::Arc, time::Duration, @@ -38,12 +40,11 @@ use { geyser::{subscribe_update::UpdateOneof, SubscribeUpdate}, yellowstone::log::{ yellowstone_log_server::YellowstoneLog, ConsumeRequest, EventSubscriptionPolicy, + TimelineTranslationPolicy, }, }, }; -type ProducerLockId = String; - const CHECK_PRODUCER_LIVENESS_DELAY: Duration = Duration::from_millis(600); const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); @@ -66,13 +67,12 @@ const UPDATE_CONSUMER_SHARD_OFFSET: &str = r###" AND event_type = ? "###; -const LIST_PRODUCER_WITH_SLOT: &str = r###" +const LIST_PRODUCER_WITH_COMMITMENT_LEVEL: &str = r###" SELECT - producer_id, - min(slot) - FROM slot_map_mv - WHERE slot = ? - GROUP BY producer_id + producer_id + FROM producer_info + WHERE commitment_level = ? + ALLOW FILTERING "###; /// @@ -94,7 +94,8 @@ const GET_SHARD_OFFSET_AT_SLOT_APPROX: &str = r###" FROM producer_slot_seen where producer_id = ? - AND slot <= ? + AND slot <= ? + AND slot >= ? ORDER BY slot desc LIMIT 1; "###; @@ -140,8 +141,8 @@ const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" "###; const INSERT_CONSUMER_INFO: &str = r###" - INSERT INTO consumer_info (consumer_id, producer_id, subscribed_event_types, created_at, updated_at) - VALUES (?,?,?, currentTimestamp(), currentTimestamp()) + INSERT INTO consumer_info (consumer_id, producer_id, consumer_ip, subscribed_event_types, created_at, updated_at) + VALUES (?, ?, ?, ?, currentTimestamp(), currentTimestamp()) "###; const UPSERT_CONSUMER_INFO: &str = r###" @@ -155,18 +156,58 @@ const UPSERT_CONSUMER_INFO: &str = r###" const GET_PRODUCER_INFO_BY_ID: &str = r###" SELECT producer_id, - num_shards + num_shards, + commitment_level FROM producer_info WHERE producer_id = ? "###; +/// +/// This error is raised when no lock is held by any producer. +/// +#[derive(Error, PartialEq, Eq, Debug)] +struct NoActiveProducer; + +impl fmt::Display for NoActiveProducer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("NoActiveProducer") + } +} + +/// +/// This error is raised when there is no active producer for the desired commitment level. +/// +#[derive(Copy, Error, PartialEq, Eq, Debug, Clone)] +struct ImpossibleCommitmentLevel(CommitmentLevel); + +impl fmt::Display for ImpossibleCommitmentLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let cl = self.0; + f.write_fmt(format_args!("ImpossibleCommitmentLevel({})", cl)) + } +} + +/// +/// This error is raised when the combination of consumer critera result in an empty set of elligible producer timeline. +/// +#[derive(Error, PartialEq, Eq, Debug)] +struct ImpossibleTimelineSelection; + +impl fmt::Display for ImpossibleTimelineSelection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("ImpossibleTimelineSelection") + } +} +/// +/// This error is raised when no producer as seen the desired `slot`. +/// #[derive(Clone, Debug, Error, PartialEq, Eq, Copy)] struct ImpossibleSlotOffset(Slot); impl fmt::Display for ImpossibleSlotOffset { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let slot = self.0; - f.write_str(format!("ImpossbielInititalOffset({slot})").as_str()) + f.write_fmt(format_args!("ImpossbielInititalOffset({})", slot)) } } @@ -176,7 +217,7 @@ struct DeadProducerErr(ProducerId); impl fmt::Display for DeadProducerErr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let producer_id = self.0[0]; - f.write_str(format!("ProducerStale({producer_id})").as_str()) + f.write_fmt(format_args!("ProducerStale({})", producer_id)) } } @@ -201,7 +242,7 @@ async fn list_producers_with_lock_held(session: Arc) -> anyhow::Result< session .query(LIST_PRODUCERS_WITH_LOCK, &[]) .await? - .rows_typed::<(ProducerId,)>()? + .rows_typed_or_empty::<(ProducerId,)>() .map(|result| result.map(|row| row.0)) .collect::, _>>() .map_err(anyhow::Error::new) @@ -209,13 +250,43 @@ async fn list_producers_with_lock_held(session: Arc) -> anyhow::Result< async fn list_producer_with_slot( session: Arc, - slot: Slot, + slot_range: RangeInclusive, ) -> anyhow::Result> { + let slot_values = slot_range + .map(|slot| format!("{slot}")) + .collect::>() + .join(", "); + + let query_template = format!( + r###" + SELECT + producer_id, + slot + FROM slot_producer_seen_mv + WHERE slot IN ({slot_values}) + "### + ); + info!("query {query_template}"); + session - .query(LIST_PRODUCER_WITH_SLOT, (slot,)) + .query(query_template, &[]) .await? .rows_typed_or_empty::<(ProducerId, Slot)>() .map(|result| result.map(|(producer_id, _slot)| producer_id)) + .collect::, _>>() + .map_err(anyhow::Error::new) + .map(|btree_set| btree_set.into_iter().collect()) +} + +async fn list_producer_with_commitment_level( + session: Arc, + commitment_level: CommitmentLevel, +) -> anyhow::Result> { + session + .query(LIST_PRODUCER_WITH_COMMITMENT_LEVEL, (commitment_level,)) + .await? + .rows_typed_or_empty::<(ProducerId,)>() + .map(|result| result.map(|row| row.0)) .collect::, _>>() .map_err(anyhow::Error::new) } @@ -280,44 +351,66 @@ async fn is_producer_still_alive( /// async fn get_producer_id_with_least_assigned_consumer( session: Arc, - slot_requirement: Option, + opt_slot_range: Option>, + commitment_level: CommitmentLevel, ) -> anyhow::Result { let locked_producers = list_producers_with_lock_held(Arc::clone(&session)).await?; - info!("{} producer lock(s) detected", locked_producers.len()); + + anyhow::ensure!(!locked_producers.is_empty(), NoActiveProducer); + let recently_active_producers = BTreeSet::from_iter( list_producers_heartbeat(Arc::clone(&session), DEFAULT_LAST_HEARTBEAT_TIME_DELTA).await?, ); - info!( "{} living producer(s) detected", recently_active_producers.len() ); + anyhow::ensure!(!recently_active_producers.is_empty(), NoActiveProducer); + + let producers_with_commitment_level = + list_producer_with_commitment_level(Arc::clone(&session), commitment_level).await?; + info!( + "{} producer(s) with {commitment_level:?} commitment level", + producers_with_commitment_level.len() + ); + + if producers_with_commitment_level.is_empty() { + anyhow::bail!(ImpossibleCommitmentLevel(commitment_level)) + } + let mut elligible_producers = locked_producers .into_iter() .filter(|producer_id| recently_active_producers.contains(producer_id)) .collect::>(); - if elligible_producers.is_empty() { - anyhow::bail!("No producer available at the moment"); - } + anyhow::ensure!(!elligible_producers.is_empty(), ImpossibleTimelineSelection); - if let Some(slot) = slot_requirement { - let producers_with_slot = - BTreeSet::from_iter(list_producer_with_slot(Arc::clone(&session), slot).await?); + if let Some(slot_range) = opt_slot_range { + info!("Producer needs slot in {slot_range:?}"); + let producers_with_slot = BTreeSet::from_iter( + list_producer_with_slot( + Arc::clone(&session), + *slot_range.start()..=*slot_range.end(), + ) + .await?, + ); info!( - "{} producer(s) with required slot {slot}", + "{} producer(s) with required slot range: {slot_range:?}", producers_with_slot.len() ); elligible_producers.retain(|k| producers_with_slot.contains(k)); - if elligible_producers.is_empty() { - return Err(anyhow::Error::new(ImpossibleSlotOffset(slot))); - } + + anyhow::ensure!( + !elligible_producers.is_empty(), + ImpossibleSlotOffset(*slot_range.end()) + ); }; info!("{} elligible producer(s)", recently_active_producers.len()); + let mut producer_count_pairs = session .query(GET_PRODUCERS_CONSUMER_COUNT, &[]) .await? @@ -371,18 +464,28 @@ fn get_blockchain_event_types( async fn assign_producer_to_consumer( session: Arc, consumer_id: ConsumerId, - initial_offset_policy: InitialOffsetPolicy, + consumer_ip: Option, + initial_offset: InitialOffset, event_sub_policy: EventSubscriptionPolicy, + commitment_level: CommitmentLevel, is_new: bool, ) -> anyhow::Result<(ConsumerInfo, Vec)> { - let maybe_slot_hint = if let InitialOffsetPolicy::SlotApprox(slot) = initial_offset_policy { - Some(slot) + let maybe_slot_range = if let InitialOffset::SlotApprox { + desired_slot, + min_slot, + } = initial_offset + { + Some(min_slot..=desired_slot) } else { None }; - let producer_id = - get_producer_id_with_least_assigned_consumer(Arc::clone(&session), maybe_slot_hint).await?; + let producer_id = get_producer_id_with_least_assigned_consumer( + Arc::clone(&session), + maybe_slot_range, + commitment_level, + ) + .await?; if is_new { session .query( @@ -390,6 +493,7 @@ async fn assign_producer_to_consumer( ( consumer_id.as_str(), producer_id, + consumer_ip.map(|ipaddr| ipaddr.to_string()), get_blockchain_event_types(event_sub_policy), ), ) @@ -416,11 +520,11 @@ async fn assign_producer_to_consumer( Arc::clone(&session), consumer_id.as_str(), producer_id, - initial_offset_policy, + initial_offset, event_sub_policy, ) .await?; - info!("Successfully set consumer shard offsets following {initial_offset_policy:?} policy"); + info!("Successfully set consumer shard offsets following {initial_offset:?} policy"); let cs = ConsumerInfo { consumer_id: consumer_id.clone(), producer_id, @@ -450,15 +554,22 @@ async fn get_min_offset_for_producer( async fn get_slot_shard_offsets( session: Arc, slot: Slot, + min_slot: Slot, producer_id: ProducerId, _num_shards: ShardId, ) -> anyhow::Result>> { let maybe = session - .query(GET_SHARD_OFFSET_AT_SLOT_APPROX, (producer_id, slot)) + .query( + GET_SHARD_OFFSET_AT_SLOT_APPROX, + (producer_id, slot, min_slot), + ) .await? .maybe_first_row_typed::<(Vec<(ShardId, ShardOffset)>, Slot)>()?; if let Some((offsets, slot_approx)) = maybe { + info!( + "found producer({producer_id:?}) shard offsets within slot range: {min_slot}..={slot}" + ); Ok(Some( offsets .into_iter() @@ -478,7 +589,7 @@ async fn set_initial_consumer_shard_offsets( session: Arc, new_consumer_id: impl AsRef, producer_id: ProducerId, - initial_offset_policy: InitialOffsetPolicy, + initial_offset_policy: InitialOffset, event_sub_policy: EventSubscriptionPolicy, ) -> anyhow::Result> { // Create all the shards counter @@ -491,7 +602,7 @@ async fn set_initial_consumer_shard_offsets( let num_shards = producer_info.num_shards; let shard_offset_pairs = match initial_offset_policy { - InitialOffsetPolicy::Latest => { + InitialOffset::Latest => { sink::get_max_shard_offsets_for_producer( Arc::clone(&session), producer_id, @@ -499,21 +610,32 @@ async fn set_initial_consumer_shard_offsets( ) .await? } - InitialOffsetPolicy::Earliest => { + InitialOffset::Earliest => { get_min_offset_for_producer(Arc::clone(&session), producer_id).await? } - InitialOffsetPolicy::SlotApprox(slot) => { + InitialOffset::SlotApprox { + desired_slot, + min_slot, + } => { let minium_producer_offsets = get_min_offset_for_producer(Arc::clone(&session), producer_id) .await? .into_iter() .map(|(shard_id, shard_offset, slot)| (shard_id, (shard_offset, slot))) .collect::>(); + info!("(consumer-id={new_consumer_id}) SlotApprox step 1: retrieved minimum producer({producer_id:?}) offset."); - let shard_offsets_contain_slot = - get_slot_shard_offsets(Arc::clone(&session), slot, producer_id, num_shards) - .await? - .ok_or(ImpossibleSlotOffset(slot))?; + let shard_offsets_contain_slot = get_slot_shard_offsets( + Arc::clone(&session), + desired_slot, + min_slot, + producer_id, + num_shards, + ) + .await? + .ok_or(ImpossibleSlotOffset(desired_slot))?; + + info!("(consumer-id={new_consumer_id}) SlotApprox step 2: producer({producer_id:?}) shard offsets containing slot range."); let are_shard_offset_reachable = shard_offsets_contain_slot @@ -525,8 +647,9 @@ async fn set_initial_consumer_shard_offsets( .is_some() }); + info!("(consumer-id={new_consumer_id}) SlotApprox step 3: producer({producer_id:?}) shard offset reachability: {are_shard_offset_reachable}"); if !are_shard_offset_reachable { - anyhow::bail!(ImpossibleSlotOffset(slot)) + anyhow::bail!(ImpossibleSlotOffset(desired_slot)) } shard_offsets_contain_slot @@ -536,10 +659,14 @@ async fn set_initial_consumer_shard_offsets( if shard_offset_pairs.len() != (num_shards as usize) { anyhow::bail!("Producer {producer_id:?} shard offsets is incomplete {new_consumer_id}"); } - + info!("Shard offset has been computed successfully"); let adjustment = match initial_offset_policy { - InitialOffsetPolicy::Earliest | InitialOffsetPolicy::SlotApprox(_) => -1, - InitialOffsetPolicy::Latest => 0, + InitialOffset::Earliest + | InitialOffset::SlotApprox { + desired_slot: _, + min_slot: _, + } => -1, + InitialOffset::Latest => 0, }; let insert_consumer_offset_ps: PreparedStatement = @@ -611,49 +738,59 @@ impl YellowstoneLog for ScyllaYsLog { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let consumer_ip = request.remote_addr().map(|addr| addr.ip()); let cr = request.into_inner(); let consumer_id = cr.consumer_id.clone().unwrap_or(Uuid::new_v4().to_string()); let initial_offset_policy = match cr.initial_offset_policy() { yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Earliest => { - InitialOffsetPolicy::Earliest + InitialOffset::Earliest } yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Latest => { - InitialOffsetPolicy::Latest + InitialOffset::Latest } yellowstone_grpc_proto::yellowstone::log::InitialOffsetPolicy::Slot => { let slot = cr.at_slot.ok_or(tonic::Status::invalid_argument( "Expected at_lot when initital_offset_policy is to `Slot`", ))?; - InitialOffsetPolicy::SlotApprox(slot) + InitialOffset::SlotApprox { + desired_slot: slot, + min_slot: slot, + } } }; + let timeline_translation_policy = cr.timeline_translation_policy(); + let event_subscription_policy = cr.event_subscription_policy(); let account_update_event_filter = cr.account_update_event_filter; let tx_event_filter = cr.tx_event_filter; + let commitment_level: CommitmentLevel = (cr.commitment_level as i16) + .try_into() + .map_err(|_| tonic::Status::invalid_argument("commitment level is invalid"))?; info!( consumer_id = consumer_id, initital_offset_policy = ?initial_offset_policy, event_subscription_policy = ?event_subscription_policy, + commitment_level = ?commitment_level, ); - let req = SpawnGrpcConsumerReq { + let req: SpawnGrpcConsumerReq = SpawnGrpcConsumerReq { consumer_id: consumer_id.clone(), + consumer_ip, account_update_event_filter, tx_event_filter, buffer_capacity: None, offset_commit_interval: None, + timeline_translation_policy, + timeline_translation_allowed_lag: cr.ttp_maximum_slot_lag, + event_subscription_policy, + commitment_level, }; - let result = spawn_grpc_consumer( - Arc::clone(&self.session), - req, - initial_offset_policy, - event_subscription_policy, - ) - .await; + let result = + spawn_grpc_consumer(Arc::clone(&self.session), req, initial_offset_policy).await; match result { Ok(rx) => { @@ -664,7 +801,7 @@ impl YellowstoneLog for ScyllaYsLog { Err(e) => { error!(consumer_id=consumer_id, error = %e); Err(tonic::Status::internal(format!( - "({consumer_id})fail to spawn consumer" + "({consumer_id}) fail to spawn consumer" ))) } } @@ -674,11 +811,16 @@ impl YellowstoneLog for ScyllaYsLog { #[derive(Clone)] pub struct SpawnGrpcConsumerReq { pub consumer_id: ConsumerId, + pub consumer_ip: Option, pub account_update_event_filter: Option, pub tx_event_filter: Option, pub buffer_capacity: Option, pub offset_commit_interval: Option, + pub timeline_translation_policy: TimelineTranslationPolicy, + pub timeline_translation_allowed_lag: Option, + pub event_subscription_policy: EventSubscriptionPolicy, + pub commitment_level: CommitmentLevel, } type GrpcConsumerSender = mpsc::Sender>; @@ -688,15 +830,16 @@ async fn build_grpc_consumer_source( sender: GrpcConsumerSender, session: Arc, req: SpawnGrpcConsumerReq, - initial_offset_policy: InitialOffsetPolicy, - event_subscription_policy: EventSubscriptionPolicy, + initial_offset_policy: InitialOffset, is_new: bool, ) -> anyhow::Result { let (consumer_info, initial_shard_offsets) = assign_producer_to_consumer( Arc::clone(&session), req.consumer_id.clone(), + req.consumer_ip, initial_offset_policy, - event_subscription_policy, + req.event_subscription_policy, + req.commitment_level, is_new, ) .await?; @@ -754,21 +897,19 @@ async fn build_grpc_consumer_source( pub async fn spawn_grpc_consumer( session: Arc, req: SpawnGrpcConsumerReq, - initial_offset_policy: InitialOffsetPolicy, - event_subscription_policy: EventSubscriptionPolicy, + initial_offset_policy: InitialOffset, ) -> anyhow::Result { let original_req = req.clone(); let buffer_capacity = req .buffer_capacity .unwrap_or(DEFAULT_CONSUMER_STREAM_BUFFER_CAPACITY); let (sender, receiver) = mpsc::channel(buffer_capacity); - + const DEFAULT_ALLOWED_LAG: u32 = 10; let mut grpc_consumer_source = build_grpc_consumer_source( sender.clone(), Arc::clone(&session), req, initial_offset_policy, - event_subscription_policy, true, ) .await?; @@ -790,7 +931,22 @@ pub async fn spawn_grpc_consumer( .shard_iterators_slot .into_iter() .min() - .map(InitialOffsetPolicy::SlotApprox) + .map(|slot| { + let min_slot = match &original_req.timeline_translation_policy { + TimelineTranslationPolicy::AllowLag => { + let lag = original_req + .timeline_translation_allowed_lag + .unwrap_or(DEFAULT_ALLOWED_LAG); + slot - (lag as Slot) + } + TimelineTranslationPolicy::StrictSlot => slot, + }; + + InitialOffset::SlotApprox { + desired_slot: slot, + min_slot, + } + }) .unwrap_or(initial_offset_policy); grpc_consumer_source = build_grpc_consumer_source( @@ -798,7 +954,6 @@ pub async fn spawn_grpc_consumer( Arc::clone(&session), original_req.clone(), forged_offset_policy, - event_subscription_policy, false, ) .await @@ -872,7 +1027,7 @@ impl GrpcConsumerSource { let producer_id = self.consumer_info.producer_id; let consumer_id = self.consumer_info.consumer_id.to_owned(); let mut commit_offset_deadline = Instant::now() + self.offset_commit_interval; - + const PRINT_CONSUMER_SLOT_REACH_DELAY: Duration = Duration::from_secs(5); info!("Serving consumer: {:?}", consumer_id); self.shard_iterators @@ -881,6 +1036,7 @@ impl GrpcConsumerSource { let mut max_seen_slot = UNDEFINED_SLOT; let mut num_event_between_two_slots = 0; + let mut next_trace_schedule = Instant::now() + PRINT_CONSUMER_SLOT_REACH_DELAY; let mut t = Instant::now(); let mut next_producer_live_probing = Instant::now() + CHECK_PRODUCER_LIVENESS_DELAY; let mut producer_is_dead = false; @@ -896,7 +1052,10 @@ impl GrpcConsumerSource { ); } if max_seen_slot < block_chain_event.slot { - info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)"); + if next_trace_schedule.elapsed() > Duration::ZERO { + info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)"); + next_trace_schedule = Instant::now() + PRINT_CONSUMER_SLOT_REACH_DELAY; + } max_seen_slot = block_chain_event.slot; num_event_between_two_slots = 0; } @@ -933,9 +1092,6 @@ impl GrpcConsumerSource { self.consumer_info.producer_id, ) .await?; - if !producer_is_dead { - info!("producer {producer_id:?} is alive"); - } next_producer_live_probing = Instant::now() + CHECK_PRODUCER_LIVENESS_DELAY; } diff --git a/yellowstone-grpc-tools/src/scylladb/sink.rs b/yellowstone-grpc-tools/src/scylladb/sink.rs index f2a36833..b7041c3d 100644 --- a/yellowstone-grpc-tools/src/scylladb/sink.rs +++ b/yellowstone-grpc-tools/src/scylladb/sink.rs @@ -5,8 +5,8 @@ use { scylladb_batch_sent_inc, scylladb_batch_size_observe, scylladb_batchitem_sent_inc_by, }, types::{ - AccountUpdate, BlockchainEvent, ProducerId, ProducerInfo, ShardId, ShardOffset, - ShardPeriod, Slot, Transaction, SHARD_OFFSET_MODULO, UNDEFINED_SLOT, + AccountUpdate, BlockchainEvent, CommitmentLevel, ProducerId, ProducerInfo, ShardId, + ShardOffset, ShardPeriod, Slot, Transaction, SHARD_OFFSET_MODULO, UNDEFINED_SLOT, }, }, deepsize::DeepSizeOf, @@ -84,7 +84,8 @@ const TRY_ACQUIRE_PRODUCER_LOCK: &str = r###" const GET_PRODUCER_INFO_BY_ID: &str = r###" SELECT producer_id, - num_shards + num_shards, + commitment_level FROM producer_info WHERE producer_id = ? "###; @@ -136,6 +137,7 @@ pub struct ScyllaSinkConfig { pub linger: Duration, pub keyspace: String, pub ifname: Option, + pub commitment_level: CommitmentLevel, } #[allow(clippy::large_enum_variant)] @@ -728,6 +730,10 @@ impl ScyllaSink { .await? .unwrap_or_else(|| panic!("producer {:?} has not yet been registered", producer_id)); + if producer_info.commitment_level != config.commitment_level { + anyhow::bail!("Commitment level in configuration ({:?}) don't match producer info in database ({:?})", config.commitment_level, producer_info.commitment_level); + } + info!("Producer {producer_id:?} is registered"); let producer_lock = diff --git a/yellowstone-grpc-tools/src/scylladb/types.rs b/yellowstone-grpc-tools/src/scylladb/types.rs index 53265147..05c6c132 100644 --- a/yellowstone-grpc-tools/src/scylladb/types.rs +++ b/yellowstone-grpc-tools/src/scylladb/types.rs @@ -1,5 +1,6 @@ use { anyhow::{anyhow, Ok}, + core::fmt, deepsize::DeepSizeOf, scylla::{ cql_to_rust::{FromCqlVal, FromCqlValError}, @@ -97,6 +98,72 @@ impl FromCqlVal for BlockchainEventType { } } +#[derive(Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Copy)] +pub enum CommitmentLevel { + Processed = 0, + Confirmed = 1, + Finalized = 2, +} + +impl fmt::Display for CommitmentLevel { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CommitmentLevel::Processed => f.write_str("Processed"), + CommitmentLevel::Confirmed => f.write_str("Confirmed"), + CommitmentLevel::Finalized => f.write_str("Finalized"), + } + } +} + +impl From for i16 { + fn from(val: CommitmentLevel) -> Self { + match val { + CommitmentLevel::Processed => 0, + CommitmentLevel::Confirmed => 1, + CommitmentLevel::Finalized => 2, + } + } +} + +impl TryFrom for CommitmentLevel { + type Error = anyhow::Error; + + fn try_from(value: i16) -> Result { + match value { + 0 => Ok(CommitmentLevel::Processed), + 1 => Ok(CommitmentLevel::Confirmed), + 2 => Ok(CommitmentLevel::Finalized), + x => Err(anyhow!( + "Unknown CommitmentLevel equivalent for code {:?}", + x + )), + } + } +} + +impl SerializeCql for CommitmentLevel { + fn serialize<'b>( + &self, + typ: &scylla::frame::response::result::ColumnType, + writer: scylla::serialize::CellWriter<'b>, + ) -> Result< + scylla::serialize::writers::WrittenCellProof<'b>, + scylla::serialize::SerializationError, + > { + let x: i16 = (*self).into(); + SerializeCql::serialize(&x, typ, writer) + } +} + +impl FromCqlVal for CommitmentLevel { + fn from_cql(cql_val: CqlValue) -> Result { + match cql_val { + CqlValue::SmallInt(x) => x.try_into().map_err(|_| FromCqlValError::BadVal), + _ => Err(FromCqlValError::BadCqlType), + } + } +} + #[derive(SerializeRow, Clone, Debug, FromRow, DeepSizeOf, PartialEq)] pub struct BlockchainEvent { // Common @@ -971,6 +1038,7 @@ impl From for AccountUpdate { pub struct ProducerInfo { pub producer_id: ProducerId, pub num_shards: ShardId, + pub commitment_level: CommitmentLevel, } impl TryFrom for SubscribeUpdateAccount {