From b19250362e0af0a8f125eddaa20890708f4a7267 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Fri, 16 Aug 2024 12:13:00 +0200 Subject: [PATCH] Gemini LLM based coarse image redaction --- Cargo.lock | 1 + Cargo.toml | 1 + src/commands/copy_command.rs | 5 +- src/errors.rs | 2 + src/redacters/gemini_llm.rs | 193 ++++++++++++++- src/redacters/mod.rs | 317 +------------------------ src/redacters/simple_image_redacter.rs | 44 ++++ src/redacters/stream_redacter.rs | 315 ++++++++++++++++++++++++ 8 files changed, 560 insertions(+), 318 deletions(-) create mode 100644 src/redacters/simple_image_redacter.rs create mode 100644 src/redacters/stream_redacter.rs diff --git a/Cargo.lock b/Cargo.lock index caf750b..b353356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3098,6 +3098,7 @@ dependencies = [ "rsb_derive", "rvstruct", "serde", + "serde_json", "sha2", "sync_wrapper 1.0.1", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index 8b493fc..3633d84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ pdfium-render = { version = "0.8", features = ["thread_safe", "image"], optional image = "0.25" bytes = { version = "1" } arboard = { version = "3", features = ["image"], optional = true } +serde_json = "1" [dev-dependencies] diff --git a/src/commands/copy_command.rs b/src/commands/copy_command.rs index 7b9b9a5..01186b3 100644 --- a/src/commands/copy_command.rs +++ b/src/commands/copy_command.rs @@ -3,7 +3,8 @@ use crate::file_converters::FileConverters; use crate::file_systems::{DetectFileSystem, FileSystemConnection, FileSystemRef}; use crate::file_tools::{FileMatcher, FileMatcherResult, FileMimeOverride}; use crate::redacters::{ - RedactSupportedOptions, Redacter, RedacterBaseOptions, RedacterOptions, Redacters, + redact_stream, RedactSupportedOptions, Redacter, RedacterBaseOptions, RedacterOptions, + Redacters, }; use crate::reporter::AppReporter; use crate::AppResult; @@ -327,7 +328,7 @@ async fn redact_upload_file< } } if !support_redacters.is_empty() { - match crate::redacters::redact_stream( + match redact_stream( &support_redacters, redacter_base_options, source_reader, diff --git a/src/errors.rs b/src/errors.rs index 58ac5d6..ce14db1 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -44,6 +44,8 @@ pub enum AppError { ClipboardError(#[from] arboard::Error), #[error("SystemTimeError: {0}")] SystemTimeError(#[from] SystemTimeError), + #[error("JSON serialization error: {0}")] + JsonSerializeError(#[from] serde_json::Error), #[error("System error: {message}")] SystemError { message: String }, } diff --git a/src/redacters/gemini_llm.rs b/src/redacters/gemini_llm.rs index a2233ac..ca3f2c7 100644 --- a/src/redacters/gemini_llm.rs +++ b/src/redacters/gemini_llm.rs @@ -3,7 +3,8 @@ use crate::common_types::GcpProjectId; use crate::errors::AppError; use crate::file_systems::FileSystemRef; use crate::redacters::{ - RedactSupportedOptions, Redacter, RedacterDataItem, RedacterDataItemContent, Redacters, + redact_image_at_coords, PiiImageCoords, RedactSupportedOptions, Redacter, RedacterDataItem, + RedacterDataItemContent, Redacters, }; use crate::reporter::AppReporter; use crate::AppResult; @@ -161,17 +162,191 @@ impl<'a> GeminiLlmRedacter<'a> { }), } } + + pub async fn redact_image_file(&self, input: RedacterDataItem) -> AppResult { + let model_name = self + .gemini_llm_options + .gemini_model + .as_ref() + .map(|model_name| model_name.value().to_string()) + .unwrap_or_else(|| Self::DEFAULT_GEMINI_MODEL.to_string()); + + match input.content { + RedacterDataItemContent::Image { mime_type, data } => { + let image_format = + image::ImageFormat::from_mime_type(&mime_type).ok_or_else(|| { + AppError::SystemError { + message: format!("Unsupported image mime type: {}", mime_type), + } + })?; + let image = image::load_from_memory_with_format(&data, image_format)?; + let resized_image = image.resize( + image.width().min(1024), + image.height().min(1024), + image::imageops::FilterType::Gaussian, + ); + let mut resized_image_bytes = std::io::Cursor::new(Vec::new()); + resized_image.write_to(&mut resized_image_bytes, image_format)?; + let resized_image_data = resized_image_bytes.into_inner(); + + let mut request = tonic::Request::new( + gcloud_sdk::google::ai::generativelanguage::v1beta::GenerateContentRequest { + model: model_name, + safety_settings: vec![ + gcloud_sdk::google::ai::generativelanguage::v1beta::HarmCategory::HateSpeech, + gcloud_sdk::google::ai::generativelanguage::v1beta::HarmCategory::SexuallyExplicit, + gcloud_sdk::google::ai::generativelanguage::v1beta::HarmCategory::DangerousContent, + gcloud_sdk::google::ai::generativelanguage::v1beta::HarmCategory::Harassment, + ].into_iter().map(|category| gcloud_sdk::google::ai::generativelanguage::v1beta::SafetySetting { + category: category.into(), + threshold: gcloud_sdk::google::ai::generativelanguage::v1beta::safety_setting::HarmBlockThreshold::BlockNone.into(), + }).collect(), + contents: vec![ + gcloud_sdk::google::ai::generativelanguage::v1beta::Content { + parts: vec![ + gcloud_sdk::google::ai::generativelanguage::v1beta::Part { + data: Some( + gcloud_sdk::google::ai::generativelanguage::v1beta::part::Data::Text( + format!("Find anything in the attached image that look like personal information. \ + Return their coordinates with x1,y1,x2,y2 as pixel coordinates and the corresponding text. \ + The image width is: {}. The image height is: {}.", resized_image.width(), resized_image.height()), + ), + ), + }, + gcloud_sdk::google::ai::generativelanguage::v1beta::Part { + data: Some( + gcloud_sdk::google::ai::generativelanguage::v1beta::part::Data::InlineData( + gcloud_sdk::google::ai::generativelanguage::v1beta::Blob { + mime_type: mime_type.to_string(), + data: resized_image_data.clone(), + } + ), + ), + } + ], + role: "user".to_string(), + }, + ], + generation_config: Some( + gcloud_sdk::google::ai::generativelanguage::v1beta::GenerationConfig { + candidate_count: Some(1), + temperature: Some(0.2), + response_mime_type: mime::APPLICATION_JSON.to_string(), + response_schema: Some( + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::Array.into(), + items: Some(Box::new( + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::Object.into(), + properties: vec![ + ( + "x1".to_string(), + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::Number.into(), + ..std::default::Default::default() + }, + ), + ( + "y1".to_string(), + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::Number.into(), + ..std::default::Default::default() + }, + ), + ( + "x2".to_string(), + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::Number.into(), + ..std::default::Default::default() + }, + ), + ( + "y2".to_string(), + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::Number.into(), + ..std::default::Default::default() + }, + ), + ( + "text".to_string(), + gcloud_sdk::google::ai::generativelanguage::v1beta::Schema { + r#type: gcloud_sdk::google::ai::generativelanguage::v1beta::Type::String.into(), + ..std::default::Default::default() + }, + ), + ].into_iter().collect(), + required: vec!["x1".to_string(), "y1".to_string(), "x2".to_string(), "y2".to_string()], + ..std::default::Default::default() + } + )), + ..std::default::Default::default() + } + ), + ..std::default::Default::default() + }, + ), + ..std::default::Default::default() + }, + ); + request.metadata_mut().insert( + "x-goog-user-project", + gcloud_sdk::tonic::metadata::MetadataValue::::try_from( + self.gemini_llm_options.project_id.as_ref(), + )?, + ); + let response = self.client.get().generate_content(request).await?; + + let inner = response.into_inner(); + if let Some(content) = inner.candidates.first().and_then(|c| c.content.as_ref()) { + let content_json = + content + .parts + .iter() + .fold("".to_string(), |acc, entity| match &entity.data { + Some( + gcloud_sdk::google::ai::generativelanguage::v1beta::part::Data::Text( + text, + ), + ) => acc + text, + _ => acc, + }); + let pii_image_coords: Vec = + serde_json::from_str(&content_json)?; + Ok(RedacterDataItem { + file_ref: input.file_ref, + content: RedacterDataItemContent::Image { + mime_type: mime_type.clone(), + data: redact_image_at_coords( + mime_type.clone(), + resized_image_data.into(), + pii_image_coords, + 0.25, + )?, + }, + }) + } else { + Err(AppError::SystemError { + message: "No content item in the response".to_string(), + }) + } + } + _ => Err(AppError::SystemError { + message: "Unsupported item for text redacting".to_string(), + }), + } + } } impl<'a> Redacter for GeminiLlmRedacter<'a> { async fn redact(&self, input: RedacterDataItem) -> AppResult { match &input.content { RedacterDataItemContent::Value(_) => self.redact_text_file(input).await, - RedacterDataItemContent::Table { .. } - | RedacterDataItemContent::Image { .. } - | RedacterDataItemContent::Pdf { .. } => Err(AppError::SystemError { - message: "Attempt to redact of unsupported type".to_string(), - }), + RedacterDataItemContent::Image { .. } => self.redact_image_file(input).await, + RedacterDataItemContent::Table { .. } | RedacterDataItemContent::Pdf { .. } => { + Err(AppError::SystemError { + message: "Attempt to redact of unsupported type".to_string(), + }) + } } } @@ -183,9 +358,15 @@ impl<'a> Redacter for GeminiLlmRedacter<'a> { Some(media_type) if Redacters::is_mime_text(media_type) => { RedactSupportedOptions::Supported } + Some(media_type) if Redacters::is_mime_image(media_type) => { + RedactSupportedOptions::Supported + } Some(media_type) if Redacters::is_mime_table(media_type) => { RedactSupportedOptions::SupportedAsText } + Some(media_type) if Redacters::is_mime_pdf(media_type) => { + RedactSupportedOptions::SupportedAsImages + } _ => RedactSupportedOptions::Unsupported, }) } diff --git a/src/redacters/mod.rs b/src/redacters/mod.rs index 9eb8ff3..334adec 100644 --- a/src/redacters/mod.rs +++ b/src/redacters/mod.rs @@ -1,11 +1,7 @@ -use crate::errors::AppError; use crate::file_systems::FileSystemRef; use crate::reporter::AppReporter; use crate::AppResult; -use futures::{Stream, TryStreamExt}; use gcloud_sdk::prost::bytes; -use image::ImageFormat; -use indicatif::ProgressBar; use mime::Mime; use std::fmt::Display; @@ -22,11 +18,15 @@ mod gemini_llm; pub use gemini_llm::*; mod open_ai_llm; -use crate::args::RedacterType; -use crate::file_converters::pdf::{PdfInfo, PdfPageInfo, PdfToImage}; -use crate::file_converters::FileConverters; pub use open_ai_llm::*; +mod simple_image_redacter; +pub use simple_image_redacter::*; +mod stream_redacter; +pub use stream_redacter::*; + +use crate::args::RedacterType; + #[derive(Debug, Clone)] pub struct RedacterDataItem { pub content: RedacterDataItemContent, @@ -207,306 +207,3 @@ impl<'a> Redacter for Redacters<'a> { } } } - -pub struct RedactStreamResult { - pub number_of_redactions: usize, - pub stream: Box> + Send + Sync + Unpin + 'static>, -} - -pub async fn redact_stream< - S: Stream> + Send + Unpin + Sync + 'static, ->( - redacters: &Vec<&impl Redacter>, - redacter_base_options: &RedacterBaseOptions, - input: S, - file_ref: &FileSystemRef, - file_converters: &FileConverters, - bar: &ProgressBar, -) -> AppResult { - let mut redacters_supported_options = Vec::with_capacity(redacters.len()); - for redacter in redacters { - let supported_options = redacter.redact_supported_options(file_ref).await?; - redacters_supported_options.push((*redacter, supported_options)); - } - - let mut redacted = stream_to_redact_item( - redacter_base_options, - input, - file_ref, - &redacters_supported_options, - ) - .await?; - let mut number_of_redactions = 0; - - for (index, (redacter, options)) in redacters_supported_options.iter().enumerate() { - let width = " ".repeat(index); - match options { - RedactSupportedOptions::Supported => { - bar.println(format!( - "{width}↳ Redacting using {} redacter", - redacter.redacter_type() - )); - redacted = redacter.redact(redacted).await?; - number_of_redactions += 1; - } - RedactSupportedOptions::SupportedAsImages => { - match file_converters.pdf_image_converter { - Some(ref converter) => { - redacted = redact_pdf_with_images_converter( - file_ref, - bar, - redacted, - *redacter, - &width, - converter.as_ref(), - ) - .await?; - number_of_redactions += 1; - } - None => { - bar.println(format!( - "{width}↲ Skipping redaction because PDF to image converter is not available", - )); - } - } - } - RedactSupportedOptions::SupportedAsText => { - if matches!(redacted.content, RedacterDataItemContent::Value(_)) { - bar.println(format!( - "{width}↳ Redacting as text using {} redacter", - redacter.redacter_type() - )); - redacted = redacter.redact(redacted).await?; - number_of_redactions += 1; - } - } - RedactSupportedOptions::Unsupported => {} - } - } - - let output_stream = match redacted.content { - RedacterDataItemContent::Value(content) => { - let bytes = bytes::Bytes::from(content.into_bytes()); - Box::new(futures::stream::iter(vec![Ok(bytes)])) - } - RedacterDataItemContent::Image { data, .. } => { - Box::new(futures::stream::iter(vec![Ok(data)])) - } - RedacterDataItemContent::Pdf { data } => Box::new(futures::stream::iter(vec![Ok(data)])), - RedacterDataItemContent::Table { headers, rows } => { - let mut writer = csv_async::AsyncWriter::from_writer(vec![]); - writer.write_record(headers).await?; - for row in rows { - writer.write_record(row).await?; - } - writer.flush().await?; - let bytes = bytes::Bytes::from(writer.into_inner().await?); - Box::new(futures::stream::iter(vec![Ok(bytes)])) - } - }; - - Ok(RedactStreamResult { - number_of_redactions, - stream: output_stream, - }) -} - -async fn stream_to_redact_item< - S: Stream> + Send + Unpin + Sync + 'static, ->( - redacter_base_options: &RedacterBaseOptions, - input: S, - file_ref: &FileSystemRef, - redacters_supported_options: &[(&impl Redacter, RedactSupportedOptions)], -) -> AppResult { - match file_ref.media_type { - Some(ref mime) - if Redacters::is_mime_text(mime) - || (Redacters::is_mime_table(mime) - && redacters_supported_options - .iter() - .any(|(_, o)| matches!(o, RedactSupportedOptions::SupportedAsText)) - && !redacters_supported_options - .iter() - .all(|(_, o)| matches!(o, RedactSupportedOptions::Supported))) => - { - stream_to_text_redact_item(redacter_base_options, input, file_ref).await - } - Some(ref mime) if Redacters::is_mime_image(mime) => { - stream_to_image_redact_item(input, file_ref, mime.clone()).await - } - Some(ref mime) if Redacters::is_mime_table(mime) => { - stream_to_table_redact_item(redacter_base_options, input, file_ref).await - } - Some(ref mime) if Redacters::is_mime_pdf(mime) => { - stream_to_pdf_redact_item(input, file_ref).await - } - Some(ref mime) => Err(AppError::SystemError { - message: format!("Media type {} is not supported for redaction", mime), - }), - None => Err(AppError::SystemError { - message: "Media type is not provided to redact".to_string(), - }), - } -} - -async fn stream_to_text_redact_item< - S: Stream> + Send + Unpin + Sync + 'static, ->( - redacter_base_options: &RedacterBaseOptions, - input: S, - file_ref: &FileSystemRef, -) -> AppResult { - let all_chunks: Vec = input.try_collect().await?; - let all_bytes = all_chunks.concat(); - let whole_content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError { - message: format!("Failed to convert bytes to string: {}", e), - })?; - let content = if let Some(sampling_size) = redacter_base_options.sampling_size { - let sampling_size = std::cmp::min(sampling_size, whole_content.len()); - whole_content - .chars() - .take(sampling_size) - .collect::() - } else { - whole_content - }; - Ok(RedacterDataItem { - content: RedacterDataItemContent::Value(content), - file_ref: file_ref.clone(), - }) -} - -async fn stream_to_table_redact_item< - S: Stream> + Send + Unpin + Sync + 'static, ->( - redacter_base_options: &RedacterBaseOptions, - input: S, - file_ref: &FileSystemRef, -) -> AppResult { - let reader = tokio_util::io::StreamReader::new( - input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), - ); - let mut reader = csv_async::AsyncReaderBuilder::default() - .has_headers(!redacter_base_options.csv_headers_disable) - .delimiter( - redacter_base_options - .csv_delimiter - .as_ref() - .cloned() - .unwrap_or(b','), - ) - .create_reader(reader); - let headers = if !redacter_base_options.csv_headers_disable { - reader - .headers() - .await? - .into_iter() - .map(|h| h.to_string()) - .collect() - } else { - vec![] - }; - let records: Vec = reader.records().try_collect().await?; - Ok(RedacterDataItem { - content: RedacterDataItemContent::Table { - headers, - rows: records - .iter() - .map(|r| r.iter().map(|c| c.to_string()).collect()) - .collect(), - }, - file_ref: file_ref.clone(), - }) -} - -async fn stream_to_image_redact_item< - S: Stream> + Send + Unpin + Sync + 'static, ->( - input: S, - file_ref: &FileSystemRef, - mime: Mime, -) -> AppResult { - let all_chunks: Vec = input.try_collect().await?; - let all_bytes = all_chunks.concat(); - Ok(RedacterDataItem { - content: RedacterDataItemContent::Image { - mime_type: mime.clone(), - data: all_bytes.into(), - }, - file_ref: file_ref.clone(), - }) -} - -async fn stream_to_pdf_redact_item< - S: Stream> + Send + Unpin + Sync + 'static, ->( - input: S, - file_ref: &FileSystemRef, -) -> AppResult { - let all_chunks: Vec = input.try_collect().await?; - let all_bytes = all_chunks.concat(); - Ok(RedacterDataItem { - content: RedacterDataItemContent::Pdf { - data: all_bytes.into(), - }, - file_ref: file_ref.clone(), - }) -} - -async fn redact_pdf_with_images_converter( - file_ref: &FileSystemRef, - bar: &ProgressBar, - redacted: RedacterDataItem, - redacter: &impl Redacter, - width: &String, - converter: &dyn PdfToImage, -) -> Result { - match redacted.content { - RedacterDataItemContent::Pdf { data } => { - bar.println(format!( - "{width}↳ Redacting using {} redacter and converting the PDF to images", - redacter.redacter_type() - )); - let pdf_info = converter.convert_to_images(data)?; - bar.println(format!( - "{width} ↳ Converting {pdf_info_pages} images", - pdf_info_pages = pdf_info.pages.len() - )); - let mut redacted_pages = Vec::with_capacity(pdf_info.pages.len()); - for page in pdf_info.pages { - let mut png_image_bytes = std::io::Cursor::new(Vec::new()); - page.page_as_images - .write_to(&mut png_image_bytes, ImageFormat::Png)?; - let image_to_redact = RedacterDataItem { - content: RedacterDataItemContent::Image { - mime_type: mime::IMAGE_PNG, - data: png_image_bytes.into_inner().into(), - }, - file_ref: file_ref.clone(), - }; - let redacted_image = redacter.redact(image_to_redact).await?; - if let RedacterDataItemContent::Image { data, .. } = redacted_image.content { - redacted_pages.push(PdfPageInfo { - page_as_images: image::load_from_memory_with_format( - &data, - ImageFormat::Png, - )?, - ..page - }); - } - } - let redacted_pdf_info = PdfInfo { - pages: redacted_pages, - }; - let redact_pdf_as_images = converter.images_to_pdf(redacted_pdf_info)?; - Ok(RedacterDataItem { - content: RedacterDataItemContent::Pdf { - data: redact_pdf_as_images, - }, - file_ref: file_ref.clone(), - }) - } - _ => Ok(redacted), - } -} diff --git a/src/redacters/simple_image_redacter.rs b/src/redacters/simple_image_redacter.rs new file mode 100644 index 0000000..c752bb5 --- /dev/null +++ b/src/redacters/simple_image_redacter.rs @@ -0,0 +1,44 @@ +use crate::errors::AppError; +use crate::AppResult; +use bytes::Bytes; +use image::ImageFormat; +use mime::Mime; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct PiiImageCoords { + pub x1: f32, + pub y1: f32, + pub x2: f32, + pub y2: f32, + pub text: Option, +} + +pub fn redact_image_at_coords( + mime: Mime, + data: Bytes, + pii_coords: Vec, + approximation_factor: f32, +) -> AppResult { + let image_format = ImageFormat::from_mime_type(&mime).ok_or_else(|| AppError::SystemError { + message: format!("Unsupported image mime type: {}", mime), + })?; + let image = image::load_from_memory_with_format(&data, image_format)?; + let mut image = image.to_rgba8(); + for PiiImageCoords { x1, y1, x2, y2, .. } in pii_coords { + for x in + ((x1 - x1 * approximation_factor) as u32)..((x2 + x2 * approximation_factor) as u32) + { + for y in + ((y1 - y1 * approximation_factor) as u32)..((y2 + y2 * approximation_factor) as u32) + { + let safe_x = x.min(image.width() - 1).max(0); + let safe_y = y.min(image.height() - 1).max(0); + image.put_pixel(safe_x, safe_y, image::Rgba([0, 0, 0, 255])); + } + } + } + let mut output = std::io::Cursor::new(Vec::new()); + image.write_to(&mut output, image_format)?; + Ok(output.into_inner().into()) +} diff --git a/src/redacters/stream_redacter.rs b/src/redacters/stream_redacter.rs new file mode 100644 index 0000000..af5dcd7 --- /dev/null +++ b/src/redacters/stream_redacter.rs @@ -0,0 +1,315 @@ +use crate::errors::AppError; +use crate::file_converters::pdf::{PdfInfo, PdfPageInfo, PdfToImage}; +use crate::file_converters::FileConverters; +use crate::file_systems::FileSystemRef; +use crate::redacters::{ + RedactSupportedOptions, Redacter, RedacterBaseOptions, RedacterDataItem, + RedacterDataItemContent, Redacters, +}; +use crate::AppResult; +use futures::{Stream, TryStreamExt}; +use image::ImageFormat; +use indicatif::ProgressBar; + +pub struct RedactStreamResult { + pub number_of_redactions: usize, + pub stream: Box> + Send + Sync + Unpin + 'static>, +} + +pub async fn redact_stream< + S: Stream> + Send + Unpin + Sync + 'static, +>( + redacters: &Vec<&impl Redacter>, + redacter_base_options: &RedacterBaseOptions, + input: S, + file_ref: &FileSystemRef, + file_converters: &FileConverters, + bar: &ProgressBar, +) -> AppResult { + let mut redacters_supported_options = Vec::with_capacity(redacters.len()); + for redacter in redacters { + let supported_options = redacter.redact_supported_options(file_ref).await?; + redacters_supported_options.push((*redacter, supported_options)); + } + + let mut redacted = stream_to_redact_item( + redacter_base_options, + input, + file_ref, + &redacters_supported_options, + ) + .await?; + let mut number_of_redactions = 0; + + for (index, (redacter, options)) in redacters_supported_options.iter().enumerate() { + let width = " ".repeat(index); + match options { + RedactSupportedOptions::Supported => { + bar.println(format!( + "{width}↳ Redacting using {} redacter", + redacter.redacter_type() + )); + redacted = redacter.redact(redacted).await?; + number_of_redactions += 1; + } + RedactSupportedOptions::SupportedAsImages => { + match file_converters.pdf_image_converter { + Some(ref converter) => { + redacted = redact_pdf_with_images_converter( + file_ref, + bar, + redacted, + *redacter, + &width, + converter.as_ref(), + ) + .await?; + number_of_redactions += 1; + } + None => { + bar.println(format!( + "{width}↲ Skipping redaction because PDF to image converter is not available", + )); + } + } + } + RedactSupportedOptions::SupportedAsText => { + if matches!(redacted.content, RedacterDataItemContent::Value(_)) { + bar.println(format!( + "{width}↳ Redacting as text using {} redacter", + redacter.redacter_type() + )); + redacted = redacter.redact(redacted).await?; + number_of_redactions += 1; + } + } + RedactSupportedOptions::Unsupported => {} + } + } + + let output_stream = match redacted.content { + RedacterDataItemContent::Value(content) => { + let bytes = bytes::Bytes::from(content.into_bytes()); + Box::new(futures::stream::iter(vec![Ok(bytes)])) + } + RedacterDataItemContent::Image { data, .. } => { + Box::new(futures::stream::iter(vec![Ok(data)])) + } + RedacterDataItemContent::Pdf { data } => Box::new(futures::stream::iter(vec![Ok(data)])), + RedacterDataItemContent::Table { headers, rows } => { + let mut writer = csv_async::AsyncWriter::from_writer(vec![]); + writer.write_record(headers).await?; + for row in rows { + writer.write_record(row).await?; + } + writer.flush().await?; + let bytes = bytes::Bytes::from(writer.into_inner().await?); + Box::new(futures::stream::iter(vec![Ok(bytes)])) + } + }; + + Ok(RedactStreamResult { + number_of_redactions, + stream: output_stream, + }) +} + +async fn stream_to_redact_item< + S: Stream> + Send + Unpin + Sync + 'static, +>( + redacter_base_options: &RedacterBaseOptions, + input: S, + file_ref: &FileSystemRef, + redacters_supported_options: &[(&impl Redacter, RedactSupportedOptions)], +) -> AppResult { + match file_ref.media_type { + Some(ref mime) + if Redacters::is_mime_text(mime) + || (Redacters::is_mime_table(mime) + && redacters_supported_options + .iter() + .any(|(_, o)| matches!(o, RedactSupportedOptions::SupportedAsText)) + && !redacters_supported_options + .iter() + .all(|(_, o)| matches!(o, RedactSupportedOptions::Supported))) => + { + stream_to_text_redact_item(redacter_base_options, input, file_ref).await + } + Some(ref mime) if Redacters::is_mime_image(mime) => { + stream_to_image_redact_item(input, file_ref, mime.clone()).await + } + Some(ref mime) if Redacters::is_mime_table(mime) => { + stream_to_table_redact_item(redacter_base_options, input, file_ref).await + } + Some(ref mime) if Redacters::is_mime_pdf(mime) => { + stream_to_pdf_redact_item(input, file_ref).await + } + Some(ref mime) => Err(AppError::SystemError { + message: format!("Media type {} is not supported for redaction", mime), + }), + None => Err(AppError::SystemError { + message: "Media type is not provided to redact".to_string(), + }), + } +} + +async fn stream_to_text_redact_item< + S: Stream> + Send + Unpin + Sync + 'static, +>( + redacter_base_options: &RedacterBaseOptions, + input: S, + file_ref: &FileSystemRef, +) -> AppResult { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + let whole_content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError { + message: format!("Failed to convert bytes to string: {}", e), + })?; + let content = if let Some(sampling_size) = redacter_base_options.sampling_size { + let sampling_size = std::cmp::min(sampling_size, whole_content.len()); + whole_content + .chars() + .take(sampling_size) + .collect::() + } else { + whole_content + }; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Value(content), + file_ref: file_ref.clone(), + }) +} + +async fn stream_to_table_redact_item< + S: Stream> + Send + Unpin + Sync + 'static, +>( + redacter_base_options: &RedacterBaseOptions, + input: S, + file_ref: &FileSystemRef, +) -> AppResult { + let reader = tokio_util::io::StreamReader::new( + input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), + ); + let mut reader = csv_async::AsyncReaderBuilder::default() + .has_headers(!redacter_base_options.csv_headers_disable) + .delimiter( + redacter_base_options + .csv_delimiter + .as_ref() + .cloned() + .unwrap_or(b','), + ) + .create_reader(reader); + let headers = if !redacter_base_options.csv_headers_disable { + reader + .headers() + .await? + .into_iter() + .map(|h| h.to_string()) + .collect() + } else { + vec![] + }; + let records: Vec = reader.records().try_collect().await?; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Table { + headers, + rows: records + .iter() + .map(|r| r.iter().map(|c| c.to_string()).collect()) + .collect(), + }, + file_ref: file_ref.clone(), + }) +} + +async fn stream_to_image_redact_item< + S: Stream> + Send + Unpin + Sync + 'static, +>( + input: S, + file_ref: &FileSystemRef, + mime: mime::Mime, +) -> AppResult { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + Ok(RedacterDataItem { + content: RedacterDataItemContent::Image { + mime_type: mime.clone(), + data: all_bytes.into(), + }, + file_ref: file_ref.clone(), + }) +} + +async fn stream_to_pdf_redact_item< + S: Stream> + Send + Unpin + Sync + 'static, +>( + input: S, + file_ref: &FileSystemRef, +) -> AppResult { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + Ok(RedacterDataItem { + content: RedacterDataItemContent::Pdf { + data: all_bytes.into(), + }, + file_ref: file_ref.clone(), + }) +} + +async fn redact_pdf_with_images_converter( + file_ref: &FileSystemRef, + bar: &ProgressBar, + redacted: RedacterDataItem, + redacter: &impl Redacter, + width: &String, + converter: &dyn PdfToImage, +) -> Result { + match redacted.content { + RedacterDataItemContent::Pdf { data } => { + bar.println(format!( + "{width}↳ Redacting using {} redacter and converting the PDF to images", + redacter.redacter_type() + )); + let pdf_info = converter.convert_to_images(data)?; + bar.println(format!( + "{width} ↳ Converting {pdf_info_pages} images", + pdf_info_pages = pdf_info.pages.len() + )); + let mut redacted_pages = Vec::with_capacity(pdf_info.pages.len()); + for page in pdf_info.pages { + let mut png_image_bytes = std::io::Cursor::new(Vec::new()); + page.page_as_images + .write_to(&mut png_image_bytes, ImageFormat::Png)?; + let image_to_redact = RedacterDataItem { + content: RedacterDataItemContent::Image { + mime_type: mime::IMAGE_PNG, + data: png_image_bytes.into_inner().into(), + }, + file_ref: file_ref.clone(), + }; + let redacted_image = redacter.redact(image_to_redact).await?; + if let RedacterDataItemContent::Image { data, .. } = redacted_image.content { + redacted_pages.push(PdfPageInfo { + page_as_images: image::load_from_memory_with_format( + &data, + ImageFormat::Png, + )?, + ..page + }); + } + } + let redacted_pdf_info = PdfInfo { + pages: redacted_pages, + }; + let redact_pdf_as_images = converter.images_to_pdf(redacted_pdf_info)?; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Pdf { + data: redact_pdf_as_images, + }, + file_ref: file_ref.clone(), + }) + } + _ => Ok(redacted), + } +}