From 94dcbdf4154e0326a7ed6b1a74f75e39e9374c30 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Thu, 8 Aug 2024 10:27:07 +0200 Subject: [PATCH 1/2] Redact stream refactoring --- src/commands/copy_command.rs | 114 ++--------------------------------- src/redacters/mod.rs | 110 ++++++++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 113 deletions(-) diff --git a/src/commands/copy_command.rs b/src/commands/copy_command.rs index 5450b27..8503abe 100644 --- a/src/commands/copy_command.rs +++ b/src/commands/copy_command.rs @@ -3,14 +3,11 @@ use crate::filesystems::{ AbsoluteFilePath, DetectFileSystem, FileMatcher, FileMatcherResult, FileSystemConnection, FileSystemRef, }; -use crate::redacters::{ - RedactSupportedOptions, Redacter, RedacterDataItem, RedacterDataItemContent, RedacterOptions, - Redacters, -}; +use crate::redacters::{RedactSupportedOptions, Redacter, RedacterOptions, Redacters}; use crate::reporter::AppReporter; use crate::AppResult; use console::{Style, Term}; -use futures::{Stream, TryStreamExt}; +use futures::Stream; use gcloud_sdk::prost::bytes; use indicatif::*; use std::error::Error; @@ -235,10 +232,10 @@ async fn redact_upload_file< base_resolved_file_ref: &AbsoluteFilePath, dest_file_ref: &FileSystemRef, redacter: &impl Redacter, -) -> AppResult { +) -> AppResult { let redacter_supported_options = redacter.redact_supported_options(dest_file_ref).await?; if redacter_supported_options != RedactSupportedOptions::Unsupported { - match redact_stream( + match crate::redacters::redact_stream( redacter, &redacter_supported_options, source_reader, @@ -300,106 +297,3 @@ async fn redact_upload_file< Ok(TransferFileResult::Skipped) } } - -async fn redact_stream< - S: Stream> + Send + Unpin + Sync + 'static, ->( - redacter: &impl Redacter, - supported_options: &RedactSupportedOptions, - input: S, - file_ref: &FileSystemRef, -) -> AppResult> + Send + Sync + Unpin + 'static>> { - let content_to_redact = match file_ref.media_type { - Some(ref mime) - if Redacters::is_mime_text(mime) - || (Redacters::is_mime_table(mime) - && matches!(supported_options, RedactSupportedOptions::SupportedAsText)) => - { - let all_chunks: Vec = input.try_collect().await?; - let all_bytes = all_chunks.concat(); - let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError { - message: format!("Failed to convert bytes to string: {}", e), - })?; - Ok(RedacterDataItem { - content: RedacterDataItemContent::Value(content), - file_ref: file_ref.clone(), - }) - } - Some(ref mime) if Redacters::is_mime_image(mime) => { - let all_chunks: Vec = input.try_collect().await?; - let all_bytes = all_chunks.concat(); - Ok(RedacterDataItem { - content: RedacterDataItemContent::Image { - mime_type: mime.clone(), - data: all_bytes.into(), - }, - file_ref: file_ref.clone(), - }) - } - Some(ref mime) if Redacters::is_mime_table(mime) => { - let reader = tokio_util::io::StreamReader::new( - input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), - ); - let mut reader = csv_async::AsyncReaderBuilder::default() - .has_headers(!redacter.options().csv_headers_disable) - .delimiter( - redacter - .options() - .csv_delimiter - .as_ref() - .cloned() - .unwrap_or(b','), - ) - .create_reader(reader); - let headers = if !redacter.options().csv_headers_disable { - reader - .headers() - .await? - .into_iter() - .map(|h| h.to_string()) - .collect() - } else { - vec![] - }; - let records: Vec = reader.records().try_collect().await?; - Ok(RedacterDataItem { - content: RedacterDataItemContent::Table { - headers, - rows: records - .iter() - .map(|r| r.iter().map(|c| c.to_string()).collect()) - .collect(), - }, - file_ref: file_ref.clone(), - }) - } - Some(ref mime) => Err(AppError::SystemError { - message: format!("Media type {} is not supported for redaction", mime), - }), - None => Err(AppError::SystemError { - message: "Media type is not provided to redact".to_string(), - }), - }?; - - let content = redacter.redact(content_to_redact).await?; - - match content { - RedacterDataItemContent::Value(content) => { - let bytes = bytes::Bytes::from(content.into_bytes()); - Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) - } - RedacterDataItemContent::Image { data, .. } => { - Ok(Box::new(futures::stream::iter(vec![Ok(data)]))) - } - RedacterDataItemContent::Table { headers, rows } => { - let mut writer = csv_async::AsyncWriter::from_writer(vec![]); - writer.write_record(headers).await?; - for row in rows { - writer.write_record(row).await?; - } - writer.flush().await?; - let bytes = bytes::Bytes::from(writer.into_inner().await?); - Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) - } - } -} diff --git a/src/redacters/mod.rs b/src/redacters/mod.rs index d7fc085..079a5bc 100644 --- a/src/redacters/mod.rs +++ b/src/redacters/mod.rs @@ -1,11 +1,11 @@ +use crate::filesystems::FileSystemRef; +use crate::reporter::AppReporter; use crate::AppResult; +use futures::{Stream, TryStreamExt}; use gcloud_sdk::prost::bytes; use mime::Mime; use std::fmt::Display; -use crate::filesystems::FileSystemRef; -use crate::reporter::AppReporter; - mod gcp_dlp; pub use gcp_dlp::*; @@ -16,6 +16,7 @@ mod ms_presidio; pub use ms_presidio::*; mod gemini_llm; +use crate::errors::AppError; pub use gemini_llm::*; #[derive(Debug, Clone)] @@ -171,3 +172,106 @@ impl<'a> Redacter for Redacters<'a> { } } } + +pub async fn redact_stream< + S: Stream> + Send + Unpin + Sync + 'static, +>( + redacter: &impl Redacter, + supported_options: &RedactSupportedOptions, + input: S, + file_ref: &FileSystemRef, +) -> AppResult> + Send + Sync + Unpin + 'static>> { + let content_to_redact = match file_ref.media_type { + Some(ref mime) + if Redacters::is_mime_text(mime) + || (Redacters::is_mime_table(mime) + && matches!(supported_options, RedactSupportedOptions::SupportedAsText)) => + { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError { + message: format!("Failed to convert bytes to string: {}", e), + })?; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Value(content), + file_ref: file_ref.clone(), + }) + } + Some(ref mime) if Redacters::is_mime_image(mime) => { + let all_chunks: Vec = input.try_collect().await?; + let all_bytes = all_chunks.concat(); + Ok(RedacterDataItem { + content: RedacterDataItemContent::Image { + mime_type: mime.clone(), + data: all_bytes.into(), + }, + file_ref: file_ref.clone(), + }) + } + Some(ref mime) if Redacters::is_mime_table(mime) => { + let reader = tokio_util::io::StreamReader::new( + input.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)), + ); + let mut reader = csv_async::AsyncReaderBuilder::default() + .has_headers(!redacter.options().csv_headers_disable) + .delimiter( + redacter + .options() + .csv_delimiter + .as_ref() + .cloned() + .unwrap_or(b','), + ) + .create_reader(reader); + let headers = if !redacter.options().csv_headers_disable { + reader + .headers() + .await? + .into_iter() + .map(|h| h.to_string()) + .collect() + } else { + vec![] + }; + let records: Vec = reader.records().try_collect().await?; + Ok(RedacterDataItem { + content: RedacterDataItemContent::Table { + headers, + rows: records + .iter() + .map(|r| r.iter().map(|c| c.to_string()).collect()) + .collect(), + }, + file_ref: file_ref.clone(), + }) + } + Some(ref mime) => Err(AppError::SystemError { + message: format!("Media type {} is not supported for redaction", mime), + }), + None => Err(AppError::SystemError { + message: "Media type is not provided to redact".to_string(), + }), + }?; + + let content = redacter.redact(content_to_redact).await?; + + match content { + RedacterDataItemContent::Value(content) => { + let bytes = bytes::Bytes::from(content.into_bytes()); + Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) + } + RedacterDataItemContent::Image { data, .. } => { + Ok(Box::new(futures::stream::iter(vec![Ok(data)]))) + } + RedacterDataItemContent::Table { headers, rows } => { + let mut writer = csv_async::AsyncWriter::from_writer(vec![]); + writer.write_record(headers).await?; + for row in rows { + writer.write_record(row).await?; + } + writer.flush().await?; + let bytes = bytes::Bytes::from(writer.into_inner().await?); + Ok(Box::new(futures::stream::iter(vec![Ok(bytes)]))) + } + } +} From 925501979f8798bbd7b9c104f51c1f4ac57111f0 Mon Sep 17 00:00:00 2001 From: Abdulla Abdurakhmanov Date: Thu, 8 Aug 2024 10:53:30 +0200 Subject: [PATCH 2/2] Sampling size argument support for text --- README.md | 2 ++ src/args.rs | 7 +++++++ src/commands/copy_command.rs | 15 +++++++++++---- src/redacters/aws_comprehend.rs | 1 + src/redacters/gcp_dlp.rs | 1 + src/redacters/gemini_llm.rs | 1 + src/redacters/mod.rs | 17 ++++++++++++++--- src/redacters/ms_presidio.rs | 1 + 8 files changed, 38 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 8768fee..59b3b83 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,8 @@ Options: URL for image redact endpoint for MsPresidio redacter --gemini-model Gemini model name for Gemini LLM redacter. Default is 'models/gemini-1.5-flash' + --sampling-size + Sampling size in bytes before redacting files. Disabled by default -h, --help Print help ``` diff --git a/src/args.rs b/src/args.rs index 5619b17..6a2a400 100644 --- a/src/args.rs +++ b/src/args.rs @@ -132,6 +132,12 @@ pub struct RedacterArgs { help = "Gemini model name for Gemini LLM redacter. Default is 'models/gemini-1.5-flash'" )] pub gemini_model: Option, + + #[arg( + long, + help = "Sampling size in bytes before redacting files. Disabled by default" + )] + pub sampling_size: Option, } impl TryInto for RedacterArgs { @@ -189,6 +195,7 @@ impl TryInto for RedacterArgs { allow_unsupported_copies: self.allow_unsupported_copies, csv_headers_disable: self.csv_headers_disable, csv_delimiter: self.csv_delimiter.map(|c| c as u8), + sampling_size: self.sampling_size, }) } } diff --git a/src/commands/copy_command.rs b/src/commands/copy_command.rs index 8503abe..d20380f 100644 --- a/src/commands/copy_command.rs +++ b/src/commands/copy_command.rs @@ -43,20 +43,27 @@ pub async fn command_copy( redacter_options: Option, ) -> AppResult { let bold_style = Style::new().bold(); - let redacted_output = if let Some(ref options) = redacter_options { + let redacted_output = if let Some(ref options) = redacter_options.as_ref() { bold_style .clone() .green() - .apply_to(format!("✓ Yes ({})", options)) + .apply_to(format!("✓ Yes ({})", &options)) } else { bold_style.clone().red().apply_to("✗ No".to_string()) }; + let sampling_output = + if let Some(ref sampling_size) = redacter_options.as_ref().and_then(|o| o.sampling_size) { + Style::new().apply_to(format!("{} bytes.", sampling_size)) + } else { + Style::new().dim().apply_to("-".to_string()) + }; term.write_line( format!( - "Copying from {} to {}.\nRedacting: {}.", + "Copying from {} to {}.\nRedacting: {}.\nSampling: {}\n", bold_style.clone().white().apply_to(source), bold_style.clone().yellow().apply_to(destination), - redacted_output + redacted_output, + sampling_output ) .as_str(), )?; diff --git a/src/redacters/aws_comprehend.rs b/src/redacters/aws_comprehend.rs index b5b5020..3f9774c 100644 --- a/src/redacters/aws_comprehend.rs +++ b/src/redacters/aws_comprehend.rs @@ -151,6 +151,7 @@ mod tests { allow_unsupported_copies: false, csv_headers_disable: false, csv_delimiter: None, + sampling_size: None, }; let redacter = AwsComprehendRedacter::new( diff --git a/src/redacters/gcp_dlp.rs b/src/redacters/gcp_dlp.rs index d22306a..95679be 100644 --- a/src/redacters/gcp_dlp.rs +++ b/src/redacters/gcp_dlp.rs @@ -400,6 +400,7 @@ mod tests { allow_unsupported_copies: false, csv_headers_disable: false, csv_delimiter: None, + sampling_size: None, }; let redacter = GcpDlpRedacter::new( diff --git a/src/redacters/gemini_llm.rs b/src/redacters/gemini_llm.rs index 0233c8d..016aa69 100644 --- a/src/redacters/gemini_llm.rs +++ b/src/redacters/gemini_llm.rs @@ -236,6 +236,7 @@ mod tests { allow_unsupported_copies: false, csv_headers_disable: false, csv_delimiter: None, + sampling_size: None, }; let redacter = GeminiLlmRedacter::new( diff --git a/src/redacters/mod.rs b/src/redacters/mod.rs index 079a5bc..f73c933 100644 --- a/src/redacters/mod.rs +++ b/src/redacters/mod.rs @@ -52,6 +52,7 @@ pub struct RedacterOptions { pub allow_unsupported_copies: bool, pub csv_headers_disable: bool, pub csv_delimiter: Option, + pub sampling_size: Option, } #[derive(Debug, Clone)] @@ -189,9 +190,19 @@ pub async fn redact_stream< { let all_chunks: Vec = input.try_collect().await?; let all_bytes = all_chunks.concat(); - let content = String::from_utf8(all_bytes).map_err(|e| AppError::SystemError { - message: format!("Failed to convert bytes to string: {}", e), - })?; + let whole_content = + String::from_utf8(all_bytes).map_err(|e| AppError::SystemError { + message: format!("Failed to convert bytes to string: {}", e), + })?; + let content = if let Some(sampling_size) = redacter.options().sampling_size { + let sampling_size = std::cmp::min(sampling_size, whole_content.len()); + whole_content + .chars() + .take(sampling_size) + .collect::() + } else { + whole_content + }; Ok(RedacterDataItem { content: RedacterDataItemContent::Value(content), file_ref: file_ref.clone(), diff --git a/src/redacters/ms_presidio.rs b/src/redacters/ms_presidio.rs index 7d64cf0..0b17823 100644 --- a/src/redacters/ms_presidio.rs +++ b/src/redacters/ms_presidio.rs @@ -257,6 +257,7 @@ mod tests { allow_unsupported_copies: false, csv_headers_disable: false, csv_delimiter: None, + sampling_size: None, }; let redacter = MsPresidioRedacter::new(