Skip to content

Commit

Permalink
Only publish to pubsub if part of allowed list
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 19, 2024
1 parent f366fb2 commit 97fab2c
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions src/google_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ use tokio::time::{self, Duration};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

const ALLOWED_PUBKEYS: &[&str] = &[
"89ef92b9ebe6dc1e4ea398f6477f227e95429627b0a33dc89b640e137b256be5", // Daniel
"27cf2c68535ae1fc06510e827670053f5dcd39e6bd7e05f1ffb487ef2ac13549", // Josh
"969e6a28ee5214cb0296ee69cbdce4f43229124a78b1043d85df31e5636d0f1f", // Linda
"b29bb98ebecca7ae3a86a02ab6cf260baecf098dcd452ef8e5f9c549dfc0e0ef", // Martin
"d0a1ffb8761b974cec4a3be8cbcb2e96a7090dcf465ffeac839aa4ca20c9a59e", // Matt
"76c71aae3a491f1d9eec47cba17e229cda4113a0bbb6e6ae1776d7643e29cafa", // Rabble
];

struct GooglePublisherClient {
pubsub_client: GoogleApi<PublisherClient<GoogleAuthMiddleware>>,
google_full_topic: String,
Expand Down Expand Up @@ -97,19 +106,8 @@ impl GooglePublisher {
info!("Publishing batch of {} follow changes", buffer.len());
if let Err(e) = client.publish_events(buffer.split_off(0)).await {
match &e {
// We've seen this happen sporadically, we don't neet to kill the look in this situation
GooglePublisherError::PublishError(_) => {
// TODO: We don't break in this error for the moment while investigating the cause:
// server-1 | Caused by:
// server-1 | 0: status: Unknown, message: "transport error", details: [], metadata: MetadataMap { headers: {} }
// server-1 | 1: transport error
// server-1 | 2: connection error
// server-1 | 3: peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof
// server-1 |
// server-1 | Stack backtrace:
// server-1 | 0: std::backtrace::Backtrace::create
// server-1 | 1: nos_followers::google_publisher::GooglePublisherClient::publish_events::{{closure}}
// server-1 | 2: nos_followers::google_publisher::GooglePublisher::create::{{closure}}::{{closure}}k

error!("{}", e);
}
_ => {
Expand Down Expand Up @@ -156,7 +154,14 @@ impl GooglePublisher {
&self,
follow_change: FollowChange,
) -> Result<(), SendError<FollowChange>> {
self.sender.send(follow_change).await
// TODO: Temporary filter while developing this service
if !ALLOWED_PUBKEYS.contains(&follow_change.followee.to_hex().as_str())
&& !ALLOWED_PUBKEYS.contains(&follow_change.follower.to_hex().as_str())
{
return Ok(());
}

return self.sender.send(follow_change).await;
}
}

Expand Down

0 comments on commit 97fab2c

Please sign in to comment.