diff --git a/src/google_publisher.rs b/src/google_publisher.rs index 99ef9d0..e3b124b 100644 --- a/src/google_publisher.rs +++ b/src/google_publisher.rs @@ -10,6 +10,15 @@ use tokio::time::{self, Duration}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; +const ALLOWED_PUBKEYS: &[&str] = &[ + "89ef92b9ebe6dc1e4ea398f6477f227e95429627b0a33dc89b640e137b256be5", // Daniel + "27cf2c68535ae1fc06510e827670053f5dcd39e6bd7e05f1ffb487ef2ac13549", // Josh + "969e6a28ee5214cb0296ee69cbdce4f43229124a78b1043d85df31e5636d0f1f", // Linda + "b29bb98ebecca7ae3a86a02ab6cf260baecf098dcd452ef8e5f9c549dfc0e0ef", // Martin + "d0a1ffb8761b974cec4a3be8cbcb2e96a7090dcf465ffeac839aa4ca20c9a59e", // Matt + "76c71aae3a491f1d9eec47cba17e229cda4113a0bbb6e6ae1776d7643e29cafa", // Rabble +]; + struct GooglePublisherClient { pubsub_client: GoogleApi>, google_full_topic: String, @@ -97,19 +106,8 @@ impl GooglePublisher { info!("Publishing batch of {} follow changes", buffer.len()); if let Err(e) = client.publish_events(buffer.split_off(0)).await { match &e { + // We've seen this happen sporadically, we don't neet to kill the look in this situation GooglePublisherError::PublishError(_) => { - // TODO: We don't break in this error for the moment while investigating the cause: - // server-1 | Caused by: - // server-1 | 0: status: Unknown, message: "transport error", details: [], metadata: MetadataMap { headers: {} } - // server-1 | 1: transport error - // server-1 | 2: connection error - // server-1 | 3: peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof - // server-1 | - // server-1 | Stack backtrace: - // server-1 | 0: std::backtrace::Backtrace::create - // server-1 | 1: nos_followers::google_publisher::GooglePublisherClient::publish_events::{{closure}} - // server-1 | 2: nos_followers::google_publisher::GooglePublisher::create::{{closure}}::{{closure}}k - error!("{}", e); } _ => { @@ -156,7 +154,14 @@ impl GooglePublisher { &self, follow_change: FollowChange, ) -> Result<(), SendError> { - self.sender.send(follow_change).await + // TODO: Temporary filter while developing this service + if !ALLOWED_PUBKEYS.contains(&follow_change.followee.to_hex().as_str()) + && !ALLOWED_PUBKEYS.contains(&follow_change.follower.to_hex().as_str()) + { + return Ok(()); + } + + return self.sender.send(follow_change).await; } }