Skip to content

Commit

Permalink
add commitment level supports for consumer + better metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre committed May 27, 2024
1 parent a641007 commit 66ded1e
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 114 deletions.
90 changes: 90 additions & 0 deletions yellowstone-grpc-proto/proto/yellowstone-log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,122 @@ service YellowstoneLog {
}


/// The InitialOffsetPolicy enum determines the initial offset used when subscribing to events or messages. It provides three options:
///
/// EARLIEST (0)
/// This policy subscribes to events or messages starting from the earliest available offset in the data stream. It ensures that all historical data is consumed from the beginning.
///
/// LATEST (1)
/// Subscribes to events or messages starting from the latest available offset in the data stream. It only consumes new events or messages generated after the subscription is initiated.
///
/// SLOT (2)
/// This policy subscribes to events or messages starting from a specific slot number in the data stream. It allows for precise control over where consumption begins based on slot numbers.
enum InitialOffsetPolicy {
EARLIEST = 0;
LATEST = 1;
SLOT = 2;
}


/// The EventSubscriptionPolicy enum defines the types of events to subscribe to. It offers three options:
///
/// ACCOUNT_UPDATE_ONLY (0)
/// Subscribes to account update events exclusively. It filters out other types of events, focusing solely on account-related updates.
///
/// TRANSACTION_ONLY (1)
/// Subscribes to transaction events exclusively. It filters out non-transactional events, ensuring that only transaction-related data is consumed.
///
/// BOTH (2)
/// This policy subscribes to both account update and transaction events. It enables consumption of a wider range of event types, encompassing both account-related updates and transactions.
enum EventSubscriptionPolicy {
ACCOUNT_UPDATE_ONLY = 0;
TRANSACTION_ONLY = 1;
BOTH = 2;
}

/// Timeline Translation Policy
/// The TimelineTranslationPolicy enum defines different strategies for translating users to timelines based on their last seen slot number or a specified lag. This is particularly useful for managing data consumption in conjunction with the ConsumeRequest.ttp_maximum_slot_lag parameter.
///
/// ALLOW_LAG
/// This policy allows users to be translated to a timeline that contains the last seen slot number or up to a certain lag. It extends the allowable lag for eligible timelines.
///
/// Example:
///
/// Suppose we have three timelines with the latest available slot numbers:
///
/// timeline1 : 10
/// timeline2 : 5
/// timeline3 : 8
/// If a consumer is assigned to timeline1 with ttp_maximum_slot_lag set to 2, then the only eligible destination timeline would be timeline3.
///
/// STRICT_SLOT
/// Under this policy, eligible destination timelines must contain the last seen slot number in the current consumer timeline; otherwise, the translation fails. This ensures strict adherence to slot numbers when translating users to timelines.
enum TimelineTranslationPolicy {
ALLOW_LAG = 0;
STRICT_SLOT = 1;
}

/// The ConsumeRequest message defines parameters for consuming events or messages from a data stream. It includes the following fields:
///
/// consumer_id (1)
/// An optional string representing the consumer's unique identifier.
///
/// initial_offset_policy (2)
/// Specifies the initial offset policy for subscribing to events. It uses values from the InitialOffsetPolicy enum.
///
/// at_slot (3)
/// An optional int64 indicating the specific slot number from which consumption should start. This is relevant when initial_offset_policy is set to SLOT.
///
/// event_subscription_policy (4)
/// Defines the event subscription policy using values from the EventSubscriptionPolicy enum.
///
/// account_update_event_filter (5)
/// An optional AccountUpdateEventFilter message specifying filters for account update events.
///
/// tx_event_filter (6)
/// An optional TransactionEventFilter message defining filters for transaction events.
///
/// commitment_level (7)
/// Specifies the commitment level for consuming events. It uses values from the geyser.CommitmentLevel enum.
///
/// timelineTranslationPolicy (8)
/// An optional TimelineTranslationPolicy describing the policy for triggering timeline translation when an ingester is out of service and consumers need to be translated to a different ingestion timeline.
///
/// ttp_maximum_slot_lag (9)
/// An optional uint32 indicating the maximum slot lag allowed for timeline translation.
message ConsumeRequest {
optional string consumer_id = 1;

InitialOffsetPolicy initial_offset_policy = 2;
optional int64 at_slot = 3;

EventSubscriptionPolicy event_subscription_policy = 4;
optional AccountUpdateEventFilter account_update_event_filter = 5;
optional TransactionEventFilter tx_event_filter = 6;
geyser.CommitmentLevel commitment_level = 7;

// timelineTranslationPolicy is used when an ingester is out of service and we need to translate a set
// of consumer to a different ingestion timeline. The policy describe what to do when we need to trigger timeline translation.
optional TimelineTranslationPolicy timelineTranslationPolicy = 8;
optional uint32 ttp_maximum_slot_lag = 9;
}

/// The AccountUpdateEventFilter message defines filters for account update events. It includes the following fields:
///
/// pubkeys (1)
/// A repeated field of bytes representing public keys. Events matching any of these public keys will be included in the filtered results.
///
/// owners (2)
/// A repeated field of bytes representing account owners. Events matching any of these account owners will be included in the filtered results.
message AccountUpdateEventFilter {
repeated bytes pubkeys = 1;
repeated bytes owners = 2;
}

