From 7142a6ecec3ba3fc00f5bafca0eb539b2aa48269 Mon Sep 17 00:00:00 2001 From: travolin Date: Mon, 13 Feb 2023 17:03:21 -0800 Subject: [PATCH] Update filesystem to process file extension changes (#325) Co-authored-by: Joel Bredeson --- crates/entities/src/models/crawl_queue.rs | 44 ++++++++++++ crates/entities/src/models/processed_files.rs | 34 +++++++++- crates/spyglass/src/filesystem/mod.rs | 67 +++++++++++++++++-- 3 files changed, 137 insertions(+), 8 deletions(-) diff --git a/crates/entities/src/models/crawl_queue.rs b/crates/entities/src/models/crawl_queue.rs index 89c865178..5ca434969 100644 --- a/crates/entities/src/models/crawl_queue.rs +++ b/crates/entities/src/models/crawl_queue.rs @@ -960,6 +960,50 @@ pub async fn find_by_lens( .await } +#[derive(Debug, FromQueryResult)] +pub struct CrawlTaskIdsUrls { + pub id: i64, + pub url: String, +} + +/// Helper method to access the urls for any extensions that have been removed +/// and delete those from the crawl queue. +pub async fn process_urls_for_removed_exts( + exts: Vec, + db: &DatabaseConnection, +) -> Result, DbErr> { + let statement = Statement::from_string( + DatabaseBackend::Sqlite, + format!( + r#" + with tags_list as ( + SELECT id FROM tags WHERE label = 'fileext' and value not in ({}) + ) + SELECT cq.id, cq.url + FROM crawl_queue as cq join crawl_tag as ct on cq.id = ct.crawl_queue_id + WHERE cq.url like 'file%' and ct.tag_id in tags_list order by cq.url + "#, + exts.iter() + .map(|str| format!("'{str}'")) + .collect::>() + .join(",") + ), + ); + let tasks = CrawlTaskIdsUrls::find_by_statement(statement) + .all(db) + .await?; + + if !tasks.is_empty() { + let delete_rslt = + delete_many_by_id(db, &tasks.iter().map(|task| task.id).collect::>()).await; + + if let Err(error) = delete_rslt { + log::error!("Error removing from crawl queue {:?}", error); + } + } + Ok(tasks) +} + #[cfg(test)] mod test { use sea_orm::prelude::*; diff --git a/crates/entities/src/models/processed_files.rs b/crates/entities/src/models/processed_files.rs index 3846d179a..6f68b2106 100644 --- a/crates/entities/src/models/processed_files.rs +++ b/crates/entities/src/models/processed_files.rs @@ -1,5 +1,5 @@ use sea_orm::entity::prelude::*; -use sea_orm::Set; +use sea_orm::{DatabaseBackend, FromQueryResult, Set, Statement}; use serde::Serialize; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Eq)] @@ -72,3 +72,35 @@ pub async fn remove_unmatched_paths( Err(error) => Err(anyhow::Error::from(error)), } } + +#[derive(Debug, FromQueryResult)] +struct FileUrls { + pub url: String, +} + +pub async fn get_files_to_recrawl( + ext: &str, + db: &DatabaseConnection, +) -> Result, DbErr> { + let ext_filter = format!("%.{ext}"); + let urls = FileUrls::find_by_statement(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + r#" + with possible as ( + select url + from crawl_queue + where url like $1 + ) + select file_path as url + from processed_files + where file_path like $1 and file_path not in possible;"#, + vec![ext_filter.into()], + )) + .all(db) + .await; + + match urls { + Ok(urls) => Ok(urls.iter().map(|file| file.url.clone()).collect()), + Err(err) => Err(err), + } +} diff --git a/crates/spyglass/src/filesystem/mod.rs b/crates/spyglass/src/filesystem/mod.rs index c58c79dd8..1f56bf1f7 100644 --- a/crates/spyglass/src/filesystem/mod.rs +++ b/crates/spyglass/src/filesystem/mod.rs @@ -289,7 +289,7 @@ impl SpyglassFileWatcher { if !removals.is_empty() { if let Err(error) = processed_files::Entity::delete_many() - .filter(processed_files::Column::Id.is_in(removals)) + .filter(processed_files::Column::FilePath.is_in(removals)) .exec(&self.db) .await { @@ -605,6 +605,8 @@ pub async fn configure_watcher(state: AppState) { .map(|path| utils::path_to_uri(path)) .collect::>(); + _handle_extension_reprocessing(&state, &extension).await; + let mut watcher = state.file_watcher.lock().await; if let Some(watcher) = watcher.as_mut() { for path in paths { @@ -638,12 +640,6 @@ pub async fn configure_watcher(state: AppState) { error ), } - - // TODO remove the content from extensions that are no longer being processed, this should be the - // purview of the document handling and not the file handling since we cannot make the assumption - // here of what happens to files that do not meet the expected extension. - - // At the moment triggering a recrawl will work the best } else { log::info!("❌ Local file watcher is disabled"); @@ -668,6 +664,63 @@ pub async fn configure_watcher(state: AppState) { } } +// Helper method used to process any updates required for changes in the configured +// extensions +async fn _handle_extension_reprocessing(state: &AppState, extension: &HashSet) { + match crawl_queue::process_urls_for_removed_exts(extension.iter().cloned().collect(), &state.db) + .await + { + Ok(urls) => { + let reprocessed_docs = urls + .iter() + .map(|url| _uri_to_debounce(&url.url)) + .collect::>(); + if let Err(err) = _process_file_and_dir(state, reprocessed_docs, extension).await { + log::error!( + "Error processing document updates for removed extensions {:?}", + err + ); + } + } + Err(error) => { + log::error!("Error running recrawl {:?}", error); + } + } + + let mut updates: Vec = Vec::new(); + for ext in extension { + match processed_files::get_files_to_recrawl(ext, &state.db).await { + Ok(recrawls) => { + if !recrawls.is_empty() { + updates.extend(recrawls.iter().map(|uri| DebouncedEvent { + kind: DebouncedEventKind::Any, + path: utils::uri_to_path(uri).unwrap_or_default(), + })); + } + } + Err(err) => { + log::error!("Error collecting recrawls {:?}", err); + } + } + } + + if !updates.is_empty() { + if let Err(err) = _process_file_and_dir(state, updates, extension).await { + log::error!( + "Error processing updates for newly added extensions {:?}", + err + ); + } + } +} + +fn _uri_to_debounce(uri: &str) -> DebouncedEvent { + DebouncedEvent { + kind: DebouncedEventKind::Any, + path: utils::uri_to_path(uri).unwrap_or_default(), + } +} + pub fn is_watcher_enabled() -> bool { let config = Config::load_user_settings().unwrap_or_default(); config.filesystem_settings.enable_filesystem_scanning