Skip to content

Commit

Permalink
Change how max offset is computed + support edge cases for period com…
Browse files Browse the repository at this point in the history
…mitment
  • Loading branch information
lvboudre committed May 14, 2024
1 parent 4eca14b commit 7d87921
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 342 deletions.
23 changes: 23 additions & 0 deletions yellowstone-grpc-tools/config-scylladb2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"prometheus": "127.0.0.1:8873",
"scylladb": {
"hostname": "localhost:9042",
"username": "cassandra",
"password": "cassandra"
},
"grpc2scylladb": {
"endpoint": "localhost:10000",
"x_token": "",
"request": {
"accounts": {
"my_filter": {
"owner": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"]
}
}
},
"batch_size_limit": 1000,
"linger": 10,
"keyspace": "solana",
"max_inflight_batch_delivery": 80
}
}
25 changes: 5 additions & 20 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ create table if not exists solana.producer_period_commit_log (
producer_id blob,
shard_id smallint,
period bigint,
created_at timestamp,
PRIMARY KEY(producer_id, shard_id, period)
) with CLUSTERING order by (shard_id desc, period desc);
updated_at timestamp,
PRIMARY KEY(producer_id, shard_id)
);

create table if not exists solana.shard_statistics (
shard_id smallint,
Expand Down Expand Up @@ -214,24 +214,9 @@ create table if not exists solana.log (
created_at timestamp,

primary key ((shard_id, period, producer_id), offset)
);

)
WITH CLUSTERING ORDER BY (offset desc);

create materialized view if not exists solana.shard_max_offset_mv
as
select
producer_id,
shard_id,
offset,
period
from solana.log
where
shard_id is not null
and offset is not null
and period is not null
and producer_id is not null
primary key((producer_id, shard_id), offset, period)
with clustering order by (offset desc, period desc);

create materialized view if not exists solana.slot_map_mv
as
Expand Down
15 changes: 10 additions & 5 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
scylla::{frame::Compression, Session, SessionBuilder},
std::{net::SocketAddr, sync::Arc, time::Duration},
tokio::time::Instant,
tracing::{info, warn},
tracing::{error, info, warn},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::subscribe_update::UpdateOneof, yellowstone::log::EventSubscriptionPolicy,
Expand Down Expand Up @@ -96,7 +96,7 @@ impl ArgsAction {
Arc::clone(&session),
"test",
InitialOffsetPolicy::Earliest,
EventSubscriptionPolicy::TransactionOnly,
EventSubscriptionPolicy::Both,
)
.await?;

Expand Down Expand Up @@ -187,7 +187,7 @@ impl ArgsAction {
None => unreachable!("Expect valid message"),
};

match message {
let result = match message {
UpdateOneof::Account(msg) => {
let acc_update = msg.clone().try_into();
if acc_update.is_err() {
Expand All @@ -199,18 +199,23 @@ impl ArgsAction {
continue;
}
// If the sink is close, let it crash...
sink.log_account_update(acc_update.unwrap()).await.unwrap();
sink.log_account_update(acc_update.unwrap()).await
}
UpdateOneof::Transaction(msg) => {
let tx: Result<Transaction, anyhow::Error> = msg.try_into();
if tx.is_err() {
warn!("failed to convert update tx: {:?}", tx.err().unwrap());
continue;
}
sink.log_transaction(tx.unwrap()).await.unwrap();
sink.log_transaction(tx.unwrap()).await
}
_ => continue,
};

if result.is_err() {
error!("errror detected in sink...");
break;
}
}
}
sink.shutdown().await
Expand Down
81 changes: 26 additions & 55 deletions yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ use {
common::{self, ConsumerId, ConsumerInfo, InitialOffsetPolicy},
shard_iterator::{ShardFilter, ShardIterator},
},
crate::scylladb::types::{
BlockchainEventType, ProducerId, ProducerInfo, ShardId, ShardOffset, MAX_PRODUCER,
MIN_PROCUDER,
crate::scylladb::{
sink,
types::{
BlockchainEventType, ProducerId, ProducerInfo, ShardId, ShardOffset, MAX_PRODUCER,
MIN_PROCUDER,
},
},
futures::Stream,
scylla::{
batch::{Batch, BatchType},
cql_to_rust::FromCqlVal,
prepared_statement::PreparedStatement,
query::Query,
transport::query_result::SingleRowTypedError,
Session,
},
Expand Down Expand Up @@ -293,37 +295,6 @@ pub async fn get_or_register_consumer(
Ok(cs)
}

fn build_offset_per_shard_query(num_shards: ShardId, ordering: &str) -> impl Into<Query> {
let shard_bind_markers = (0..num_shards)
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", ");

format!(
r###"
SELECT
shard_id,
offset
FROM shard_max_offset_mv
WHERE
producer_id = ?
AND shard_id IN ({shard_bind_markers})
ORDER BY offset {ordering}, period {ordering}
PER PARTITION LIMIT 1
"###,
shard_bind_markers = shard_bind_markers,
ordering = ordering
)
}