/// The TransactionEventFilter message specifies filters for transaction events. It contains the following field:

/// account_keys (1)
/// A repeated field of bytes representing account keys. Events associated with any of these account keys will be included in the filtered results.
message TransactionEventFilter {
repeated bytes account_keys = 1;
}
32 changes: 14 additions & 18 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ create table if not exists consumer_shard_offset (
created_at timestamp,
updated_at timestamp,
PRIMARY KEY ((consumer_id, producer_id), shard_id, event_type)
);
)
with default_time_to_live = 3600;


create table if not exists consumer_info (
consumer_id text,
producer_id blob,
consumer_ip: text,
subscribed_event_types frozen<set<smallint>>,
created_at timestamp,
updated_at timestamp,
Expand All @@ -56,6 +58,7 @@ primary key (producer_id, consumer_id);

create table if not exists producer_info (
producer_id blob,
commitment smallint,
num_shards smallint,
created_at timestamp,
updated_at timestamp,
Expand Down Expand Up @@ -192,7 +195,8 @@ create table if not exists log (
primary key ((shard_id, period, producer_id), offset)
)
WITH CLUSTERING ORDER BY (offset desc)
AND default_time_to_live = 86400; -- 24 hours
AND default_time_to_live = 86400
and compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'MINUTES', 'compaction_window_size' : 10};

create table if not exists producer_slot_seen (
producer_id blob,
Expand All @@ -204,22 +208,14 @@ create table if not exists producer_slot_seen (
with clustering order by (slot DESC)
AND default_time_to_live = 82800; -- 23 hours

create materialized view if not exists slot_map_mv
as
select
slot,
producer_id,
shard_id,
period,
offset
from log
where
slot is not null
and producer_id is not null
and shard_id is not null
and period is not null
and offset is not null
primary key (slot, producer_id, shard_id, period, offset);
CREATE materialized VIEW if not exists slot_producer_seen_mv
AS
SELECT slot, producer_id FROM producer_slot_seen
WHERE
producer_id is not null
and slot is not null
PRIMARY KEY (slot, producer_id);



create table if not exists producer_period_commit_log (
Expand Down
22 changes: 12 additions & 10 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use {
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::subscribe_update::UpdateOneof,
yellowstone::log::{yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy},
yellowstone::log::{
yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy,
TimelineTranslationPolicy,
},
},
yellowstone_grpc_tools::{
config::{load as config_load, GrpcRequestToProto},
Expand All @@ -21,11 +24,11 @@ use {
Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo,
},
consumer::{
common::InitialOffsetPolicy,
common::InitialOffset,
grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq},
},
sink::ScyllaSink,
types::Transaction,
types::{CommitmentLevel, Transaction},
},
setup_tracing,
},
Expand Down Expand Up @@ -136,18 +139,17 @@ impl ArgsAction {
let session = Arc::new(session);
let req = SpawnGrpcConsumerReq {
consumer_id: String::from("test"),
consumer_ip: None,
account_update_event_filter: None,
tx_event_filter: None,
buffer_capacity: None,
offset_commit_interval: None,
event_subscription_policy: EventSubscriptionPolicy::Both,
commitment_level: CommitmentLevel::Processed,
timeline_translation_policy: TimelineTranslationPolicy::AllowLag,
timeline_translation_allowed_lag: None,
};
let mut rx = spawn_grpc_consumer(
session,
req,
InitialOffsetPolicy::Earliest,
EventSubscriptionPolicy::Both,
)
.await?;
let mut rx = spawn_grpc_consumer(session, req, InitialOffset::Earliest).await?;

let mut print_tx_secs = Instant::now() + Duration::from_secs(1);
let mut num_events = 0;
Expand Down
7 changes: 6 additions & 1 deletion yellowstone-grpc-tools/src/scylladb/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
super::sink::ScyllaSinkConfig,
super::{sink::ScyllaSinkConfig, types::CommitmentLevel},
crate::config::ConfigGrpcRequest,
serde::Deserialize,
serde_with::{serde_as, DurationMilliSeconds},
Expand Down Expand Up @@ -97,6 +97,11 @@ impl ConfigGrpc2ScyllaDB {
linger: self.linger,
keyspace: self.keyspace.clone(),
ifname: self.ifname.to_owned(),
commitment_level: match self.request.commitment.expect("Missing commitment level") {
crate::config::ConfigGrpcRequestCommitment::Processed => CommitmentLevel::Processed,
crate::config::ConfigGrpcRequestCommitment::Confirmed => CommitmentLevel::Confirmed,
crate::config::ConfigGrpcRequestCommitment::Finalized => CommitmentLevel::Finalized,
},
}
}
}
9 changes: 6 additions & 3 deletions yellowstone-grpc-tools/src/scylladb/consumer/common.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset};
use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset, Slot};

pub type OldShardOffset = ShardOffset;

///
/// Initial position in the log when creating a new consumer.
///
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum InitialOffsetPolicy {
pub enum InitialOffset {
Earliest,
#[default]
Latest,
SlotApprox(i64),
SlotApprox {
desired_slot: Slot,
min_slot: Slot,
},
}

pub struct ConsumerInfo {
Expand Down
Loading

0 comments on commit 66ded1e

Please sign in to comment.