Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add commitment level supports for consumer + better metadata #350

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions yellowstone-grpc-proto/proto/yellowstone-log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,122 @@ service YellowstoneLog {
}


/// The InitialOffsetPolicy enum determines the initial offset used when subscribing to events or messages. It provides three options:
///
/// EARLIEST (0)
/// This policy subscribes to events or messages starting from the earliest available offset in the data stream. It ensures that all historical data is consumed from the beginning.
///
/// LATEST (1)
/// Subscribes to events or messages starting from the latest available offset in the data stream. It only consumes new events or messages generated after the subscription is initiated.
///
/// SLOT (2)
/// This policy subscribes to events or messages starting from a specific slot number in the data stream. It allows for precise control over where consumption begins based on slot numbers.
enum InitialOffsetPolicy {
EARLIEST = 0;
LATEST = 1;
SLOT = 2;
}


/// The EventSubscriptionPolicy enum defines the types of events to subscribe to. It offers three options:
///
/// ACCOUNT_UPDATE_ONLY (0)
/// Subscribes to account update events exclusively. It filters out other types of events, focusing solely on account-related updates.
///
/// TRANSACTION_ONLY (1)
/// Subscribes to transaction events exclusively. It filters out non-transactional events, ensuring that only transaction-related data is consumed.
///
/// BOTH (2)
/// This policy subscribes to both account update and transaction events. It enables consumption of a wider range of event types, encompassing both account-related updates and transactions.
enum EventSubscriptionPolicy {
ACCOUNT_UPDATE_ONLY = 0;
TRANSACTION_ONLY = 1;
BOTH = 2;
}

/// Timeline Translation Policy
/// The TimelineTranslationPolicy enum defines different strategies for translating users to timelines based on their last seen slot number or a specified lag. This is particularly useful for managing data consumption in conjunction with the ConsumeRequest.ttp_maximum_slot_lag parameter.
///
/// ALLOW_LAG
/// This policy allows users to be translated to a timeline that contains the last seen slot number or up to a certain lag. It extends the allowable lag for eligible timelines.
///
/// Example:
///
/// Suppose we have three timelines with the latest available slot numbers:
///
/// timeline1 : 10
/// timeline2 : 5
/// timeline3 : 8
/// If a consumer is assigned to timeline1 with ttp_maximum_slot_lag set to 2, then the only eligible destination timeline would be timeline3.
///
/// STRICT_SLOT
/// Under this policy, eligible destination timelines must contain the last seen slot number in the current consumer timeline; otherwise, the translation fails. This ensures strict adherence to slot numbers when translating users to timelines.
enum TimelineTranslationPolicy {
ALLOW_LAG = 0;
STRICT_SLOT = 1;
}

/// The ConsumeRequest message defines parameters for consuming events or messages from a data stream. It includes the following fields:
///
/// consumer_id (1)
/// An optional string representing the consumer's unique identifier.
///
/// initial_offset_policy (2)
/// Specifies the initial offset policy for subscribing to events. It uses values from the InitialOffsetPolicy enum.
///
/// at_slot (3)
/// An optional int64 indicating the specific slot number from which consumption should start. This is relevant when initial_offset_policy is set to SLOT.
///
/// event_subscription_policy (4)
/// Defines the event subscription policy using values from the EventSubscriptionPolicy enum.
///
/// account_update_event_filter (5)
/// An optional AccountUpdateEventFilter message specifying filters for account update events.
///
/// tx_event_filter (6)
/// An optional TransactionEventFilter message defining filters for transaction events.
///
/// commitment_level (7)
/// Specifies the commitment level for consuming events. It uses values from the geyser.CommitmentLevel enum.
///
/// timelineTranslationPolicy (8)
/// An optional TimelineTranslationPolicy describing the policy for triggering timeline translation when an ingester is out of service and consumers need to be translated to a different ingestion timeline.
///
/// ttp_maximum_slot_lag (9)
/// An optional uint32 indicating the maximum slot lag allowed for timeline translation.
message ConsumeRequest {
optional string consumer_id = 1;

InitialOffsetPolicy initial_offset_policy = 2;
optional int64 at_slot = 3;

EventSubscriptionPolicy event_subscription_policy = 4;
optional AccountUpdateEventFilter account_update_event_filter = 5;
optional TransactionEventFilter tx_event_filter = 6;
geyser.CommitmentLevel commitment_level = 7;

// timelineTranslationPolicy is used when an ingester is out of service and we need to translate a set
// of consumer to a different ingestion timeline. The policy describe what to do when we need to trigger timeline translation.
optional TimelineTranslationPolicy timelineTranslationPolicy = 8;
optional uint32 ttp_maximum_slot_lag = 9;
}

/// The AccountUpdateEventFilter message defines filters for account update events. It includes the following fields:
///
/// pubkeys (1)
/// A repeated field of bytes representing public keys. Events matching any of these public keys will be included in the filtered results.
///
/// owners (2)
/// A repeated field of bytes representing account owners. Events matching any of these account owners will be included in the filtered results.
message AccountUpdateEventFilter {
repeated bytes pubkeys = 1;
repeated bytes owners = 2;
}

/// The TransactionEventFilter message specifies filters for transaction events. It contains the following field:

