diff --git a/Cargo.lock b/Cargo.lock index 8fb7f43..650c663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3148,7 +3148,7 @@ dependencies = [ [[package]] name = "redacter" -version = "0.10.1" +version = "0.11.0" dependencies = [ "anyhow", "arboard", diff --git a/Cargo.toml b/Cargo.toml index 0885202..8e57fb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "redacter" -version = "0.10.1" +version = "0.11.0" edition = "2021" authors = ["Abdulla Abdurakhmanov "] license = "Apache-2.0" diff --git a/src/args.rs b/src/args.rs index cca720c..b84fd89 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,4 @@ -use crate::common_types::{GcpProjectId, GcpRegion}; +use crate::common_types::{DlpRequestLimit, GcpProjectId, GcpRegion}; use crate::errors::AppError; use crate::redacters::{ GcpDlpRedacterOptions, GcpVertexAiModelName, GeminiLlmModelName, OpenAiLlmApiKey, @@ -214,6 +214,12 @@ pub struct RedacterArgs { help = "Open AI model name for OpenAI LLM redacter. Default is 'gpt-4o-mini'" )] pub open_ai_model: Option, + + #[arg( + long, + help = "Limit the number of DLP requests. Some DLPs has strict quotas and to avoid errors, limit the number of requests delaying them. Default is disabled" + )] + pub limit_dlp_requests: Option, } impl TryInto for RedacterArgs { @@ -314,6 +320,7 @@ impl TryInto for RedacterArgs { csv_headers_disable: self.csv_headers_disable, csv_delimiter: self.csv_delimiter.map(|c| c as u8), sampling_size: self.sampling_size, + limit_dlp_requests: self.limit_dlp_requests, }; Ok(RedacterOptions { provider_options, diff --git a/src/commands/copy_command.rs b/src/commands/copy_command.rs index 9d5d180..70dc990 100644 --- a/src/commands/copy_command.rs +++ b/src/commands/copy_command.rs @@ -2,7 +2,9 @@ use crate::errors::AppError; use crate::file_converters::FileConverters; use crate::file_systems::{DetectFileSystem, FileSystemConnection, FileSystemRef}; use crate::file_tools::{FileMatcher, FileMatcherResult, FileMimeOverride}; -use crate::redacters::{Redacter, RedacterBaseOptions, RedacterOptions, Redacters, StreamRedacter}; +use crate::redacters::{ + Redacter, RedacterBaseOptions, RedacterOptions, RedacterThrottler, Redacters, StreamRedacter, +}; use crate::reporter::AppReporter; use crate::AppResult; use console::{pad_str, Alignment, Style, Term}; @@ -10,7 +12,7 @@ use futures::Stream; use gcloud_sdk::prost::bytes; use indicatif::*; use std::error::Error; -use std::time::Duration; +use std::time::{Duration, Instant}; pub struct CopyCommandResult { pub files_copied: usize, @@ -73,6 +75,10 @@ pub async fn command_copy( let mut source_fs = DetectFileSystem::open(source, &app_reporter).await?; let mut destination_fs = DetectFileSystem::open(destination, &app_reporter).await?; + let mut redacter_throttler = redacter_options + .as_ref() + .and_then(|o| o.base_options.limit_dlp_requests.clone()) + .map(|limit| limit.to_throttling_counter()); let maybe_redacters = match redacter_options { Some(options) => { @@ -126,6 +132,7 @@ pub async fn command_copy( &options, &maybe_redacters, &file_converters, + &mut redacter_throttler, ) .await? { @@ -148,6 +155,7 @@ pub async fn command_copy( &options, &maybe_redacters, &file_converters, + &mut redacter_throttler, ) .await? { @@ -246,6 +254,7 @@ async fn transfer_and_redact_file< options: &CopyCommandOptions, redacter: &Option<(RedacterBaseOptions, Vec)>, file_converters: &FileConverters<'a>, + redacter_throttler: &mut Option, ) -> AppResult { let bold_style = Style::new().bold().white(); let (base_file_ref, source_reader) = source_fs.download(source_file_ref).await?; @@ -317,6 +326,7 @@ async fn transfer_and_redact_file< options, redacter_with_options, file_converters, + redacter_throttler, ) .await? } else { @@ -344,6 +354,7 @@ async fn redact_upload_file< options: &CopyCommandOptions, redacter_with_options: &(RedacterBaseOptions, Vec), file_converters: &FileConverters<'a>, + redacter_throttler: &mut Option, ) -> AppResult { let (redacter_base_options, redacters) = redacter_with_options; let stream_redacter = StreamRedacter::new(redacter_base_options, file_converters, bar); @@ -357,6 +368,23 @@ async fn redact_upload_file< .await?; if !supported_redacters.is_empty() { + if let Some(ref mut throttler) = redacter_throttler { + let delay = throttler.delay(); + if delay.as_millis() > 0 { + bar.println( + format!( + "⧗ Delaying redaction for {} seconds", + bold_style + .clone() + .yellow() + .apply_to(throttler.delay().as_secs().to_string()) + ) + .as_str(), + ); + tokio::time::sleep(*delay).await; + } + *throttler = throttler.update(Instant::now()); + } match stream_redacter .redact_stream( source_reader, diff --git a/src/common_types.rs b/src/common_types.rs index b92333a..d814473 100644 --- a/src/common_types.rs +++ b/src/common_types.rs @@ -1,5 +1,7 @@ +use crate::redacters::RedacterThrottler; use rvstruct::ValueStruct; use serde::{Deserialize, Serialize}; +use std::str::FromStr; #[derive(Debug, Clone, ValueStruct)] pub struct GcpProjectId(String); @@ -18,3 +20,57 @@ pub struct TextImageCoords { pub y2: f32, pub text: Option, } + +#[derive(Debug, Clone)] +pub struct DlpRequestLimit { + pub value: usize, + pub per: std::time::Duration, +} + +impl DlpRequestLimit { + pub fn new(value: usize, per: std::time::Duration) -> Self { + assert!(value > 0, "Limit value should be more than zero"); + assert!( + per.as_millis() > 0, + "Limit duration should be more than zero" + ); + + Self { value, per } + } + + pub fn to_rate_limit_in_ms(&self) -> u64 { + self.per.as_millis() as u64 / self.value as u64 + } + + pub fn to_rate_limit_capacity(&self) -> usize { + self.per.as_millis() as usize / self.to_rate_limit_in_ms() as usize + } + + pub fn to_throttling_counter(&self) -> RedacterThrottler { + RedacterThrottler::new(self.to_rate_limit_capacity(), self.to_rate_limit_in_ms()) + } +} + +impl FromStr for DlpRequestLimit { + type Err = String; + + fn from_str(s: &str) -> Result { + let index = s.find(|c: char| !c.is_numeric()).unwrap_or(s.len()); + let (number, unit) = s.split_at(index); + let max_ops_in_units = number + .parse::() + .map_err(|e| format!("Failed to parse number in DlpRequestLimit: {}", e))?; + println!("max_ops_in_units: {}", max_ops_in_units); + match unit { + "rps" => Ok(DlpRequestLimit::new( + max_ops_in_units, + std::time::Duration::from_secs(1), + )), + "rpm" => Ok(DlpRequestLimit::new( + max_ops_in_units, + std::time::Duration::from_secs(60), + )), + unknown => Err(format!("Unknown unit specified: {}", unknown)), + } + } +} diff --git a/src/redacters/mod.rs b/src/redacters/mod.rs index d029b76..aa525d9 100644 --- a/src/redacters/mod.rs +++ b/src/redacters/mod.rs @@ -28,7 +28,11 @@ pub use simple_image_redacter::*; mod stream_redacter; pub use stream_redacter::*; +mod redacter_throttler; +pub use redacter_throttler::*; + use crate::args::RedacterType; +use crate::common_types::DlpRequestLimit; #[derive(Debug, Clone)] pub struct RedacterDataItem { @@ -74,6 +78,7 @@ pub struct RedacterBaseOptions { pub csv_headers_disable: bool, pub csv_delimiter: Option, pub sampling_size: Option, + pub limit_dlp_requests: Option, } #[derive(Debug, Clone)] diff --git a/src/redacters/redacter_throttler.rs b/src/redacters/redacter_throttler.rs new file mode 100644 index 0000000..e2ff4d8 --- /dev/null +++ b/src/redacters/redacter_throttler.rs @@ -0,0 +1,138 @@ +use std::ops::Add; +use std::time::{Duration, Instant}; + +#[derive(Clone, Debug)] +pub struct RedacterThrottler { + capacity: i64, + max_capacity: usize, + last_arrived: Instant, + last_updated: Instant, + rate_limit_in_millis: u64, + delay: Duration, +} + +impl RedacterThrottler { + pub fn new(max_capacity: usize, rate_limit_in_millis: u64) -> Self { + Self { + capacity: max_capacity as i64, + max_capacity, + last_arrived: Instant::now(), + last_updated: Instant::now(), + rate_limit_in_millis, + delay: Duration::from_millis(0), + } + } + + pub fn update(&self, now: Instant) -> Self { + let time_elapsed_millis = now + .checked_duration_since(self.last_arrived) + .unwrap_or_else(|| Duration::from_millis(0)) + .as_millis() as u64; + + let (arrived, new_last_arrived) = { + if time_elapsed_millis >= self.rate_limit_in_millis { + let arrived_in_time = time_elapsed_millis / self.rate_limit_in_millis; + let new_last_updated = self.last_arrived.add(Duration::from_millis( + arrived_in_time * self.rate_limit_in_millis, + )); + (arrived_in_time as usize, new_last_updated) + } else { + (0usize, self.last_arrived) + } + }; + + let new_available_capacity = + std::cmp::min(self.capacity + arrived as i64, self.max_capacity as i64); + + if new_available_capacity > 0 { + Self { + capacity: new_available_capacity - 1, + last_arrived: new_last_arrived, + last_updated: now, + delay: Duration::from_millis(0), + ..self.clone() + } + } else { + let updated_capacity = new_available_capacity - 1; + + let base_delay_in_millis = now + .checked_duration_since(self.last_updated) + .map_or(0u64, |d| d.as_millis() as u64); + + let delay_penalty = (self.rate_limit_in_millis as f64 * self.capacity.abs() as f64 + / self.max_capacity as f64) as u64; + + let delay_in_millis = if base_delay_in_millis < self.rate_limit_in_millis { + self.rate_limit_in_millis - base_delay_in_millis + } else { + 0 + }; + let delay_with_penalty = + Duration::from_millis(delay_in_millis.saturating_add(delay_penalty)); + + Self { + capacity: updated_capacity, + last_arrived: new_last_arrived, + last_updated: now, + delay: delay_with_penalty, + ..self.clone() + } + } + } + + pub fn delay(&self) -> &Duration { + &self.delay + } +} + +#[allow(unused_imports)] +mod tests { + use super::*; + use crate::common_types::DlpRequestLimit; + use crate::redacters::RedacterProviderOptions; + use console::Term; + + #[test] + fn check_decreased() { + let rate_limit = DlpRequestLimit::new(15, Duration::from_secs(60)); + let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms(); + let rate_limit_capacity = rate_limit.to_rate_limit_capacity(); + + let now = Instant::now(); + let counter = RedacterThrottler::new(rate_limit_capacity, rate_limit_in_ms); + let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms - 1))); + + assert_eq!(updated_counter.last_arrived, counter.last_arrived); + assert_eq!(updated_counter.delay, Duration::from_millis(0)); + assert_eq!(updated_counter.capacity, counter.capacity - 1); + } + + #[test] + fn check_max_available() { + let rate_limit = DlpRequestLimit::new(15, Duration::from_secs(60)); + let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms(); + let rate_limit_capacity = rate_limit.to_rate_limit_capacity(); + + let now = Instant::now(); + let counter = RedacterThrottler::new(rate_limit_capacity, rate_limit_in_ms); + let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms + 1))); + + assert_eq!(updated_counter.delay, Duration::from_millis(0)); + assert_eq!(updated_counter.capacity, (counter.max_capacity - 1) as i64); + } + + #[test] + fn check_delay() { + let counter = DlpRequestLimit::new(15, Duration::from_secs(60)).to_throttling_counter(); + + let now = Instant::now(); + + let updated_counter = + (0..counter.capacity + 1).fold(counter.clone(), |result, _| result.update(now)); + + assert_eq!( + updated_counter.delay, + Duration::from_millis(counter.rate_limit_in_millis) + ); + } +}