Skip to content

Commit

Permalink
Fixed redacting as text for CSV
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence committed Aug 16, 2024
1 parent fd274b8 commit 813f93c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 13 deletions.
17 changes: 15 additions & 2 deletions src/commands/copy_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,25 @@ async fn redact_upload_file<
)
.await
{
Ok(redacted_reader) => {
Ok(redacted_result)
if redacted_result.number_of_redactions > 0
|| redacter_base_options.allow_unsupported_copies =>
{
destination_fs
.upload(redacted_reader, Some(dest_file_ref))
.upload(redacted_result.stream, Some(dest_file_ref))
.await?;
Ok(TransferFileResult::Copied)
}
Ok(_) => {
bar.println(
format!(
"↲ Skipping redaction because {} redactions were applied",
bold_style.yellow().apply_to("no suitable".to_string())
)
.as_str(),
);
Ok(TransferFileResult::Skipped)
}
Err(ref error) => {
bar.println(
format!(
Expand Down
43 changes: 32 additions & 11 deletions src/redacters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ impl<'a> Redacter for Redacters<'a> {
}
}

pub struct RedactStreamResult {
pub number_of_redactions: usize,
pub stream: Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>,
}

pub async fn redact_stream<
S: Stream<Item = AppResult<bytes::Bytes>> + Send + Unpin + Sync + 'static,
>(
Expand All @@ -217,7 +222,7 @@ pub async fn redact_stream<
file_ref: &FileSystemRef,
file_converters: &FileConverters,
bar: &ProgressBar,
) -> AppResult<Box<dyn Stream<Item = AppResult<bytes::Bytes>> + Send + Sync + Unpin + 'static>> {
) -> AppResult<RedactStreamResult> {
let mut redacters_supported_options = Vec::with_capacity(redacters.len());
for redacter in redacters {
let supported_options = redacter.redact_supported_options(file_ref).await?;
Expand All @@ -231,6 +236,7 @@ pub async fn redact_stream<
&redacters_supported_options,
)
.await?;
let mut number_of_redactions = 0;

for (index, (redacter, options)) in redacters_supported_options.iter().enumerate() {
let width = " ".repeat(index);
Expand All @@ -241,6 +247,7 @@ pub async fn redact_stream<
redacter.redacter_type()
));
redacted = redacter.redact(redacted).await?;
number_of_redactions += 1;
}
RedactSupportedOptions::SupportedAsImages => {
match file_converters.pdf_image_converter {
Expand All @@ -253,7 +260,8 @@ pub async fn redact_stream<
&width,
converter.as_ref(),
)
.await?
.await?;
number_of_redactions += 1;
}
None => {
bar.println(format!(
Expand All @@ -262,21 +270,29 @@ pub async fn redact_stream<
}
}
}
RedactSupportedOptions::SupportedAsText | RedactSupportedOptions::Unsupported => {}
RedactSupportedOptions::SupportedAsText => {
if matches!(redacted.content, RedacterDataItemContent::Value(_)) {
bar.println(format!(
"{width}↳ Redacting as text using {} redacter",
redacter.redacter_type()
));
redacted = redacter.redact(redacted).await?;
number_of_redactions += 1;
}
}
RedactSupportedOptions::Unsupported => {}
}
}

match redacted.content {
let output_stream = match redacted.content {
RedacterDataItemContent::Value(content) => {
let bytes = bytes::Bytes::from(content.into_bytes());
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
Box::new(futures::stream::iter(vec![Ok(bytes)]))
}
RedacterDataItemContent::Image { data, .. } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
}
RedacterDataItemContent::Pdf { data } => {
Ok(Box::new(futures::stream::iter(vec![Ok(data)])))
Box::new(futures::stream::iter(vec![Ok(data)]))
}
RedacterDataItemContent::Pdf { data } => Box::new(futures::stream::iter(vec![Ok(data)])),
RedacterDataItemContent::Table { headers, rows } => {
let mut writer = csv_async::AsyncWriter::from_writer(vec![]);
writer.write_record(headers).await?;
Expand All @@ -285,9 +301,14 @@ pub async fn redact_stream<
}
writer.flush().await?;
let bytes = bytes::Bytes::from(writer.into_inner().await?);
Ok(Box::new(futures::stream::iter(vec![Ok(bytes)])))
Box::new(futures::stream::iter(vec![Ok(bytes)]))
}
}
};

Ok(RedactStreamResult {
number_of_redactions,
stream: output_stream,
})
}

async fn stream_to_redact_item<
Expand Down

0 comments on commit 813f93c

Please sign in to comment.