diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3f3d3bca..01e3f93b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -57,7 +57,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update - sudo apt-get install -y libsasl2-dev + sudo apt-get install -y libsasl2-dev protobuf-compiler - name: cargo tree run: | diff --git a/Cargo.lock b/Cargo.lock index fd300d89..7e29bfdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,6 +1290,22 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "etcd-client" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae697f3928e8c89ae6f4dcf788059f49fd01a76dc53e63628f5a33881f5715e" +dependencies = [ + "http", + "prost 0.12.3", + "tokio", + "tokio-stream", + "tonic 0.10.2", + "tonic-build", + "tower", + "tower-service", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -5576,6 +5592,7 @@ dependencies = [ "clap", "const-hex", "deepsize", + "etcd-client", "futures", "git-version", "google-cloud-googleapis", diff --git a/Cargo.toml b/Cargo.toml index 0e785070..1a0b79f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ const-hex = "1.6.2" crossbeam-channel = "0.5.8" deepsize = "0.2.0" env_logger = "0.10.0" +etcd-client = "0.12.4" futures = "0.3.24" git-version = "0.3.5" google-cloud-googleapis = "0.11.0" diff --git a/yellowstone-grpc-proto/proto/yellowstone-log.proto b/yellowstone-grpc-proto/proto/yellowstone-log.proto index 6a88b559..c453f123 100644 --- a/yellowstone-grpc-proto/proto/yellowstone-log.proto +++ b/yellowstone-grpc-proto/proto/yellowstone-log.proto @@ -8,9 +8,23 @@ option go_package = "github.com/rpcpool/solana-geyser-grpc/golang/proto"; package yellowstone.log; service YellowstoneLog { + rpc CreateStaticConsumerGroup(CreateStaticConsumerGroupRequest) returns (CreateStaticConsumerGroupResponse) {} rpc Consume(ConsumeRequest) returns (stream geyser.SubscribeUpdate) {} } +message CreateStaticConsumerGroupResponse { + string group_id = 1; +} + +message CreateStaticConsumerGroupRequest { + repeated string instance_id_list = 2; + repeated string redundancy_instance_id_list = 3; +} + +enum PartitionAssignmentStrategy { + STATIC = 0; +} + /// The InitialOffsetPolicy enum determines the initial offset used when subscribing to events or messages. It provides three options: /// @@ -96,20 +110,22 @@ enum TimelineTranslationPolicy { /// ttp_maximum_slot_lag (9) /// An optional uint32 indicating the maximum slot lag allowed for timeline translation. message ConsumeRequest { - optional string consumer_id = 1; + optional string consumer_group_id = 1; + optional string consumer_id = 2; + optional string instance_id = 3; - InitialOffsetPolicy initial_offset_policy = 2; - optional int64 at_slot = 3; + InitialOffsetPolicy initial_offset_policy = 4; + geyser.CommitmentLevel commitment_level = 5; + EventSubscriptionPolicy event_subscription_policy = 6; - EventSubscriptionPolicy event_subscription_policy = 4; - optional AccountUpdateEventFilter account_update_event_filter = 5; - optional TransactionEventFilter tx_event_filter = 6; - geyser.CommitmentLevel commitment_level = 7; + optional int64 at_slot = 7; + optional AccountUpdateEventFilter account_update_event_filter = 8; + optional TransactionEventFilter tx_event_filter = 9; // timelineTranslationPolicy is used when an ingester is out of service and we need to translate a set // of consumer to a different ingestion timeline. The policy describe what to do when we need to trigger timeline translation. - optional TimelineTranslationPolicy timelineTranslationPolicy = 8; - optional uint32 ttp_maximum_slot_lag = 9; + optional TimelineTranslationPolicy timelineTranslationPolicy = 10; + optional uint32 ttp_maximum_slot_lag = 11; } /// The AccountUpdateEventFilter message defines filters for account update events. It includes the following fields: diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 4b8d34ed..31a2ed49 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -30,6 +30,7 @@ chrono = { workspace = true, optional = true } clap = { workspace = true, features = ["derive"] } const-hex = { workspace = true, optional = true } deepsize = { workspace = true, optional = true } +etcd-client = { workspace = true, optional = true } futures = { workspace = true } google-cloud-googleapis = { workspace = true, optional = true } google-cloud-pubsub = { workspace = true, optional = true } @@ -75,4 +76,4 @@ vergen = { workspace = true, features = ["build", "rustc"] } default = ["google-pubsub", "kafka"] google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"] kafka = ["const-hex", "rdkafka", "sha2"] -scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror"] +scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror", "etcd-client"] diff --git a/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh b/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh index 0ffca32b..c7d0bd4b 100644 --- a/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh +++ b/yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh @@ -1,6 +1,18 @@ #!/usr/bin/bash -grpcurl -plaintext -import-path . -proto yellowstone-log.proto \ +# Subscripte to account update only +grpcurl \ + -max-msg-sz 10000000 \ + -plaintext -import-path . \ + -proto yellowstone-log.proto \ -d '{"initial_offset_policy": 0, "event_subscription_policy": 0 }' \ - '127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume \ No newline at end of file + '127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume + + +# Create a static consumer group +grpcurl \ + -plaintext -import-path . \ + -proto yellowstone-log.proto \ + -d '{"instance_id_list": ["a", "b"], "redundancy_instance_id_list": ["c", "d"] }' \ + '127.0.0.1:10001' yellowstone.log.YellowstoneLog.CreateStaticConsumerGroup \ No newline at end of file diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql index 4a620157..a0afdcd8 100644 --- a/yellowstone-grpc-tools/solana.cql +++ b/yellowstone-grpc-tools/solana.cql @@ -35,11 +35,28 @@ create table if not exists consumer_shard_offset ( with default_time_to_live = 3600; +create table if not exists consumer_groups ( + consumer_group_id blob, + group_type smallint, + + last_access_ip_address text, + + -- fields for static consumer group only + instance_id_shard_assignments frozen>>, + redundant_id_shard_assignments frozen>>, + + created_at timestamp, + updated_at timestamp, + + primary key (consumer_group_id) +); + create table if not exists consumer_info ( consumer_id text, producer_id blob, consumer_ip: text, subscribed_event_types frozen>, + last_connection timestamp, created_at timestamp, updated_at timestamp, PRIMARY KEY (consumer_id) diff --git a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs index 3612be42..a9004584 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-scylladb.rs @@ -23,12 +23,12 @@ use { config::{ Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo, }, - consumer::{ + sink::ScyllaSink, + types::{CommitmentLevel, Transaction}, + yellowstone_log::{ common::InitialOffset, grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq}, }, - sink::ScyllaSink, - types::{CommitmentLevel, Transaction}, }, setup_tracing, }, @@ -107,7 +107,7 @@ impl ArgsAction { .await?; let session = Arc::new(session); - let scylla_ys_log = ScyllaYsLog::new(session); + let scylla_ys_log = ScyllaYsLog::new(session).await?; let ys_log_server = YellowstoneLogServer::new(scylla_ys_log); println!("YellowstoneLogServer listening on {}", addr); diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/mod.rs b/yellowstone-grpc-tools/src/scylladb/consumer/mod.rs deleted file mode 100644 index 6d3b120c..00000000 --- a/yellowstone-grpc-tools/src/scylladb/consumer/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod common; -pub mod grpc; -mod shard_iterator; diff --git a/yellowstone-grpc-tools/src/scylladb/mod.rs b/yellowstone-grpc-tools/src/scylladb/mod.rs index 3ff178a6..6f321506 100644 --- a/yellowstone-grpc-tools/src/scylladb/mod.rs +++ b/yellowstone-grpc-tools/src/scylladb/mod.rs @@ -1,5 +1,5 @@ pub mod config; -pub mod consumer; pub mod prom; pub mod sink; pub mod types; +pub mod yellowstone_log; diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/common.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/common.rs similarity index 100% rename from yellowstone-grpc-tools/src/scylladb/consumer/common.rs rename to yellowstone-grpc-tools/src/scylladb/yellowstone_log/common.rs diff --git a/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_group/mod.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_group/mod.rs new file mode 100644 index 00000000..c426b23e --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_group/mod.rs @@ -0,0 +1 @@ +pub mod repo; diff --git a/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_group/repo.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_group/repo.rs new file mode 100644 index 00000000..a638ce11 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_group/repo.rs @@ -0,0 +1,153 @@ +use { + crate::scylladb::types::ShardId, + scylla::{ + cql_to_rust::{FromCqlVal, FromCqlValError}, + frame::response::result::CqlValue, + prepared_statement::PreparedStatement, + serialize::value::SerializeCql, + Session, + }, + std::{collections::BTreeMap, net::IpAddr, sync::Arc}, + uuid::Uuid, +}; + +const NUM_SHARDS: usize = 64; + +type ConsumerGroupId = Uuid; +type InstanceId = String; + +const CREATE_STATIC_CONSUMER_GROUP: &str = r###" + INSERT INTO consumer_groups ( + consumer_group_id, + group_type, + last_access_ip_address, + instance_id_shard_assignments, + redundant_id_shard_assignments, + created_at, + updated_at + ) + VALUES (?, ?, ?, ?, ?, currentTimestamp(), currentTimestamp()) +"###; + +#[derive(Clone, Debug, PartialEq, Eq, Copy)] +enum ConsumerGroupType { + Static = 0, +} + +impl TryFrom for ConsumerGroupType { + type Error = anyhow::Error; + + fn try_from(value: i16) -> Result { + match value { + 0 => Ok(ConsumerGroupType::Static), + x => Err(anyhow::anyhow!( + "Unknown ConsumerGroupType equivalent for {:?}", + x + )), + } + } +} + +impl From for i16 { + fn from(val: ConsumerGroupType) -> Self { + match val { + ConsumerGroupType::Static => 0, + } + } +} + +impl SerializeCql for ConsumerGroupType { + fn serialize<'b>( + &self, + typ: &scylla::frame::response::result::ColumnType, + writer: scylla::serialize::CellWriter<'b>, + ) -> Result< + scylla::serialize::writers::WrittenCellProof<'b>, + scylla::serialize::SerializationError, + > { + let x: i16 = (*self).into(); + SerializeCql::serialize(&x, typ, writer) + } +} + +impl FromCqlVal for ConsumerGroupType { + fn from_cql(cql_val: CqlValue) -> Result { + match cql_val { + CqlValue::SmallInt(x) => x.try_into().map_err(|_| FromCqlValError::BadVal), + _ => Err(FromCqlValError::BadCqlType), + } + } +} + +pub(crate) struct ConsumerGroupRepo { + session: Arc, + create_static_consumer_group_ps: PreparedStatement, +} + +fn assign_shards(ids: &[InstanceId], num_shards: usize) -> BTreeMap> { + let mut ids = ids.to_vec(); + ids.sort(); + + let num_parts_per_id = num_shards / ids.len(); + let shard_vec = (0..num_shards).map(|x| x as ShardId).collect::>(); + let chunk_it = shard_vec + .chunks(num_parts_per_id) + .into_iter() + .map(|chunk| chunk.iter().cloned().collect()); + + ids.into_iter().zip(chunk_it).collect() +} + +pub(crate) struct StaticConsumerGroupInfo { + pub(crate) consumer_group_id: ConsumerGroupId, + pub(crate) instance_id_assignments: BTreeMap>, + pub(crate) redundant_instance_id_assignments: BTreeMap>, +} + +impl ConsumerGroupRepo { + pub async fn new(session: Arc) -> anyhow::Result { + let create_static_consumer_group_ps = session.prepare(CREATE_STATIC_CONSUMER_GROUP).await?; + + let this = ConsumerGroupRepo { + session, + create_static_consumer_group_ps, + }; + + Ok(this) + } + + pub async fn create_static_consumer_group( + &self, + instance_ids: &[InstanceId], + redundant_instance_ids: &[InstanceId], + remote_ip_addr: Option, + ) -> anyhow::Result { + let consumer_group_id = Uuid::new_v4(); + anyhow::ensure!( + instance_ids.len() == redundant_instance_ids.len(), + "mismatch number if instance/redundant ids" + ); + let shard_assignments = assign_shards(&instance_ids, NUM_SHARDS); + let shard_assignments2 = assign_shards(&redundant_instance_ids, NUM_SHARDS); + self.session + .execute( + &self.create_static_consumer_group_ps, + ( + consumer_group_id.as_bytes(), + ConsumerGroupType::Static, + remote_ip_addr.map(|ipaddr| ipaddr.to_string()), + &shard_assignments, + &shard_assignments2, + ), + ) + .await?; + + let ret = StaticConsumerGroupInfo { + consumer_group_id, + instance_id_assignments: shard_assignments, + redundant_instance_id_assignments: shard_assignments2, + }; + + Ok(ret) + } +} diff --git a/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_source.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_source.rs new file mode 100644 index 00000000..4a2d00cf --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/consumer_source.rs @@ -0,0 +1,187 @@ +use { + crate::scylladb::{ + types::{BlockchainEvent, ConsumerInfo, ProducerId, ShardId, Slot, UNDEFINED_SLOT}, + yellowstone_log::shard_iterator::ShardIterator, + }, + core::fmt, + futures::future::try_join_all, + scylla::{ + batch::{Batch, BatchType}, + prepared_statement::PreparedStatement, + Session, + }, + std::{collections::BTreeMap, sync::Arc, time::Duration}, + thiserror::Error, + tokio::{ + sync::{ + mpsc, + oneshot::{self, error::TryRecvError}, + }, + time::Instant, + }, + tracing::{info, warn}, +}; + +const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); + +const FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD: Duration = Duration::from_millis(500); + +const UPDATE_CONSUMER_SHARD_OFFSET: &str = r###" + UPDATE consumer_shard_offset + SET offset = ?, slot = ?, updated_at = currentTimestamp() + WHERE + consumer_id = ? + AND producer_id = ? + AND shard_id = ? + AND event_type = ? +"###; + +pub(crate) struct ConsumerSource { + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender, + // The interval at which we want to commit our Offset progression to Scylla + offset_commit_interval: Duration, + shard_iterators: BTreeMap, + pub(crate) shard_iterators_slot: BTreeMap, + update_consumer_shard_offset_prepared_stmt: PreparedStatement, +} + +pub type InterruptSignal = oneshot::Receiver<()>; + +#[derive(Clone, Debug, PartialEq, Error, Eq, Copy)] +pub(crate) struct Interrupted; + +impl fmt::Display for Interrupted { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Interrupted") + } +} + +pub(crate) trait FromBlockchainEvent { + type Output; + + fn from(blockchain_event: BlockchainEvent) -> Self::Output; +} + +impl> ConsumerSource { + pub(crate) async fn new( + session: Arc, + consumer_info: ConsumerInfo, + sender: mpsc::Sender, + offset_commit_interval: Duration, + mut shard_iterators: Vec, + ) -> anyhow::Result { + let update_consumer_shard_offset_prepared_stmt = + session.prepare(UPDATE_CONSUMER_SHARD_OFFSET).await?; + // Prewarm every shard iterator + try_join_all(shard_iterators.iter_mut().map(|shard_it| shard_it.warm())).await?; + let shard_iterators_slot = shard_iterators + .iter() + .map(|shard_it| (shard_it.shard_id, UNDEFINED_SLOT)) + .collect(); + Ok(ConsumerSource { + session, + consumer_info, + sender, + offset_commit_interval, + shard_iterators: shard_iterators + .into_iter() + .map(|shard_it| (shard_it.shard_id, shard_it)) + .collect(), + shard_iterators_slot, + update_consumer_shard_offset_prepared_stmt, + }) + } + + pub(crate) fn producer_id(&self) -> ProducerId { + self.consumer_info.producer_id + } + + async fn update_consumer_shard_offsets(&self) -> anyhow::Result<()> { + let mut batch = Batch::new(BatchType::Unlogged); + let mut values = Vec::with_capacity(self.shard_iterators_slot.len()); + for (shard_id, shard_it) in self.shard_iterators.iter() { + values.push(( + shard_it.last_offset(), + self.shard_iterators_slot + .get(shard_id) + .expect("missing shard slot info"), + self.consumer_info.consumer_id.to_owned(), + self.consumer_info.producer_id, + shard_it.shard_id, + shard_it.event_type, + )); + batch.append_statement(self.update_consumer_shard_offset_prepared_stmt.clone()); + } + + self.session.batch(&batch, values).await?; + Ok(()) + } + + pub async fn run(&mut self, mut interrupt: InterruptSignal) -> anyhow::Result<()> { + let consumer_id = self.consumer_info.consumer_id.to_owned(); + let mut commit_offset_deadline = Instant::now() + self.offset_commit_interval; + const PRINT_CONSUMER_SLOT_REACH_DELAY: Duration = Duration::from_secs(5); + info!("Serving consumer: {:?}", consumer_id); + + let mut max_seen_slot = UNDEFINED_SLOT; + let mut num_event_between_two_slots = 0; + + let mut next_trace_schedule = Instant::now() + PRINT_CONSUMER_SLOT_REACH_DELAY; + let mut t = Instant::now(); + loop { + for (shard_id, shard_it) in self.shard_iterators.iter_mut() { + match interrupt.try_recv() { + Ok(_) => { + warn!("consumer {consumer_id} received an interrupted signal"); + self.update_consumer_shard_offsets().await?; + anyhow::bail!(Interrupted) + } + Err(TryRecvError::Closed) => anyhow::bail!("detected orphan consumer source"), + Err(TryRecvError::Empty) => (), + } + + let maybe = shard_it.try_next().await?; + + if let Some(block_chain_event) = maybe { + self.shard_iterators_slot + .insert(*shard_id, block_chain_event.slot); + if t.elapsed() >= FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD { + warn!( + "consumer {consumer_id} micro batch took {:?} to fetch.", + t.elapsed() + ); + } + if max_seen_slot < block_chain_event.slot { + if next_trace_schedule.elapsed() > Duration::ZERO { + info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)"); + next_trace_schedule = Instant::now() + PRINT_CONSUMER_SLOT_REACH_DELAY; + } + max_seen_slot = block_chain_event.slot; + num_event_between_two_slots = 0; + } + let t_send = Instant::now(); + + if self.sender.send(T::from(block_chain_event)).await.is_err() { + warn!("Consumer {consumer_id} closed its streaming half"); + return Ok(()); + } + let send_latency = t_send.elapsed(); + if send_latency >= CLIENT_LAG_WARN_THRESHOLD { + warn!("Slow read from consumer {consumer_id}, recorded latency: {send_latency:?}") + } + num_event_between_two_slots += 1; + t = Instant::now(); + } + } + // Every now and then, we commit where the consumer is loc + if commit_offset_deadline.elapsed() > Duration::ZERO { + let t = Instant::now(); + self.update_consumer_shard_offsets().await?; + info!("updated consumer shard offset in {:?}", t.elapsed()); + commit_offset_deadline = Instant::now() + self.offset_commit_interval; + } + } + } +} diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/grpc.rs similarity index 81% rename from yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs rename to yellowstone-grpc-tools/src/scylladb/yellowstone_log/grpc.rs index 486c2a15..fd60db57 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs +++ b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/grpc.rs @@ -1,14 +1,17 @@ use { super::{ common::InitialOffset, + consumer_group::repo::ConsumerGroupRepo, + consumer_source::{ConsumerSource, FromBlockchainEvent}, shard_iterator::{ShardFilter, ShardIterator}, }, crate::scylladb::{ sink, types::{ BlockchainEventType, CommitmentLevel, ConsumerId, ConsumerInfo, ConsumerShardOffset, - ProducerId, ProducerInfo, ShardId, ShardOffset, Slot, UNDEFINED_SLOT, + ProducerId, ProducerInfo, ShardId, ShardOffset, Slot, }, + yellowstone_log::consumer_source::Interrupted, }, chrono::{DateTime, TimeDelta, Utc}, core::fmt, @@ -31,7 +34,7 @@ use { time::Duration, }, thiserror::Error, - tokio::{sync::mpsc, time::Instant}, + tokio::sync::{mpsc, oneshot}, tokio_stream::wrappers::ReceiverStream, tonic::Response, tracing::{error, info, warn}, @@ -39,18 +42,13 @@ use { yellowstone_grpc_proto::{ geyser::{subscribe_update::UpdateOneof, SubscribeUpdate}, yellowstone::log::{ - yellowstone_log_server::YellowstoneLog, ConsumeRequest, EventSubscriptionPolicy, - TimelineTranslationPolicy, + yellowstone_log_server::YellowstoneLog, ConsumeRequest, + CreateStaticConsumerGroupRequest, CreateStaticConsumerGroupResponse, + EventSubscriptionPolicy, TimelineTranslationPolicy, }, }, }; -const CHECK_PRODUCER_LIVENESS_DELAY: Duration = Duration::from_millis(600); - -const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); - -const FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD: Duration = Duration::from_millis(500); - const DEFAULT_LAST_HEARTBEAT_TIME_DELTA: Duration = Duration::from_secs(10); const DEFAULT_OFFSET_COMMIT_INTERVAL: Duration = Duration::from_secs(10); @@ -346,6 +344,32 @@ async fn is_producer_still_alive( Ok(qr2.rows.is_some()) } +fn wait_for_producer_is_dead( + session: Arc, + producer_id: ProducerId, +) -> oneshot::Receiver<()> { + let (sender, receiver) = oneshot::channel(); + + tokio::spawn(async move { + let session = session; + loop { + let is_alive = is_producer_still_alive(Arc::clone(&session), producer_id) + .await + .expect("checking producer is alive failed"); + if !is_alive { + info!("producer {producer_id:?} is dead"); + sender + .send(()) + .expect(format!("the receiveing half closed while waiting for producer({producer_id:?}) liveness status").as_str()); + break; + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + + receiver +} + /// /// Returns the producer id with least consumer assignment. /// @@ -719,11 +743,16 @@ async fn set_initial_consumer_shard_offsets( pub struct ScyllaYsLog { session: Arc, + consumer_group_repo: ConsumerGroupRepo, } impl ScyllaYsLog { - pub fn new(session: Arc) -> Self { - ScyllaYsLog { session } + pub async fn new(session: Arc) -> anyhow::Result { + let consumer_group_repo = ConsumerGroupRepo::new(Arc::clone(&session)).await?; + Ok(ScyllaYsLog { + session, + consumer_group_repo, + }) } } @@ -734,6 +763,29 @@ impl YellowstoneLog for ScyllaYsLog { #[doc = r" Server streaming response type for the consume method."] type ConsumeStream = LogStream; + async fn create_static_consumer_group( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let remote_ip_addr = request.remote_addr().map(|addr| addr.ip()); + let request = request.into_inner(); + + let instance_ids = request.instance_id_list; + let redundant_instance_ids = request.redundancy_instance_id_list; + + let consumer_group_info = self + .consumer_group_repo + .create_static_consumer_group(&instance_ids, &redundant_instance_ids, remote_ip_addr) + .await + .map_err(|e| { + error!("create_static_consumer_group: {e:?}"); + tonic::Status::internal("failed to create consumer group") + })?; + Ok(Response::new(CreateStaticConsumerGroupResponse { + group_id: consumer_group_info.consumer_group_id.to_string(), + })) + } + async fn consume( &self, request: tonic::Request, @@ -825,6 +877,33 @@ pub struct SpawnGrpcConsumerReq { type GrpcConsumerSender = mpsc::Sender>; type GrpcConsumerReceiver = mpsc::Receiver>; +type GrpcEvent = Result; + +impl FromBlockchainEvent for GrpcEvent { + type Output = Self; + fn from(blockchain_event: crate::scylladb::types::BlockchainEvent) -> Self::Output { + let geyser_event = match blockchain_event.event_type { + BlockchainEventType::AccountUpdate => { + UpdateOneof::Account(blockchain_event.try_into().map_err(|e| { + error!(error=?e); + tonic::Status::internal("corrupted account update event in the stream") + })?) + } + BlockchainEventType::NewTransaction => { + UpdateOneof::Transaction(blockchain_event.try_into().map_err(|e| { + error!(error=?e); + tonic::Status::internal("corrupted new transaction event in the stream") + })?) + } + }; + let subscribe_update = SubscribeUpdate { + filters: Default::default(), + update_oneof: Some(geyser_event), + }; + + Ok(subscribe_update) + } +} async fn build_grpc_consumer_source( sender: GrpcConsumerSender, @@ -832,7 +911,7 @@ async fn build_grpc_consumer_source( req: SpawnGrpcConsumerReq, initial_offset_policy: InitialOffset, is_new: bool, -) -> anyhow::Result { +) -> anyhow::Result> { let (consumer_info, initial_shard_offsets) = assign_producer_to_consumer( Arc::clone(&session), req.consumer_id.clone(), @@ -882,7 +961,7 @@ async fn build_grpc_consumer_source( )) .await?; - let consumer = GrpcConsumerSource::new( + let consumer = ConsumerSource::new( consumer_session, consumer_info, sender, @@ -921,17 +1000,20 @@ pub async fn spawn_grpc_consumer( let sender = sender; let session = session; while !sender.is_closed() { - match grpc_consumer_source.run_forever().await { + let current_producer_id = grpc_consumer_source.producer_id(); + let interrupt_signal = + wait_for_producer_is_dead(Arc::clone(&session), current_producer_id); + + match grpc_consumer_source.run(interrupt_signal).await { Ok(_) => break, Err(e) => { warn!("Consumer {consumer_id} source has stop with {e:?}"); - if let Some(DeadProducerErr(_producer_id)) = e.downcast_ref::() - { + if let Some(Interrupted) = e.downcast_ref::() { let forged_offset_policy = grpc_consumer_source .shard_iterators_slot .into_iter() .min() - .map(|slot| { + .map(|(_shard_id, slot)| { let min_slot = match &original_req.timeline_translation_policy { TimelineTranslationPolicy::AllowLag => { let lag = original_req @@ -967,146 +1049,3 @@ pub async fn spawn_grpc_consumer( }); Ok(receiver) } - -struct GrpcConsumerSource { - session: Arc, - consumer_info: ConsumerInfo, - sender: mpsc::Sender>, - // The interval at which we want to commit our Offset progression to Scylla - offset_commit_interval: Duration, - shard_iterators: Vec, - shard_iterators_slot: Vec, - update_consumer_shard_offset_prepared_stmt: PreparedStatement, -} - -impl GrpcConsumerSource { - async fn new( - session: Arc, - consumer_info: ConsumerInfo, - sender: mpsc::Sender>, - offset_commit_interval: Duration, - mut shard_iterators: Vec, - ) -> anyhow::Result { - let update_consumer_shard_offset_prepared_stmt = - session.prepare(UPDATE_CONSUMER_SHARD_OFFSET).await?; - // Prewarm every shard iterator - try_join_all(shard_iterators.iter_mut().map(|shard_it| shard_it.warm())).await?; - let num_shard_iterators = shard_iterators.len(); - let shard_iterators_slot = vec![UNDEFINED_SLOT; num_shard_iterators]; - Ok(GrpcConsumerSource { - session, - consumer_info, - sender, - offset_commit_interval, - shard_iterators, - shard_iterators_slot, - update_consumer_shard_offset_prepared_stmt, - }) - } - - async fn update_consumer_shard_offsets(&self) -> anyhow::Result<()> { - let mut batch = Batch::new(BatchType::Unlogged); - let mut values = Vec::with_capacity(self.shard_iterators_slot.len()); - for (i, shard_it) in self.shard_iterators.iter().enumerate() { - values.push(( - shard_it.last_offset(), - self.shard_iterators_slot[i], - self.consumer_info.consumer_id.to_owned(), - self.consumer_info.producer_id, - shard_it.shard_id, - shard_it.event_type, - )); - batch.append_statement(self.update_consumer_shard_offset_prepared_stmt.clone()); - } - - self.session.batch(&batch, values).await?; - Ok(()) - } - - async fn run_forever(&mut self) -> anyhow::Result<()> { - let producer_id = self.consumer_info.producer_id; - let consumer_id = self.consumer_info.consumer_id.to_owned(); - let mut commit_offset_deadline = Instant::now() + self.offset_commit_interval; - const PRINT_CONSUMER_SLOT_REACH_DELAY: Duration = Duration::from_secs(5); - info!("Serving consumer: {:?}", consumer_id); - - self.shard_iterators - .sort_by_key(|it| (it.shard_id, it.event_type)); - - let mut max_seen_slot = UNDEFINED_SLOT; - let mut num_event_between_two_slots = 0; - - let mut next_trace_schedule = Instant::now() + PRINT_CONSUMER_SLOT_REACH_DELAY; - let mut t = Instant::now(); - let mut next_producer_live_probing = Instant::now() + CHECK_PRODUCER_LIVENESS_DELAY; - let mut producer_is_dead = false; - loop { - for (i, shard_it) in self.shard_iterators.iter_mut().enumerate() { - let maybe = shard_it.try_next().await?; - if let Some(block_chain_event) = maybe { - self.shard_iterators_slot[i] = block_chain_event.slot; - if t.elapsed() >= FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD { - warn!( - "consumer {consumer_id} micro batch took {:?} to fetch.", - t.elapsed() - ); - } - if max_seen_slot < block_chain_event.slot { - if next_trace_schedule.elapsed() > Duration::ZERO { - info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)"); - next_trace_schedule = Instant::now() + PRINT_CONSUMER_SLOT_REACH_DELAY; - } - max_seen_slot = block_chain_event.slot; - num_event_between_two_slots = 0; - } - let geyser_event = match block_chain_event.event_type { - BlockchainEventType::AccountUpdate => { - UpdateOneof::Account(block_chain_event.try_into()?) - } - BlockchainEventType::NewTransaction => { - UpdateOneof::Transaction(block_chain_event.try_into()?) - } - }; - let subscribe_update = SubscribeUpdate { - filters: Default::default(), - update_oneof: Some(geyser_event), - }; - let t_send = Instant::now(); - - if self.sender.send(Ok(subscribe_update)).await.is_err() { - warn!("Consumer {consumer_id} closed its streaming half"); - return Ok(()); - } - let send_latency = t_send.elapsed(); - if send_latency >= CLIENT_LAG_WARN_THRESHOLD { - warn!("Slow read from consumer {consumer_id}, recorded latency: {send_latency:?}") - } - num_event_between_two_slots += 1; - t = Instant::now(); - } - } - - if next_producer_live_probing.elapsed() > Duration::ZERO { - producer_is_dead = !is_producer_still_alive( - Arc::clone(&self.session), - self.consumer_info.producer_id, - ) - .await?; - next_producer_live_probing = Instant::now() + CHECK_PRODUCER_LIVENESS_DELAY; - } - - // Every now and then, we commit where the consumer is loc - if commit_offset_deadline.elapsed() > Duration::ZERO || producer_is_dead { - let t = Instant::now(); - self.update_consumer_shard_offsets().await?; - info!("updated consumer shard offset in {:?}", t.elapsed()); - commit_offset_deadline = Instant::now() + self.offset_commit_interval; - } - - if producer_is_dead { - warn!("Producer {producer_id:?} is considered dead"); - return Err(anyhow::Error::new(DeadProducerErr(producer_id))); - } - } - } -} diff --git a/yellowstone-grpc-tools/src/scylladb/yellowstone_log/mod.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/mod.rs new file mode 100644 index 00000000..6689e781 --- /dev/null +++ b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/mod.rs @@ -0,0 +1,5 @@ +pub mod common; +mod consumer_group; +mod consumer_source; +pub mod grpc; +pub mod shard_iterator; diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs b/yellowstone-grpc-tools/src/scylladb/yellowstone_log/shard_iterator.rs similarity index 100% rename from yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs rename to yellowstone-grpc-tools/src/scylladb/yellowstone_log/shard_iterator.rs