Skip to content

Commit

Permalink
Add vanish_requests stream subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Oct 16, 2024
1 parent 470bddf commit e81b8db
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 5 deletions.
69 changes: 69 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ neo4rs = "0.8.0"
nonzero_ext = "0.3.0"
nostr-sdk = "0.33.0"
ordermap = "0.5.3"
redis = { version = "0.27.4", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] }
rustls = { version = "0.23.12", features = ["ring"] }
serde = { version = "1.0.209", features = ["derive"] }
serde_json = "1.0.128"
Expand Down
4 changes: 0 additions & 4 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ services:
context: .
target: final
environment:
- APP__followers__relay=ws://relay:7777
- APP__followers__neo4j_uri=db:7687
- APP__followers__neo4j_user=neo4j
- APP__followers__neo4j_password=mydevpassword
- APP__ENVIRONMENT=development
- GOOGLE_APPLICATION_CREDENTIALS=/app/gcloud/application_default_credentials.json
- RUST_LOG=nos_followers=debug
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Settings {
pub pagerank_cron_expression: String,
pub http_cache_seconds: u32,
pub burst: NonZeroU16,
pub redis_url: String,
}

impl Configurable for Settings {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub mod relay_subscriber;
pub mod repo;
pub mod scheduler;
pub mod tcp_importer;
pub mod vanish_subscriber;
pub mod worker_pool;
13 changes: 13 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use nos_followers::{
repo::{Repo, RepoTrait},
scheduler::start_scheduler,
tcp_importer::start_tcp_importer,
vanish_subscriber::{start_vanish_subscriber, RedisClient},
worker_pool::WorkerPool,
};
use nostr_sdk::prelude::*;
Expand Down Expand Up @@ -153,6 +154,18 @@ async fn start_server(settings: Settings) -> Result<()> {
.await
.context("Failed starting the scheduler")?;

// TODO: Now that we have redis we would use it to restore pending
// notifications between restarts and integrate it with cached crate
let redis_client = RedisClient::new(&settings.redis_url);

info!("Starting vanish subscriber");
start_vanish_subscriber(
task_tracker.clone(),
redis_client,
repo.clone(),
cancellation_token.clone(),
);

tokio::spawn(async move {
if let Err(e) = cancel_on_stop_signals(cancellation_token).await {
error!("Failed to listen stop signals: {}", e);
Expand Down
49 changes: 48 additions & 1 deletion src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl Repo {
}

// Default trait raises not implemented just to ease testing
pub trait RepoTrait: Sync + Send {
pub trait RepoTrait: Sync + Send + 'static {
/// Set the last contact list date seen for a user if it's newer than the stored value
fn update_last_contact_list_at(
&self,
Expand Down Expand Up @@ -107,6 +107,13 @@ pub trait RepoTrait: Sync + Send {
{
async { panic!("Not implemented") }
}

fn remove_pubkey(
&self,
_public_key: &PublicKey,
) -> impl std::future::Future<Output = Result<(), RepoError>> + std::marker::Send {
async { panic!("Not implemented") }
}
}

impl RepoTrait for Repo {
Expand Down Expand Up @@ -571,6 +578,43 @@ impl RepoTrait for Repo {
Err(e) => Err(RepoError::General(e)),
}
}

async fn remove_pubkey(&self, public_key: &PublicKey) -> Result<(), RepoError> {
let statement = r#"
MATCH (user:User {pubkey: $pubkey_val})
// Decrement follower_count of followees
OPTIONAL MATCH (user)-[:FOLLOWS]->(followee:User)
FOREACH (f IN CASE WHEN followee IS NOT NULL THEN [followee] ELSE [] END |
SET f.follower_count = CASE
WHEN f.follower_count > 0 THEN f.follower_count - 1
ELSE 0
END
)
// Decrement followee_count of followers
WITH user
OPTIONAL MATCH (follower:User)-[:FOLLOWS]->(user)
FOREACH (f IN CASE WHEN follower IS NOT NULL THEN [follower] ELSE [] END |
SET f.followee_count = CASE
WHEN f.followee_count > 0 THEN f.followee_count - 1
ELSE 0
END
)
WITH user
DETACH DELETE user
"#;

let query = query(statement).param("pubkey_val", public_key.to_hex());

self.graph
.run(query)
.await
.map_err(RepoError::RemovePubkey)?;

Ok(())
}
}

/// A function to read as DateTime<Utc> a value stored either as LocalDatetime or DateTime<Utc>
Expand Down Expand Up @@ -640,6 +684,9 @@ pub enum RepoError {

#[error("Failed to get pagerank: {0}")]
GetPageRank(neo4rs::Error),

#[error("Failed to remove pubkey: {0}")]
RemovePubkey(neo4rs::Error),
}

impl RepoError {
Expand Down
Loading

0 comments on commit e81b8db

Please sign in to comment.