Skip to content

Commit

Permalink
tools: decoupled grpc from consumer_source (#351)
Browse files Browse the repository at this point in the history
* tools: decoupled grpc from consumer_source

* add protobuf-compiler in test dependencies
  • Loading branch information
lvboudre authored May 28, 2024
1 parent f28384f commit f7e2f5c
Show file tree
Hide file tree
Showing 17 changed files with 528 additions and 182 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y libsasl2-dev
sudo apt-get install -y libsasl2-dev protobuf-compiler
- name: cargo tree
run: |
Expand Down
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const-hex = "1.6.2"
crossbeam-channel = "0.5.8"
deepsize = "0.2.0"
env_logger = "0.10.0"
etcd-client = "0.12.4"
futures = "0.3.24"
git-version = "0.3.5"
google-cloud-googleapis = "0.11.0"
Expand Down
34 changes: 25 additions & 9 deletions yellowstone-grpc-proto/proto/yellowstone-log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,23 @@ option go_package = "github.com/rpcpool/solana-geyser-grpc/golang/proto";
package yellowstone.log;

service YellowstoneLog {
rpc CreateStaticConsumerGroup(CreateStaticConsumerGroupRequest) returns (CreateStaticConsumerGroupResponse) {}
rpc Consume(ConsumeRequest) returns (stream geyser.SubscribeUpdate) {}
}

message CreateStaticConsumerGroupResponse {
string group_id = 1;
}

message CreateStaticConsumerGroupRequest {
repeated string instance_id_list = 2;
repeated string redundancy_instance_id_list = 3;
}

enum PartitionAssignmentStrategy {
STATIC = 0;
}


/// The InitialOffsetPolicy enum determines the initial offset used when subscribing to events or messages. It provides three options:
///
Expand Down Expand Up @@ -96,20 +110,22 @@ enum TimelineTranslationPolicy {
/// ttp_maximum_slot_lag (9)
/// An optional uint32 indicating the maximum slot lag allowed for timeline translation.
message ConsumeRequest {
optional string consumer_id = 1;
optional string consumer_group_id = 1;
optional string consumer_id = 2;
optional string instance_id = 3;

InitialOffsetPolicy initial_offset_policy = 2;
optional int64 at_slot = 3;
InitialOffsetPolicy initial_offset_policy = 4;
geyser.CommitmentLevel commitment_level = 5;
EventSubscriptionPolicy event_subscription_policy = 6;

EventSubscriptionPolicy event_subscription_policy = 4;
optional AccountUpdateEventFilter account_update_event_filter = 5;
optional TransactionEventFilter tx_event_filter = 6;
geyser.CommitmentLevel commitment_level = 7;
optional int64 at_slot = 7;
optional AccountUpdateEventFilter account_update_event_filter = 8;
optional TransactionEventFilter tx_event_filter = 9;

// timelineTranslationPolicy is used when an ingester is out of service and we need to translate a set
// of consumer to a different ingestion timeline. The policy describe what to do when we need to trigger timeline translation.
optional TimelineTranslationPolicy timelineTranslationPolicy = 8;
optional uint32 ttp_maximum_slot_lag = 9;
optional TimelineTranslationPolicy timelineTranslationPolicy = 10;
optional uint32 ttp_maximum_slot_lag = 11;
}

/// The AccountUpdateEventFilter message defines filters for account update events. It includes the following fields:
Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ chrono = { workspace = true, optional = true }
clap = { workspace = true, features = ["derive"] }
const-hex = { workspace = true, optional = true }
deepsize = { workspace = true, optional = true }
etcd-client = { workspace = true, optional = true }
futures = { workspace = true }
google-cloud-googleapis = { workspace = true, optional = true }
google-cloud-pubsub = { workspace = true, optional = true }
Expand Down Expand Up @@ -75,4 +76,4 @@ vergen = { workspace = true, features = ["build", "rustc"] }
default = ["google-pubsub", "kafka"]
google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"]
kafka = ["const-hex", "rdkafka", "sha2"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror", "etcd-client"]
16 changes: 14 additions & 2 deletions yellowstone-grpc-tools/grpcurl_yellowstone_log_server_example.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
#!/usr/bin/bash


grpcurl -plaintext -import-path . -proto yellowstone-log.proto \
# Subscripte to account update only
grpcurl \
-max-msg-sz 10000000 \
-plaintext -import-path . \
-proto yellowstone-log.proto \
-d '{"initial_offset_policy": 0, "event_subscription_policy": 0 }' \
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume


# Create a static consumer group
grpcurl \
-plaintext -import-path . \
-proto yellowstone-log.proto \
-d '{"instance_id_list": ["a", "b"], "redundancy_instance_id_list": ["c", "d"] }' \
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.CreateStaticConsumerGroup
17 changes: 17 additions & 0 deletions yellowstone-grpc-tools/solana.cql
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,28 @@ create table if not exists consumer_shard_offset (
with default_time_to_live = 3600;


create table if not exists consumer_groups (
consumer_group_id blob,
group_type smallint,

last_access_ip_address text,

-- fields for static consumer group only
instance_id_shard_assignments frozen<map<text, set<smallint>>>,
redundant_id_shard_assignments frozen<map<text, set<smallint>>>,

created_at timestamp,
updated_at timestamp,

primary key (consumer_group_id)
);

create table if not exists consumer_info (
consumer_id text,
producer_id blob,
consumer_ip: text,
subscribed_event_types frozen<set<smallint>>,
last_connection timestamp,
created_at timestamp,
updated_at timestamp,
PRIMARY KEY (consumer_id)
Expand Down
8 changes: 4 additions & 4 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use {
config::{
Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo,
},
consumer::{
sink::ScyllaSink,
types::{CommitmentLevel, Transaction},
yellowstone_log::{
common::InitialOffset,
grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq},
},
sink::ScyllaSink,
types::{CommitmentLevel, Transaction},
},
setup_tracing,
},
Expand Down Expand Up @@ -107,7 +107,7 @@ impl ArgsAction {
.await?;

let session = Arc::new(session);
let scylla_ys_log = ScyllaYsLog::new(session);
let scylla_ys_log = ScyllaYsLog::new(session).await?;
let ys_log_server = YellowstoneLogServer::new(scylla_ys_log);

println!("YellowstoneLogServer listening on {}", addr);
Expand Down
3 changes: 0 additions & 3 deletions yellowstone-grpc-tools/src/scylladb/consumer/mod.rs

This file was deleted.

2 changes: 1 addition & 1 deletion yellowstone-grpc-tools/src/scylladb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod config;
pub mod consumer;
pub mod prom;
pub mod sink;
pub mod types;
pub mod yellowstone_log;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod repo;
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use {
crate::scylladb::types::ShardId,
scylla::{
cql_to_rust::{FromCqlVal, FromCqlValError},
frame::response::result::CqlValue,
prepared_statement::PreparedStatement,
serialize::value::SerializeCql,
Session,
},
std::{collections::BTreeMap, net::IpAddr, sync::Arc},
uuid::Uuid,
};

const NUM_SHARDS: usize = 64;

type ConsumerGroupId = Uuid;
type InstanceId = String;

const CREATE_STATIC_CONSUMER_GROUP: &str = r###"
INSERT INTO consumer_groups (
consumer_group_id,
group_type,
last_access_ip_address,
instance_id_shard_assignments,
redundant_id_shard_assignments,
created_at,
updated_at
)
VALUES (?, ?, ?, ?, ?, currentTimestamp(), currentTimestamp())
"###;

#[derive(Clone, Debug, PartialEq, Eq, Copy)]
enum ConsumerGroupType {
Static = 0,
}

impl TryFrom<i16> for ConsumerGroupType {
type Error = anyhow::Error;

fn try_from(value: i16) -> Result<Self, Self::Error> {
match value {
0 => Ok(ConsumerGroupType::Static),
x => Err(anyhow::anyhow!(
"Unknown ConsumerGroupType equivalent for {:?}",
x
)),
}
}
}

impl From<ConsumerGroupType> for i16 {
fn from(val: ConsumerGroupType) -> Self {
match val {
ConsumerGroupType::Static => 0,
}
}
}

impl SerializeCql for ConsumerGroupType {
fn serialize<'b>(
&self,
typ: &scylla::frame::response::result::ColumnType,
writer: scylla::serialize::CellWriter<'b>,
) -> Result<
scylla::serialize::writers::WrittenCellProof<'b>,
scylla::serialize::SerializationError,
> {
let x: i16 = (*self).into();
SerializeCql::serialize(&x, typ, writer)
}
}

impl FromCqlVal<CqlValue> for ConsumerGroupType {
fn from_cql(cql_val: CqlValue) -> Result<Self, scylla::cql_to_rust::FromCqlValError> {
match cql_val {
CqlValue::SmallInt(x) => x.try_into().map_err(|_| FromCqlValError::BadVal),
_ => Err(FromCqlValError::BadCqlType),
}
}
}

pub(crate) struct ConsumerGroupRepo {
session: Arc<Session>,
create_static_consumer_group_ps: PreparedStatement,
}

fn assign_shards(ids: &[InstanceId], num_shards: usize) -> BTreeMap<InstanceId, Vec<ShardId>> {
let mut ids = ids.to_vec();
ids.sort();

let num_parts_per_id = num_shards / ids.len();
let shard_vec = (0..num_shards).map(|x| x as ShardId).collect::<Vec<_>>();
let chunk_it = shard_vec
.chunks(num_parts_per_id)
.into_iter()
.map(|chunk| chunk.iter().cloned().collect());

ids.into_iter().zip(chunk_it).collect()
}

pub(crate) struct StaticConsumerGroupInfo {
pub(crate) consumer_group_id: ConsumerGroupId,
pub(crate) instance_id_assignments: BTreeMap<InstanceId, Vec<ShardId>>,
pub(crate) redundant_instance_id_assignments: BTreeMap<InstanceId, Vec<ShardId>>,
}

impl ConsumerGroupRepo {
pub async fn new(session: Arc<Session>) -> anyhow::Result<Self> {
let create_static_consumer_group_ps = session.prepare(CREATE_STATIC_CONSUMER_GROUP).await?;

let this = ConsumerGroupRepo {
session,
create_static_consumer_group_ps,
};

Ok(this)
}

pub async fn create_static_consumer_group(
&self,
instance_ids: &[InstanceId],
redundant_instance_ids: &[InstanceId],
remote_ip_addr: Option<IpAddr>,
) -> anyhow::Result<StaticConsumerGroupInfo> {
let consumer_group_id = Uuid::new_v4();
anyhow::ensure!(
instance_ids.len() == redundant_instance_ids.len(),
"mismatch number if instance/redundant ids"
);
let shard_assignments = assign_shards(&instance_ids, NUM_SHARDS);
let shard_assignments2 = assign_shards(&redundant_instance_ids, NUM_SHARDS);
self.session
.execute(
&self.create_static_consumer_group_ps,
(
consumer_group_id.as_bytes(),
ConsumerGroupType::Static,
remote_ip_addr.map(|ipaddr| ipaddr.to_string()),
&shard_assignments,
&shard_assignments2,
),
)
.await?;

let ret = StaticConsumerGroupInfo {
consumer_group_id,
instance_id_assignments: shard_assignments,
redundant_instance_id_assignments: shard_assignments2,
};

Ok(ret)
}
}
Loading

0 comments on commit f7e2f5c

Please sign in to comment.