Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
lvboudre committed May 23, 2024
1 parent 7cf07de commit e9fa886
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ serde_json = { workspace = true }
serde_with = { workspace = true, optional = true }
serde_yaml = { workspace = true }
sha2 = { workspace = true, optional = true }
thiserror = { workspace = true, optional = true }
tokio = { workspace = true, features = ["signal", "time"] }
tokio-stream = { workspace = true }
tonic = { workspace = true, features = ["gzip"] }
Expand Down Expand Up @@ -74,4 +75,4 @@ vergen = { workspace = true, features = ["build", "rustc"] }
default = ["google-pubsub", "kafka"]
google-pubsub = ["google-cloud-googleapis", "google-cloud-pubsub"]
kafka = ["const-hex", "rdkafka", "sha2"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono"]
scylladb = ["scylla", "serde_with", "deepsize", "uuid", "local-ip-address", "chrono", "thiserror"]
28 changes: 10 additions & 18 deletions yellowstone-grpc-tools/src/scylladb/consumer/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use {
},
std::{
collections::{BTreeMap, BTreeSet},
error::Error,
pin::Pin,
sync::Arc,
time::Duration,
},
thiserror::Error,
tokio::{sync::mpsc, time::Instant},
tokio_stream::wrappers::ReceiverStream,
tonic::Response,
Expand Down Expand Up @@ -157,7 +157,7 @@ const GET_PRODUCER_INFO_BY_ID: &str = r###"
WHERE producer_id = ?
"###;

#[derive(Clone, Debug, PartialEq, Eq, Copy)]
#[derive(Clone, Debug, Error, PartialEq, Eq, Copy)]
struct ImpossibleSlotOffset(Slot);

impl fmt::Display for ImpossibleSlotOffset {
Expand All @@ -167,13 +167,9 @@ impl fmt::Display for ImpossibleSlotOffset {
}
}

impl Error for ImpossibleSlotOffset {}

#[derive(Clone, Debug, PartialEq, Eq, Copy)]
#[derive(Clone, Debug, PartialEq, Error, Eq, Copy)]
struct DeadProducerErr(ProducerId);

impl Error for DeadProducerErr {}

impl fmt::Display for DeadProducerErr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let producer_id = self.0[0];
Expand Down Expand Up @@ -305,18 +301,14 @@ async fn get_producer_id_with_least_assigned_consumer(
}

if let Some(slot) = slot_requirement {
let ret = BTreeSet::from_iter(list_producer_with_slot(Arc::clone(&session), slot).await?);
info!("{} producer(s) with required slot {slot}", ret.len());
let to_remove = elligible_producers
.iter()
.cloned()
.filter(|k| !ret.contains(k))
.collect::<Vec<_>>();

for k in to_remove {
elligible_producers.remove(&k);
}
let producers_with_slot =
BTreeSet::from_iter(list_producer_with_slot(Arc::clone(&session), slot).await?);
info!(
"{} producer(s) with required slot {slot}",
producers_with_slot.len()
);

elligible_producers.retain(|k| producers_with_slot.contains(k));
if elligible_producers.is_empty() {
return Err(anyhow::Error::new(ImpossibleSlotOffset(slot)));
}
Expand Down

0 comments on commit e9fa886

Please sign in to comment.