Skip to content

Commit

Permalink
tools: added yellowstone-log-server command (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre authored May 16, 2024
1 parent c88df3b commit e33f925
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 25 deletions.
12 changes: 12 additions & 0 deletions yellowstone-grpc-tools/config-ys-log-server.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"prometheus": "127.0.0.1:8873",
"scylladb": {
"hostname": "localhost:9042",
"username": "cassandra",
"password": "cassandra"
},
"yellowstone_log_server": {
"listen": "localhost:10001",
"keyspace": "solana"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/bash


grpcurl -plaintext -import-path . -proto yellowstone-log.proto \
-d '{"initial_offset_policy": 0, "event_subscription_policy": 0 }' \
'127.0.0.1:10001' yellowstone.log.YellowstoneLog.Consume
3 changes: 3 additions & 0 deletions yellowstone-grpc-tools/solana_keyspace_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/bash

nodetool disableautocompaction solana log producer_period_commit_log producer_slot_seen
61 changes: 53 additions & 8 deletions yellowstone-grpc-tools/src/bin/grpc-scylladb.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
use {
anyhow::Ok,
clap::{Parser, Subcommand},
futures::{future::BoxFuture, stream::StreamExt},
futures::{future::BoxFuture, stream::StreamExt, TryFutureExt},
scylla::{frame::Compression, Session, SessionBuilder},
std::{net::SocketAddr, sync::Arc, time::Duration},
tokio::time::Instant,
tonic::transport::Server,
tracing::{error, info, warn},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::subscribe_update::UpdateOneof, yellowstone::log::EventSubscriptionPolicy,
prelude::subscribe_update::UpdateOneof,
yellowstone::log::{
yellowstone_log_server::{self, YellowstoneLog, YellowstoneLogServer},
EventSubscriptionPolicy,
},
},
yellowstone_grpc_tools::{
config::{load as config_load, GrpcRequestToProto},
create_shutdown,
prom::run_server as prometheus_run_server,
scylladb::{
config::{Config, ConfigGrpc2ScyllaDB, ScyllaDbConnectionInfo},
config::{
Config, ConfigGrpc2ScyllaDB, ConfigYellowstoneLogServer, ScyllaDbConnectionInfo,
},
consumer::{
common::InitialOffsetPolicy,
grpc::{spawn_grpc_consumer, SpawnGrpcConsumerReq},
grpc::{spawn_grpc_consumer, ScyllaYsLog, SpawnGrpcConsumerReq},
},
sink::ScyllaSink,
types::Transaction,
Expand Down Expand Up @@ -50,9 +57,11 @@ enum ArgsAction {
/// Receive data from gRPC and send them to the Kafka
#[command(name = "grpc2scylla")]
Grpc2Scylla,

/// Receive data from Kafka and send them over gRPC
#[command(name = "scylla2grpc")]
Scylla2Grpc,
#[command(name = "yellowstone-log-server")]
YellowstoneLogServer,

#[command(name = "test")]
Test,
}
Expand All @@ -67,8 +76,11 @@ impl ArgsAction {
})?;
Self::grpc2scylladb(config2, config.scylladb, shutdown).await
}
ArgsAction::Scylla2Grpc => {
unimplemented!();
ArgsAction::YellowstoneLogServer => {
let config2 = config.yellowstone_log_server.ok_or_else(|| {
anyhow::anyhow!("`grpc2scylladb` section in config should be defined")
})?;
Self::yellowstone_log_server(config2, config.scylladb, shutdown).await
}
ArgsAction::Test => {
let config2 = config.grpc2scylladb.ok_or_else(|| {
Expand All @@ -79,6 +91,39 @@ impl ArgsAction {
}
}

async fn yellowstone_log_server(
config: ConfigYellowstoneLogServer,
scylladb_conn_config: ScyllaDbConnectionInfo,
mut shutdown: BoxFuture<'static, ()>,
) -> anyhow::Result<()> {
let addr = config.listen.parse().unwrap();

let session: Session = SessionBuilder::new()
.known_node(scylladb_conn_config.hostname)
.user(scylladb_conn_config.username, scylladb_conn_config.password)
.compression(Some(Compression::Lz4))
.use_keyspace(config.keyspace.clone(), false)
.build()
.await?;

let session = Arc::new(session);
let scylla_ys_log = ScyllaYsLog::new(session);
let ys_log_server = YellowstoneLogServer::new(scylla_ys_log);

println!("YellowstoneLogServer listening on {}", addr);

let server_fut = Server::builder()
// GrpcWeb is over http1 so we must enable it.
.add_service(ys_log_server)
.serve(addr)
.map_err(anyhow::Error::new);

tokio::select! {
_ = &mut shutdown => Ok(()),
result = server_fut => result,
}
}

async fn test(
config: ConfigGrpc2ScyllaDB,
scylladb_conn_config: ScyllaDbConnectionInfo,
Expand Down
9 changes: 9 additions & 0 deletions yellowstone-grpc-tools/src/scylladb/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct Config {
pub prometheus: Option<SocketAddr>,
pub scylladb: ScyllaDbConnectionInfo,
pub grpc2scylladb: Option<ConfigGrpc2ScyllaDB>,
pub yellowstone_log_server: Option<ConfigYellowstoneLogServer>,
}

#[derive(Debug, Default, Deserialize)]
Expand All @@ -53,6 +54,14 @@ pub struct ScyllaDbConnectionInfo {
pub password: String,
}

#[serde_as]
#[derive(Debug, Deserialize)]
pub struct ConfigYellowstoneLogServer {
pub listen: String,
#[serde(default = "default_keyspace")]
pub keyspace: String,
}

#[serde_as]
#[derive(Debug, Deserialize)]
pub struct ConfigGrpc2ScyllaDB {
Expand Down
70 changes: 59 additions & 11 deletions yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use {
tokio::{sync::mpsc, time::Instant},
tokio_stream::wrappers::ReceiverStream,
tonic::Response,
tracing::{error, info},
tracing::{error, info, warn},
uuid::Uuid,
yellowstone_grpc_proto::{
geyser::{subscribe_update::UpdateOneof, SubscribeUpdate},
Expand All @@ -39,6 +39,10 @@ use {
},
};

const CLIENT_LAG_WARN_THRESHOLD: Duration = Duration::from_millis(250);

const FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD: Duration = Duration::from_millis(500);

const DEFAULT_LAST_HEARTBEAT_TIME_DELTA: Duration = Duration::from_secs(10);

const DEFAULT_OFFSET_COMMIT_INTERVAL: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -235,19 +239,27 @@ async fn get_producer_id_with_least_assigned_consumer(
session: Arc<Session>,
) -> anyhow::Result<ProducerId> {
let locked_producers = list_producers_with_lock_held(Arc::clone(&session)).await?;

info!("{} producer lock(s) detected", locked_producers.len());
let recently_active_producers = BTreeSet::from_iter(
list_producers_heartbeat(Arc::clone(&session), DEFAULT_LAST_HEARTBEAT_TIME_DELTA).await?,
);

info!(
"{} living producer(s) detected",
recently_active_producers.len()
);

let elligible_producers = locked_producers
.into_iter()
.filter(|producer_id| recently_active_producers.contains(producer_id))
.collect::<BTreeSet<_>>();

info!("{} elligible producer(s)", recently_active_producers.len());
let mut producer_count_pairs = session
.query(GET_PRODUCERS_CONSUMER_COUNT, &[])
.await?
.rows_typed::<(ProducerId, i32)>()?
.rows_typed::<(ProducerId, i64)>()?
.collect::<Result<BTreeMap<_, _>, _>>()?;

elligible_producers.iter().for_each(|producer_id| {
Expand Down Expand Up @@ -308,9 +320,10 @@ async fn register_new_consumer(
) -> anyhow::Result<ConsumerInfo> {
let producer_id = get_producer_id_with_least_assigned_consumer(Arc::clone(&session)).await?;

let insert_consumer_mapping_ps = session.prepare(INSERT_CONSUMER_PRODUCER_MAPPING).await?;
session
.query(
INSERT_CONSUMER_PRODUCER_MAPPING,
.execute(
&insert_consumer_mapping_ps,
(consumer_id.as_ref(), producer_id),
)
.await?;
Expand Down Expand Up @@ -376,6 +389,8 @@ async fn get_or_register_consumer(
};
Ok(cs)
} else {
let cid = consumer_id.as_ref();
info!("Bootstrapping consumer {cid}");
register_new_consumer(
session,
consumer_id,
Expand Down Expand Up @@ -472,6 +487,12 @@ pub struct ScyllaYsLog {
session: Arc<Session>,
}

impl ScyllaYsLog {
pub fn new(session: Arc<Session>) -> Self {
ScyllaYsLog { session }
}
}

pub type LogStream = Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, tonic::Status>> + Send>>;

#[tonic::async_trait]
Expand Down Expand Up @@ -505,6 +526,12 @@ impl YellowstoneLog for ScyllaYsLog {
let account_update_event_filter = cr.account_update_event_filter;
let tx_event_filter = cr.tx_event_filter;

info!(
consumer_id = consumer_id,
initital_offset_policy = ?initial_offset_policy,
event_subscription_policy = ?event_subscription_policy,
);

let req = SpawnGrpcConsumerReq {
consumer_id,
account_update_event_filter,
Expand Down Expand Up @@ -762,10 +789,25 @@ impl GrpcConsumerSource {
self.shard_iterators
.sort_by_key(|it| (it.shard_id, it.event_type));

let mut max_seen_slot = -1;
let mut num_event_between_two_slots = 0;

let mut t = Instant::now();
loop {
for shard_it in self.shard_iterators.iter_mut() {
let maybe = shard_it.try_next().await?;
if let Some(block_chain_event) = maybe {
if t.elapsed() >= FETCH_MICRO_BATCH_LATENCY_WARN_THRESHOLD {
warn!(
"consumer {consumer_id} micro batch took {:?} to fetch.",
t.elapsed()
);
}
if max_seen_slot < block_chain_event.slot {
info!("Consumer {consumer_id} reach slot {max_seen_slot} after {num_event_between_two_slots} blockchain event(s)");
max_seen_slot = block_chain_event.slot;
num_event_between_two_slots = 0;
}
let geyser_event = match block_chain_event.event_type {
BlockchainEventType::AccountUpdate => {
UpdateOneof::Account(block_chain_event.try_into()?)
Expand All @@ -778,9 +820,18 @@ impl GrpcConsumerSource {
filters: Default::default(),
update_oneof: Some(geyser_event),
};
self.sender.send(Ok(subscribe_update)).await.map_err(|_| {
anyhow::anyhow!("Failed to deliver message to consumer {}", consumer_id)
})?;
let t_send = Instant::now();

if self.sender.send(Ok(subscribe_update)).await.is_err() {
warn!("Consumer {consumer_id} closed its streaming half");
return Ok(());
}
let send_latency = t_send.elapsed();
if send_latency >= CLIENT_LAG_WARN_THRESHOLD {
warn!("Slow read from consumer {consumer_id}, recorded latency: {send_latency:?}")
}
num_event_between_two_slots += 1;
t = Instant::now();
}
}

Expand All @@ -805,10 +856,7 @@ impl GrpcConsumerSource {
if let Err(_actual_offset_in_scylla) = result {
anyhow::bail!("two concurrent connections are using the same consumer instance")
}
info!(
"Successfully committed offsets for consumer {:?}",
consumer_id
);
info!("Successfully committed offsets for consumer {consumer_id}");
std::mem::swap(&mut new_offsets_to_commit, &mut last_committed_offsets);
commit_offset_deadline = Instant::now() + self.offset_commit_interval;
}
Expand Down
10 changes: 4 additions & 6 deletions yellowstone-grpc-tools/src/scylladb/consumer/shard_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
scylla::{prepared_statement::PreparedStatement, Session},
std::{collections::VecDeque, sync::Arc},
tokio::sync::oneshot::{self, error::TryRecvError},
tracing::warn,
};

const MICRO_BATCH_SIZE: usize = 40;
Expand Down Expand Up @@ -240,12 +241,9 @@ impl ShardIterator {
.rows_typed_or_empty::<BlockchainEvent>()
.collect::<Result<VecDeque<_>, _>>()
.expect("failed to typed scylladb rows");
sender
.send(micro_batch)
.map_err(|_| ())
.unwrap_or_else(|_| {
panic!("Failed to send micro batch to shard iterator {}", shard_id)
});
if sender.send(micro_batch).is_err() {
warn!("Shard iterator {shard_id} was fetching micro batch, but client closed its stream half.")
}
});
receiver
}
Expand Down

0 comments on commit e33f925

Please sign in to comment.