Skip to content

Commit

Permalink
Update filesystem to process file extension changes (#325)
Browse files Browse the repository at this point in the history
Co-authored-by: Joel Bredeson <[email protected]>
  • Loading branch information
travolin and Joel Bredeson authored Feb 14, 2023
1 parent 3f15b87 commit 7142a6e
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 8 deletions.
44 changes: 44 additions & 0 deletions crates/entities/src/models/crawl_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,50 @@ pub async fn find_by_lens(
.await
}

#[derive(Debug, FromQueryResult)]
pub struct CrawlTaskIdsUrls {
pub id: i64,
pub url: String,
}

/// Helper method to access the urls for any extensions that have been removed
/// and delete those from the crawl queue.
pub async fn process_urls_for_removed_exts(
exts: Vec<String>,
db: &DatabaseConnection,
) -> Result<Vec<CrawlTaskIdsUrls>, DbErr> {
let statement = Statement::from_string(
DatabaseBackend::Sqlite,
format!(
r#"
with tags_list as (
SELECT id FROM tags WHERE label = 'fileext' and value not in ({})
)
SELECT cq.id, cq.url
FROM crawl_queue as cq join crawl_tag as ct on cq.id = ct.crawl_queue_id
WHERE cq.url like 'file%' and ct.tag_id in tags_list order by cq.url
"#,
exts.iter()
.map(|str| format!("'{str}'"))
.collect::<Vec<String>>()
.join(",")
),
);
let tasks = CrawlTaskIdsUrls::find_by_statement(statement)
.all(db)
.await?;

if !tasks.is_empty() {
let delete_rslt =
delete_many_by_id(db, &tasks.iter().map(|task| task.id).collect::<Vec<i64>>()).await;

if let Err(error) = delete_rslt {
log::error!("Error removing from crawl queue {:?}", error);
}
}
Ok(tasks)
}

#[cfg(test)]
mod test {
use sea_orm::prelude::*;
Expand Down
34 changes: 33 additions & 1 deletion crates/entities/src/models/processed_files.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use sea_orm::entity::prelude::*;
use sea_orm::Set;
use sea_orm::{DatabaseBackend, FromQueryResult, Set, Statement};
use serde::Serialize;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Eq)]
Expand Down Expand Up @@ -72,3 +72,35 @@ pub async fn remove_unmatched_paths(
Err(error) => Err(anyhow::Error::from(error)),
}
}

#[derive(Debug, FromQueryResult)]
struct FileUrls {
pub url: String,
}

pub async fn get_files_to_recrawl(
ext: &str,
db: &DatabaseConnection,
) -> Result<Vec<String>, DbErr> {
let ext_filter = format!("%.{ext}");
let urls = FileUrls::find_by_statement(Statement::from_sql_and_values(
DatabaseBackend::Sqlite,
r#"
with possible as (
select url
from crawl_queue
where url like $1
)
select file_path as url
from processed_files
where file_path like $1 and file_path not in possible;"#,
vec![ext_filter.into()],
))
.all(db)
.await;

match urls {
Ok(urls) => Ok(urls.iter().map(|file| file.url.clone()).collect()),
Err(err) => Err(err),
}
}
67 changes: 60 additions & 7 deletions crates/spyglass/src/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl SpyglassFileWatcher {

if !removals.is_empty() {
if let Err(error) = processed_files::Entity::delete_many()
.filter(processed_files::Column::Id.is_in(removals))
.filter(processed_files::Column::FilePath.is_in(removals))
.exec(&self.db)
.await
{
Expand Down Expand Up @@ -605,6 +605,8 @@ pub async fn configure_watcher(state: AppState) {
.map(|path| utils::path_to_uri(path))
.collect::<Vec<String>>();

_handle_extension_reprocessing(&state, &extension).await;

let mut watcher = state.file_watcher.lock().await;
if let Some(watcher) = watcher.as_mut() {
for path in paths {
Expand Down Expand Up @@ -638,12 +640,6 @@ pub async fn configure_watcher(state: AppState) {
error
),
}

// TODO remove the content from extensions that are no longer being processed, this should be the
// purview of the document handling and not the file handling since we cannot make the assumption
// here of what happens to files that do not meet the expected extension.

// At the moment triggering a recrawl will work the best
} else {
log::info!("❌ Local file watcher is disabled");

Expand All @@ -668,6 +664,63 @@ pub async fn configure_watcher(state: AppState) {
}
}

// Helper method used to process any updates required for changes in the configured
// extensions
async fn _handle_extension_reprocessing(state: &AppState, extension: &HashSet<String>) {
match crawl_queue::process_urls_for_removed_exts(extension.iter().cloned().collect(), &state.db)
.await
{
Ok(urls) => {
let reprocessed_docs = urls
.iter()
.map(|url| _uri_to_debounce(&url.url))
.collect::<Vec<DebouncedEvent>>();
if let Err(err) = _process_file_and_dir(state, reprocessed_docs, extension).await {
log::error!(
"Error processing document updates for removed extensions {:?}",
err
);
}
}
Err(error) => {
log::error!("Error running recrawl {:?}", error);
}
}

let mut updates: Vec<DebouncedEvent> = Vec::new();
for ext in extension {
match processed_files::get_files_to_recrawl(ext, &state.db).await {
Ok(recrawls) => {
if !recrawls.is_empty() {
updates.extend(recrawls.iter().map(|uri| DebouncedEvent {
kind: DebouncedEventKind::Any,
path: utils::uri_to_path(uri).unwrap_or_default(),
}));
}
}
Err(err) => {
log::error!("Error collecting recrawls {:?}", err);
}
}
}

if !updates.is_empty() {
if let Err(err) = _process_file_and_dir(state, updates, extension).await {
log::error!(
"Error processing updates for newly added extensions {:?}",
err
);
}
}
}

fn _uri_to_debounce(uri: &str) -> DebouncedEvent {
DebouncedEvent {
kind: DebouncedEventKind::Any,
path: utils::uri_to_path(uri).unwrap_or_default(),
}
}

pub fn is_watcher_enabled() -> bool {
let config = Config::load_user_settings().unwrap_or_default();
config.filesystem_settings.enable_filesystem_scanning
Expand Down

0 comments on commit 7142a6e

Please sign in to comment.