diff --git a/Cargo.lock b/Cargo.lock index 2cdaabe..31df7e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -872,6 +872,19 @@ dependencies = [ "syn 2.0.76", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -1040,6 +1053,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "forwarded-header-value" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835f84f38484cc86f110a805655697908257fb9a7af005234060891557198e9" +dependencies = [ + "nonempty", + "thiserror", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -1117,6 +1140,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -1210,6 +1239,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.4.6" @@ -1844,6 +1893,12 @@ dependencies = [ "syn 2.0.76", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -1854,6 +1909,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonempty" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" + [[package]] name = "nonzero_ext" version = "0.3.0" @@ -1874,6 +1935,7 @@ dependencies = [ "env_logger", "futures", "gcloud-sdk", + "hyper", "log", "metrics", "metrics-exporter-prometheus", @@ -1892,7 +1954,9 @@ dependencies = [ "tokio-cron-scheduler", "tokio-retry", "tokio-util", + "tower 0.5.1", "tower-http", + "tower_governor", "tracing", "tracing-subscriber", ] @@ -3092,6 +3156,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "strsim" version = "0.11.1" @@ -3491,6 +3564,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower_governor" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "313fa625fea5790ed56360a30ea980e41229cf482b4835801a67ef1922bf63b9" +dependencies = [ + "axum", + "forwarded-header-value", + "governor", + "http", + "pin-project 1.1.5", + "thiserror", + "tower 0.4.13", + "tracing", +] + [[package]] name = "tracing" version = "0.1.40" diff --git a/Cargo.toml b/Cargo.toml index 2217a03..4072747 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ config_rs = { version = "0.14", package = "config", features = ["yaml"] } env_logger = "0.11.5" futures = "0.3.30" gcloud-sdk = { version = "0.25.6", features = ["google-pubsub-v1"] } +hyper = "1.4.1" log = "0.4.22" metrics = "0.23.0" metrics-exporter-prometheus = "0.15.3" @@ -35,7 +36,9 @@ tokio = { version = "1.40.0", features = ["full", "test-util"] } tokio-cron-scheduler = "0.11.0" tokio-retry = "0.3.0" tokio-util = { version = "0.7.11", features = ["rt"] } -tower-http = { version = "0.5.2", features = ["timeout", "trace"] } +tower = "0.5.1" +tower-http = { version = "0.5.2", features = ["cors", "timeout", "trace"] } +tower_governor = "0.4.2" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "tracing-log"] } diff --git a/config/settings.yml b/config/settings.yml index cd15df7..25dc172 100644 --- a/config/settings.yml +++ b/config/settings.yml @@ -12,9 +12,7 @@ followers: worker_timeout_secs: 15 google_project_id: "pub-verse-app" google_topic: "follow-changes" - # how often to flush the buffer to generate messages - flush_period_seconds: 60 - # 30 minutes, so no more than 2 messages per hour - min_seconds_between_messages: 1800 - # Daily at midnight - pagerank_cron_expression: "0 0 0 * * *" \ No newline at end of file + flush_period_seconds: 60 # how often to flush the buffer to generate messages + min_seconds_between_messages: 1800 # 30 minutes, so no more than 2 messages per hour + pagerank_cron_expression: "0 0 0 * * *" # Daily at midnight + http_cache_seconds: 86400 # 24 hours \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index fd1d5ce..15212d3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -23,6 +23,7 @@ pub struct Settings { pub tcp_importer_port: u16, pub http_port: u16, pub pagerank_cron_expression: String, + pub http_cache_seconds: u32, } impl Configurable for Settings { diff --git a/src/http_server.rs b/src/http_server.rs index 2aebc14..ef06b49 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,5 +1,10 @@ +mod handlers; mod router; -use crate::repo::{Recommendation, Repo, RepoTrait}; + +use crate::{ + config::Settings, + repo::{Recommendation, Repo, RepoTrait}, +}; use anyhow::{Context, Result}; use axum::Router; use moka::future::Cache; @@ -47,14 +52,14 @@ pub struct HttpServer; impl HttpServer { pub fn start( task_tracker: TaskTracker, - http_port: u16, + settings: &Settings, repo: Arc, cancellation_token: CancellationToken, ) -> Result<()> { let state = Arc::new(AppState::new(repo)); - let router = create_router(state)?; + let router = create_router(state, settings)?; - start_http_server(task_tracker, http_port, router, cancellation_token); + start_http_server(task_tracker, settings.http_port, router, cancellation_token); Ok(()) } @@ -76,10 +81,13 @@ fn start_http_server( let token_clone = cancellation_token.clone(); let server_future = tokio::spawn(async { - axum::serve(listener, router) - .with_graceful_shutdown(shutdown_hook(token_clone)) - .await - .context("Failed to start HTTP server") + axum::serve( + listener, + router.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(shutdown_hook(token_clone)) + .await + .context("Failed to start HTTP server") }); await_shutdown(cancellation_token, server_future).await; diff --git a/src/http_server/handlers.rs b/src/http_server/handlers.rs new file mode 100644 index 0000000..ba82a98 --- /dev/null +++ b/src/http_server/handlers.rs @@ -0,0 +1,96 @@ +use super::router::ApiError; +use super::AppState; +use crate::repo::{Recommendation, RepoTrait}; +use axum::{extract::State, http::HeaderMap, response::Html, response::IntoResponse, Json}; +use nostr_sdk::PublicKey; +use std::sync::Arc; +use tracing::info; + +pub async fn cached_get_recommendations( + State(state): State>>, + axum::extract::Path(pubkey): axum::extract::Path, +) -> Result>, ApiError> +where + T: RepoTrait, +{ + let public_key = PublicKey::from_hex(&pubkey).map_err(ApiError::InvalidPublicKey)?; + if let Some(cached_recommendation_result) = state.recommendation_cache.get(&pubkey).await { + return Ok(Json(cached_recommendation_result)); + } + + let recommendations = get_recommendations(&state.repo, public_key).await?; + + state + .recommendation_cache + .insert(pubkey, recommendations.clone()) + .await; + + Ok(Json(recommendations)) +} + +async fn get_recommendations( + repo: &Arc, + public_key: PublicKey, +) -> Result, ApiError> +where + T: RepoTrait, +{ + repo.get_recommendations(&public_key) + .await + .map_err(ApiError::from) +} + +pub async fn cached_maybe_spammer( + State(state): State>>, // Extract shared state with generic RepoTrait + axum::extract::Path(pubkey): axum::extract::Path, // Extract pubkey from the path +) -> Result, ApiError> +where + T: RepoTrait, +{ + println!("cached_maybe_spammer"); + let public_key = PublicKey::from_hex(&pubkey).map_err(ApiError::InvalidPublicKey)?; + println!("cached_maybe_spammer2"); + + if let Some(cached_spammer_result) = state.spammer_cache.get(&pubkey).await { + return Ok(Json(cached_spammer_result)); + } + + let is_spammer = maybe_spammer(&state.repo, public_key).await?; + + state.spammer_cache.insert(pubkey, is_spammer).await; + + Ok(Json(is_spammer)) +} + +async fn maybe_spammer(repo: &Arc, public_key: PublicKey) -> Result +where + T: RepoTrait, +{ + info!("Checking if {} is a spammer", public_key.to_hex()); + let pagerank = repo + .get_pagerank(&public_key) + .await + .map_err(|_| ApiError::NotFound)?; + + info!("Pagerank for {}: {}", public_key.to_hex(), pagerank); + + Ok(pagerank < 0.2) + // TODO don't return if it's too low, instead use other manual + // checks, nos user, nip05, check network, more hints, and only then + // give up +} + +pub async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse { + let body = r#" + + + Nos Followers Server + + +

Healthy

+ + + "#; + + Html(body) +} diff --git a/src/http_server/router.rs b/src/http_server/router.rs index 11f9240..d697fed 100644 --- a/src/http_server/router.rs +++ b/src/http_server/router.rs @@ -1,134 +1,91 @@ use super::AppState; -use crate::metrics::setup_metrics; -use crate::repo::{Recommendation, RepoError, RepoTrait}; -use anyhow::Result; -use axum::{ - extract::State, http::HeaderMap, http::StatusCode, response::Html, response::IntoResponse, - routing::get, Json, Router, +use crate::config::Settings; +use crate::http_server::handlers::{ + cached_get_recommendations, cached_maybe_spammer, serve_root_page, }; -use nostr_sdk::PublicKey; -use std::{sync::Arc, time::Duration}; +use crate::metrics::setup_metrics; +use crate::repo::{RepoError, RepoTrait}; +use axum::{body::Body, http::StatusCode, response::IntoResponse, routing::get, Router}; +use hyper::http::{header, HeaderValue, Method, Request}; +use std::{sync::Arc, time::Duration}; // Use `hyper::http` instead of `http` use thiserror::Error; +use tower_governor::{ + governor::GovernorConfigBuilder, key_extractor::SmartIpKeyExtractor, GovernorLayer, +}; use tower_http::{ + cors::{Any, CorsLayer}, timeout::TimeoutLayer, - trace::{DefaultMakeSpan, DefaultOnFailure, DefaultOnResponse, TraceLayer}, - LatencyUnit, + trace::{DefaultMakeSpan, TraceLayer}, }; -use tracing::{error, info, Level}; +use tracing::{error, Level}; -pub fn create_router(state: Arc>) -> Result +pub fn create_router(state: Arc>, settings: &Settings) -> Result where - T: RepoTrait + 'static, // 'static is needed because the router needs to be static + T: RepoTrait + 'static, { - let tracing_layer = TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) - .on_response( - DefaultOnResponse::new() - .level(Level::INFO) - .latency_unit(LatencyUnit::Millis), - ) - .on_failure(DefaultOnFailure::new().level(Level::ERROR)); - - let metrics_handle = setup_metrics()?; + let tracing_layer = + TraceLayer::new_for_http().make_span_with(DefaultMakeSpan::new().level(Level::INFO)); + + let metrics_handle = setup_metrics().map_err(|e| ApiError::InternalServerError(e.into()))?; + let rate_limit_config = GovernorConfigBuilder::default() + .key_extractor(SmartIpKeyExtractor) + .per_second(25) + .burst_size(35) + .use_headers() + .finish() + .unwrap(); + + let cors = CorsLayer::new() + .allow_methods([ + Method::GET, + Method::PUT, + Method::POST, + Method::PATCH, + Method::DELETE, + Method::OPTIONS, + ]) + .allow_headers(Any) + .allow_origin(Any) + .max_age(Duration::from_secs(86400)); + + let cache_duration = settings.http_cache_seconds; Ok(Router::new() .route("/", get(serve_root_page)) .route("/metrics", get(|| async move { metrics_handle.render() })) - .route( - "/recommendations/:pubkey", - get(cached_get_recommendations::), + .nest( + "/api/v1", + Router::new() + .route( + "/recommendations/:pubkey", + get(cached_get_recommendations::), + ) + .route("/maybe_spammer/:pubkey", get(cached_maybe_spammer::)) + .layer(TimeoutLayer::new(Duration::from_secs(5))) + .layer(GovernorLayer { + config: Arc::new(rate_limit_config), + }) + .layer(cors) + .route_layer(axum::middleware::from_fn(move |req, next| { + add_cache_header(req, next, cache_duration) + })) + .with_state(state), ) - .route("/maybe_spammer/:pubkey", get(cached_maybe_spammer::)) - .layer(tracing_layer) - .layer(TimeoutLayer::new(Duration::from_secs(5))) - .with_state(state)) -} - -async fn cached_get_recommendations( - State(state): State>>, - axum::extract::Path(pubkey): axum::extract::Path, -) -> Result>, ApiError> -where - T: RepoTrait, -{ - let public_key = PublicKey::from_hex(&pubkey).map_err(ApiError::InvalidPublicKey)?; - if let Some(cached_recommendation_result) = state.recommendation_cache.get(&pubkey).await { - return Ok(Json(cached_recommendation_result)); - } - - let recommendations = get_recommendations(&state.repo, public_key).await?; - - state - .recommendation_cache - .insert(pubkey, recommendations.clone()) - .await; - - Ok(Json(recommendations)) -} - -async fn get_recommendations( - repo: &Arc, - public_key: PublicKey, -) -> Result, ApiError> -where - T: RepoTrait, -{ - repo.get_recommendations(&public_key) - .await - .map_err(ApiError::from) + .layer(tracing_layer)) } -async fn cached_maybe_spammer( - State(state): State>>, // Extract shared state with generic RepoTrait - axum::extract::Path(pubkey): axum::extract::Path, // Extract pubkey from the path -) -> Result, ApiError> -where - T: RepoTrait, -{ - let public_key = PublicKey::from_hex(&pubkey).map_err(ApiError::InvalidPublicKey)?; - - if let Some(cached_spammer_result) = state.spammer_cache.get(&pubkey).await { - return Ok(Json(cached_spammer_result)); - } - - let is_spammer = maybe_spammer(&state.repo, public_key).await?; - - state.spammer_cache.insert(pubkey, is_spammer).await; - - Ok(Json(is_spammer)) -} - -async fn maybe_spammer(repo: &Arc, public_key: PublicKey) -> Result -where - T: RepoTrait, -{ - info!("Checking if {} is a spammer", public_key.to_hex()); - let pagerank = repo - .get_pagerank(&public_key) - .await - .map_err(|_| ApiError::NotFound)?; - - info!("Pagerank for {}: {}", public_key.to_hex(), pagerank); - - Ok(pagerank < 0.2) - // TODO don't return if it's too low, instead use other manual - // checks, nos user, nip05, check network, more hints, and only then - // give up -} - -async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse { - let body = r#" - - - Nos Followers Server - - -

Healthy

- - - "#; - - Html(body) +async fn add_cache_header( + req: Request, + next: axum::middleware::Next, + cache_duration: u32, +) -> impl IntoResponse { + let mut response = next.run(req).await; + + response.headers_mut().insert( + header::CACHE_CONTROL, + HeaderValue::from_str(&format!("public, max-age={}", cache_duration)).unwrap(), + ); + response } #[derive(Error, Debug)] @@ -141,6 +98,8 @@ pub enum ApiError { RepoError(#[from] RepoError), #[error(transparent)] AxumError(#[from] axum::Error), + #[error("Internal server errorre")] + InternalServerError(#[from] Box), } impl IntoResponse for ApiError { @@ -150,6 +109,9 @@ impl IntoResponse for ApiError { ApiError::NotFound => (StatusCode::NOT_FOUND, self.to_string()), ApiError::RepoError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), ApiError::AxumError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Axum error".to_string()), + ApiError::InternalServerError(_) => { + (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()) + } }; error!("Api error: {}", self); diff --git a/src/main.rs b/src/main.rs index bfda396..9f07d4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -178,9 +178,10 @@ async fn start(settings: Settings) -> Result<()> { .await .context("Failed starting the scheduler")?; + info!("Starting HTTP server at port {}", settings.http_port); HttpServer::start( task_tracker.clone(), - settings.http_port, + &settings, repo.clone(), cancellation_token.clone(), )?; diff --git a/src/metrics.rs b/src/metrics.rs index 84e8857..d1507b3 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,5 +1,5 @@ use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram}; -use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; +use metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle}; pub fn pubsub_messages() -> Counter { metrics::counter!("pubsub_messages") @@ -69,7 +69,7 @@ pub fn pagerank_seconds() -> Gauge { metrics::gauge!("pagerank_seconds") } -pub fn setup_metrics() -> Result { +pub fn setup_metrics() -> Result { describe_counter!( "pubsub_messages", "Number of messages published to Google Pub/Sub"