Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: decoupled grpc from consumer_source #351

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const-hex = "1.6.2"
crossbeam-channel = "0.5.8"
deepsize = "0.2.0"
env_logger = "0.10.0"
etcd-client = "0.12.4"
futures = "0.3.24"
git-version = "0.3.5"
google-cloud-googleapis = "0.11.0"
Expand Down
34 changes: 25 additions & 9 deletions yellowstone-grpc-proto/proto/yellowstone-log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,23 @@ option go_package = "github.com/rpcpool/solana-geyser-grpc/golang/proto";
package yellowstone.log;

service YellowstoneLog {
rpc CreateStaticConsumerGroup(CreateStaticConsumerGroupRequest) returns (CreateStaticConsumerGroupResponse) {}
rpc Consume(ConsumeRequest) returns (stream geyser.SubscribeUpdate) {}
}

message CreateStaticConsumerGroupResponse {
string group_id = 1;
}

message CreateStaticConsumerGroupRequest {
repeated string instance_id_list = 2;
repeated string redundancy_instance_id_list = 3;
}

enum PartitionAssignmentStrategy {
STATIC = 0;
}


/// The InitialOffsetPolicy enum determines the initial offset used when subscribing to events or messages. It provides three options:
///
Expand Down Expand Up @@ -96,20 +110,22 @@ enum TimelineTranslationPolicy {
/// ttp_maximum_slot_lag (9)
/// An optional uint32 indicating the maximum slot lag allowed for timeline translation.
message ConsumeRequest {
optional string consumer_id = 1;
optional string consumer_group_id = 1;
optional string consumer_id = 2;
optional string instance_id = 3;

InitialOffsetPolicy initial_offset_policy = 2;
optional int64 at_slot = 3;
InitialOffsetPolicy initial_offset_policy = 4;
geyser.CommitmentLevel commitment_level = 5;
EventSubscriptionPolicy event_subscription_policy = 6;

EventSubscriptionPolicy event_subscription_policy = 4;
optional AccountUpdateEventFilter account_update_event_filter = 5;
optional TransactionEventFilter tx_event_filter = 6;
geyser.CommitmentLevel commitment_level = 7;
optional int64 at_slot = 7;
optional AccountUpdateEventFilter account_update_event_filter = 8;
optional TransactionEventFilter tx_event_filter = 9;

// timelineTranslationPolicy is used when an ingester is out of service and we need to translate a set
// of consumer to a different ingestion timeline. The policy describe what to do when we need to trigger timeline translation.
optional TimelineTranslationPolicy timelineTranslationPolicy = 8;
optional uint32 ttp_maximum_slot_lag = 9;
optional TimelineTranslationPolicy timelineTranslationPolicy = 10;
optional uint32 ttp_maximum_slot_lag = 11;
}

/// The AccountUpdateEventFilter message defines filters for account update events. It includes the following fields:
Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ chrono = { workspace = true, optional = true }
clap = { workspace = true, features = ["derive"] }
const-hex = { workspace = true, optional = true }
deepsize = { workspace = true, optional = true }
etcd-client = { workspace = true, optional = true }
futures = { workspace = true }
google-cloud-googleapis = { workspace = true, optional = true }
google-cloud-pubsub = { workspace = true, optional = true }
Expand Down Expand Up @@ -75,4 +76,4 @@ vergen = { workspace = true, features = ["build", "rustc"] }
default = ["google-pubsub", "kafka"]
google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"]
kafka = ["const-hex", "rdkafka", "sha2"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror", "etcd-client"]
16 changes: 14 additions & 2 deletions yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
#!/usr/bin/bash


grpcurl -plaintext -import-path . -proto yellowstone-log.proto \
# Subscripte to account update only
grpcurl \
-max-msg-sz 10000000 \
-plaintext -import-path . \
-proto yellowstone-log.proto \
-d '{"initial_offset_policy": 0, "event_subscription_policy": 0 }' \
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume


# Create a static consumer group
grpcurl \
-plaintext -import-path . \
-proto yellowstone-log.proto \
-d '{"instance_id_list": ["a", "b"], "redundancy_instance_id_list": ["c", "d"] }' \
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.CreateStaticConsumerGroup
17 changes: 17 additions & 0 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,28 @@ create table if not exists consumer_shard_offset (
with default_time_to_live = 3600;


create table if not exists consumer_groups (
consumer_group_id blob,
group_type smallint,

last_access_ip_address text,

-- fields for static consumer group only
instance_id_shard_assignments frozen<map<text, set<smallint>>>,
redundant_id_shard_assignments frozen<map<text, set<smallint>>>,

created_at timestamp,
updated_at timestamp,

primary key (consumer_group_id)
);

create table if not exists consumer_info (
consumer_id text,
producer_id blob,
consumer_ip: text,
subscribed_event_types frozen<set<smallint>>,
last_connection timestamp,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (consumer_id)
Expand Down
8 changes: 4 additions & 4 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use {
config::{
Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo,
},
consumer::{
sink::ScyllaSink,
types::{CommitmentLevel, Transaction},
yellowstone_log::{
common::InitialOffset,
grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq},
},
sink::ScyllaSink,
types::{CommitmentLevel, Transaction},
},
setup_tracing,
},
Expand Down Expand Up @@ -107,7 +107,7 @@ impl ArgsAction {
.await?;

let session = Arc::new(session);
let scylla_ys_log = ScyllaYsLog::new(session);
let scylla_ys_log = ScyllaYsLog::new(session).await?;
let ys_log_server = YellowstoneLogServer::new(scylla_ys_log);

println!("YellowstoneLogServer listening on {}", addr);
Expand Down
3 changes: 0 additions & 3 deletions yellowstone-grpc-tools/src/scylladb/consumer/mod.rs

This file was deleted.

2 changes: 1 addition & 1 deletion yellowstone-grpc-tools/src/scylladb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod config;
pub mod consumer;
pub mod prom;
pub mod sink;
pub mod types;
pub mod yellowstone_log;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod repo;
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use {
crate::scylladb::types::ShardId,
scylla::{
cql_to_rust::{FromCqlVal, FromCqlValError},
frame::response::result::CqlValue,
prepared_statement::PreparedStatement,
serialize::value::SerializeCql,
Session,
},
std::{collections::BTreeMap, net::IpAddr, sync::Arc},
uuid::Uuid,
};

const NUM_SHARDS: usize = 64;

type ConsumerGroupId = Uuid;
type InstanceId = String;

const CREATE_STATIC_CONSUMER_GROUP: &str = r###"
INSERT INTO consumer_groups (
consumer_group_id,
group_type,
last_access_ip_address,
instance_id_shard_assignments,
redundant_id_shard_assignments,
created_at,
updated_at
)
VALUES (?, ?, ?, ?, ?, currentTimestamp(), currentTimestamp())
"###;

#[derive(Clone, Debug, PartialEq, Eq, Copy)]
enum ConsumerGroupType {
Static = 0,
}

impl TryFrom<i16> for ConsumerGroupType {
type Error = anyhow::Error;

fn try_from(value: i16) -> Result<Self, Self::Error> {
match value {
0 => Ok(ConsumerGroupType::Static),
x => Err(anyhow::anyhow!(
"Unknown ConsumerGroupType equivalent for {:?}",
x
)),
}
}
}

impl From<ConsumerGroupType> for i16 {
fn from(val: ConsumerGroupType) -> Self {
match val {
ConsumerGroupType::Static => 0,
}
}
}

impl SerializeCql for ConsumerGroupType {
fn serialize<'b>(
&self,
typ: &scylla::frame::response::result::ColumnType,
writer: scylla::serialize::CellWriter<'b>,
) -> Result<
scylla::serialize::writers::WrittenCellProof<'b>,
scylla::serialize::SerializationError,
> {
let x: i16 = (*self).into();
SerializeCql::serialize(&x, typ, writer)
}
}

impl FromCqlVal<CqlValue> for ConsumerGroupType {
fn from_cql(cql_val: CqlValue) -> Result<Self, scylla::cql_to_rust::FromCqlValError> {
match cql_val {
CqlValue::SmallInt(x) => x.try_into().map_err(|_| FromCqlValError::BadVal),
_ => Err(FromCqlValError::BadCqlType),
}
}
}

pub(crate) struct ConsumerGroupRepo {
session: Arc<Session>,
create_static_consumer_group_ps: PreparedStatement,
}

fn assign_shards(ids: &[InstanceId], num_shards: usize) -> BTreeMap<InstanceId, Vec<ShardId>> {
let mut ids = ids.to_vec();
ids.sort();

let num_parts_per_id = num_shards / ids.len();
let shard_vec = (0..num_shards).map(|x| x as ShardId).collect::<Vec<_>>();
let chunk_it = shard_vec
.chunks(num_parts_per_id)
.into_iter()
.map(|chunk| chunk.iter().cloned().collect());

ids.into_iter().zip(chunk_it).collect()
}

pub(crate) struct StaticConsumerGroupInfo {
pub(crate) consumer_group_id: ConsumerGroupId,
pub(crate) instance_id_assignments: BTreeMap<InstanceId, Vec<ShardId>>,
pub(crate) redundant_instance_id_assignments: BTreeMap<InstanceId, Vec<ShardId>>,
}

impl ConsumerGroupRepo {
pub async fn new(session: Arc<Session>) -> anyhow::Result<Self> {
let create_static_consumer_group_ps = session.prepare(CREATE_STATIC_CONSUMER_GROUP).await?;

let this = ConsumerGroupRepo {
session,
create_static_consumer_group_ps,
};

Ok(this)
}

pub async fn create_static_consumer_group(
&self,
instance_ids: &[InstanceId],
redundant_instance_ids: &[InstanceId],
remote_ip_addr: Option<IpAddr>,
) -> anyhow::Result<StaticConsumerGroupInfo> {
let consumer_group_id = Uuid::new_v4();
anyhow::ensure!(
instance_ids.len() == redundant_instance_ids.len(),
"mismatch number if instance/redundant ids"
);
let shard_assignments = assign_shards(&instance_ids, NUM_SHARDS);
let shard_assignments2 = assign_shards(&redundant_instance_ids, NUM_SHARDS);
self.session
.execute(
&self.create_static_consumer_group_ps,
(
consumer_group_id.as_bytes(),
ConsumerGroupType::Static,
remote_ip_addr.map(|ipaddr| ipaddr.to_string()),
&shard_assignments,
&shard_assignments2,
),
)
.await?;

let ret = StaticConsumerGroupInfo {
consumer_group_id,
instance_id_assignments: shard_assignments,
redundant_instance_id_assignments: shard_assignments2,
};

Ok(ret)
}
}
Loading
Loading