Skip to content

Commit

Permalink
fix formatting and added docs
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre committed May 10, 2024
1 parent 3bb1f56 commit b91a0b8
Show file tree
Hide file tree
Showing 7 changed files with 598 additions and 405 deletions.
2 changes: 1 addition & 1 deletion yellowstone-grpc-proto/proto/yellowstone-log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ message ConsumeRequest {
optional string consumer_id = 1;
InitialOffsetPolicy initial_offset_policy = 2;
optional int64 at_slot = 3;
EventSubscriptionPolicy event_subscription_polocy = 4;
EventSubscriptionPolicy event_subscription_policy = 4;
optional AccountUpdateEventFilter account_update_event_filter = 5;
optional TransactionEventFilter tx_event_filter = 6;
}
Expand Down
96 changes: 55 additions & 41 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
use {
anyhow::Ok, clap::{Parser, Subcommand}, futures::{future::BoxFuture, stream::StreamExt}, scylla::{frame::Compression, Session, SessionBuilder}, std::{net::SocketAddr, sync::Arc, time::Duration}, tokio::time::Instant, tracing::{info, warn}, yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{geyser::SubscribeUpdate, prelude::subscribe_update::UpdateOneof, yellowstone::log::{AccountUpdateEventFilter, EventSubscriptionPolicy, TransactionEventFilter}}, yellowstone_grpc_tools::{
anyhow::Ok,
clap::{Parser, Subcommand},
futures::{future::BoxFuture, stream::StreamExt},
scylla::{frame::Compression, Session, SessionBuilder},
std::{net::SocketAddr, sync::Arc, time::Duration},
tokio::time::Instant,
tracing::{info, warn},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::subscribe_update::UpdateOneof, yellowstone::log::EventSubscriptionPolicy,
},
yellowstone_grpc_tools::{
config::{load as config_load, GrpcRequestToProto},
create_shutdown,
prom::run_server as prometheus_run_server,
scylladb::{
config::{Config, ConfigGrpc2ScyllaDB, ScyllaDbConnectionInfo}, consumer::{common::InitialOffsetPolicy, grpc::{get_or_register_consumer, spawn_grpc_consumer, SpawnGrpcConsumerReq}}, sink::ScyllaSink, types::Transaction
config::{Config, ConfigGrpc2ScyllaDB, ScyllaDbConnectionInfo},
consumer::{
common::InitialOffsetPolicy,
grpc::{get_or_register_consumer, spawn_grpc_consumer, SpawnGrpcConsumerReq},
},
sink::ScyllaSink,
types::Transaction,
},
setup_tracing,
}
},
};

// 512MB
Expand Down Expand Up @@ -77,18 +94,19 @@ impl ArgsAction {
let session = Arc::new(session);
let ci = get_or_register_consumer(
Arc::clone(&session),
"test",
"test",
InitialOffsetPolicy::Earliest,
EventSubscriptionPolicy::TransactionOnly,
).await?;
)
.await?;

let hexstr = "16daf15e85d893b89d83a8ca7d7f86416f134905d1d79e4f62e3da70a3a20a7d";
let pubkey = (0..hexstr.len())
let _pubkey = (0..hexstr.len())
.step_by(2)
.map(|i| u8::from_str_radix(&hexstr[i..i + 2], 16))
.collect::<Result<Vec<_>, _>>()?;
let req = SpawnGrpcConsumerReq {
session: Arc::clone(&session),
session: Arc::clone(&session),
consumer_info: ci,
// account_update_event_filter: Some(
// AccountUpdateEventFilter {
Expand All @@ -98,10 +116,9 @@ impl ArgsAction {
// ),
// tx_event_filter: Some(
// TransactionEventFilter {
// account_keys: vec![pubkey]
// account_keys: vec![pubkey]
// }
// ),

account_update_event_filter: None,
tx_event_filter: None,
buffer_capacity: None,
Expand All @@ -123,7 +140,7 @@ impl ArgsAction {
if result.is_err() {
anyhow::bail!("fail!!!")
}
let x = result?.update_oneof.expect("got none");
let _x = result?.update_oneof.expect("got none");
// match x {
// UpdateOneof::Account(acc) => println!("acc, slot {:?}", acc.slot),
// UpdateOneof::Transaction(tx) => panic!("got tx"),
Expand Down Expand Up @@ -175,39 +192,36 @@ impl ArgsAction {
}
.transpose()?;

match message {
Some(message) => {
let message = match message.update_oneof {
Some(value) => value,
None => unreachable!("Expect valid message"),
};

match message {
UpdateOneof::Account(msg) => {
let acc_update = msg.clone().try_into();
if acc_update.is_err() {
// Drop the message if invalid
warn!(
"failed to parse account update: {:?}",
acc_update.err().unwrap()
);
continue;
}
// If the sink is close, let it crash...
sink.log_account_update(acc_update.unwrap()).await.unwrap();
if let Some(message) = message {
let message = match message.update_oneof {
Some(value) => value,
None => unreachable!("Expect valid message"),
};

match message {
UpdateOneof::Account(msg) => {
let acc_update = msg.clone().try_into();
if acc_update.is_err() {
// Drop the message if invalid
warn!(
"failed to parse account update: {:?}",
acc_update.err().unwrap()
);
continue;
}
UpdateOneof::Transaction(msg) => {
let tx: Result<Transaction, anyhow::Error> = msg.try_into();
if tx.is_err() {
warn!("failed to convert update tx: {:?}", tx.err().unwrap());
continue;
}
sink.log_transaction(tx.unwrap()).await.unwrap();
// If the sink is close, let it crash...
sink.log_account_update(acc_update.unwrap()).await.unwrap();
}
UpdateOneof::Transaction(msg) => {
let tx: Result<Transaction, anyhow::Error> = msg.try_into();
if tx.is_err() {
warn!("failed to convert update tx: {:?}", tx.err().unwrap());
continue;
}
_ => continue,
};
}
_ => (),
sink.log_transaction(tx.unwrap()).await.unwrap();
}
_ => continue,
};
}
}
Ok(())
Expand Down
5 changes: 1 addition & 4 deletions yellowstone-grpc-tools/src/scylladb/consumer/common.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::scylladb::types::{BlockchainEventType, ProducerId, ShardId, ShardOffset};



pub type OldShardOffset = ShardOffset;


pub type ConsumerId = String;

///
Expand All @@ -23,4 +20,4 @@ pub struct ConsumerInfo {
pub producer_id: ProducerId,
pub shard_offsets: Vec<(ShardId, BlockchainEventType, ShardOffset)>,
pub subscribed_blockchain_event_types: Vec<BlockchainEventType>,
}
}
Loading

0 comments on commit b91a0b8

Please sign in to comment.