diff --git a/yellowstone-grpc-tools/solana.cql b/yellowstone-grpc-tools/solana.cql index 176782ca..3af1703e 100644 --- a/yellowstone-grpc-tools/solana.cql +++ b/yellowstone-grpc-tools/solana.cql @@ -67,18 +67,12 @@ create table if not exists producer_lock ( lock_id text, ifname text, ipv4 text, + is_ready boolean, + minimum_shard_offset frozen>>, created_at timestamp, primary key (producer_id) ); -create table if not exists producer_period_commit_log ( - producer_id blob, - shard_id smallint, - period bigint, - created_at timestamp, - PRIMARY KEY((producer_id, shard_id), period) -) -with clustering order by (period DESC); create type if not exists message_addr_table_lookup ( account_key blob, @@ -197,7 +191,8 @@ create table if not exists log ( primary key ((shard_id, period, producer_id), offset) ) -WITH CLUSTERING ORDER BY (offset desc); +WITH CLUSTERING ORDER BY (offset desc) + AND default_time_to_live = 86400; -- 24 hours create table if not exists producer_slot_seen ( producer_id blob, @@ -206,9 +201,8 @@ create table if not exists producer_slot_seen ( created_at timestamp, primary key (producer_id, slot) ) -with clustering order by (slot DESC); - - +with clustering order by (slot DESC) + AND default_time_to_live = 82800; -- 23 hours create materialized view if not exists slot_map_mv as @@ -228,11 +222,21 @@ where primary key (slot, producer_id, shard_id, period, offset); +create table if not exists producer_period_commit_log ( + producer_id blob, + shard_id smallint, + period bigint, + created_at timestamp, + PRIMARY KEY((producer_id, shard_id), period) +) with clustering order by (period desc) + AND default_time_to_live = 82800; -- 23 hours + -- clear all table truncate log; truncate producer_period_commit_log; truncate producer_slot_seen; +truncate producer_lock; truncate consumer_info; -truncate consumer_shard_offset; +truncate consumer_shard_offset; \ No newline at end of file diff --git a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs index 45452f64..9b66b934 100644 --- a/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs +++ b/yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs @@ -7,8 +7,7 @@ use { sink, types::{ BlockchainEventType, ConsumerId, ConsumerInfo, ConsumerShardOffset, ProducerId, - ProducerInfo, ShardId, ShardOffset, ShardPeriod, Slot, SHARD_OFFSET_MODULO, - UNDEFINED_SLOT, + ProducerInfo, ShardId, ShardOffset, Slot, UNDEFINED_SLOT, }, }, chrono::{DateTime, TimeDelta, Utc}, @@ -43,6 +42,8 @@ use { }, }; +type ProducerLockId = String; + const CHECK_PRODUCER_LIVENESS_DELAY: Duration = Duration::from_millis(600); const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250); @@ -126,6 +127,8 @@ const LIST_PRODUCERS_WITH_LOCK: &str = r###" SELECT producer_id FROM producer_lock + WHERE is_ready = true + ALLOW FILTERING "###; const GET_PRODUCERS_CONSUMER_COUNT: &str = r###" @@ -430,39 +433,21 @@ async fn assign_producer_to_consumer( async fn get_min_offset_for_producer( session: Arc, producer_id: ProducerId, - num_shards: usize, -) -> anyhow::Result> { - let shard_id_list = (0..num_shards) - .map(|x| format!("{x}")) - .collect::>() - .join(", "); - let query = format!( - r###" - SELECT - shard_id, - period - FROM producer_period_commit_log - WHERE - producer_id = ? - AND shard_id in ({shard_id_list}) - ORDER BY period ASC - PER PARTITION LIMIT 1 - "### - ); - +) -> anyhow::Result> { session - .query(query, (producer_id,)) + .query( + "SELECT minimum_shard_offset FROM producer_lock WHERE producer_id = ?", + (producer_id,), + ) .await? - .rows_typed::<(ShardId, ShardPeriod)>()? - .map(|result| { - result - .map(|(shard_id, period)| (shard_id, (period * SHARD_OFFSET_MODULO) as ShardOffset)) - }) - .collect::, _>>() - .map_err(anyhow::Error::new) + .first_row_typed::<(Option>,)>()? + .0 + .ok_or(anyhow::anyhow!( + "Producer lock exists, but its minimum shard offset is not set." + )) } -async fn get_slot_shard_offsets2( +async fn get_slot_shard_offsets( session: Arc, slot: Slot, producer_id: ProducerId, @@ -515,20 +500,40 @@ async fn set_initial_consumer_shard_offsets( .await? } InitialOffsetPolicy::Earliest => { - get_min_offset_for_producer(Arc::clone(&session), producer_id, num_shards as usize) - .await? - .into_iter() - .map(|(shard_id, shard_offset)| (shard_id, shard_offset, UNDEFINED_SLOT)) - .collect::>() + get_min_offset_for_producer(Arc::clone(&session), producer_id).await? } InitialOffsetPolicy::SlotApprox(slot) => { - get_slot_shard_offsets2(Arc::clone(&session), slot, producer_id, num_shards) - .await? - .ok_or(ImpossibleSlotOffset(slot))? + let minium_producer_offsets = + get_min_offset_for_producer(Arc::clone(&session), producer_id) + .await? + .into_iter() + .map(|(shard_id, shard_offset, slot)| (shard_id, (shard_offset, slot))) + .collect::>(); + + let shard_offsets_contain_slot = + get_slot_shard_offsets(Arc::clone(&session), slot, producer_id, num_shards) + .await? + .ok_or(ImpossibleSlotOffset(slot))?; + + let are_shard_offset_reachable = + shard_offsets_contain_slot + .iter() + .all(|(shard_id, offset1, _)| { + minium_producer_offsets + .get(shard_id) + .filter(|(offset2, _)| offset1 > offset2) + .is_some() + }); + + if !are_shard_offset_reachable { + anyhow::bail!(ImpossibleSlotOffset(slot)) + } + + shard_offsets_contain_slot } }; - if shard_offset_pairs.is_empty() { + if shard_offset_pairs.len() != (num_shards as usize) { anyhow::bail!("Producer {producer_id:?} shard offsets is incomplete {new_consumer_id}"); } diff --git a/yellowstone-grpc-tools/src/scylladb/sink.rs b/yellowstone-grpc-tools/src/scylladb/sink.rs index 162b784a..f2a36833 100644 --- a/yellowstone-grpc-tools/src/scylladb/sink.rs +++ b/yellowstone-grpc-tools/src/scylladb/sink.rs @@ -21,7 +21,12 @@ use { frame::Compression, FromRow, Session, SessionBuilder, }, - std::{collections::BTreeMap, net::IpAddr, sync::Arc, time::Duration}, + std::{ + collections::{BTreeMap, BTreeSet}, + net::IpAddr, + sync::Arc, + time::Duration, + }, tokio::{ sync::mpsc::{error::SendError, Permit}, task::{JoinError, JoinHandle}, @@ -71,8 +76,8 @@ const DROP_PRODUCER_LOCK: &str = r###" "###; const TRY_ACQUIRE_PRODUCER_LOCK: &str = r###" - INSERT INTO producer_lock (producer_id, lock_id, ifname, ipv4, created_at) - VALUES (?, ?, ?, ?, currentTimestamp()) + INSERT INTO producer_lock (producer_id, lock_id, ifname, ipv4, is_ready, minimum_shard_offset, created_at) + VALUES (?, ?, ?, ?, false, null, currentTimestamp()) IF NOT EXISTS "###; @@ -500,6 +505,8 @@ fn spawn_round_robin( let h: JoinHandle> = tokio::spawn(async move { let insert_slot_ps = session.prepare(INSERT_PRODUCER_SLOT).await?; + // One hour worth of slots + const SLOT_SEEN_RETENTION: usize = 9000; //session.execute(&insert_slot_ps, (producer_id,)).await?; let iterator = shard_handles.iter().enumerate().cycle(); @@ -507,8 +514,11 @@ fn spawn_round_robin( let mut msg_between_slot = 0; let mut max_slot_seen = -1; let mut time_since_new_max_slot = Instant::now(); - let mut background_commit_max_slot_seen = + let mut background_commit_slot_seen = tokio::spawn(future::ready(Ok::<(), anyhow::Error>(()))); + + let mut slots_seen = BTreeSet::::new(); + for (i, shard_sender) in iterator { let msg = receiver.recv().await.unwrap_or(ShardCommand::Shutdown); @@ -521,13 +531,22 @@ fn spawn_round_robin( ShardCommand::InsertAccountUpdate(x) => x.slot, ShardCommand::InsertTransaction(x) => x.slot, }; - if max_slot_seen < slot { - max_slot_seen = slot; + + if slots_seen.insert(slot) { + while slots_seen.len() >= SLOT_SEEN_RETENTION { + slots_seen.pop_first(); + } + + if max_slot_seen > slot { + warn!("Slot {slot} arrived late after seeing {max_slot_seen}"); + } else { + max_slot_seen = slot; + } let time_elapsed_between_last_max_slot = time_since_new_max_slot.elapsed(); // We only commit every 3 slot number let t = Instant::now(); - background_commit_max_slot_seen.await??; + background_commit_slot_seen.await??; let session = Arc::clone(&session); let insert_slot_ps = insert_slot_ps.clone(); @@ -536,7 +555,7 @@ fn spawn_round_robin( .map(|sh| (sh.shard_id, sh.get_last_committed_offset())) .collect::>(); - background_commit_max_slot_seen = tokio::spawn(async move { + background_commit_slot_seen = tokio::spawn(async move { session .execute(&insert_slot_ps, (producer_id, slot, shard_offset_pairs)) .await?; @@ -657,6 +676,35 @@ async fn try_acquire_lock( } } +async fn set_minimum_producer_offsets( + session: Arc, + producer_lock: &ProducerLock, + minimum_shard_offsets: &[(ShardId, ShardOffset, Slot)], +) -> anyhow::Result<()> { + let ps = session + .prepare( + r###" + UPDATE producer_lock + SET minimum_shard_offset = ?, is_ready = true + WHERE + producer_id = ? + IF EXISTS + "###, + ) + .await?; + + let lwt = session + .execute(&ps, (minimum_shard_offsets, producer_lock.producer_id)) + .await? + .first_row_typed::()?; + + if let LwtSuccess(false) = lwt { + anyhow::bail!("Producer lock is corrupted, it may be cause by concurrent lock acquisition"); + } + + Ok(()) +} + impl ScyllaSink { pub async fn new( config: ScyllaSinkConfig, @@ -689,12 +737,24 @@ impl ScyllaSink { let shard_count = producer_info.num_shards as usize; - info!("init producer {producer_id:?} period commit log successful."); - + // On init, we collect where the producer left = max shard offsets + // Where we left of, it becomes new earliest offset available. + // This is to prevent let shard_offsets = get_max_shard_offsets_for_producer(Arc::clone(&session), producer_id, shard_count) .await?; + let result = + set_minimum_producer_offsets(Arc::clone(&session), &producer_lock, &shard_offsets) + .await; + if let Err(e) = result { + let result2 = producer_lock.release().await; + if let Err(e2) = result2 { + error!("Releasing lock failed during error handling: {e2:?}"); + } + anyhow::bail!(e); + } + info!("Got back last offsets of all {shard_count} shards"); let mut shard_handles = Vec::with_capacity(shard_count); for (shard_id, last_offset, _slot) in shard_offsets.into_iter() {