Skip to content

Commit

Permalink
tools: scylla added ttl + fixed some slot location safeguards (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre authored May 24, 2024
1 parent 35e9476 commit a641007
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 63 deletions.
30 changes: 17 additions & 13 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,12 @@ create table if not exists producer_lock (
lock_id text,
ifname text,
ipv4 text,
is_ready boolean,
minimum_shard_offset frozen<set<tuple<smallint, bigint, bigint>>>,
created_at timestamp,
primary key (producer_id)
);

create table if not exists producer_period_commit_log (
producer_id blob,
shard_id smallint,
period bigint,
created_at timestamp,
PRIMARY KEY((producer_id, shard_id), period)
)
with clustering order by (period DESC);

create type if not exists message_addr_table_lookup (
account_key blob,
Expand Down Expand Up @@ -197,7 +191,8 @@ create table if not exists log (

primary key ((shard_id, period, producer_id), offset)
)
WITH CLUSTERING ORDER BY (offset desc);
WITH CLUSTERING ORDER BY (offset desc)
AND default_time_to_live = 86400; -- 24 hours

create table if not exists producer_slot_seen (
producer_id blob,
Expand All @@ -206,9 +201,8 @@ create table if not exists producer_slot_seen (
created_at timestamp,
primary key (producer_id, slot)
)
with clustering order by (slot DESC);


with clustering order by (slot DESC)
AND default_time_to_live = 82800; -- 23 hours

create materialized view if not exists slot_map_mv
as
Expand All @@ -228,11 +222,21 @@ where
primary key (slot, producer_id, shard_id, period, offset);


create table if not exists producer_period_commit_log (
producer_id blob,
shard_id smallint,
period bigint,
created_at timestamp,
PRIMARY KEY((producer_id, shard_id), period)
) with clustering order by (period desc)
AND default_time_to_live = 82800; -- 23 hours

-- clear all table

truncate log;
truncate producer_period_commit_log;
truncate producer_slot_seen;
truncate producer_lock;

truncate consumer_info;
truncate consumer_shard_offset;
truncate consumer_shard_offset;
85 changes: 45 additions & 40 deletions yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use {
sink,
types::{
BlockchainEventType, ConsumerId, ConsumerInfo, ConsumerShardOffset, ProducerId,
ProducerInfo, ShardId, ShardOffset, ShardPeriod, Slot, SHARD_OFFSET_MODULO,
UNDEFINED_SLOT,
ProducerInfo, ShardId, ShardOffset, Slot, UNDEFINED_SLOT,
},
},
chrono::{DateTime, TimeDelta, Utc},
Expand Down Expand Up @@ -43,6 +42,8 @@ use {
},
};

type ProducerLockId = String;

const CHECK_PRODUCER_LIVENESS_DELAY: Duration = Duration::from_millis(600);

const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250);
Expand Down Expand Up @@ -126,6 +127,8 @@ const LIST_PRODUCERS_WITH_LOCK: &str = r###"
SELECT
producer_id
FROM producer_lock
WHERE is_ready = true
ALLOW FILTERING
"###;

const GET_PRODUCERS_CONSUMER_COUNT: &str = r###"
Expand Down Expand Up @@ -430,39 +433,21 @@ async fn assign_producer_to_consumer(
async fn get_min_offset_for_producer(
session: Arc<Session>,
producer_id: ProducerId,
num_shards: usize,
) -> anyhow::Result<Vec<(ShardId, ShardOffset)>> {
let shard_id_list = (0..num_shards)
.map(|x| format!("{x}"))
.collect::<Vec<_>>()
.join(", ");
let query = format!(
r###"
SELECT
shard_id,
period
FROM producer_period_commit_log
WHERE
producer_id = ?
AND shard_id in ({shard_id_list})
ORDER BY period ASC
PER PARTITION LIMIT 1
"###
);

) -> anyhow::Result<Vec<(ShardId, ShardOffset, Slot)>> {
session
.query(query, (producer_id,))
.query(
"SELECT minimum_shard_offset FROM producer_lock WHERE producer_id = ?",
(producer_id,),
)
.await?
.rows_typed::<(ShardId, ShardPeriod)>()?
.map(|result| {
result
.map(|(shard_id, period)| (shard_id, (period * SHARD_OFFSET_MODULO) as ShardOffset))
})
.collect::<Result<Vec<_>, _>>()
.map_err(anyhow::Error::new)
.first_row_typed::<(Option<Vec<(ShardId, ShardOffset, Slot)>>,)>()?
.0
.ok_or(anyhow::anyhow!(
"Producer lock exists, but its minimum shard offset is not set."
))
}

async fn get_slot_shard_offsets2(
async fn get_slot_shard_offsets(
session: Arc<Session>,
slot: Slot,
producer_id: ProducerId,
Expand Down Expand Up @@ -515,20 +500,40 @@ async fn set_initial_consumer_shard_offsets(
.await?
}
InitialOffsetPolicy::Earliest => {
get_min_offset_for_producer(Arc::clone(&session), producer_id, num_shards as usize)
.await?
.into_iter()
.map(|(shard_id, shard_offset)| (shard_id, shard_offset, UNDEFINED_SLOT))
.collect::<Vec<_>>()
get_min_offset_for_producer(Arc::clone(&session), producer_id).await?
}
InitialOffsetPolicy::SlotApprox(slot) => {
get_slot_shard_offsets2(Arc::clone(&session), slot, producer_id, num_shards)
.await?
.ok_or(ImpossibleSlotOffset(slot))?
let minium_producer_offsets =
get_min_offset_for_producer(Arc::clone(&session), producer_id)
.await?
.into_iter()
.map(|(shard_id, shard_offset, slot)| (shard_id, (shard_offset, slot)))
.collect::<BTreeMap<_, _>>();

let shard_offsets_contain_slot =
get_slot_shard_offsets(Arc::clone(&session), slot, producer_id, num_shards)
.await?
.ok_or(ImpossibleSlotOffset(slot))?;

let are_shard_offset_reachable =
shard_offsets_contain_slot
.iter()
.all(|(shard_id, offset1, _)| {
minium_producer_offsets
.get(shard_id)
.filter(|(offset2, _)| offset1 > offset2)
.is_some()
});

if !are_shard_offset_reachable {
anyhow::bail!(ImpossibleSlotOffset(slot))
}

shard_offsets_contain_slot
}
};

if shard_offset_pairs.is_empty() {
if shard_offset_pairs.len() != (num_shards as usize) {
anyhow::bail!("Producer {producer_id:?} shard offsets is incomplete {new_consumer_id}");
}

Expand Down
80 changes: 70 additions & 10 deletions yellowstone-grpc-tools/src/scylladb/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use {
frame::Compression,
FromRow, Session, SessionBuilder,
},
std::{collections::BTreeMap, net::IpAddr, sync::Arc, time::Duration},
std::{
collections::{BTreeMap, BTreeSet},
net::IpAddr,
sync::Arc,
time::Duration,
},
tokio::{
sync::mpsc::{error::SendError, Permit},
task::{JoinError, JoinHandle},
Expand Down Expand Up @@ -71,8 +76,8 @@ const DROP_PRODUCER_LOCK: &str = r###"
"###;

const TRY_ACQUIRE_PRODUCER_LOCK: &str = r###"
INSERT INTO producer_lock (producer_id, lock_id, ifname, ipv4, created_at)
VALUES (?, ?, ?, ?, currentTimestamp())
INSERT INTO producer_lock (producer_id, lock_id, ifname, ipv4, is_ready, minimum_shard_offset, created_at)
VALUES (?, ?, ?, ?, false, null, currentTimestamp())
IF NOT EXISTS
"###;

Expand Down Expand Up @@ -500,15 +505,20 @@ fn spawn_round_robin(
let h: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let insert_slot_ps = session.prepare(INSERT_PRODUCER_SLOT).await?;

// One hour worth of slots
const SLOT_SEEN_RETENTION: usize = 9000;
//session.execute(&insert_slot_ps, (producer_id,)).await?;

let iterator = shard_handles.iter().enumerate().cycle();
info!("Started round robin router");
let mut msg_between_slot = 0;
let mut max_slot_seen = -1;
let mut time_since_new_max_slot = Instant::now();
let mut background_commit_max_slot_seen =
let mut background_commit_slot_seen =
tokio::spawn(future::ready(Ok::<(), anyhow::Error>(())));

let mut slots_seen = BTreeSet::<Slot>::new();

for (i, shard_sender) in iterator {
let msg = receiver.recv().await.unwrap_or(ShardCommand::Shutdown);

Expand All @@ -521,13 +531,22 @@ fn spawn_round_robin(
ShardCommand::InsertAccountUpdate(x) => x.slot,
ShardCommand::InsertTransaction(x) => x.slot,
};
if max_slot_seen < slot {
max_slot_seen = slot;

if slots_seen.insert(slot) {
while slots_seen.len() >= SLOT_SEEN_RETENTION {
slots_seen.pop_first();
}

if max_slot_seen > slot {
warn!("Slot {slot} arrived late after seeing {max_slot_seen}");
} else {
max_slot_seen = slot;
}
let time_elapsed_between_last_max_slot = time_since_new_max_slot.elapsed();
// We only commit every 3 slot number

let t = Instant::now();
background_commit_max_slot_seen.await??;
background_commit_slot_seen.await??;

let session = Arc::clone(&session);
let insert_slot_ps = insert_slot_ps.clone();
Expand All @@ -536,7 +555,7 @@ fn spawn_round_robin(
.map(|sh| (sh.shard_id, sh.get_last_committed_offset()))
.collect::<Vec<_>>();

background_commit_max_slot_seen = tokio::spawn(async move {
background_commit_slot_seen = tokio::spawn(async move {
session
.execute(&insert_slot_ps, (producer_id, slot, shard_offset_pairs))
.await?;
Expand Down Expand Up @@ -657,6 +676,35 @@ async fn try_acquire_lock(
}
}

async fn set_minimum_producer_offsets(
session: Arc<Session>,
producer_lock: &ProducerLock,
minimum_shard_offsets: &[(ShardId, ShardOffset, Slot)],
) -> anyhow::Result<()> {
let ps = session
.prepare(
r###"
UPDATE producer_lock
SET minimum_shard_offset = ?, is_ready = true
WHERE
producer_id = ?
IF EXISTS
"###,
)
.await?;

let lwt = session
.execute(&ps, (minimum_shard_offsets, producer_lock.producer_id))
.await?
.first_row_typed::<LwtSuccess>()?;

if let LwtSuccess(false) = lwt {
anyhow::bail!("Producer lock is corrupted, it may be cause by concurrent lock acquisition");
}

Ok(())
}

impl ScyllaSink {
pub async fn new(
config: ScyllaSinkConfig,
Expand Down Expand Up @@ -689,12 +737,24 @@ impl ScyllaSink {

let shard_count = producer_info.num_shards as usize;

info!("init producer {producer_id:?} period commit log successful.");

// On init, we collect where the producer left = max shard offsets
// Where we left of, it becomes new earliest offset available.
// This is to prevent
let shard_offsets =
get_max_shard_offsets_for_producer(Arc::clone(&session), producer_id, shard_count)
.await?;

let result =
set_minimum_producer_offsets(Arc::clone(&session), &producer_lock, &shard_offsets)
.await;
if let Err(e) = result {
let result2 = producer_lock.release().await;
if let Err(e2) = result2 {
error!("Releasing lock failed during error handling: {e2:?}");
}
anyhow::bail!(e);
}

info!("Got back last offsets of all {shard_count} shards");
let mut shard_handles = Vec::with_capacity(shard_count);
for (shard_id, last_offset, _slot) in shard_offsets.into_iter() {
Expand Down

0 comments on commit a641007

Please sign in to comment.