Skip to content

Commit

Permalink
Limiting/throttling DLP requests support (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
abdolence authored Aug 22, 2024
1 parent 6e22267 commit fff730e
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "redacter"
version = "0.10.1"
version = "0.11.0"
edition = "2021"
authors = ["Abdulla Abdurakhmanov <[email protected]>"]
license = "Apache-2.0"
Expand Down
9 changes: 8 additions & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common_types::{GcpProjectId, GcpRegion};
use crate::common_types::{DlpRequestLimit, GcpProjectId, GcpRegion};
use crate::errors::AppError;
use crate::redacters::{
GcpDlpRedacterOptions, GcpVertexAiModelName, GeminiLlmModelName, OpenAiLlmApiKey,
Expand Down Expand Up @@ -214,6 +214,12 @@ pub struct RedacterArgs {
help = "Open AI model name for OpenAI LLM redacter. Default is 'gpt-4o-mini'"
)]
pub open_ai_model: Option<OpenAiModelName>,

#[arg(
long,
help = "Limit the number of DLP requests. Some DLPs has strict quotas and to avoid errors, limit the number of requests delaying them. Default is disabled"
)]
pub limit_dlp_requests: Option<DlpRequestLimit>,
}

impl TryInto<RedacterOptions> for RedacterArgs {
Expand Down Expand Up @@ -314,6 +320,7 @@ impl TryInto<RedacterOptions> for RedacterArgs {
csv_headers_disable: self.csv_headers_disable,
csv_delimiter: self.csv_delimiter.map(|c| c as u8),
sampling_size: self.sampling_size,
limit_dlp_requests: self.limit_dlp_requests,
};
Ok(RedacterOptions {
provider_options,
Expand Down
32 changes: 30 additions & 2 deletions src/commands/copy_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ use crate::errors::AppError;
use crate::file_converters::FileConverters;
use crate::file_systems::{DetectFileSystem, FileSystemConnection, FileSystemRef};
use crate::file_tools::{FileMatcher, FileMatcherResult, FileMimeOverride};
use crate::redacters::{Redacter, RedacterBaseOptions, RedacterOptions, Redacters, StreamRedacter};
use crate::redacters::{
Redacter, RedacterBaseOptions, RedacterOptions, RedacterThrottler, Redacters, StreamRedacter,
};
use crate::reporter::AppReporter;
use crate::AppResult;
use console::{pad_str, Alignment, Style, Term};
use futures::Stream;
use gcloud_sdk::prost::bytes;
use indicatif::*;
use std::error::Error;
use std::time::Duration;
use std::time::{Duration, Instant};

pub struct CopyCommandResult {
pub files_copied: usize,
Expand Down Expand Up @@ -73,6 +75,10 @@ pub async fn command_copy(

let mut source_fs = DetectFileSystem::open(source, &app_reporter).await?;
let mut destination_fs = DetectFileSystem::open(destination, &app_reporter).await?;
let mut redacter_throttler = redacter_options
.as_ref()
.and_then(|o| o.base_options.limit_dlp_requests.clone())
.map(|limit| limit.to_throttling_counter());

let maybe_redacters = match redacter_options {
Some(options) => {
Expand Down Expand Up @@ -126,6 +132,7 @@ pub async fn command_copy(
&options,
&maybe_redacters,
&file_converters,
&mut redacter_throttler,
)
.await?
{
Expand All @@ -148,6 +155,7 @@ pub async fn command_copy(
&options,
&maybe_redacters,
&file_converters,
&mut redacter_throttler,
)
.await?
{
Expand Down Expand Up @@ -246,6 +254,7 @@ async fn transfer_and_redact_file<
options: &CopyCommandOptions,
redacter: &Option<(RedacterBaseOptions, Vec<impl Redacter>)>,
file_converters: &FileConverters<'a>,
redacter_throttler: &mut Option<RedacterThrottler>,
) -> AppResult<TransferFileResult> {
let bold_style = Style::new().bold().white();
let (base_file_ref, source_reader) = source_fs.download(source_file_ref).await?;
Expand Down Expand Up @@ -317,6 +326,7 @@ async fn transfer_and_redact_file<
options,
redacter_with_options,
file_converters,
redacter_throttler,
)
.await?
} else {
Expand Down Expand Up @@ -344,6 +354,7 @@ async fn redact_upload_file<
options: &CopyCommandOptions,
redacter_with_options: &(RedacterBaseOptions, Vec<impl Redacter>),
file_converters: &FileConverters<'a>,
redacter_throttler: &mut Option<RedacterThrottler>,
) -> AppResult<TransferFileResult> {
let (redacter_base_options, redacters) = redacter_with_options;
let stream_redacter = StreamRedacter::new(redacter_base_options, file_converters, bar);
Expand All @@ -357,6 +368,23 @@ async fn redact_upload_file<
.await?;

if !supported_redacters.is_empty() {
if let Some(ref mut throttler) = redacter_throttler {
let delay = throttler.delay();
if delay.as_millis() > 0 {
bar.println(
format!(
"⧗ Delaying redaction for {} seconds",
bold_style
.clone()
.yellow()
.apply_to(throttler.delay().as_secs().to_string())
)
.as_str(),
);
tokio::time::sleep(*delay).await;
}
*throttler = throttler.update(Instant::now());
}
match stream_redacter
.redact_stream(
source_reader,
Expand Down
56 changes: 56 additions & 0 deletions src/common_types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::redacters::RedacterThrottler;
use rvstruct::ValueStruct;
use serde::{Deserialize, Serialize};
use std::str::FromStr;

#[derive(Debug, Clone, ValueStruct)]
pub struct GcpProjectId(String);
Expand All @@ -18,3 +20,57 @@ pub struct TextImageCoords {
pub y2: f32,
pub text: Option<String>,
}

#[derive(Debug, Clone)]
pub struct DlpRequestLimit {
pub value: usize,
pub per: std::time::Duration,
}

impl DlpRequestLimit {
pub fn new(value: usize, per: std::time::Duration) -> Self {
assert!(value > 0, "Limit value should be more than zero");
assert!(
per.as_millis() > 0,
"Limit duration should be more than zero"
);

Self { value, per }
}

pub fn to_rate_limit_in_ms(&self) -> u64 {
self.per.as_millis() as u64 / self.value as u64
}

pub fn to_rate_limit_capacity(&self) -> usize {
self.per.as_millis() as usize / self.to_rate_limit_in_ms() as usize
}

pub fn to_throttling_counter(&self) -> RedacterThrottler {
RedacterThrottler::new(self.to_rate_limit_capacity(), self.to_rate_limit_in_ms())
}
}

impl FromStr for DlpRequestLimit {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let index = s.find(|c: char| !c.is_numeric()).unwrap_or(s.len());
let (number, unit) = s.split_at(index);
let max_ops_in_units = number
.parse::<usize>()
.map_err(|e| format!("Failed to parse number in DlpRequestLimit: {}", e))?;
println!("max_ops_in_units: {}", max_ops_in_units);
match unit {
"rps" => Ok(DlpRequestLimit::new(
max_ops_in_units,
std::time::Duration::from_secs(1),
)),
"rpm" => Ok(DlpRequestLimit::new(
max_ops_in_units,
std::time::Duration::from_secs(60),
)),
unknown => Err(format!("Unknown unit specified: {}", unknown)),
}
}
}
5 changes: 5 additions & 0 deletions src/redacters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ pub use simple_image_redacter::*;
mod stream_redacter;
pub use stream_redacter::*;

mod redacter_throttler;
pub use redacter_throttler::*;

use crate::args::RedacterType;
use crate::common_types::DlpRequestLimit;

#[derive(Debug, Clone)]
pub struct RedacterDataItem {
Expand Down Expand Up @@ -74,6 +78,7 @@ pub struct RedacterBaseOptions {
pub csv_headers_disable: bool,
pub csv_delimiter: Option<u8>,
pub sampling_size: Option<usize>,
pub limit_dlp_requests: Option<DlpRequestLimit>,
}

#[derive(Debug, Clone)]
Expand Down
138 changes: 138 additions & 0 deletions src/redacters/redacter_throttler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::ops::Add;
use std::time::{Duration, Instant};

#[derive(Clone, Debug)]
pub struct RedacterThrottler {
capacity: i64,
max_capacity: usize,
last_arrived: Instant,
last_updated: Instant,
rate_limit_in_millis: u64,
delay: Duration,
}

impl RedacterThrottler {
pub fn new(max_capacity: usize, rate_limit_in_millis: u64) -> Self {
Self {
capacity: max_capacity as i64,
max_capacity,
last_arrived: Instant::now(),
last_updated: Instant::now(),
rate_limit_in_millis,
delay: Duration::from_millis(0),
}
}

pub fn update(&self, now: Instant) -> Self {
let time_elapsed_millis = now
.checked_duration_since(self.last_arrived)
.unwrap_or_else(|| Duration::from_millis(0))
.as_millis() as u64;

let (arrived, new_last_arrived) = {
if time_elapsed_millis >= self.rate_limit_in_millis {
let arrived_in_time = time_elapsed_millis / self.rate_limit_in_millis;
let new_last_updated = self.last_arrived.add(Duration::from_millis(
arrived_in_time * self.rate_limit_in_millis,
));
(arrived_in_time as usize, new_last_updated)
} else {
(0usize, self.last_arrived)
}
};

let new_available_capacity =
std::cmp::min(self.capacity + arrived as i64, self.max_capacity as i64);

if new_available_capacity > 0 {
Self {
capacity: new_available_capacity - 1,
last_arrived: new_last_arrived,
last_updated: now,
delay: Duration::from_millis(0),
..self.clone()
}
} else {
let updated_capacity = new_available_capacity - 1;

let base_delay_in_millis = now
.checked_duration_since(self.last_updated)
.map_or(0u64, |d| d.as_millis() as u64);

let delay_penalty = (self.rate_limit_in_millis as f64 * self.capacity.abs() as f64
/ self.max_capacity as f64) as u64;

let delay_in_millis = if base_delay_in_millis < self.rate_limit_in_millis {
self.rate_limit_in_millis - base_delay_in_millis
} else {
0
};
let delay_with_penalty =
Duration::from_millis(delay_in_millis.saturating_add(delay_penalty));

Self {
capacity: updated_capacity,
last_arrived: new_last_arrived,
last_updated: now,
delay: delay_with_penalty,
..self.clone()
}
}
}

pub fn delay(&self) -> &Duration {
&self.delay
}
}

#[allow(unused_imports)]
mod tests {
use super::*;
use crate::common_types::DlpRequestLimit;
use crate::redacters::RedacterProviderOptions;
use console::Term;

#[test]
fn check_decreased() {
let rate_limit = DlpRequestLimit::new(15, Duration::from_secs(60));
let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
let rate_limit_capacity = rate_limit.to_rate_limit_capacity();

let now = Instant::now();
let counter = RedacterThrottler::new(rate_limit_capacity, rate_limit_in_ms);
let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms - 1)));

assert_eq!(updated_counter.last_arrived, counter.last_arrived);
assert_eq!(updated_counter.delay, Duration::from_millis(0));
assert_eq!(updated_counter.capacity, counter.capacity - 1);
}

#[test]
fn check_max_available() {
let rate_limit = DlpRequestLimit::new(15, Duration::from_secs(60));
let rate_limit_in_ms = rate_limit.to_rate_limit_in_ms();
let rate_limit_capacity = rate_limit.to_rate_limit_capacity();

let now = Instant::now();
let counter = RedacterThrottler::new(rate_limit_capacity, rate_limit_in_ms);
let updated_counter = counter.update(now.add(Duration::from_millis(rate_limit_in_ms + 1)));

assert_eq!(updated_counter.delay, Duration::from_millis(0));
assert_eq!(updated_counter.capacity, (counter.max_capacity - 1) as i64);
}

#[test]
fn check_delay() {
let counter = DlpRequestLimit::new(15, Duration::from_secs(60)).to_throttling_counter();

let now = Instant::now();

let updated_counter =
(0..counter.capacity + 1).fold(counter.clone(), |result, _| result.update(now));

assert_eq!(
updated_counter.delay,
Duration::from_millis(counter.rate_limit_in_millis)
);
}
}

0 comments on commit fff730e

Please sign in to comment.