fn get_max_offset_per_shard_query(num_shards: ShardId) -> impl Into<Query> {
build_offset_per_shard_query(num_shards, "DESC")
}

fn get_min_offset_per_shard_query(num_shards: ShardId) -> impl Into<Query> {
build_offset_per_shard_query(num_shards, "ASC")
}

/// Sets the initial shard offsets for a newly created consumer based on [[`InitialOffsetPolicy`]].
///
/// Similar to seeking in a file, we can seek right at the beginning of the log, completly at the end or at first
Expand All @@ -342,22 +313,25 @@ async fn set_initial_consumer_shard_offsets(

let num_shards = producer_info.num_shards;

let shard_offsets_query_result = match initial_offset_policy {
let shard_offset_pairs = match initial_offset_policy {
InitialOffsetPolicy::Latest => {
session
.query(get_max_offset_per_shard_query(num_shards), (producer_id,))
.await?
}
InitialOffsetPolicy::Earliest => {
session
.query(get_min_offset_per_shard_query(num_shards), (producer_id,))
.await?
}
InitialOffsetPolicy::SlotApprox(slot) => {
session
.query(GET_MIN_OFFSET_FOR_SLOT, (slot, producer_id))
.await?
sink::get_max_shard_offsets_for_producer(
Arc::clone(&session),
producer_id,
num_shards as usize,
)
.await?
}
InitialOffsetPolicy::Earliest => repeat(0)
.take(num_shards as usize)
.enumerate()
.map(|(i, x)| (i as ShardId, x))
.collect::<Vec<_>>(),
InitialOffsetPolicy::SlotApprox(slot) => session
.query(GET_MIN_OFFSET_FOR_SLOT, (slot, producer_id))
.await?
.rows_typed_or_empty::<(ShardId, ShardOffset)>()
.collect::<Result<Vec<_>, _>>()?,
};

let adjustment = match initial_offset_policy {
Expand All @@ -368,19 +342,16 @@ async fn set_initial_consumer_shard_offsets(
let insert_consumer_offset_ps: PreparedStatement =
session.prepare(INSERT_CONSUMER_OFFSET).await?;

let rows = shard_offsets_query_result
.rows_typed_or_empty::<(ShardId, ShardOffset)>()
.collect::<Result<Vec<_>, _>>()?;

let mut batch = Batch::new(BatchType::Unlogged);
let mut buffer = Vec::with_capacity(rows.len());
let mut buffer = Vec::with_capacity(shard_offset_pairs.len());

let ev_types = get_blockchain_event_types(event_sub_policy);

ev_types
.into_iter()
.flat_map(|ev_type| {
rows.iter()
shard_offset_pairs
.iter()
.cloned()
.map(move |(shard_id, offset)| (ev_type, shard_id, offset))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ pub const GET_NEW_TRANSACTION_EVENT: &str = r###"

const PRODUCER_SHARD_PERIOD_COMMIT_EXISTS: &str = r###"
SELECT
producer_id
period
FROM producer_period_commit_log
WHERE
producer_id = ?
AND shard_id = ?
AND period = ?
"###;

/// Represents the state of a shard iterator, which is used to manage the iteration
Expand Down Expand Up @@ -203,12 +202,12 @@ impl ShardIterator {
let (sender, receiver) = oneshot::channel();
tokio::spawn(async move {
let result = session
.execute(&ps, (producer_id, shard_id, period))
.execute(&ps, (producer_id, shard_id))
.await
.expect("failed to query period commit state")
.maybe_first_row()
.maybe_first_row_typed::<(ShardPeriod,)>()
.expect("query not elligible to return rows")
.map(|_row| true)
.map(|row| row.0 >= period)
.unwrap_or(false);
sender.send(result).map_err(|_| ()).unwrap_or_else(|_| {
panic!(
Expand Down
Loading

0 comments on commit 7d87921

Please sign in to comment.