/// account_keys (1)
/// A repeated field of bytes representing account keys. Events associated with any of these account keys will be included in the filtered results.
message TransactionEventFilter {
repeated bytes account_keys = 1;
}
32 changes: 14 additions & 18 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ create table if not exists consumer_shard_offset (
created_at timestamp,
updated_at timestamp,
PRIMARY KEY ((consumer_id, producer_id), shard_id, event_type)
);
)
with default_time_to_live = 3600;


create table if not exists consumer_info (
consumer_id text,
producer_id blob,
consumer_ip: text,
subscribed_event_types frozen<set<smallint>>,
created_at timestamp,
updated_at timestamp,
Expand All @@ -56,6 +58,7 @@ primary key (producer_id, consumer_id);

create table if not exists producer_info (
producer_id blob,
commitment smallint,
num_shards smallint,
created_at timestamp,
updated_at timestamp,
Expand Down Expand Up @@ -192,7 +195,8 @@ create table if not exists log (
primary key ((shard_id, period, producer_id), offset)
)
WITH CLUSTERING ORDER BY (offset desc)
AND default_time_to_live = 86400; -- 24 hours
AND default_time_to_live = 86400
and compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'MINUTES', 'compaction_window_size' : 10};

create table if not exists producer_slot_seen (
producer_id blob,
Expand All @@ -204,22 +208,14 @@ create table if not exists producer_slot_seen (
with clustering order by (slot DESC)
AND default_time_to_live = 82800; -- 23 hours

create materialized view if not exists slot_map_mv
as
select
slot,
producer_id,
shard_id,
period,
offset
from log
where
slot is not null
and producer_id is not null
and shard_id is not null
and period is not null
and offset is not null
primary key (slot, producer_id, shard_id, period, offset);
CREATE materialized VIEW if not exists slot_producer_seen_mv
AS
SELECT slot, producer_id FROM producer_slot_seen
WHERE
producer_id is not null
and slot is not null
PRIMARY KEY (slot, producer_id);



create table if not exists producer_period_commit_log (
Expand Down
22 changes: 12 additions & 10 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use {
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::subscribe_update::UpdateOneof,
yellowstone::log::{yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy},
yellowstone::log::{
yellowstone_log_server::YellowstoneLogServer, EventSubscriptionPolicy,
TimelineTranslationPolicy,
},
},
yellowstone_grpc_tools::{
config::{load as config_load, GrpcRequestToProto},
Expand All @@ -21,11 +24,11 @@ use {
Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo,
},
consumer::{
common::InitialOffsetPolicy,
common::InitialOffset,
grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq},
},
sink::ScyllaSink,
types::Transaction,
types::{CommitmentLevel, Transaction},
},
setup_tracing,
},
Expand Down Expand Up @@ -136,18 +139,17 @@ impl ArgsAction {
let session = Arc::new(session);
let req = SpawnGrpcConsumerReq {
consumer_id: String::from("test"),
consumer_ip: None,
account_update_event_filter: None,
tx_event_filter: None,
buffer_capacity: None,
offset_commit_interval: None,
event_subscription_policy: EventSubscriptionPolicy::Both,
commitment_level: CommitmentLevel::Processed,
timeline_translation_policy: TimelineTranslationPolicy::AllowLag,
timeline_translation_allowed_lag: None,
};
let mut rx = spawn_grpc_consumer(
session,
req,
InitialOffsetPolicy::Earliest,
EventSubscriptionPolicy::Both,
)
.await?;
let mut rx = spawn_grpc_consumer(session, req, InitialOffset::Earliest).await?;

let mut print_tx_secs = Instant::now() + Duration::from_secs(1);
let mut num_events = 0;
Expand Down
7 changes: 6 additions & 1 deletion yellowstone-grpc-tools/src/scylladb/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
super::sink::ScyllaSinkConfig,
super::{sink::ScyllaSinkConfig, types::CommitmentLevel},
crate::config::ConfigGrpcRequest,
serde::Deserialize,
serde_with::{serde_as, DurationMilliSeconds},
Expand Down Expand Up @@ -97,6 +97,11 @@ impl ConfigGrpc2ScyllaDB {
linger: self.linger,
keyspace: self.keyspace.clone(),
ifname: self.ifname.to_owned(),
commitment_level: match self.request.commitment.expect("Missing commitment level") {
crate::config::ConfigGrpcRequestCommitment::Processed => CommitmentLevel::Processed,
crate::config::ConfigGrpcRequestCommitment::Confirmed => CommitmentLevel::Confirmed,
crate::config::ConfigGrpcRequestCommitment::Finalized => CommitmentLevel::Finalized,
},
}
}
}
9 changes: 6 additions & 3 deletions yellowstone-grpc-tools/src/scylladb/consumer/common.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset};
use crate::scylladb::types::{BlockchainEventType, ConsumerId, ProducerId, ShardOffset, Slot};

pub type OldShardOffset = ShardOffset;

///
/// Initial position in the log when creating a new consumer.
///
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum InitialOffsetPolicy {
pub enum InitialOffset {
Earliest,
#[default]
Latest,
SlotApprox(i64),
SlotApprox {
desired_slot: Slot,
min_slot: Slot,
},
}

pub struct ConsumerInfo {
Expand Down
Loading
Loading