diff --git a/brro-compressor/src/compare.rs b/brro-compressor/src/compare.rs index 9925fe2..d6fbe47 100644 --- a/brro-compressor/src/compare.rs +++ b/brro-compressor/src/compare.rs @@ -14,17 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -use crate::{ - compressor::{ - constant::constant_compressor, - fft::fft, - polynomial::{polynomial, PolynomialType}, - }, - optimizer::utils::DataStats, -}; -use std::thread; - -/// Enum to represent the decision between compressors. #[derive(PartialEq, Debug)] enum CompressionDecision { Constant, @@ -32,66 +21,7 @@ enum CompressionDecision { Polynomial, } -impl CompressionDecision { - /// Function to perform compression and make a decision based on the results. - pub fn compress_and_decide() -> Result<(), Box> { - // Sample data for testing - let data = vec![1.0, 2.0, 3.0, 4.0, 5.0]; - let stats = DataStats::new(&data); - - // Clone data for each compressor - let data_constant = data.clone(); - let data_fft = data.clone(); - let data_polynomial = data.clone(); - - // Create threads for each compressor - let thread_constant = thread::spawn(move || constant_compressor(&data_constant, stats)); - let thread_fft = thread::spawn(move || fft(&data_fft)); - let thread_polynomial = - thread::spawn(move || polynomial(&data_polynomial, PolynomialType::Polynomial)); - - // Wait for threads to finish and collect their results with error handling - let result_constant = thread_constant - .join() - .map_err(|e| format!("Constant thread error: {:?}", e))?; - let result_fft = thread_fft - .join() - .map_err(|e| format!("FFT thread error: {:?}", e))?; - let result_polynomial = thread_polynomial - .join() - .map_err(|e| format!("Polynomial thread error: {:?}", e))?; - - // Use the decision logic to determine the compression decision - let decision = match ( - result_constant.compressed_data.len(), - result_fft.len(), - result_polynomial.len(), - ) { - (constant_len, fft_len, poly_len) - if constant_len < fft_len && constant_len < poly_len => - { - CompressionDecision::Constant - } - (_, fft_len, poly_len) if fft_len < poly_len => CompressionDecision::Fft, - _ => CompressionDecision::Polynomial, - }; - - // Use the decision to perform further actions - match decision { - CompressionDecision::Constant => { - println!("Selected Constant Compressor"); - } - CompressionDecision::Fft => { - println!("Selected FFT Compressor"); - } - CompressionDecision::Polynomial => { - println!("Selected Polynomial Compressor"); - } - } - - Ok(()) - } -} +impl CompressionDecision {} fn get_compression_decision( result_constant: &[f64], result_fft: &[f64], diff --git a/brro-compressor/src/compressor/constant.rs b/brro-compressor/src/compressor/constant.rs index 99b1dab..cc82e21 100644 --- a/brro-compressor/src/compressor/constant.rs +++ b/brro-compressor/src/compressor/constant.rs @@ -69,7 +69,6 @@ impl Decode for Constant { ) -> Result { let id = Decode::decode(decoder)?; let bitdepth = Decode::decode(decoder)?; - // Here is where the pig twists the tail let constant: f64 = match bitdepth { Bitdepth::U8 => { debug!("Decoding as u8"); @@ -112,11 +111,6 @@ impl Constant { } } - /// This compressor is about having a single constant for the whole segment - pub fn set_constant(&mut self, constant_value: f64) { - self.constant = constant_value; - } - /// Receives a data stream and generates a Constant pub fn decompress(data: &[u8]) -> Self { let config = BinConfig::get(); @@ -126,7 +120,6 @@ impl Constant { /// This function transforms the structure into a Binary stream pub fn to_bytes(&self) -> Vec { - // Use Bincode and flate2-rs? Do this at the Stream Level? let config = BinConfig::get(); bincode::encode_to_vec(self, config).unwrap() } @@ -141,9 +134,7 @@ impl Constant { pub fn constant_compressor(data: &[f64], stats: DataStats) -> CompressorResult { debug!("Initializing Constant Compressor. Error and Stats provided"); - // Initialize the compressor let c = Constant::new(data.len(), stats.min, stats.bitdepth); - // Convert to bytes CompressorResult::new(c.to_bytes(), 0.0) } diff --git a/brro-compressor/src/compressor/fft.rs b/brro-compressor/src/compressor/fft.rs index 4e00b15..4c9fa07 100644 --- a/brro-compressor/src/compressor/fft.rs +++ b/brro-compressor/src/compressor/fft.rs @@ -39,30 +39,6 @@ pub struct FrequencyPoint { } impl FrequencyPoint { - pub fn new(real: f32, img: f32) -> Self { - FrequencyPoint { - pos: 0, - freq_real: real, - freq_img: img, - } - } - - pub fn with_position(real: f32, img: f32, pos: u16) -> Self { - FrequencyPoint { - pos, - freq_real: real, - freq_img: img, - } - } - - pub fn from_complex(complex: Complex) -> Self { - FrequencyPoint { - pos: 0, - freq_real: complex.re, - freq_img: complex.im, - } - } - pub fn from_complex_with_position(complex: Complex, pos: u16) -> Self { FrequencyPoint { pos, @@ -132,15 +108,10 @@ impl Ord for FrequencyPoint { /// FFT Compressor. Applies FFT to a signal, picks the N best frequencies, discards the rest. Always LOSSY #[derive(PartialEq, Debug)] pub struct FFT { - /// Compressor ID pub id: u8, - /// Stored frequencies pub frequencies: Vec, - /// The maximum numeric value of the points in the frame pub max_value: f32, - /// The minimum numeric value of the points in the frame pub min_value: f32, - /// Compression error pub error: Option, } @@ -391,7 +362,7 @@ impl FFT { } /// Compresses data via FFT - /// The set of frequencies to store is 1/100 of the data lenght OR 3, which is bigger. + /// The set of frequencies to store is 1/100 of the data length OR 3, which is bigger. pub fn compress(&mut self, data: &[f64]) { if self.max_value == self.min_value { debug!("Same max and min, we're done here!"); @@ -415,8 +386,6 @@ impl FFT { buffer.truncate(size); self.frequencies = FFT::fft_trim(&mut buffer, max_freq); } - - /// Decompresses data pub fn decompress(data: &[u8]) -> Self { let config = BinConfig::get(); let (fft, _) = bincode::decode_from_slice(data, config).unwrap(); diff --git a/brro-compressor/src/compressor/mod.rs b/brro-compressor/src/compressor/mod.rs index 75f0370..4b71cd3 100644 --- a/brro-compressor/src/compressor/mod.rs +++ b/brro-compressor/src/compressor/mod.rs @@ -118,7 +118,6 @@ pub struct BinConfig { impl BinConfig { pub fn get() -> Configuration { - // Little endian and Variable int encoding config::standard() } } diff --git a/brro-compressor/src/compressor/noop.rs b/brro-compressor/src/compressor/noop.rs index 87ee570..e622fd4 100644 --- a/brro-compressor/src/compressor/noop.rs +++ b/brro-compressor/src/compressor/noop.rs @@ -34,17 +34,14 @@ impl Noop { data: Vec::with_capacity(sample_count), } } - ///Optimize pub fn optimize(data: &[f64]) -> Vec { let mut out_vec = Vec::with_capacity(data.len()); for &element in data { - // Round the floating-point number before casting to i64 out_vec.push(element.round() as i64); } out_vec } - /// "Compress" pub fn compress(&mut self, data: &[f64]) { self.data = Noop::optimize(data); debug!( @@ -54,20 +51,17 @@ impl Noop { ); } - /// Receives a data stream and generates a Noop pub fn decompress(data: &[u8]) -> Self { let config = BinConfig::get(); let (noop, _) = bincode::decode_from_slice(data, config).unwrap(); noop } - /// This function transforms the structure in a Binary stream to be appended to the frame pub fn to_bytes(&self) -> Vec { let config = BinConfig::get(); bincode::encode_to_vec(self, config).unwrap() } - /// Returns an array of data pub fn to_data(&self, _frame_size: usize) -> Vec { self.data.clone() } @@ -116,9 +110,8 @@ mod tests { #[test] fn test_optimize() { - // Test case with floating-point numbers that have fractional parts let input_data = [1.5, 2.7, 3.3, 4.9]; - let expected_output = [2, 3, 3, 5]; // Rounded to the nearest integer + let expected_output = [2, 3, 3, 5]; let result = Noop::optimize(&input_data); assert_eq!(result, expected_output); diff --git a/brro-compressor/src/compressor/polynomial.rs b/brro-compressor/src/compressor/polynomial.rs index 9d25791..531a82a 100644 --- a/brro-compressor/src/compressor/polynomial.rs +++ b/brro-compressor/src/compressor/polynomial.rs @@ -42,17 +42,12 @@ pub enum Method { #[derive(PartialEq, Debug, Clone)] pub struct Polynomial { - /// Compressor ID pub id: PolynomialType, - /// Stored Points pub data_points: Vec, pub min: f64, pub max: f64, - /// What is the base step between points pub point_step: u8, - /// Compression error pub error: Option, - /// Target bitdepth pub bitdepth: Bitdepth, } @@ -97,7 +92,6 @@ impl Decode for Polynomial { ) -> Result { let id = Decode::decode(decoder)?; let bitdepth = Decode::decode(decoder)?; - // Here is where the pig twists the tail let data_points: Vec = match bitdepth { Bitdepth::U8 => { debug!("Decoding as u8"); @@ -205,10 +199,6 @@ impl Polynomial { } } - fn locate_in_data_points(&self, point: f64) -> bool { - self.data_points.iter().any(|&i| i == point) - } - fn get_method(&self) -> Method { match self.id { PolynomialType::Idw => Method::Idw, @@ -223,7 +213,7 @@ impl Polynomial { } // TODO: Big one, read below // To reduce error we add more points to the polynomial, but, we also might add residuals - // each residual is 1/data_lenght * 100% less compression, each jump is 5% less compression. + // each residual is 1/data_length * 100% less compression, each jump is 5% less compression. // We can do the math and pick the one which fits better. let method = self.get_method(); let data_len = data.len(); @@ -280,7 +270,7 @@ impl Polynomial { } self.error = Some(current_err); debug!( - "Final Stored Data Lenght: {} Iterations: {}", + "Final Stored Data Length: {} Iterations: {}", self.data_points.len(), iterations ); @@ -314,7 +304,6 @@ impl Polynomial { self.point_step = step as u8; } - // --- MANDATORY METHODS --- pub fn compress(&mut self, data: &[f64]) { let points = if 3 >= (data.len() / 100) { 3 @@ -324,7 +313,6 @@ impl Polynomial { self.compress_hinted(data, points) } - /// Decompresses data pub fn decompress(data: &[u8]) -> Self { let config = BinConfig::get(); let (poly, _) = bincode::decode_from_slice(data, config).unwrap(); @@ -336,7 +324,6 @@ impl Polynomial { bincode::encode_to_vec(self, config).unwrap() } - // --- END OF MANDATORY METHODS --- /// Since IDW and Polynomial are the same code everywhere, this function prepares the data /// to be used by one of the polynomial decompression methods fn get_positions(&self, frame_size: usize) -> Vec { @@ -393,7 +380,6 @@ impl Polynomial { .map(|&f| f as f64) .collect(); let idw = IDW::new(points, self.data_points.clone()); - // Build the data (0..frame_size) .map(|f| { round_and_limit_f64( @@ -421,11 +407,8 @@ impl Polynomial { pub fn polynomial(data: &[f64], p_type: PolynomialType) -> Vec { info!("Initializing Polynomial Compressor"); let stats = DataStats::new(data); - // Initialize the compressor let mut c = Polynomial::new(data.len(), stats.min, stats.max, p_type, stats.bitdepth); - // Convert the data c.compress(data); - // Convert to bytes c.to_bytes() } @@ -436,14 +419,11 @@ pub fn polynomial_allowed_error( ) -> CompressorResult { info!("Initializing Polynomial Compressor"); let stats = DataStats::new(data); - // Initialize the compressor let mut c = Polynomial::new(data.len(), stats.min, stats.max, p_type, stats.bitdepth); - // Convert the data c.compress_bounded(data, allowed_error); CompressorResult::new(c.to_bytes(), c.error.unwrap_or(0.0)) } -/// Uncompress pub fn to_data(sample_number: usize, compressed_data: &[u8]) -> Vec { let c = Polynomial::decompress(compressed_data); c.to_data(sample_number) diff --git a/brro-compressor/src/data.rs b/brro-compressor/src/data.rs index 339cf2d..ea97dd9 100644 --- a/brro-compressor/src/data.rs +++ b/brro-compressor/src/data.rs @@ -27,7 +27,6 @@ pub struct CompressedStream { } impl CompressedStream { - /// Creates an empty compressor stream pub fn new() -> Self { CompressedStream { header: CompressorHeader::new(), @@ -87,8 +86,6 @@ impl CompressedStream { let (compressed_stream, _) = bincode::decode_from_slice(data, config).unwrap(); compressed_stream } - - /// Decompresses all the frames and returns a vector with the data pub fn decompress(&self) -> Vec { self.data_frames .iter() diff --git a/brro-compressor/src/frame/mod.rs b/brro-compressor/src/frame/mod.rs index caef28d..2a56612 100644 --- a/brro-compressor/src/frame/mod.rs +++ b/brro-compressor/src/frame/mod.rs @@ -21,14 +21,11 @@ use std::mem::size_of_val; const COMPRESSION_SPEED: [i32; 7] = [i32::MAX, 4096, 2048, 1024, 512, 256, 128]; -/// This is the structure of a compressor frame #[derive(Encode, Decode, Debug, Clone)] pub struct CompressorFrame { /// The frame size in bytes, frame_size: usize, - /// The number of samples in this frame, sample_count: usize, - /// The compressor used in the current frame compressor: Compressor, /// Output from the compressor data: Vec, @@ -48,7 +45,7 @@ impl CompressorFrame { } /// Calculates the size of the Frame and "closes it" - // TODO this is probably wrong, so we have to use the write stream to dump the bytes writen + // TODO this is probably wrong, so we have to use the write stream to dump the bytes written pub fn close(&mut self) { let size = size_of_val(&self.sample_count) + size_of_val(&self.compressor) @@ -57,13 +54,11 @@ impl CompressorFrame { self.frame_size = size; } - /// Compress a data and stores the result in the frame pub fn compress(&mut self, data: &[f64]) { self.sample_count = data.len(); self.data = self.compressor.compress(data); } - /// Compress a data and stores the result in the frame pub fn compress_bounded(&mut self, data: &[f64], max_error: f32) { self.sample_count = data.len(); self.data = self.compressor.compress_bounded(data, max_error as f64); @@ -75,15 +70,12 @@ impl CompressorFrame { // Speed factor limits the amount of data that is sampled to calculate the best compressor. // We need enough samples to do decent compression, minimum is 128 (2^7) let data_sample = COMPRESSION_SPEED[compression_speed] as usize; - // Eligible compressors for use let compressor_list = [Compressor::FFT, Compressor::Polynomial]; - // Do a statistical analysis of the data, let's see if we can pick a compressor out of this. let stats = DataStats::new(data); // Checking the statistical analysis and chose, if possible, a compressor // If the data is constant, well, constant frame if stats.min == stats.max { self.compressor = Compressor::Constant; - // Now do the full data compression self.data = self .compressor .get_compress_bounded_results(data, max_error as f64) @@ -106,13 +98,11 @@ impl CompressorFrame { .min_by_key(|x| x.0.compressed_data.len()) .unwrap(); self.compressor = *chosen_compressor; - // Now do the full data compression self.data = self .compressor .get_compress_bounded_results(data, max_error as f64) .compressed_data; } else { - // Run all the eligible compressors and choose smallest let compressor_results: Vec<_> = compressor_list .iter() .map(|compressor| { @@ -150,7 +140,6 @@ impl CompressorFrame { debug!("Auto Compressor Selection: {:?}", self.compressor); } - /// Decompresses a frame and returns the resulting data array pub fn decompress(&self) -> Vec { debug!( "Decompressing Frame. Size: {}, Samples: {}", diff --git a/brro-compressor/src/header.rs b/brro-compressor/src/header.rs index e7f398d..080beb0 100644 --- a/brro-compressor/src/header.rs +++ b/brro-compressor/src/header.rs @@ -16,7 +16,6 @@ limitations under the License. use bincode::{Decode, Encode}; -/// This will write the file headers #[derive(Encode, Decode, Debug, Clone)] pub struct CompressorHeader { initial_segment: [u8; 4], diff --git a/brro-compressor/src/lib.rs b/brro-compressor/src/lib.rs index 2b1d4ae..1b6492d 100644 --- a/brro-compressor/src/lib.rs +++ b/brro-compressor/src/lib.rs @@ -15,7 +15,7 @@ limitations under the License. */ #![allow(clippy::new_without_default)] -// Lucas - Once the project is far enough along I strongly reccomend reenabling dead code checks +// TODO: re-enable dead code checks #![allow(dead_code)] pub mod compare; diff --git a/brro-compressor/src/main.rs b/brro-compressor/src/main.rs index 69e7561..d261382 100644 --- a/brro-compressor/src/main.rs +++ b/brro-compressor/src/main.rs @@ -24,31 +24,23 @@ use std::error::Error; use std::path::PathBuf; use wavbrro::wavbrro::WavBrro; -/// Processes the given input based on the provided arguments. fn process_args(arguments: &Args) -> Result<(), Box> { let metadata = std::fs::metadata(&arguments.input)?; - // If the input path points to a single file if metadata.is_file() { debug!("Target is a file"); process_single_file(arguments.input.clone(), arguments)?; - } - // If the input path points to a directory - else if metadata.is_dir() { + } else if metadata.is_dir() { debug!("Target is a directory"); process_directory(arguments)?; - } - // If the input path is neither a file nor a directory - else { + } else { return Err("The provided path is neither a file nor a directory.".into()); } Ok(()) } -/// Processes all files in a given directory. fn process_directory(arguments: &Args) -> Result<(), Box> { - // Assuming you want to process each file inside this directory for entry in std::fs::read_dir(arguments.input.clone())? { let path = entry?.path(); if path.is_file() { @@ -67,14 +59,11 @@ fn process_directory(arguments: &Args) -> Result<(), Box> { Ok(()) } -/// Processes a single file. fn process_single_file(mut file_path: PathBuf, arguments: &Args) -> Result<(), Box> { debug!("Processing single file..."); if arguments.uncompress { - //read if let Some(vec) = bro_reader::read_file(&file_path)? { let arr: &[u8] = &vec; - //decompress let decompressed_data = decompress_data(arr); if arguments.verbose { println!("Output={:?}", decompressed_data); @@ -83,29 +72,22 @@ fn process_single_file(mut file_path: PathBuf, arguments: &Args) -> Result<(), B WavBrro::to_file_with_data(&file_path, &decompressed_data) } } else { - // Read an WavBRRO file and compress it let data = WavBrro::from_file(&file_path)?; if arguments.verbose { println!("Input={:?}", data); } - //compress let compressed_data = compress_data(&data, arguments); - //write file_path.set_extension("bro"); std::fs::write(file_path, compressed_data)?; } Ok(()) } -/// Compresses the data based on the provided tag and arguments. fn compress_data(vec: &[f64], arguments: &Args) -> Vec { debug!("Compressing data!"); - //let optimizer_results = optimizer::process_data(vec, tag); - // Create Optimization Plan and Stream for the data. let mut op = OptimizerPlan::plan(vec); let mut cs = CompressedStream::new(); - // Assign the compressor if it was selected match arguments.compressor { CompressorType::Noop => op.set_compressor(Compressor::Noop), CompressorType::Constant => op.set_compressor(Compressor::Constant), @@ -116,7 +98,6 @@ fn compress_data(vec: &[f64], arguments: &Args) -> Vec { } for (cpr, data) in op.get_execution().into_iter() { debug!("Chunk size: {}", data.len()); - // If compressor is a losseless one, compress with the error defined, or default match arguments.compressor { CompressorType::Fft | CompressorType::Polynomial @@ -133,7 +114,6 @@ fn compress_data(vec: &[f64], arguments: &Args) -> Vec { cs.to_bytes() } -/// Compresses the data based on the provided tag and arguments. fn decompress_data(compressed_data: &[u8]) -> Vec { debug!("decompressing data!"); let cs = CompressedStream::from_bytes(compressed_data); @@ -143,10 +123,8 @@ fn decompress_data(compressed_data: &[u8]) -> Vec { #[derive(Parser, Default, Debug)] #[command(author, version, about="A Time-Series compressor", long_about = None)] struct Args { - /// input file input: PathBuf, - /// Select a compressor, default is auto #[arg(long, value_enum, default_value = "auto")] compressor: CompressorType, @@ -157,7 +135,6 @@ struct Args { #[arg(short, long, default_value_t = 5, value_parser = clap::value_parser!(u8).range(0..51))] error: u8, - /// Uncompresses the input file/directory #[arg(short, action)] uncompress: bool, @@ -169,7 +146,6 @@ struct Args { #[arg(short, long, default_value_t = 0, value_parser = clap::value_parser!(u8).range(0..7))] compression_selection_sample_level: u8, - /// Verbose output, dumps everysample in the input file (for compression) and in the ouput file (for decompression) #[arg(long, action)] verbose: bool, } diff --git a/brro-compressor/src/optimizer/mod.rs b/brro-compressor/src/optimizer/mod.rs index 2756aaa..2351d01 100644 --- a/brro-compressor/src/optimizer/mod.rs +++ b/brro-compressor/src/optimizer/mod.rs @@ -16,18 +16,14 @@ limitations under the License. use crate::{ compressor::Compressor, - types, utils::{f64_to_u64, prev_power_of_two}, }; -use log::debug; -use types::metric_tag::MetricTag; pub mod utils; /// Max Frame size, this can aprox. 36h of data at 1point/sec rate, a little more than 1 week at 1point/5sec /// and 1 month (30 days) at 1 point/20sec. /// This would be aprox. 1MB of Raw data (131072 * 64bits). -/// We wouldn't want to decompressed a ton of uncessary data, but for historical view of the data, looking into 1day/week/month at once is very reasonable const MAX_FRAME_SIZE: usize = 131072; // 2^17 /// The Min frame size is one that allows our compressors potentially achieve 100x compression. Currently the most /// limited one is the FFT compressor, that needs 3 frequencies at minimum, 3x100 = 300, next power of 2 is 512. @@ -47,7 +43,6 @@ pub struct OptimizerPlan { } impl OptimizerPlan { - /// Creates an optimal data compression plan pub fn plan(data: &[f64]) -> Self { let c_data = OptimizerPlan::clean_data(data); let chunks = OptimizerPlan::get_chunks_sizes(c_data.len()); @@ -59,28 +54,12 @@ impl OptimizerPlan { } } - /// Creates an optimal plan for compression for the data set provided bound by a given error - pub fn plan_bounded(data: &[f64], max_error: f32) -> Self { - // TODO: Check error limits - let c_data = OptimizerPlan::clean_data(data); - let chunks = OptimizerPlan::get_chunks_sizes(c_data.len()); - let optimizer = OptimizerPlan::assign_compressor(&c_data, &chunks, Some(max_error)); - OptimizerPlan { - data: c_data, - chunk_sizes: chunks, - compressors: optimizer, - } - } - - /// Sets a given compressor for all data chunks pub fn set_compressor(&mut self, compressor: Compressor) { let new_compressors = vec![compressor; self.compressors.len()]; self.compressors = new_compressors; } - /// Removes NaN and infinite references from the data pub fn clean_data(wav_data: &[f64]) -> Vec { - // Cleaning data, removing NaN, etc. This might reduce sample count wav_data .iter() .filter(|x| !(x.is_nan() || x.is_infinite())) @@ -115,7 +94,6 @@ impl OptimizerPlan { chunk_sizes } - /// Returns a vector with the data slice and the compressor associated pub fn get_execution(&self) -> Vec<(&Compressor, &[f64])> { let mut output = Vec::with_capacity(self.chunk_sizes.len()); let mut s = 0; @@ -126,15 +104,11 @@ impl OptimizerPlan { output } - /// Walks the data, checks how much variability is in the data, and assigns a compressor based on that - /// NOTE: Is this any good? fn get_compressor(data: &[f64]) -> Compressor { let _ = data.iter().map(|&f| f64_to_u64(f, 0)); - // For now, let's just return FFT Compressor::FFT } - /// Assigns a compressor to a chunk of data fn assign_compressor( clean_data: &[f64], chunks: &[usize], @@ -155,17 +129,6 @@ impl OptimizerPlan { } } -/// This should look at the data and return an optimized dataset for a specific compressor, -/// If a compressor is hand picked, this should be skipped. -pub fn process_data(wav_data: &[f64], tag: &MetricTag) -> Vec { - debug!("Tag: {:?} Len: {}", tag, wav_data.len()); - wav_data - .iter() - .filter(|x| !(x.is_nan() || x.is_infinite())) - .copied() - .collect() -} - #[cfg(test)] mod tests { use super::*; diff --git a/brro-compressor/src/optimizer/utils.rs b/brro-compressor/src/optimizer/utils.rs index 3486729..4700d2f 100644 --- a/brro-compressor/src/optimizer/utils.rs +++ b/brro-compressor/src/optimizer/utils.rs @@ -24,26 +24,19 @@ pub enum Bitdepth { I16, U8, } -/// Data structure that holds statictical information about the data provided + pub struct DataStats { - // Max value pub max: f64, - // Max value location in the array pub max_loc: usize, - // Min value pub min: f64, - // Min value location in the array pub min_loc: usize, - // Mean of the data pub mean: f64, - // Bitdepth that this data can be pub bitdepth: Bitdepth, pub fractional: bool, } impl DataStats { pub fn new(data: &[f64]) -> Self { - // Statistical data stored let mut min: f64 = data[0]; let mut min_loc = 0; let mut max: f64 = data[0]; @@ -52,7 +45,6 @@ impl DataStats { let mut mean: f64 = 0.0; let mut recommended_bitdepth = Bitdepth::F64; - // Walk the data and perform the analysis for (i, value) in data.iter().enumerate() { let t_value = *value; mean += value; @@ -70,7 +62,7 @@ impl DataStats { } mean /= data.len() as f64; // Check max size of values - // For very large numbers (i32 and i64), it might be ideal to detect the dc component + // TODO: for very large numbers (i32 and i64), it might be ideal to detect the dc component // of the signal. And then remove it later let max_int = split_n(max).0; // This is the DC component let min_int = split_n(min).0; @@ -118,19 +110,6 @@ impl DataStats { } } } - -fn as_i8(value: f64) -> i8 { - split_n(value).0 as i8 -} - -fn as_i16(value: f64) -> i16 { - split_n(value).0 as i16 -} - -fn as_i32(value: f64) -> i32 { - split_n(value).0 as i32 -} - fn split_n(x: f64) -> (i64, f64) { const FRACT_SCALE: f64 = 1.0 / (65536.0 * 65536.0 * 65536.0 * 65536.0); // 1_f64.exp(-64) const STORED_MANTISSA_DIGITS: u32 = f64::MANTISSA_DIGITS - 1; @@ -177,57 +156,6 @@ fn split_n(x: f64) -> (i64, f64) { (0, 0.0) } } - -fn analyze_data(data: &Vec) -> (i32, i64, bool) { - let mut min: f64 = 0.0; - let mut max: f64 = 0.0; - let mut fractional = false; - for value in data { - let t_value = *value; - if split_n(t_value).1 != 0.0 { - fractional = true; - } - if t_value > max { - max = t_value - }; - if t_value < min { - min = t_value - }; - } - // Check max size of values - // For very large numbers (i32 and i64), it might be ideal to detect the dc component - // of the signal. And then remove it later - let max_int = split_n(max).0; // This is the DC component - let min_int = split_n(min).0; - - // Finding the bitdepth without the DC component - let recommended_bitdepth = find_bitdepth(max_int - min_int, min_int); - debug!( - "Recommended Bitdepth: {}, Fractional: {}", - recommended_bitdepth, fractional - ); - (recommended_bitdepth, min_int, fractional) -} - -fn find_bitdepth(max_int: i64, min_int: i64) -> i32 { - // Check where those ints fall into - let bitdepth = match max_int { - _ if max_int <= u8::MAX.into() => 8, - _ if max_int <= i16::MAX.into() => 16, - _ if max_int <= i32::MAX.into() => 32, - _ => 64, - }; - - let bitdepth_signed = match min_int { - _ if min_int == 0 => 8, - _ if min_int >= i16::MIN.into() => 16, - _ if min_int >= i32::MIN.into() => 32, - _ => 64, - }; - - bitdepth.max(bitdepth_signed) -} - #[cfg(test)] mod tests { use super::*; diff --git a/brro-compressor/src/types/metric_tag.rs b/brro-compressor/src/types/metric_tag.rs index 7748a4b..eb4b7c8 100644 --- a/brro-compressor/src/types/metric_tag.rs +++ b/brro-compressor/src/types/metric_tag.rs @@ -14,21 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. */ -use median::Filter; - #[derive(Debug)] pub enum MetricTag { + // Represents a percentage value. Precision is reduced to 2 significant digits. Percent(i32), - // If it is a percent reduce significant digits to 2 + // Represents a duration value. Precision is reduced to 1 microsecond. Duration(i32), - // if it is a duration reduce precision to 1 microsecond + // Represents a metric with a float representation where precision is not required. NotFloat, - // A metric that has a float representation but shouldn't (Eg. Precision is not needed) + // Represents a quasi-random metric, such as network deltas or heap memory changes, + // which exhibit unpredictable behavior. QuasiRandom, - // A metric that exhibits a quasi random sample behavior. (E.g. Network deltas, heap memory) + // Represents data in bytes. Should be converted to a human-readable format, + // such as KB or MB. Bytes(i32), - // Data that is in bytes... Make it MB, or KB - Other, // Everything else + // Represents any other type of metric that does not fit into the predefined categories. + Other, } impl MetricTag { @@ -47,16 +48,4 @@ impl MetricTag { fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 { (number * mul as f64) as i64 } - - fn to_median_filter(data: &[f64]) -> Vec { - let mut filtered = Vec::with_capacity(data.len()); - // 10minutes of data - let mut filter = Filter::new(50); - for point in data { - let point_int = MetricTag::QuasiRandom.from_float(*point); - let median = filter.consume(point_int); - filtered.push(median) - } - filtered - } } diff --git a/brro-compressor/src/utils/error.rs b/brro-compressor/src/utils/error.rs index 9303ae5..d472b5d 100644 --- a/brro-compressor/src/utils/error.rs +++ b/brro-compressor/src/utils/error.rs @@ -40,17 +40,11 @@ impl ErrorMethod { /// This function calculates the error between 2 arrays of f64. The results are from 0 to .. /// Being 0, no error, 1 - 100% error and so on. -/// This uses the default function to calculte it. +/// This uses the default function to calculate it. pub fn calculate_error(original: &[f64], generated: &[f64]) -> f64 { ErrorMethod::error(ErrorMethod::default(), original, generated) } -/// This function calculates the error between 2 arrays of f64. The results are from 0 to .. -/// Being 0, no error, 1 - 100% error and so on. -/// This uses the provided method to calculte it. -pub fn calculate_error_method(original: &[f64], generated: &[f64], method: ErrorMethod) -> f64 { - ErrorMethod::error(method, original, generated) -} /// Calculates the mean squared error between two vectors. /// /// # Arguments diff --git a/brro-compressor/src/utils/mod.rs b/brro-compressor/src/utils/mod.rs index 263ab0e..c00015d 100644 --- a/brro-compressor/src/utils/mod.rs +++ b/brro-compressor/src/utils/mod.rs @@ -20,7 +20,7 @@ pub mod writers; pub const DECIMAL_PRECISION: u32 = 5; -// Is this the right place? +// TODO: check if it is the right place? pub fn prev_power_of_two(n: usize) -> usize { // n = 0 gives highest_bit_set_idx = 0. let highest_bit_set_idx = 63 - (n | 1).leading_zeros(); @@ -58,11 +58,6 @@ pub fn f64_to_u64(number: f64, precision: usize) -> u64 { (number * mul as f64) as u64 } -pub fn round_f32(x: f32, decimals: u32) -> f64 { - let y = 10i32.pow(decimals) as f64; - (x as f64 * y).round() / y -} - pub fn round_f64(x: f64, decimals: u32) -> f64 { let y = 10i32.pow(decimals) as f64; (x * y).round() / y diff --git a/brro-compressor/src/utils/readers/bro_reader.rs b/brro-compressor/src/utils/readers/bro_reader.rs index 3ee0b36..f2b7e37 100644 --- a/brro-compressor/src/utils/readers/bro_reader.rs +++ b/brro-compressor/src/utils/readers/bro_reader.rs @@ -44,28 +44,3 @@ fn is_bro_file(file_path: &Path) -> io::Result { // Check if the file starts with "BRRO" Ok(header.starts_with(b"BRRO")) } - -/// Read a file by chunks and processes the chunks -pub fn process_by_chunk(file_path: &Path) -> Result<(), std::io::Error> { - let mut file = std::fs::File::open(file_path)?; - - let mut list_of_chunks = Vec::new(); - // 64KB at a time, assuming 64Bit samples, ~1024 samples. - let chunk_size = 0x10000; - - loop { - let mut chunk = Vec::with_capacity(chunk_size); - let n = file - .by_ref() - .take(chunk_size as u64) - .read_to_end(&mut chunk)?; - if n == 0 { - break; - } - list_of_chunks.push(chunk); - if n < chunk_size { - break; - } - } - Ok(()) -} diff --git a/csv-compressor/src/csv.rs b/csv-compressor/src/csv.rs index d8372aa..fb38ddb 100644 --- a/csv-compressor/src/csv.rs +++ b/csv-compressor/src/csv.rs @@ -46,7 +46,6 @@ pub fn read_samples_from_csv_file(dest: &Path) -> Result, csv::Error reader.deserialize().collect() } -/// Writes samples to file at dest as csv pub fn write_samples_to_csv_file(dest: &Path, samples: &[Sample]) -> Result<(), csv::Error> { let mut csv_file = File::create(dest)?; let mut writer = csv::Writer::from_writer(&mut csv_file); @@ -89,7 +88,6 @@ mod tests { TempDir::new("test_read_samples").expect("Unable to create temporary directory"); let path = temp_dir.path().join("samples.csv"); - // Writing content to test file let mut file = File::create(&path).expect("Unable to create test file"); file.write_all(csv_content.as_bytes()) .expect("Unable to write data"); diff --git a/csv-compressor/src/main.rs b/csv-compressor/src/main.rs index 17ebb26..04a6299 100644 --- a/csv-compressor/src/main.rs +++ b/csv-compressor/src/main.rs @@ -34,10 +34,8 @@ mod metric; author, version, about = "A Time-Series compressor utilizes Brro Compressor for CSV format", long_about = None )] pub struct Args { - /// Path to input input: PathBuf, - /// Defines where the result will be stored #[arg(short, long, action)] output: Option, @@ -92,14 +90,10 @@ enum CompressorType { Idw, } -/// Compresses the data based on the provided tag and arguments. fn compress_data(vec: &[f64], arguments: &Args) -> Vec { debug!("Compressing data!"); - //let optimizer_results = optimizer::process_data(vec, tag); - // Create Optimization Plan and Stream for the data. let mut op = OptimizerPlan::plan(vec); let mut cs = CompressedStream::new(); - // Assign the compressor if it was selected match arguments.compressor { CompressorType::Noop => op.set_compressor(Compressor::Noop), CompressorType::Constant => op.set_compressor(Compressor::Constant), @@ -110,7 +104,6 @@ fn compress_data(vec: &[f64], arguments: &Args) -> Vec { } for (cpr, data) in op.get_execution().into_iter() { debug!("Chunk size: {}", data.len()); - // If compressor is a losseless one, compress with the error defined, or default match arguments.compressor { CompressorType::Fft | CompressorType::Polynomial @@ -127,14 +120,12 @@ fn compress_data(vec: &[f64], arguments: &Args) -> Vec { cs.to_bytes() } -/// Compresses the data based on the provided tag and arguments. fn decompress_data(compressed_data: &[u8]) -> Vec { debug!("decompressing data!"); let cs = CompressedStream::from_bytes(compressed_data); cs.decompress() } -/// process_csv opens and parses the content of file at path pub fn process_csv(path: &Path) -> Metric { let samples = csv::read_samples_from_csv_file(path).expect("failed to read samples from file"); Metric::from_samples(&samples).expect("failed to create metric from samples") @@ -147,18 +138,15 @@ fn process_args(args: Args) { .unwrap_or_else(|| args.input.clone()) .clone(); - // uncompressing input if args.uncompress { debug!("Starting uncompressing of {:?}", &args.input); if let Some(data) = read_file(&args.input).expect("failed to read bro file") { - // decomressing data and creating wavbrro from it let decompressed_data = decompress_data(&data); let mut wbro = WavBrro::new(); for data in decompressed_data.iter() { wbro.add_sample(*data); } - // // reading existing index let mut vsri_file_path = args.input.clone(); vsri_file_path.set_extension("vsri"); debug!("Reading vsri at {:?}", &output_base); @@ -174,7 +162,6 @@ fn process_args(args: Args) { let samples = metric.get_samples(); - // creating csv output file let mut csv_file_path = file_path.clone(); csv_file_path.set_extension("csv"); debug!("Writing samples into csv file"); @@ -199,7 +186,6 @@ fn process_args(args: Args) { .expect("failed to flush vsri to the file"); } - // compressing input if no_compression is not set if !args.no_compression { debug!("Starting compressing"); let data = metric.wbro.get_samples(); diff --git a/csv-compressor/src/metric.rs b/csv-compressor/src/metric.rs index d1c0747..6f0f615 100644 --- a/csv-compressor/src/metric.rs +++ b/csv-compressor/src/metric.rs @@ -20,12 +20,9 @@ use std::path::Path; use vsri::{day_elapsed_seconds, Vsri}; use wavbrro::wavbrro::WavBrro; -/// Metric is responsible for generating WavBrro and VSRI from parsed Samples #[derive(Default)] pub struct Metric { - /// Metric data itself pub wbro: WavBrro, - /// Metric indexes pub vsri: Vsri, } @@ -49,15 +46,12 @@ impl Display for Error { impl std::error::Error for Error {} impl Metric { - /// Creates new WavBrro instance pub fn new(wbro: WavBrro, vsri: Vsri) -> Self { Metric { wbro, vsri } } - /// Appends samples to the metric pub fn append_samples(&mut self, samples: &[Sample]) -> Result<(), Error> { for sample in samples { - // For solution simplification it generates only 1 WavBrro and 1 VSRI let ts = day_elapsed_seconds(sample.timestamp / 1000); self.vsri .update_for_point(ts) @@ -69,19 +63,16 @@ impl Metric { Ok(()) } - /// Creates default metric from the existing samples pub fn from_samples(samples: &[Sample]) -> Result { let mut metric = Metric::default(); metric.append_samples(samples)?; Ok(metric) } - /// Flushes underlying WavBrro formatted metrics to the file at path pub fn flush_wavbrro(&self, path: &Path) { self.wbro.to_file(path) } - /// Flushes underlying VSRI to the file at path pub fn flush_indexes(&self, path: &Path) -> Result<(), std::io::Error> { self.vsri.flush_to(path) } diff --git a/optimizer/src/main.rs b/optimizer/src/main.rs index 03f0bab..244cefc 100644 --- a/optimizer/src/main.rs +++ b/optimizer/src/main.rs @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Lucas - Once the project is far enough along I strongly reccomend reenabling dead code checks +// TODO: re-enable dead code checks #![allow(dead_code)] use clap::{arg, command, Parser}; @@ -29,12 +29,20 @@ use std::{fs::File, path::Path}; #[derive(Debug)] enum MetricTag { - Percent(i32), // If it is a percent reduce significant digits to 2 - Duration(i32), // if it is a duration reduce precision to 1 microsecond - NotFloat, // A metric that has a float representation but shouldn't (Eg. Precision is not needed) - QuasiRandom, // A metric that exhibits a quasi random sample behavior. (E.g. Network deltas, heap memory) - Bytes(i32), // Data that is in bytes... Make it MB, or KB - Other, // Everything else + // Represents a percentage value. Precision is reduced to 2 significant digits. + Percent(i32), + // Represents a duration value. Precision is reduced to 1 microsecond. + Duration(i32), + // Represents a metric with a float representation where precision is not required. + NotFloat, + // Represents a quasi-random metric, such as network deltas or heap memory changes, + // which exhibit unpredictable behavior. + QuasiRandom, + // Represents data in bytes. Should be converted to a human-readable format, + // such as KB or MB. + Bytes(i32), + // Represents any other type of metric that does not fit into the predefined categories. + Other, } impl MetricTag { @@ -50,13 +58,6 @@ impl MetricTag { } } -/* -Reads a WAV file, checks the channels and the information contained there. From that -information takes a decision on the best channel, block size and bitrate for the BRRO -encoders. -*/ - -/* Read a WAV file, */ fn read_metrics_from_wav(filename: &str) -> Vec { let r_reader = hound::WavReader::open(filename); let mut reader = match r_reader { @@ -70,7 +71,6 @@ fn read_metrics_from_wav(filename: &str) -> Vec { let mut raw_data: Vec = Vec::new(); let mut u64_holder: [u16; 4] = [0, 0, 0, 0]; - // Iterate over the samples and channels and push each sample to the vector let mut current_channel: usize = 0; for sample in reader.samples::() { u64_holder[current_channel] = sample.unwrap() as u16; @@ -93,9 +93,7 @@ fn generate_wav_header(channels: Option, bitdepth: u16, samplerate: u32) -> } } -/// Write a WAV file with the outputs of data analysis for float data fn write_optimal_wav(filename: &str, data: Vec, bitdepth: i32, dc: i64, channels: i32) { - // Make DC a float for operations let fdc = dc as f64; let header: WavSpec = generate_wav_header(Some(channels), bitdepth as u16, 8000); let mut file_path = filename.to_string(); @@ -142,7 +140,6 @@ fn as_i32(value: f64) -> i32 { split_n(value).0 as i32 } -// Split a float into an integer fn split_n(x: f64) -> (i64, f64) { const FRACT_SCALE: f64 = 1.0 / (65536.0 * 65536.0 * 65536.0 * 65536.0); // 1_f64.exp(-64) const STORED_MANTISSA_DIGITS: u32 = f64::MANTISSA_DIGITS - 1; @@ -203,7 +200,6 @@ fn get_max(a: i32, b: i32) -> i32 { a.max(b) } -/// Converts a float via multiplication and truncation fn to_multiply_and_truncate(number: f64, mul: i32) -> i64 { (number * mul as f64) as i64 } @@ -220,7 +216,6 @@ fn to_median_filter(data: &Vec) -> Vec { filtered } -/// Check the type of metric and tag it fn tag_metric(filename: &str) -> MetricTag { // Should sort this by the probability of each tag, so the ones that are more common are dealt first // If it says percent_ or _utilization @@ -274,7 +269,7 @@ fn analyze_data(data: &Vec) -> (i32, i64, bool) { let max_int = split_n(max).0; // This is the DC component let min_int = split_n(min).0; - // If fractional is it relevant? + // TODO: check is it relevant in fractional case? let max_frac = split_n(max).1; // Finding the bitdepth without the DC component @@ -397,7 +392,6 @@ fn process_data_and_write_output(full_path: &Path, file: &mut File, arguments: & MetricTag::QuasiRandom => to_median_filter(&wav_data), _ => wav_data.iter().map(|x| tag.from_float(*x)).collect(), }; - // We split the code here if !iwav_data.is_empty() { _fractional = false; if arguments.dump_optimized { @@ -421,7 +415,6 @@ fn process_data_and_write_output(full_path: &Path, file: &mut File, arguments: & #[derive(Parser, Default, Debug)] #[command(author, version, about, long_about = None)] struct Args { - /// input file input: String, /// Write a new file with optimized settings, named filename_OPT.wav @@ -445,8 +438,6 @@ struct Args { } fn main() { - // How to break the float part??? --> THERE ARE NO FLOATS! - // https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/deployment_guide/s2-proc-stat env_logger::init(); let arguments = Args::parse(); debug!("{:?}", arguments); diff --git a/prometheus-remote/src/flac_reader.rs b/prometheus-remote/src/flac_reader.rs index c8739fe..0b2d805 100644 --- a/prometheus-remote/src/flac_reader.rs +++ b/prometheus-remote/src/flac_reader.rs @@ -16,15 +16,7 @@ limitations under the License. use std::fs::File; -use symphonia::core::audio::SampleBuffer; -use symphonia::core::codecs::{Decoder, DecoderOptions}; use symphonia::core::errors::Error as SymphoniaError; -use symphonia::core::formats::{FormatOptions, FormatReader}; -use symphonia::core::io::MediaSourceStream; -use symphonia::core::meta::MetadataOptions; -use symphonia::core::probe::Hint; - -use chrono::{DateTime, Utc}; use crate::lib_vsri; @@ -114,191 +106,3 @@ impl SimpleFlacReader { f64::from_bits(u64_bits) } } - -pub struct FlacMetric { - timeseries_data: Vec<(i64, f64)>, // Sample Data - file: File, // The File where the metric is - interval_start: i64, // The start interval in timestamp with miliseconds - decoder: Option>, // Flac decoder - format_reader: Option>, // Flac format reader -} - -impl FlacMetric { - pub fn new(file: File, start_ts: i64) -> Self { - FlacMetric { - timeseries_data: Vec::new(), - file, - interval_start: start_ts, - decoder: None, - format_reader: None, - } - } - - fn datetime_from_ms(real_time: i64) -> String { - // Time is in ms, convert it to seconds - let datetime = DateTime::::from_timestamp(real_time / 1000, 0).unwrap(); - // Transform datetime to string with the format YYYY-MM-DD - let datetime_str = datetime.format("%Y-%m-%d").to_string(); - datetime_str - } - - /// Load sample data into the Flac Object - fn load_samples(self) -> Vec<(i64, f64)> { - Vec::new() - } - - fn get_format_reader(&self) -> Box { - // TODO: One more unwrap to deal with - let owned_file = self.file.try_clone().unwrap(); - debug!("[READ][FLAC] Probing file: {:?}", owned_file); - let file = Box::new(owned_file); - // Create the media source stream using the boxed media source from above. - let mss = MediaSourceStream::new(file, Default::default()); - // Use the default options when reading and decoding. - let format_opts: FormatOptions = Default::default(); - let metadata_opts: MetadataOptions = Default::default(); - // Probe the media source stream for a format. - let probed = symphonia::default::get_probe() - .format( - Hint::new().mime_type("FLaC"), - mss, - &format_opts, - &metadata_opts, - ) - .unwrap(); - // Get the format reader yielded by the probe operation. - probed.format - } - - fn get_decoder(&self) -> Box { - let decoder_opts: DecoderOptions = Default::default(); - let format = self.get_format_reader(); - // Get the default track. - let track = format.default_track().unwrap(); - // Create a decoder for the track. - symphonia::default::get_codecs() - .make(&track.codec_params, &decoder_opts) - .unwrap() - } - - /// Read samples from a file with an optional start and end point. - pub fn get_samples( - &self, - start: Option, - end: Option, - ) -> std::result::Result, SymphoniaError> { - let mut sample_vec: Vec = Vec::new(); - let mut format_reader = self.get_format_reader(); - let mut decoder = self.get_decoder(); - let channels = decoder.codec_params().channels.unwrap().count(); - let mut sample_buf = None; - let mut frame_counter: i32 = 0; - let start_frame = start.unwrap_or(0); - let end_frame = end.unwrap_or(lib_vsri::MAX_INDEX_SAMPLES); - // Loop over all the packets, get all the samples and return them - loop { - let packet = match format_reader.next_packet() { - Ok(packet) => packet, - Err(err) => break error!("[READ]Reader error: {}", err), - }; - // How many frames inside the packet - let dur = packet.dur() as i32; - // Check if we need to decode this packet or not - if !(start_frame < frame_counter + dur && end_frame > frame_counter + dur) { - continue; - } - // Decode the packet into samples. - // TODO: This is overly complex, split into its own code - match decoder.decode(&packet) { - Ok(decoded) => { - // Consume the decoded samples (see below). - if sample_buf.is_none() { - // Get the audio buffer specification. - let spec = *decoded.spec(); - // Get the capacity of the decoded buffer. Note: This is capacity, not length! - let duration = decoded.capacity() as u64; - // Create the sample buffer. - sample_buf = Some(SampleBuffer::::new(duration, spec)); - } - // Each frame contains several samples, we need to get the frame not the sample. Since samples = frames * channels - if let Some(buf) = &mut sample_buf { - buf.copy_interleaved_ref(decoded); - let mut i16_samples: [u16; 4] = [0, 0, 0, 0]; - let mut i = 1; // Starting at 1, channel number is not 0 indexed... - for sample in buf.samples() { - if i >= channels { - frame_counter += 1; - if frame_counter >= start_frame && frame_counter <= end_frame { - sample_vec.push(FlacMetric::join_u16_into_f64(i16_samples)); - } - i = 1; - } - i16_samples[i - 1] = *sample as u16; - i += 1; - } - } - } - Err(SymphoniaError::DecodeError(err)) => error!("[READ]Decode error: {}", err), - Err(err) => break error!("[READ]Unexpeted Decode error: {}", err), - } - } - Ok(sample_vec) - } - - /// Read all samples from a file - pub fn get_all_samples(&self) -> std::result::Result, SymphoniaError> { - let mut sample_vec: Vec = Vec::new(); - let mut format_reader = self.get_format_reader(); - let mut decoder = self.get_decoder(); - let channels = decoder.codec_params().channels.unwrap().count(); - let mut sample_buf = None; - // Loop over all the packets, get all the samples and return them - loop { - let packet = match format_reader.next_packet() { - Ok(packet) => packet, - Err(err) => break debug!("[READ]Reader error: {}", err), - }; - // Decode the packet into audio samples. - match decoder.decode(&packet) { - Ok(decoded) => { - // Consume the decoded audio samples (see below). - if sample_buf.is_none() { - // Get the audio buffer specification. - let spec = *decoded.spec(); - // Get the capacity of the decoded buffer. Note: This is capacity, not length! - let duration = decoded.capacity() as u64; - // Create the sample buffer. - sample_buf = Some(SampleBuffer::::new(duration, spec)); - } - if let Some(buf) = &mut sample_buf { - buf.copy_interleaved_ref(decoded); - let mut i16_samples: [u16; 4] = [0, 0, 0, 0]; - let mut i = 1; // Starting at 1, channel number is not 0 indexed... - for sample in buf.samples() { - if i >= channels { - sample_vec.push(FlacMetric::join_u16_into_f64(i16_samples)); - i = 1; - } - i16_samples[i - 1] = *sample as u16; - i += 1; - } - } - } - Err(SymphoniaError::DecodeError(err)) => error!("[READ]Decode error: {}", err), - Err(err) => break error!("[READ]Unexpeted Decode error: {}", err), - } - } - // Just to make it compile - Ok(sample_vec) - } - - /// Recreate a f64 - fn join_u16_into_f64(bits: [u16; 4]) -> f64 { - let u64_bits = (bits[0] as u64) - | ((bits[1] as u64) << 16) - | ((bits[2] as u64) << 32) - | ((bits[3] as u64) << 48); - - f64::from_bits(u64_bits) - } -} diff --git a/prometheus-remote/src/fs_utils.rs b/prometheus-remote/src/fs_utils.rs index f4aa629..9374063 100644 --- a/prometheus-remote/src/fs_utils.rs +++ b/prometheus-remote/src/fs_utils.rs @@ -41,7 +41,6 @@ use crate::lib_vsri::{day_elapsed_seconds, start_day_ts, Vsri, MAX_INDEX_SAMPLES struct DateRange(DateTime, DateTime); -// Iterator for Day to Day // TODO: move this to several impl? So we can return iterators over several time periods? impl Iterator for DateRange { type Item = DateTime; @@ -71,7 +70,6 @@ impl PromDataPoint { } } } -/// Holds a time range for the file and index #[derive(Debug, Clone, Copy)] struct FileTimeRange { start: i32, @@ -95,9 +93,6 @@ pub struct DataLocator { } impl DataLocator { - /// Creates a new DataLocator, includes the File, Index and the Time Range for the data it is expected to return. - /// This is a lazy, doesn't check for the intersection between the time range and data owned until the data is - /// requested. fn new(file: File, index: Vsri, time_range: FileTimeRange, date: DateTime) -> Self { DataLocator { file, @@ -107,20 +102,16 @@ impl DataLocator { } } - /// Checks if the Locator time_range intersects with the Index data fn do_intersect(&self) -> bool { - // If the data start after the end of the range or the data ends before the beggining of the range if self.index.min() > self.time_range.end || self.index.max() < self.time_range.start { return false; } - // index function checks for no ownership, this function checks for ownership, invert the result !self .index .is_empty([self.time_range.start, self.time_range.end]) } fn get_samples_from_range(&self) -> Option<[i32; 2]> { - // By default, get all the samples let mut sample_range: [i32; 2] = [0, MAX_INDEX_SAMPLES]; if !self.do_intersect() { return None; @@ -130,17 +121,14 @@ impl DataLocator { sample_range[0] = 0; } _ => { - // There is intersection, it can unwrap safely sample_range[0] = self.index.get_this_or_next(self.time_range.start).unwrap(); } } match self.time_range.end { - // Match cannot shadow statics and whatever _ if self.time_range.end == MAX_INDEX_SAMPLES => { sample_range[1] = self.index.get_sample_count(); } _ => { - // There is intersection, it can unwrap safely sample_range[1] = self .index .get_this_or_previous(self.time_range.start) @@ -150,13 +138,11 @@ impl DataLocator { Some(sample_range) } - /// Consumes the DataLocator to return a Vec of PromDataPoints pub fn into_prom_data_point(self) -> Vec { let mut prom_data = Vec::new(); let samples_locations = self.get_samples_from_range(); let flac_metric = SimpleFlacReader::new(self.file, self.time_range.start as i64); let tmp_vec = self.index.get_all_timestamps(); - // There goes an empty arry if samples_locations.is_none() { return prom_data; } @@ -169,14 +155,12 @@ impl DataLocator { tmp_vec.len() ); let time_for_samples = &tmp_vec[start as usize..=end as usize]; - // The time I learned if..else is an expression! let temp_result = if start == 0 && end == self.index.get_sample_count() { flac_metric.get_all_samples() } else { flac_metric.get_samples(Some(start), Some(end)) }; match temp_result { - // Pack this into DataPoints Ok(samples) => { for (v, t) in samples.into_iter().zip(time_for_samples.iter()) { let ts = *t as i64 + start_day_ts(self.date); @@ -191,7 +175,8 @@ impl DataLocator { prom_data } - /// Given a metric name and a time interval, returns all the files handles for the files that *might* contain that data (No data range intersection is done here) + /// Given a metric name and a time interval, returns all the files handles for the files that *might* contain that + /// data (No data range intersection is done here) pub fn get_locators_for_range( metric_name: &str, start_time: i64, @@ -225,10 +210,8 @@ impl DataLocator { continue; } }; - // If I got here, I should be able to unwrap Vsri safely. file_index_vec.push((file, vsri.unwrap(), date)); } - // Creating the Time Range array let start_ts_i32 = day_elapsed_seconds(start_time); let end_ts_i32 = day_elapsed_seconds(end_time); let mut time_intervals = Vec::new(); @@ -249,7 +232,6 @@ impl DataLocator { } } - // We have at least one file create the Object if !file_index_vec.is_empty() { data_locator_vec = file_index_vec .into_iter() @@ -262,7 +244,6 @@ impl DataLocator { } } -/// Returns a Vector of array of time intervals (in seconds) for the interval of time fn time_intervals(start_time: i64, end_time: i64) -> Vec<[i32; 2]> { let mut time_intervals = Vec::new(); let start_date = DateTime::::from_timestamp(start_time / 1000, 0).unwrap(); @@ -289,7 +270,6 @@ fn time_intervals(start_time: i64, end_time: i64) -> Vec<[i32; 2]> { time_intervals } -/// Given a metric name and a time interval, returns all the files handles for the files that contain that data pub fn get_file_index_time( metric_name: &str, start_time: i64, @@ -309,122 +289,3 @@ pub fn data_locator_into_prom_data_point(data: Vec) -> Vec, -) -> Vec { - let mut data_points = Vec::new(); - /* Processing logic: - Case 1 (2+ files): - The first file, the period if from `start_time` to end of the file (use index), - The second until the last file (exclusive), we need all the data points we can get (read full file). - The last file we need from start until the `end_time` (use index). - Case 2 (Single file): - Read the index to locate the start sample and the end sample. - Read the file and obtain said samples. - */ - // How many files to process - let file_count = file_vec.len(); - // Get the baseline timestamps to add to the index timestamps - let start_date = DateTime::::from_timestamp(start_time / 1000, 0).unwrap(); - let end_date = DateTime::::from_timestamp(end_time / 1000, 0).unwrap(); - let ts_bases: Vec = DateRange(start_date, end_date).map(start_day_ts).collect(); - let start_ts_i32 = day_elapsed_seconds(start_time); - let end_ts_i32 = day_elapsed_seconds(end_time); - // Files might not match the intervals of time, a time array of time intervals need to be done. - - // Where the samples land in the indexes - let mut samples_locations: [i32; 2]; - for pack in file_vec.into_iter().enumerate() { - let iter_index = pack.0; - let file = pack.1 .0; - let vsri = pack.1 .1; - debug!( - "[READ] Locating samples. VSRI {:?} TS: {} - {}", - vsri, start_ts_i32, end_ts_i32 - ); - // Check if the timestamps intercept the index space - if file_count == 1 { - debug!("[READ] Processing single file..."); - // Case 2 - // get_sample can return None - if vsri.min() > end_ts_i32 || vsri.max() < start_ts_i32 { - debug!("[READ] No intersection. Returning."); - return data_points; - } - let start_sample = vsri.get_this_or_next(start_ts_i32); - if start_sample.is_none() { - // No sample in the file fits the current requested interval - debug!("[READ] No intersection (Part2). Returning."); - return data_points; - } - // If I can start reading the file, I can get at least one sample, so it is safe to unwrap. - let end_sample = vsri.get_this_or_previous(end_ts_i32).unwrap(); - samples_locations = [start_sample.unwrap(), end_sample]; - } else { - // Case 1 - debug!("[READ] Processing multiple files..."); - match pack.0 { - // First file - 0 => { - let start_sample = vsri.get_this_or_next(start_ts_i32); - if start_sample.is_none() { - continue; - } - samples_locations = [start_sample.unwrap(), vsri.get_sample_count()]; - } - // Last file - _ if iter_index == file_count - 1 => { - let end_sample = vsri.get_this_or_previous(end_ts_i32); - if end_sample.is_none() { - continue; - } - samples_locations = [0, end_sample.unwrap()]; - } - // Other files - _ => { - // Collect the full file - samples_locations = [0, vsri.get_sample_count()]; - } - } - } - // Collect the data points - let flac_metric = SimpleFlacReader::new(file, start_time); - let tmp_vec = vsri.get_all_timestamps(); - let start = samples_locations[0]; - let end = samples_locations[1] - 1; - debug!( - "[READ] Samples located! From {} to {}. TS available: {}", - start, - end, - tmp_vec.len() - ); - // !@)(#*&!@)# usize and ints... - let time_for_samples = &tmp_vec[start as usize..=end as usize]; - // The time I learned if..else is an expression! - let temp_result = if start == 0 && end == vsri.get_sample_count() { - flac_metric.get_all_samples() - } else { - flac_metric.get_samples(Some(start), Some(end)) - }; - - match temp_result { - // Pack this into DataPoints - Ok(samples) => { - for (v, t) in samples.into_iter().zip(time_for_samples.iter()) { - let ts = *t as i64 + ts_bases[iter_index]; - data_points.push(PromDataPoint::new(v, ts)); - } - } - Err(err) => { - error!("[READ] Error processing FLaC file {:?}", err); - continue; - } - } - } - debug!("[READ] Returning datapoints: {:?}", data_points); - data_points -} diff --git a/prometheus-remote/src/lib_vsri.rs b/prometheus-remote/src/lib_vsri.rs index 7f146b2..4383243 100644 --- a/prometheus-remote/src/lib_vsri.rs +++ b/prometheus-remote/src/lib_vsri.rs @@ -43,19 +43,15 @@ use std::io::{BufRead, BufReader, BufWriter, Write}; // TODO: This should be configurable. Indexes are build for 1 day worth of samples, at 1 sample per second pub static MAX_INDEX_SAMPLES: i32 = 86400; -// Helper functions, this should be moved somewhere -/// Returns the number of seconds elapsed for the day provided in the `timestamp_sec` +// TODO: Move these helper functions pub fn day_elapsed_seconds(timestamp_sec: i64) -> i32 { let datetime = DateTime::::from_timestamp(timestamp_sec, 0).unwrap(); - // Extract the time components (hour, minute, and second) from the DateTime let hour = datetime.time().hour(); let minute = datetime.time().minute(); let second = datetime.time().second(); - // Calculate the total seconds since the start of the day (hour * 3600 + minute * 60 + second) as i32 } -/// Returns the timestamp for the beginning of the day given a DateTime object. pub fn start_day_ts(dt: DateTime) -> i64 { let hour = dt.time().hour(); let minute = dt.time().minute(); @@ -63,35 +59,6 @@ pub fn start_day_ts(dt: DateTime) -> i64 { dt.timestamp() - (hour * 3600 + minute * 60 + second) as i64 } -/// In this implementation we are writing sample by sample to the WAV file, so -/// we can't do a proper segment calculation. So there will a special first segment -/// that will hold the first point so we can calculate the segments from there. -/// -/// # Examples -/// Creating a new index, metric is of expected time 0, but for sure location of X is 0 -/// ```no_run -/// let vsri = Vsri::new("metric_name", 0, 0); -/// vsri.flush(); -/// ``` -/// Updating an index, adding point at time 5sec -/// ```no_run -/// let vsri = Vsri::load("metric_name").unwrap().update_for_point(5); -/// vsri.flush(); -/// ``` -/// Fetch a sample location from the index given a timestamp -/// ```no_run -/// let vsri = Vsri::load("metric_name").unwrap(); -/// vsri.get_sample_location("metric_name", 5); -/// ``` - -/// Index Structure -/// index_name: Name of the index file we are indexing -/// min_ts: the minimum TS available in this file -/// max_ts: the highest TS available in this file -/// vsri_segments: Description of each segment -/// [sample_rate (m), initial_point(x,y), # of samples(length)] -/// Each segments describes a line with the form of mX + B that has a lenght -/// of # of samples. #[derive(Debug)] pub struct Vsri { index_file: String, @@ -102,8 +69,6 @@ pub struct Vsri { } impl Vsri { - /// Creates the index, it doesn't create the file in the disk - /// flush needs to be called for that pub fn new(filename: &str) -> Self { debug!("[INDEX] Creating new index!"); let segments: Vec<[i32; 4]> = Vec::new(); @@ -115,44 +80,24 @@ impl Vsri { } } - /// Given a filename and a time location, returns the sample location in the - /// data file. Or None in case it doesn't exist. - pub fn get_sample_location(filename: String, y: i32) -> Option { - let vsri = match Vsri::load(&filename) { - Ok(vsri) => vsri, - Err(_err) => return None, - }; - if vsri.min() <= y && y <= vsri.max() { - return vsri.get_sample(y); - } - None - } - - /// Get the sample for this timestamp or the next one pub fn get_this_or_next(&self, y: i32) -> Option { let r = self.get_sample(y).or(self.get_next_sample(y)); debug!("[INDEX] This or next location {:?} for TS {}", r, y); r } - /// Get the sample for this timestamp or the previous one pub fn get_this_or_previous(&self, y: i32) -> Option { let r = self.get_sample(y).or(self.get_previous_sample(y)); debug!("[INDEX] This or previous location {:?} for TS {}", r, y); r } - /// Returns the next sample for the provided timestamp. - /// This might be useful to find the next segment timestamp if the timestamp - /// is in between segments. It will return None in case the timestamp is over - /// the maximum timestamp of the index. pub fn get_next_sample(&self, y: i32) -> Option { if y < self.min() { return Some(0); } else if y >= self.max() { return None; } - // It wasn't smaller, so let's see if we have a sample that matches for segment in self.vsri_segments.clone().into_iter().rev() { let first_sample = segment[1]; let y0 = segment[2]; @@ -163,23 +108,16 @@ impl Vsri { None } - /// Returns the previous sample for the provided timestamp. - /// This might be useful to find the previous segment timestamp if the timestamp - /// is in between segments. It will return None in case the timestamp is bellow - /// the minimum timestamp of the index. pub fn get_previous_sample(&self, y: i32) -> Option { if y < self.min() { return None; } else if y >= self.max() { - // Return the last segment, # of samples. That is the total # of samples in a file return Some(self.get_sample_count()); } - // Cycle through the segments for segment in &self.vsri_segments { let first_sample = segment[1]; let y0 = segment[2]; if y < y0 { - // Return the last sample of the previous segment return Some(first_sample - 1); } } @@ -190,17 +128,13 @@ impl Vsri { /// This is useful to check intersections. If this function returns false the provided /// time segment does overlap with the existing time segments in the file pub fn is_empty(&self, time_segment: [i32; 2]) -> bool { - // I could simple try to get 2 samples and if one of the returns, it is not empty - // but I would walk segments twice instead of once match &self.vsri_segments.len() { 1 => { - // It starts or ends inside the segment (might be a single sample) if (time_segment[0] >= self.min() && time_segment[0] <= self.max()) || (time_segment[1] <= self.max() && time_segment[1] >= self.min()) { return false; } - // Or it contains the whole segment if time_segment[0] < self.min() && time_segment[1] > self.max() { return false; } @@ -213,39 +147,30 @@ impl Vsri { let y0 = segment[2]; let num_samples = segment[3]; let segment_end_y = y0 + (sample_rate * (num_samples - 1)); - // If we are in the 2+ segment, lets test if the time falls in the middle if segment_count >= 1 && (time_segment[0] > previous_seg_end && time_segment[1] < y0) { return true; } - // Could this be simplified with Karnaugh map? I'll dig my books later - // It starts or ends inside the segment + // TODO: simplify it with Karnaugh map if possible if (time_segment[0] >= y0 && time_segment[0] < segment_end_y) || (time_segment[1] < segment_end_y && time_segment[1] >= y0) { return false; } - // Or it contains the whole segment if time_segment[0] < y0 && time_segment[1] > segment_end_y { return false; } - // At this point, time segments doesn't touch this segment. previous_seg_end = segment_end_y; } } } - // Didn't find any intersection, or left in the middle, it is empty true } - /// Update the index for the provided point - /// y - time in seconds pub fn update_for_point(&mut self, y: i32) -> Result<(), ()> { - // Y needs to be bigger that the current max_ts, otherwise we are appending a point in the past // TODO: #11 Quantiles sends several metrics for the same time, how to handle it? if y < self.max_ts { - // Is this always a period (day) change? Assuming so warn!( "[INDEX] Trying to index a point in the past: {}, provided point: {}", self.max_ts, y @@ -254,47 +179,36 @@ impl Vsri { } self.max_ts = y; let segment_count = self.vsri_segments.len(); - // Empty segments, create a new one, this is also a new index, update the timestamps if segment_count == 0 { self.min_ts = y; self.vsri_segments.push(self.create_fake_segment(y)); return Ok(()); } if self.is_fake_segment() { - // In the presence of a fake segment (where m is 0), and a new point, we are now - // in a situation we can calculate a decent segment self.vsri_segments[segment_count - 1] = self.generate_segment(y); } else { - // Check ownership by the current segment if self.fits_segment(y) { - // It fits, increase the sample count and it's done debug!("[INDEX] Same segment, updating. TS: {}", y); self.vsri_segments[segment_count - 1][3] += 1; return Ok(()); } - // If it doesn't fit, create a new fake segment self.vsri_segments.push(self.create_fake_segment(y)); } Ok(()) } - /// Minimum time stamp pub fn min(&self) -> i32 { self.min_ts } - /// Maximum time stamp pub fn max(&self) -> i32 { self.max_ts } fn calculate_b(&self, segment: &[i32; 4]) -> i32 { - // b = y - mx - segment[2] - segment[0] * segment[1] } - /// Returns the most recent (the last) calculated segment fn current_segment(&self) -> [i32; 4] { match self.vsri_segments.len() { 0 => [0, 0, 0, 0], @@ -302,7 +216,6 @@ impl Vsri { } } - /// Get the sample location for a given point in time, or None if there is no sample for that specific TS pub fn get_sample(&self, y: i32) -> Option { for segment in &self.vsri_segments { let sample_rate = segment[0]; @@ -312,7 +225,6 @@ impl Vsri { let segment_end_y = y0 + (sample_rate * (num_samples - 1)); if y >= y0 && y <= segment_end_y { - // x = (y - b)/ m // TODO: This can return floats! let x_value = (y - self.calculate_b(segment)) / sample_rate; return Some(x_value); @@ -321,29 +233,6 @@ impl Vsri { None // No matching segment found for the given Y value } - /// For a given sample position, return the timestamp associated - pub fn get_time(&self, x: i32) -> Option { - match x { - 0 => Some(self.min()), - _ if x > self.get_sample_count() => None, - _ if x == self.get_sample_count() => Some(self.max()), - // it is somewhere in the middle - _ => { - // Find the segment where X fits - for segment in &self.vsri_segments { - if x >= segment[1] && x < (segment[1] + segment[3]) { - // Belongs here! Return Segment TS + the TS interval * x - let y = segment[2] + segment[0] * x; - return Some(y); - } - continue; - } - None - } - } - } - - /// Returns a vector will all the timestamps covered by this index pub fn get_all_timestamps(&self) -> Vec { let mut time_vec = Vec::new(); for segment in &self.vsri_segments { @@ -361,12 +250,8 @@ impl Vsri { last_segment[3] + last_segment[1] } - /// Generates a segment from a point. It uses information stored in the segment - /// to regenerate the same segment with the new point information. fn generate_segment(&self, y: i32) -> [i32; 4] { - // Retrieve the last segment let last_segment = self.current_segment(); - // double check for correctness if last_segment[0] != 0 { return last_segment; } @@ -376,15 +261,6 @@ impl Vsri { // We got m, the initial points are the same, and now we have 2 samples [m, last_segment[1], last_segment[2], 2] } - - fn update_segment_samples(mut self) { - let segment_count = self.vsri_segments.len(); - self.vsri_segments[segment_count - 1][3] += 1; - } - - /// Generate a fake segment, this can't be used for ownership testing - /// x is the previous segment sample number - /// We only have the first y0 point, nothing else fn create_fake_segment(&self, y: i32) -> [i32; 4] { debug!("[INDEX] New segment, creating for point: {}", y); let segment = self.current_segment(); @@ -393,13 +269,11 @@ impl Vsri { [0, x, y, 1] } - /// Checks if the most recent segment is a fake segment fn is_fake_segment(&self) -> bool { let last_segment = self.current_segment(); last_segment[0] == 0 } - /// Returns true if a point fits the last segment of the index fn fits_segment(&self, y: i32) -> bool { let last_segment = self.current_segment(); let b = self.calculate_b(&last_segment); @@ -446,7 +320,6 @@ impl Vsri { Ok(()) } - /// Reads an index file and loads the content into the structure /// TODO: Add error control (Unwrap hell) pub fn load(filename: &String) -> Result { debug!("[INDEX] Load existing index"); diff --git a/prometheus-remote/src/main.rs b/prometheus-remote/src/main.rs index 94d6006..e674da9 100644 --- a/prometheus-remote/src/main.rs +++ b/prometheus-remote/src/main.rs @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Lucas - Once the project is far enough along I strongly reccomend reenabling dead code checks +// TODO: re-enable dead code checks #![allow(dead_code)] mod flac_reader; @@ -57,7 +57,6 @@ fn get_flac_samples_to_prom( // TODO: #6 Count the number of samples for the given metric! -> Can be done with the Index alone \m/ \m/ // TODO: #1 Do not ignore Job! // TODO: #2 Do not ignore Step! - // Just for today, Step in the files is always 15sec, 15000 ms. let sample_step = (step_ms / 15000) as usize; if step_ms == 0 { return vec![Sample { @@ -65,7 +64,6 @@ fn get_flac_samples_to_prom( timestamp: start_ms, }]; } - // Build the metric name let metric_name = format!("{}_{}", metric, source); let files_to_parse = get_file_index_time(&metric_name, start_ms, end_ms); if files_to_parse.is_none() { @@ -75,31 +73,12 @@ fn get_flac_samples_to_prom( timestamp: start_ms, }]; } - //let prom_vec = get_data_between_timestamps(start_ms, end_ms, files_to_parse.unwrap()); let prom_vec = data_locator_into_prom_data_point(files_to_parse.unwrap()); let prom_len = prom_vec.len(); - //debug!("[MAIN] Prom data points: {:?}", prom_vec); debug!( "[MAIN] Returned samples: {:?} Requested Step: {:?} Proposed Steps: {:?}", prom_len, step_ms, sample_step ); - // Convert into Samples and apply step_ms - //let mut out = Vec::new(); - //let mut prev_sample_ts: i64 = 0; - /* - for (i, pdp) in prom_vec.into_iter().enumerate() { - if i == 0 { - out.push(Sample{value: pdp.point, timestamp: pdp.time}); - prev_sample_ts = pdp.time; - continue; - } - if pdp.time < prev_sample_ts + step_ms { continue; } - out.push(Sample{value: pdp.point, timestamp: pdp.time}); - prev_sample_ts = pdp.time; - } - debug!("[MAIN] Requested Step: {:?} Proposed Steps: {:?} Original len {:?} Final len {:?}", step_ms, sample_step, prom_len, out.len()); - - out */ prom_vec .iter() .map(|pdp| Sample { @@ -107,13 +86,6 @@ fn get_flac_samples_to_prom( timestamp: pdp.time, }) .collect() - //prom_vec.iter().step_by(sample_step).map(|pdp| Sample{value: pdp.point, timestamp: pdp.time}).collect() - //let flac_content = get_flac_samples(metric, start_ms, end_ms).unwrap(); - // Flac reader is ignoring step returning way to many samples. So we have to deal with step here - // Transforming the result into Samples - //let step_size: usize = (step_ms/DATA_INTERVAL_MSEC).try_into().unwrap(); - //debug!(" # of FLaC samples: {} Step size ms: {} Internal step: {}", flac_content.len(), step_ms, step_size); - //flac_content.iter().step_by(step_size).enumerate().map(|(i, sample)| Sample{value: *sample as f64, timestamp: (start_ms + (i as i64)*step_ms) as i64}).collect() } fn parse_remote_write_request( diff --git a/prometheus-remote/src/wav_writer.rs b/prometheus-remote/src/wav_writer.rs index 86a4b90..481dbc2 100644 --- a/prometheus-remote/src/wav_writer.rs +++ b/prometheus-remote/src/wav_writer.rs @@ -18,7 +18,6 @@ use chrono::{DateTime, Utc}; use hound::{WavSpec, WavWriter}; use std::fs::File; use std::fs::{metadata, OpenOptions}; -use std::process::Command; use crate::lib_vsri::{day_elapsed_seconds, Vsri}; @@ -173,37 +172,10 @@ impl WavMetric { } } - /// Add a single metric value to the structure - pub fn add_timeseries(mut self, ts: i64, value: f64) { - self.timeseries_data.push((ts, value)) - } - - /// Add a vector of data to the existing timeseries pub fn add_bulk_timeseries(&mut self, timeseries: &mut Vec<(i64, f64)>) { self.timeseries_data.append(timeseries) } - /// Read a range in the structure - pub fn get_range(self, ts_start: i64, ts_end: i64) -> Vec<(i64, f64)> { - let mut i = 0; - let mut j = 0; - for (count, (ts, _)) in self.timeseries_data.iter().enumerate() { - if *ts < ts_start { - i = count - } - if *ts < ts_end { - j = count; - break; - } - } - if i > 0 { - return self.timeseries_data[i - 1..j].to_vec(); - } - self.timeseries_data[..j].to_vec() - } - - /// Instead of chasing data types and converting stuff, let's just unpack the f64 and - /// put it into different channels. This way we can always garantee a clean i16 Wave file fn split_f64_into_i16s(value: f64) -> [i16; 4] { let bits: u64 = value.to_bits(); @@ -214,40 +186,4 @@ impl WavMetric { [i16_1, i16_2, i16_3, i16_4] } - - /// Recreate a f64 - fn create_f64_from_16bits(bits: [u16; 4]) -> f64 { - let u64_bits = (bits[0] as u64) - | ((bits[1] as u64) << 16) - | ((bits[2] as u64) << 32) - | ((bits[3] as u64) << 48); - - f64::from_bits(u64_bits) - } - - /// Rotate the wav file after the interval and save it as a FLaC file - fn rotate_wav_into_flac(self) { - let file_in = format!( - "{}_{}_{}.wav", - self.metric_name, self.instance, self.creation_time - ); - let file_out = format!( - "{}_{}_{}.flac", - self.metric_name, self.instance, self.creation_time - ); - // Command: sox input.wav output.flac - let output = Command::new("sox") - .arg(file_in) - .arg(file_out) - .output() - .expect("Error converting WAV to FLAC"); - if !output.status.success() { - panic!("Could not rotate file!") - } - } - - /// Check if the current timestamp is within the file period - fn is_ts_valid(_ts: i64) -> bool { - true - } } diff --git a/tools/src/bin/dwt_finder.rs b/tools/src/bin/dwt_finder.rs index 55d239e..6466838 100644 --- a/tools/src/bin/dwt_finder.rs +++ b/tools/src/bin/dwt_finder.rs @@ -30,7 +30,6 @@ fn read_metrics_from_wav(filename: &str) -> Vec { let mut raw_data: Vec = Vec::new(); let mut u64_holder: [u16; 4] = [0, 0, 0, 0]; - // Iterate over the samples and channels and push each sample to the vector let mut current_channel: usize = 0; for sample in reader.samples::() { u64_holder[current_channel] = sample.unwrap() as u16; @@ -54,13 +53,9 @@ fn join_u16_into_f64(bits: [u16; 4]) -> f64 { #[derive(Parser, Default, Debug)] struct Arguments { - /// First wav file file_one: String, - /// Second wav file file_two: String, - /// Distance distance: usize, - /// Block size block: usize, } diff --git a/tools/src/bin/flac_reader_tester.rs b/tools/src/bin/flac_reader_tester.rs index 47727fa..b5785d4 100644 --- a/tools/src/bin/flac_reader_tester.rs +++ b/tools/src/bin/flac_reader_tester.rs @@ -25,20 +25,16 @@ fn _read_metrics_from_wav(filename: &str) -> Vec { let num_samples = reader.len() as usize / reader.spec().channels as usize; let num_channels = reader.spec().channels as usize; - // Create a vector to hold the audio data let mut samples = Vec::with_capacity(num_samples * num_channels); - // Iterate over the samples and channels and push each sample to the vector for sample in reader.samples::() { samples.push(sample.unwrap()); } samples } -/* Read a FLAC file */ fn read_metrics_from_flac(filename: &str) -> Vec { let mut reader = claxon::FlacReader::open(filename).unwrap(); - // Create a vector to hold the audio data let mut samples = Vec::with_capacity(reader.streaminfo().samples.unwrap() as usize); for sample in reader.samples() { samples.push(sample.unwrap() as u16); @@ -55,8 +51,6 @@ fn read_metrics_from_flac_by_bloc(filename: &str) -> Vec { let mut block = claxon::Block::empty(); loop { - // Read a single frame. Recycle the buffer from the previous frame to - // avoid allocations as much as possible. match frame_reader.read_next_or_eof(block.into_buffer()) { Ok(Some(next_block)) => block = next_block, Ok(None) => break, // EOF. @@ -68,12 +62,10 @@ fn read_metrics_from_flac_by_bloc(filename: &str) -> Vec { sample_channel_data[channel] = block.sample(channel as u32, sample) as u16; } - // Process the sample_channel_data as needed for &sample in &sample_channel_data { sample_vec.push(sample); } - // Optionally, can print debug information println!( "Sample {}/{}, Channels: {:?}", sample, @@ -87,10 +79,8 @@ fn read_metrics_from_flac_by_bloc(filename: &str) -> Vec { fn _read_metrics_from_flac_in_interval(filename: &str, start: u32, end: u32) -> Vec { let mut reader = claxon::FlacReader::open(filename).unwrap(); - // Create a vector to hold the audio data let start_sample = start * reader.streaminfo().sample_rate; let end_sample = end * reader.streaminfo().sample_rate; - //let mut samples = Vec::with_capacity(reader.streaminfo().samples.unwrap() as usize); let mut samples: Vec = Vec::new(); for (i, sample) in reader.samples().enumerate() { let i = i as u32; @@ -109,14 +99,11 @@ fn main() { let filename_flac = "/home/crolo/code/prom_data/go_memstats_frees_total_localhost:9090_2023-07-07.flac"; let _filename_flac_single = "3_single_channel.flac"; - //let samples = read_metrics_from_wav(filename); - //println!("{:?}", samples); let samples_flac = read_metrics_from_flac(filename_flac); let samples_flac_b = read_metrics_from_flac_by_bloc(filename_flac); println!("{:?}", samples_flac); println!("{:?}", samples_flac_b); assert_eq!(samples_flac_b, samples_flac); - //let samples_flac_in_interval = read_metrics_from_flac_in_interval(filename_flac, 5, 7); println!("Sample Flac {:?}", samples_flac.len()); println!("Sample Flac {:?}", samples_flac_b.len()); } diff --git a/tools/src/bin/wav2wbro.rs b/tools/src/bin/wav2wbro.rs index 2ecf975..be7d8f3 100644 --- a/tools/src/bin/wav2wbro.rs +++ b/tools/src/bin/wav2wbro.rs @@ -23,12 +23,10 @@ use wavbrro::wavbrro::WavBrro; // --- Legacy stuff to read brro "wav" files --- fn is_wav_file(file_path: &Path) -> bool { - // Open the file for reading and read the first 12 bytes (header) of the file let mut file = File::open(file_path).expect("Can't open file!"); let mut header = [0u8; 12]; file.read_exact(&mut header).expect("File too small!"); - // Check if the file starts with "RIFF" and ends with "WAVE" in the header header.starts_with(b"RIFF") && &header[8..12] == b"WAVE" } @@ -45,7 +43,6 @@ fn read_metrics_from_wav(filename: &str) -> Vec { let mut raw_data: Vec = Vec::new(); let mut u64_holder: [u16; 4] = [0, 0, 0, 0]; - // Iterate over the samples and channels and push each sample to the vector let mut current_channel: usize = 0; for sample in reader.samples::() { u64_holder[current_channel] = sample.unwrap() as u16; @@ -70,15 +67,13 @@ fn join_u16_into_f64(bits: [u16; 4]) -> f64 { } out } -// --- Legacy ends (I need to stop lying to myself...) --- +// --- Legacy ends --- #[derive(Parser, Default, Debug)] #[command(author, version, about="WAV to WAVBRRO converter", long_about = None)] struct Args { - /// input file input: PathBuf, - /// Verbose output, dumps everysample in the input file (for compression) and in the ouput file (for decompression) #[arg(long, action)] validate: bool, } @@ -91,16 +86,13 @@ fn main() { assert!(is_wav_file(&arguments.input)); let wav_data = read_metrics_from_wav(filename); let mut wb = WavBrro::new(); - // Clean NaN wav_data.iter().for_each(|x| { if !x.is_nan() { wb.add_sample(*x) } }); - // Write the file let wavbrro_file = format!("{}wbro", filename.strip_suffix("wav").unwrap()); wb.to_file(Path::new(&wavbrro_file)); - // Checking the results if arguments.validate { let brro_data = wb.get_samples(); assert_eq!(wav_data, brro_data); diff --git a/vsri/src/lib.rs b/vsri/src/lib.rs index a9a4f00..0487e53 100644 --- a/vsri/src/lib.rs +++ b/vsri/src/lib.rs @@ -96,7 +96,7 @@ pub fn start_day_ts(dt: DateTime) -> i64 { /// max_ts: the highest TS available in this file /// vsri_segments: Description of each segment /// [sample_rate (m), initial_point(x,y), # of samples(length)] -/// Each segments describes a line with the form of mX + B that has a lenght +/// Each segments describes a line with the form of mX + B that has a length /// of # of samples. #[derive(Debug, Default)] pub struct Vsri { diff --git a/wavbrro/src/read.rs b/wavbrro/src/read.rs index 000379e..709e430 100644 --- a/wavbrro/src/read.rs +++ b/wavbrro/src/read.rs @@ -19,9 +19,7 @@ use std::fs::File; use std::io::{self, Read, Seek, SeekFrom}; use std::path::Path; -// Function to check if a file is a WAV file pub fn is_wavbrro_file(file_path: &Path) -> io::Result { - // Open the file for reading and read the first 12 bytes (header) of the file let mut file = fs::File::open(file_path)?; let mut header = [0u8; 12]; file.read_exact(&mut header)?; diff --git a/wavbrro/src/wavbrro.rs b/wavbrro/src/wavbrro.rs index 1839d11..74de5e6 100644 --- a/wavbrro/src/wavbrro.rs +++ b/wavbrro/src/wavbrro.rs @@ -32,7 +32,6 @@ const MAX_CHUNK_SIZE: usize = 2048; // API, you have to derive CheckBytes for the archived type: check_bytes, )] -// Derives can be passed through to the generated type: #[archive_attr(derive(Debug))] pub struct WavBrro { // We can infer chunk count from here -> chunk count = ceil(sample_count/MAX_CHUNK_SIZE) @@ -74,7 +73,6 @@ impl WavBrro { self.chunks.push(Vec::with_capacity(MAX_CHUNK_SIZE)); } - // Receives a slice of f64 and writes in it's internal structure fn from_slice(data: &[f64]) -> Self { let sample_count = data.len(); WavBrro { @@ -92,7 +90,7 @@ impl WavBrro { self.sample_count += 1; } - // This should be generic, but first implementation is going to be Vec f64 + // TODO: This should be generic, but first implementation is going to be Vec f64 // This consumes self! pub fn get_samples(self) -> Vec { self.chunks.into_iter().flatten().collect::>() diff --git a/wavbrro/src/write.rs b/wavbrro/src/write.rs index c73a861..e21cd25 100644 --- a/wavbrro/src/write.rs +++ b/wavbrro/src/write.rs @@ -19,9 +19,7 @@ use std::os::unix::prelude::FileExt; use std::path::Path; pub fn write_wavbrro_file(file_path: &Path, content: &[u8]) { - // The content of the header let header: [u8; 12] = *b"WBRO0000WBRO"; - // We need to put the header in front let file = File::create(file_path).expect("Can't create file!"); file.write_at(&header, 0).expect("Fail to write header"); file.write_at(content, header.len() as u64)