Skip to content

Commit

Permalink
Settings as struct, tests for google publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 26, 2024
1 parent f2560ee commit 916a983
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 95 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ anyhow = "1.0.86"
axum = "0.7.5"
cached = { version = "0.53.1", features = ["async"] }
chrono = { version = "0.4.38", features = ["serde"] }
config = "0.14.0"
config_rs = { version = "0.14", package = "config", features = ["yaml"] }
env_logger = "0.11.5"
futures = "0.3.30"
gcloud-sdk = { version = "0.25.5", features = ["google-pubsub-v1"] }
Expand Down
20 changes: 14 additions & 6 deletions config/settings.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
relay: wss://relay.nos.social
event_channel_size: 1000
event_workers: 10
follow_change_channel_size: 100000
follow_change_workers: 50
worker_timeout_secs: 10
followers:
relay: wss://relay.nos.social
neo4j_uri: "bolt://db:7687"
neo4j_user: "neo4j"
neo4j_password: "password"
event_channel_size: 500
event_workers: 5
follow_change_channel_size: 50000
follow_change_workers: 30
worker_timeout_secs: 10
google_project_id: "pub-verse-app"
google_topic: "follow-changes"
seconds_threshold: 5
size_threshold: 500
68 changes: 42 additions & 26 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
use anyhow::{Context, Result};
use config::{Config as ConfigTree, Environment, File};
use config_rs::{Config as ConfigTree, ConfigError, Environment, File};
use serde::de::DeserializeOwned;
use std::{any::type_name, env};
use serde::Deserialize;
use std::env;
use tracing::info;

/*
* Constants
*/
#[derive(Debug, Deserialize)]
pub struct Settings {
pub relay: String,
pub neo4j_uri: String,
pub neo4j_user: String,
pub neo4j_password: String,
pub event_channel_size: usize,
pub event_workers: usize,
pub follow_change_channel_size: usize,
pub follow_change_workers: usize,
pub worker_timeout_secs: u64,
pub google_project_id: String,
pub google_topic: String,
pub seconds_threshold: u64,
pub size_threshold: usize,
}

impl Configurable for Settings {
fn key() -> &'static str {
// Root key
"followers"
}
}

pub const ENVIRONMENT_PREFIX: &str = "APP";
pub const CONFIG_SEPARATOR: &str = "__";
Expand All @@ -16,10 +37,6 @@ pub fn environment() -> String {
.unwrap_or_else(|_| "development".into())
}

/*
* Configuration
*/

pub trait Configurable {
fn key() -> &'static str;
}
Expand All @@ -30,12 +47,19 @@ pub struct Config {
}

impl Config {
pub fn new(config_dir: &str) -> Result<Self> {
pub fn new(config_dir: &str) -> Result<Self, ConfigError> {
let environment = environment();

let default_config_path = format!("{}/settings", &config_dir);
let env_config_path = format!("{}/settings.{}", &config_dir, &environment);
let local_config_path = format!("{}/settings.local", &config_dir);
let default_config_path = format!("{}/settings.yml", &config_dir);
let env_config_path = format!("{}/settings.{}.yml", &config_dir, &environment);
let local_config_path = format!("{}/settings.local.yml", &config_dir);

info!("Loading configuration from: {}", default_config_path);
info!(
"Loading environment-specific configuration from: {}",
env_config_path
);
info!("Loading local overrides from: {}", local_config_path);

ConfigTree::builder()
.add_source(File::with_name(&default_config_path))
Expand All @@ -44,28 +68,20 @@ impl Config {
.add_source(Environment::with_prefix(ENVIRONMENT_PREFIX).separator(CONFIG_SEPARATOR))
.build()
.map(|c| Config { config: c })
.map_err(Into::into)
}

pub fn get<T>(&self) -> Result<T>
pub fn get<T>(&self) -> Result<T, ConfigError>
where
T: Configurable,
T: DeserializeOwned,
{
self.config.get::<T>(T::key()).context(format!(
"Error loading configuration for `{}` at `{}`",
type_name::<T>(),
T::key(),
))
self.config.get::<T>(T::key())
}

pub fn get_by_key<T>(&self, key: &str) -> Result<T>
pub fn get_by_key<T>(&self, key: &str) -> Result<T, ConfigError>
where
T: DeserializeOwned,
{
self.config.get::<T>(key).context(format!(
"Error loading configuration for `{}` at `{key}`",
type_name::<T>(),
))
self.config.get::<T>(key)
}
}
19 changes: 11 additions & 8 deletions src/follow_change_handler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config::Settings;
use crate::domain::follow_change::FollowChange;
use crate::google_publisher::GooglePublisher;
use crate::google_pubsub_client::GooglePubSubClient;
Expand All @@ -10,7 +11,6 @@ use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::debug;

/// Fetches friendly ids and then sends follow change to google pubsub
pub struct FollowChangeHandler {
repo: Arc<Repo>,
Expand All @@ -24,20 +24,23 @@ impl FollowChangeHandler {
repo: Arc<Repo>,
nostr_client: Client,
cancellation_token: CancellationToken,
timeout_secs: u64,
google_project_id: &str,
google_topic: &str,
settings: &Settings,
) -> Result<Self> {
let google_publisher_client =
GooglePubSubClient::new(google_project_id, google_topic).await?;
let google_publisher =
GooglePublisher::create(cancellation_token.clone(), google_publisher_client).await?;
GooglePubSubClient::new(&settings.google_project_id, &settings.google_topic).await?;
let google_publisher = GooglePublisher::create(
cancellation_token.clone(),
google_publisher_client,
settings.seconds_threshold,
settings.size_threshold,
)
.await?;

Ok(Self {
repo,
nostr_client,
google_publisher,
timeout_secs,
timeout_secs: settings.worker_timeout_secs,
})
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/follows_differ.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ where

if let Some(log_line) = log_line(
follower,
event.created_at,
followed_counter,
unfollowed_counter,
unchanged,
Expand All @@ -196,7 +195,6 @@ where

fn log_line(
follower: PublicKey,
event_created_at: Timestamp,
followed_counter: usize,
unfollowed_counter: usize,
unchanged: usize,
Expand All @@ -209,14 +207,15 @@ fn log_line(
return None;
}

let human_event_created_at = event.created_at.to_human_datetime();
let timestamp_diff = if let Some(latest_stored_updated_at) = maybe_latest_stored_updated_at {
format!(
"[{}->{}]",
latest_stored_updated_at.to_human_datetime(),
event_created_at.to_human_datetime()
human_event_created_at
)
} else {
format!("[new->{}]", event_created_at.to_human_datetime())
format!("[new->{}]", human_event_created_at)
};

if first_seen && followed_counter > 0 {
Expand Down
Loading

0 comments on commit 916a983

Please sign in to comment.