Skip to content

Commit

Permalink
tools: support grpc-scylladb consumer timeline-translation (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre authored May 23, 2024
1 parent c963bae commit 35e9476
Show file tree
Hide file tree
Showing 9 changed files with 655 additions and 510 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ serde_json = { workspace = true }
serde_with = { workspace = true, optional = true }
serde_yaml = { workspace = true }
sha2 = { workspace = true, optional = true }
thiserror = { workspace = true, optional = true }
tokio = { workspace = true, features = ["signal", "time"] }
tokio-stream = { workspace = true }
tonic = { workspace = true, features = ["gzip"] }
Expand Down Expand Up @@ -74,4 +75,4 @@ vergen = { workspace = true, features = ["build", "rustc"] }
default = ["google-pubsub", "kafka"]
google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"]
kafka = ["const-hex", "rdkafka", "sha2"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror"]
149 changes: 71 additions & 78 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
@@ -1,83 +1,77 @@
CREATE KEYSPACE solana WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true;
CREATE KEYSPACE solana2 WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true;

drop materialized view if exists solana.producer_consumer_mapping_mv;
drop materialized view if exists solana.slot_map;

drop table if exists solana.producer_slot_seen;
drop table if exists solana.shard_statistics;
drop table if exists solana.producer_info;
drop table if exists solana.consumer_info;
drop table if exists solana.consumer_producer_mapping;
drop table if exists solana.log;
drop type if exists solana.transaction_meta;
drop type if exists solana.message_addr_table_lookup;
drop type if exists solana.compiled_instr;
drop type if exists solana.tx_token_balance;
drop type if exists solana.reward;
drop type if exists solana.inner_instrs;
drop type if exists solana.inner_instr;
drop type if exists solana.return_data;


create table if not exists solana.consumer_info (

drop materialized view if exists producer_consumer_mapping_mv;
drop materialized view if exists slot_map;

drop table if exists producer_slot_seen;
drop table if exists shard_statistics;
drop table if exists producer_info;
drop table if exists consumer_shard_offset;
drop table if exists consumer_producer_mapping;
drop table if exists log;
drop type if exists transaction_meta;
drop type if exists message_addr_table_lookup;
drop type if exists compiled_instr;
drop type if exists tx_token_balance;
drop type if exists reward;
drop type if exists inner_instrs;
drop type if exists inner_instr;
drop type if exists return_data;


create table if not exists consumer_shard_offset (
consumer_id text,
producer_id blob,
shard_id smallint,
event_type smallint,
offset bigint,
slot bigint,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY ((consumer_id, producer_id), shard_id, event_type)
);


create table if not exists solana.consumer_producer_mapping (
create table if not exists consumer_info (
consumer_id text,
producer_id blob,
subscribed_event_types frozen<set<smallint>>,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (consumer_id)
);

create materialized view if not exists solana.producer_consumer_mapping_mv
create materialized view if not exists producer_consumer_mapping_mv
as
select
producer_id,
consumer_id
from solana.consumer_producer_mapping
from consumer_info
where
consumer_id is not null
and producer_id is not null
primary key (producer_id, consumer_id);

create table if not exists solana.producer_info (
create table if not exists producer_info (
producer_id blob,
num_shards smallint,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (producer_id)
);

create table if not exists solana.producer_lock (
create table if not exists producer_lock (
producer_id blob,
lock_id text,
ifname text,
ipv4 text,
created_at timestamp,
primary key (producer_id)
)

-- # example
insert into solana.producer_info (
producer_id,
num_shards,
created_at,
updated_at
)
values (0x00, 256, currentTimestamp(), currentTimestamp());

);

create table if not exists solana.producer_period_commit_log (
create table if not exists producer_period_commit_log (
producer_id blob,
shard_id smallint,
period bigint,
Expand All @@ -86,83 +80,71 @@ create table if not exists solana.producer_period_commit_log (
)
with clustering order by (period DESC);

create table if not exists solana.shard_statistics (
shard_id smallint,
period bigint,
producer_id blob,
offset bigint,
min_slot bigint,
max_slot bigint,
total_events bigint,
slot_event_counter map<bigint, int>,
primary key(shard_id, period, offset)
) with CLUSTERING order by (period desc, offset desc);

create type if not exists solana.message_addr_table_lookup (
create type if not exists message_addr_table_lookup (
account_key blob,
writable_indexes blob,
readonly_indexes blob
);

create type if not exists solana.compiled_instr (
create type if not exists compiled_instr (
program_id_index bigint,
accounts blob,
data blob
);

create type if not exists solana.inner_instr (
create type if not exists inner_instr (
program_id_index bigint,
accounts blob,
data blob,
stack_height bigint
);

create type if not exists solana.inner_instrs (
create type if not exists inner_instrs (
"index" bigint,
instructions frozen<list<solana.inner_instr>>
instructions frozen<list<inner_instr>>
);

create type if not exists solana.ui_token_amount (
create type if not exists ui_token_amount (
ui_amount double,
decimals bigint,
amount text,
ui_amount_string text
);

create type if not exists solana.tx_token_balance (
create type if not exists tx_token_balance (
account_index bigint,
mint text, --varchar(44)
ui_token_amount frozen<solana.ui_token_amount>,
ui_token_amount frozen<ui_token_amount>,
owner text, --varchar(44)
program_id text,
);

create type if not exists solana.reward (
create type if not exists reward (
pubkey text, -- varchar(44)
lamports bigint,
post_balance bigint,
reward_type int, --Fee, Rent, Staking, Voting
commission text
);

create type if not exists solana.return_data (
create type if not exists return_data (
program_id blob,
data blob
);

create type if not exists solana.transaction_meta (
create type if not exists transaction_meta (
error blob,
fee bigint,
pre_balances frozen<list<bigint>>,
post_balances frozen<list<bigint>>,
inner_instructions frozen<list<solana.inner_instrs>>,
inner_instructions frozen<list<inner_instrs>>,
log_messages frozen<list<text>>,
pre_token_balances frozen<list<solana.tx_token_balance>>,
post_token_balances frozen<list<solana.tx_token_balance>>,
rewards frozen<list<solana.reward>>,
pre_token_balances frozen<list<tx_token_balance>>,
post_token_balances frozen<list<tx_token_balance>>,
rewards frozen<list<reward>>,
loaded_writable_addresses frozen<list<blob>>,
loaded_readonly_addresses frozen<list<blob>>,
return_data frozen<solana.return_data>,
return_data frozen<return_data>,
compute_units_consumed bigint
);

Expand All @@ -171,7 +153,7 @@ create type if not exists solana.transaction_meta (
-- There is not performance advantage to have separate tables since ScyllaDB is wide-column family database.
-- ScyllaDB is built to have sparse columns (alot of unused columns)
-- On each query, the storage engine only retrieves what matters to the query.
create table if not exists solana.log (
create table if not exists log (

-- commun columns
shard_id smallint,
Expand Down Expand Up @@ -202,10 +184,10 @@ create table if not exists solana.log (
num_readonly_unsigned_accounts int,
account_keys frozen<list<blob>>,
recent_blockhash blob,
instructions frozen<list<solana.compiled_instr>>,
instructions frozen<list<compiled_instr>>,
versioned boolean,
address_table_lookups frozen<list<solana.message_addr_table_lookup>>,
meta solana.transaction_meta,
address_table_lookups frozen<list<message_addr_table_lookup>>,
meta transaction_meta,
is_vote boolean,
tx_index bigint,

Expand All @@ -217,16 +199,26 @@ create table if not exists solana.log (
)
WITH CLUSTERING ORDER BY (offset desc);

create table if not exists producer_slot_seen (
producer_id blob,
slot bigint,
shard_offset_map frozen<set<tuple<smallint, bigint>>>,
created_at timestamp,
primary key (producer_id, slot)
)
with clustering order by (slot DESC);



create materialized view if not exists solana.slot_map_mv
create materialized view if not exists slot_map_mv
as
select
slot,
producer_id,
shard_id,
period,
offset
from solana.log
from log
where
slot is not null
and producer_id is not null
Expand All @@ -236,10 +228,11 @@ where
primary key (slot, producer_id, shard_id, period, offset);


create table if not exists solana.producer_slot_seen (
producer_id blob,
slot bigint,
created_at timestamp,
primary key (producer_id, slot)
)
with clustering order by (slot DESC);
-- clear all table

truncate log;
truncate producer_period_commit_log;
truncate producer_slot_seen;

truncate consumer_info;
truncate consumer_shard_offset;
9 changes: 3 additions & 6 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use {
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::subscribe_update::UpdateOneof,
yellowstone::log::{
yellowstone_log_server::{self, YellowstoneLog, YellowstoneLogServer},
EventSubscriptionPolicy,
},
yellowstone::log::{yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy},
},
yellowstone_grpc_tools::{
config::{load as config_load, GrpcRequestToProto},
Expand Down Expand Up @@ -249,8 +246,8 @@ impl ArgsAction {
_ => continue,
};

if result.is_err() {
error!("errror detected in sink...");
if let Err(e) = result {
error!("error detected in sink: {e}");
break;
}
}
Expand Down
8 changes: 3 additions & 5 deletions yellowstone-grpc-tools/src/scylladb/consumer/common.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::scylladb::types::{BlockchainEventType, ProducerId, ShardId, ShardOffset};
use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset};

pub type OldShardOffset = ShardOffset;

pub type ConsumerId = String;

///
/// Initial position in the log when creating a new consumer.
///
#[derive(Default, Debug, Clone, Copy)]
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum InitialOffsetPolicy {
Earliest,
#[default]
Expand All @@ -18,6 +16,6 @@ pub enum InitialOffsetPolicy {
pub struct ConsumerInfo {
pub consumer_id: ConsumerId,
pub producer_id: ProducerId,
pub initital_shard_offsets: Vec<(ShardId, BlockchainEventType, ShardOffset)>,
//pub initital_shard_offsets: Vec<ConsumerShardOffset>,
pub subscribed_blockchain_event_types: Vec<BlockchainEventType>,
}
Loading

0 comments on commit 35e9476

Please sign in to comment.