From b13bd756e04ebb7acca9d17e12505c5da90d82e7 Mon Sep 17 00:00:00 2001 From: link2xt Date: Tue, 10 Dec 2024 02:29:34 +0000 Subject: [PATCH 1/6] feat: cache HTTP GET requests --- src/net/http.rs | 198 ++++++++++++++++++++++++++++++++++++++++-- src/sql.rs | 7 ++ src/sql/migrations.rs | 15 ++++ 3 files changed, 215 insertions(+), 5 deletions(-) diff --git a/src/net/http.rs b/src/net/http.rs index 94e85f68c1..570e6ec9dd 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -6,14 +6,18 @@ use http_body_util::BodyExt; use hyper_util::rt::TokioIo; use mime::Mime; use serde::Serialize; +use tokio::fs; +use crate::blob::BlobObject; use crate::context::Context; +use crate::log::LogExt; use crate::net::proxy::ProxyConfig; use crate::net::session::SessionStream; use crate::net::tls::wrap_rustls; +use crate::tools::{create_id, time}; /// HTTP(S) GET response. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Response { /// Response body. pub blob: Vec, @@ -90,9 +94,120 @@ where Ok(sender) } +/// Converts the URL to expiration timestamp. +fn http_url_cache_expires(url: &str, mimetype: Option<&str>) -> i64 { + let now = time(); + if url.ends_with(".xdc") { + // WebXDCs expire in 5 weeks. + now + 3600 * 24 * 35 + } else if mimetype.is_some_and(|s| s.starts_with("image/")) { + // Cache images for 1 day. + now + 3600 * 24 + } else { + // Cache everything else for 1 hour. + now + 3600 + } +} + +/// Places the binary into HTTP cache. +async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> { + let blob = BlobObject::create( + context, + &format!("http_cache_{}", create_id()), + response.blob.as_slice(), + ) + .await?; + + context + .sql + .insert( + "INSERT OR IGNORE INTO http_cache (url, expires, blob, mimetype, encoding) + VALUES (?, ?, ?, ?, ?)", + ( + url, + http_url_cache_expires(url, response.mimetype.as_deref()), + blob.as_name(), + response.mimetype.as_deref().unwrap_or_default(), + response.encoding.as_deref().unwrap_or_default(), + ), + ) + .await?; + + Ok(()) +} + +/// Retrieves the binary from HTTP cache. +async fn http_cache_get(context: &Context, url: &str) -> Result> { + let Some((blob_name, mimetype, encoding)) = context + .sql + .query_row_optional( + "SELECT blob, mimetype, encoding + FROM http_cache WHERE url=? AND expires > ?", + (url, time()), + |row| { + let blob_name: String = row.get(0)?; + let mimetype: Option = Some(row.get(1)?).filter(|s: &String| !s.is_empty()); + let encoding: Option = Some(row.get(2)?).filter(|s: &String| !s.is_empty()); + Ok((blob_name, mimetype, encoding)) + }, + ) + .await? + else { + return Ok(None); + }; + + let blob_object = BlobObject::from_name(context, blob_name)?; + let blob_abs_path = blob_object.to_abs_path(); + let blob = fs::read(blob_abs_path).await?; + + let expires = http_url_cache_expires(url, mimetype.as_deref()); + let response = Response { + blob, + mimetype, + encoding, + }; + + // Update expiration timestamp. + context + .sql + .execute( + "UPDATE http_cache SET expires=? WHERE url=?", + (expires, url), + ) + .await?; + + Ok(Some(response)) +} + +/// Removes expired cache entries. +pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> { + // Remove cache entries that are already expired + // or entries that will not expire in a year + // to make sure we don't have invalid timestamps that are way forward in the future. + context + .sql + .execute( + "DELETE FROM http_cache + WHERE ?1 > expires OR expires > ?1 + 31536000", + (time(),), + ) + .await?; + Ok(()) +} + /// Retrieves the binary contents of URL using HTTP GET request. -pub async fn read_url_blob(context: &Context, url: &str) -> Result { - let mut url = url.to_string(); +pub async fn read_url_blob(context: &Context, original_url: &str) -> Result { + if let Some(response) = http_cache_get(context, original_url) + .await + .log_err(context) + .unwrap_or_default() + { + info!(context, "Returning {original_url:?} from cache."); + return Ok(response); + } + + info!(context, "Not found {original_url:?} in cache."); + let mut url = original_url.to_string(); // Follow up to 10 http-redirects for _i in 0..10 { @@ -139,11 +254,14 @@ pub async fn read_url_blob(context: &Context, url: &str) -> Result { }); let body = response.collect().await?.to_bytes(); let blob: Vec = body.to_vec(); - return Ok(Response { + let response = Response { blob, mimetype, encoding, - }); + }; + info!(context, "Inserting {original_url:?} into cache."); + http_cache_put(context, &url, &response).await?; + return Ok(response); } Err(anyhow!("Followed 10 redirections")) @@ -241,3 +359,73 @@ pub(crate) async fn post_form( let bytes = response.collect().await?.to_bytes(); Ok(bytes) } + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + use crate::test_utils::TestContext; + use crate::tools::SystemTime; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_http_cache() -> Result<()> { + let t = &TestContext::new().await; + + assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None); + + let html_response = Response { + blob: b" ...".to_vec(), + mimetype: Some("text/html".to_string()), + encoding: None, + }; + + let xdc_response = Response { + blob: b"PK...".to_vec(), + mimetype: Some("application/octet-stream".to_string()), + encoding: None, + }; + let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc"; + let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc"; + + http_cache_put(t, "https://webxdc.org/", &html_response).await?; + + assert_eq!(http_cache_get(t, xdc_editor_url).await?, None); + assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None); + assert_eq!( + http_cache_get(t, "https://webxdc.org/").await?, + Some(html_response.clone()) + ); + + http_cache_put(t, xdc_editor_url, &xdc_response).await?; + assert_eq!( + http_cache_get(t, xdc_editor_url).await?, + Some(xdc_response.clone()) + ); + + http_cache_cleanup(t).await?; + assert_eq!( + http_cache_get(t, "https://webxdc.org/").await?, + Some(html_response.clone()) + ); + + // HTML expires after 1 hour, but .xdc does not. + SystemTime::shift(Duration::from_secs(3600 + 100)); + assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None); + assert_eq!( + http_cache_get(t, xdc_editor_url).await?, + Some(xdc_response.clone()) + ); + + // 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour. + // But editor is still there because we did not request it for just 35 days. + SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100)); + assert_eq!( + http_cache_get(t, xdc_editor_url).await?, + Some(xdc_response.clone()) + ); + assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None); + + Ok(()) + } +} diff --git a/src/sql.rs b/src/sql.rs index c1c46e2fb8..28bcf90886 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -19,6 +19,7 @@ use crate::location::delete_orphaned_poi_locations; use crate::log::LogExt; use crate::message::{Message, MsgId}; use crate::net::dns::prune_dns_cache; +use crate::net::http::http_cache_cleanup; use crate::net::prune_connection_history; use crate::param::{Param, Params}; use crate::peerstate::Peerstate; @@ -788,6 +789,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> { .log_err(context) .ok(); + http_cache_cleanup(context) + .await + .context("Failed to cleanup HTTP cache") + .log_err(context) + .ok(); + info!(context, "Housekeeping done."); Ok(()) } diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 15d89c6f1a..8be0504ea1 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -1088,6 +1088,21 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); .await?; } + inc_and_check(&mut migration_version, 125)?; + if dbversion < migration_version { + sql.execute_migration( + "CREATE TABLE http_cache ( + url TEXT PRIMARY KEY, + expires INTEGER NOT NULL, -- When the cache entry is considered expired, timestamp in seconds. + blob TEXT NOT NULL, + mimetype TEXT NOT NULL DEFAULT '', -- MIME type extracted from Content-Type header. + encoding TEXT NOT NULL DEFAULT '' -- Encoding from Content-Type header. + ) STRICT", + migration_version, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? From 2b723c921c0082ed396829d8c17bd82f234f4ed9 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 11 Dec 2024 17:26:06 +0000 Subject: [PATCH 2/6] Rename `blob` column into `blobname` --- src/net/http.rs | 4 ++-- src/sql/migrations.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/net/http.rs b/src/net/http.rs index 570e6ec9dd..0f8576fb96 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -121,7 +121,7 @@ async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Re context .sql .insert( - "INSERT OR IGNORE INTO http_cache (url, expires, blob, mimetype, encoding) + "INSERT OR IGNORE INTO http_cache (url, expires, blobname, mimetype, encoding) VALUES (?, ?, ?, ?, ?)", ( url, @@ -141,7 +141,7 @@ async fn http_cache_get(context: &Context, url: &str) -> Result let Some((blob_name, mimetype, encoding)) = context .sql .query_row_optional( - "SELECT blob, mimetype, encoding + "SELECT blobname, mimetype, encoding FROM http_cache WHERE url=? AND expires > ?", (url, time()), |row| { diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 8be0504ea1..0827f924e5 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -1094,7 +1094,7 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); "CREATE TABLE http_cache ( url TEXT PRIMARY KEY, expires INTEGER NOT NULL, -- When the cache entry is considered expired, timestamp in seconds. - blob TEXT NOT NULL, + blobname TEXT NOT NULL, mimetype TEXT NOT NULL DEFAULT '', -- MIME type extracted from Content-Type header. encoding TEXT NOT NULL DEFAULT '' -- Encoding from Content-Type header. ) STRICT", From e24c5e1c79e01c3ae0f6afc95c876f399519c5dd Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 11 Dec 2024 17:54:01 +0000 Subject: [PATCH 3/6] Keep files used by http_cache during housekeeping --- src/sql.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/sql.rs b/src/sql.rs index 28bcf90886..6448272687 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -853,6 +853,22 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> { .await .context("housekeeping: failed to SELECT value FROM config")?; + context + .sql + .query_map( + "SELECT blobname FROM http_cache", + (), + |row| row.get::<_, String>(0), + |rows| { + for row in rows { + maybe_add_file(&mut files_in_use, &row?); + } + Ok(()) + }, + ) + .await + .context("Failed to SELECT blobname FROM http_cache")?; + info!(context, "{} files in use.", files_in_use.len()); /* go through directories and delete unused files */ let blobdir = context.get_blobdir(); From ad6f521716515623054411f84a87b0d34e836f24 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 11 Dec 2024 17:54:44 +0000 Subject: [PATCH 4/6] Cleanup http cache before removing unused blobs --- src/sql.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/sql.rs b/src/sql.rs index 6448272687..62a0b44b93 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -721,6 +721,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> { warn!(context, "Can't set config: {e:#}."); } + http_cache_cleanup(context) + .await + .context("Failed to cleanup HTTP cache") + .log_err(context) + .ok(); + if let Err(err) = remove_unused_files(context).await { warn!( context, @@ -789,12 +795,6 @@ pub async fn housekeeping(context: &Context) -> Result<()> { .log_err(context) .ok(); - http_cache_cleanup(context) - .await - .context("Failed to cleanup HTTP cache") - .log_err(context) - .ok(); - info!(context, "Housekeeping done."); Ok(()) } From b66075a538b7bd3b83fd68f809d6722a9bb6fbb5 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 11 Dec 2024 17:58:21 +0000 Subject: [PATCH 5/6] Run housekeeping in HTTP cache test --- src/net/http.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/net/http.rs b/src/net/http.rs index 0f8576fb96..8618213ad0 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -365,6 +365,7 @@ mod tests { use super::*; use std::time::Duration; + use crate::sql::housekeeping; use crate::test_utils::TestContext; use crate::tools::SystemTime; @@ -403,7 +404,6 @@ mod tests { Some(xdc_response.clone()) ); - http_cache_cleanup(t).await?; assert_eq!( http_cache_get(t, "https://webxdc.org/").await?, Some(html_response.clone()) @@ -420,6 +420,10 @@ mod tests { // 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour. // But editor is still there because we did not request it for just 35 days. SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100)); + + // Run housekeeping to test that it does not delete the blob too early. + housekeeping(t).await?; + assert_eq!( http_cache_get(t, xdc_editor_url).await?, Some(xdc_response.clone()) From b164cc23ac7d67d9f7763efdf454cc9b7cb57e5a Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 11 Dec 2024 18:25:36 +0000 Subject: [PATCH 6/6] Test that blob file of HTTP cached entry can be safely removed --- src/net/http.rs | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/net/http.rs b/src/net/http.rs index 8618213ad0..8006ba2675 100644 --- a/src/net/http.rs +++ b/src/net/http.rs @@ -10,7 +10,6 @@ use tokio::fs; use crate::blob::BlobObject; use crate::context::Context; -use crate::log::LogExt; use crate::net::proxy::ProxyConfig; use crate::net::session::SessionStream; use crate::net::tls::wrap_rustls; @@ -158,7 +157,18 @@ async fn http_cache_get(context: &Context, url: &str) -> Result let blob_object = BlobObject::from_name(context, blob_name)?; let blob_abs_path = blob_object.to_abs_path(); - let blob = fs::read(blob_abs_path).await?; + let blob = match fs::read(blob_abs_path) + .await + .with_context(|| format!("Failed to read blob for {url:?} cache entry.")) + { + Ok(blob) => blob, + Err(err) => { + // This should not happen, but user may go into the blobdir and remove files, + // antivirus may delete the file or there may be a bug in housekeeping. + warn!(context, "{err:?}."); + return Ok(None); + } + }; let expires = http_url_cache_expires(url, mimetype.as_deref()); let response = Response { @@ -197,11 +207,7 @@ pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> { /// Retrieves the binary contents of URL using HTTP GET request. pub async fn read_url_blob(context: &Context, original_url: &str) -> Result { - if let Some(response) = http_cache_get(context, original_url) - .await - .log_err(context) - .unwrap_or_default() - { + if let Some(response) = http_cache_get(context, original_url).await? { info!(context, "Returning {original_url:?} from cache."); return Ok(response); } @@ -430,6 +436,21 @@ mod tests { ); assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None); + // Test that if the file is accidentally removed from the blobdir, + // there is no error when trying to load the cache entry. + for entry in std::fs::read_dir(t.get_blobdir())? { + let entry = entry.unwrap(); + let path = entry.path(); + std::fs::remove_file(path).expect("Failed to remove blob"); + } + + assert_eq!( + http_cache_get(t, xdc_editor_url) + .await + .context("Failed to get no cache response")?, + None + ); + Ok(()) } }