diff --git a/lib/src/chunking.rs b/lib/src/chunking.rs index 3a299928..8e7f5769 100644 --- a/lib/src/chunking.rs +++ b/lib/src/chunking.rs @@ -3,10 +3,11 @@ // SPDX-License-Identifier: Apache-2.0 OR MIT use std::borrow::{Borrow, Cow}; -use std::collections::{BTreeMap, BTreeSet, HashMap}; -use std::fmt::Write; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::num::NonZeroU32; use std::rc::Rc; +use std::time::Instant; use crate::objectsource::{ContentID, ObjectMeta, ObjectMetaMap, ObjectSourceMeta}; use crate::objgv::*; @@ -43,6 +44,20 @@ pub struct ObjectSourceMetaSized { size: u64, } +impl Hash for ObjectSourceMetaSized { + fn hash(&self, state: &mut H) { + self.meta.identifier.hash(state); + } +} + +impl Eq for ObjectSourceMetaSized {} + +impl PartialEq for ObjectSourceMetaSized { + fn eq(&self, other: &Self) -> bool { + self.meta.identifier == other.meta.identifier + } +} + /// Extend content source metadata with sizes. #[derive(Debug)] pub struct ObjectMetaSized { @@ -294,27 +309,22 @@ impl Chunking { .unwrap(); // TODO: Compute bin packing in a better way + let start = Instant::now(); let packing = basic_packing( sizes, NonZeroU32::new(self.max).unwrap(), prior_build_metadata, ); + let duration = start.elapsed(); + println!("Time elapsed in packing: {:#?}", duration); for bin in packing.into_iter() { - let first = bin[0]; - let first_name = &*first.meta.name; let name = match bin.len() { - 0 => unreachable!(), - 1 => Cow::Borrowed(first_name), - 2..=5 => { - let r = bin.iter().map(|v| &*v.meta.name).fold( - String::from(first_name), - |mut acc, v| { - write!(acc, " and {}", v).unwrap(); - acc - }, - ); - Cow::Owned(r) + 0 => Cow::Owned("Reserved for new packages".to_string()), + 1 => { + let first = bin[0]; + let first_name = &*first.meta.identifier; + Cow::Borrowed(first_name) } n => Cow::Owned(format!("{n} components")), }; @@ -325,9 +335,7 @@ impl Chunking { self.remainder.move_obj(&mut chunk, obj.as_str()); } } - if !chunk.content.is_empty() { - self.chunks.push(chunk); - } + self.chunks.push(chunk); } assert_eq!(self.remainder.content.len(), 0); @@ -372,82 +380,388 @@ impl Chunking { } } -type ChunkedComponents<'a> = Vec<&'a ObjectSourceMetaSized>; - +#[cfg(test)] fn components_size(components: &[&ObjectSourceMetaSized]) -> u64 { components.iter().map(|k| k.size).sum() } /// Compute the total size of a packing #[cfg(test)] -fn packing_size(packing: &[ChunkedComponents]) -> u64 { +fn packing_size(packing: &[Vec<&ObjectSourceMetaSized>]) -> u64 { packing.iter().map(|v| components_size(v)).sum() } -fn sort_packing(packing: &mut [ChunkedComponents]) { - packing.sort_by(|a, b| { - let a: u64 = components_size(a); - let b: u64 = components_size(b); - b.cmp(&a) - }); +fn mean(data: &[u64]) -> Option { + let sum = data.iter().sum::() as f64; + let count = data.len(); + match count { + positive if positive > 0 => Some(sum / count as f64), + _ => None, + } +} + +fn std_deviation(data: &[u64]) -> Option { + match (mean(data), data.len()) { + (Some(data_mean), count) if count > 0 => { + let variance = data + .iter() + .map(|value| { + let diff = data_mean - (*value as f64); + diff * diff + }) + .sum::() + / count as f64; + Some(variance.sqrt()) + } + _ => None, + } +} + +fn get_partitions_with_threshold( + components: Vec<&ObjectSourceMetaSized>, + threshold: f64, +) -> Option>> { + let mut bins: BTreeMap> = BTreeMap::new(); + let mut med_size: Vec<&ObjectSourceMetaSized> = Vec::new(); + + //Calculate Mean and Stddev for Size + let sizes: Vec = components.iter().map(|a| a.size).collect(); + let mean_size = mean(&sizes)?; + let stddev_size = std_deviation(&sizes)?; + let mut size_low_limit = mean_size - threshold * stddev_size; + if size_low_limit < 0 as f64 { + size_low_limit = 100000_f64; + } + let size_high_limit = mean_size + threshold * stddev_size; + + for pkg in components { + let size = pkg.size as f64; + + //hs + if size >= size_high_limit { + bins.entry("1hs".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //ls + else if size <= size_low_limit { + bins.entry("2ls".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //ms + else { + med_size.push(pkg); + } + } + + let med_frequencies: Vec = med_size + .iter() + .map(|a| a.meta.change_frequency.into()) + .collect(); + let med_sizes: Vec = med_size.iter().map(|a| a.size).collect(); + let med_mean_freq = mean(&med_frequencies)?; + let med_stddev_freq = std_deviation(&med_frequencies)?; + let med_mean_size = mean(&med_sizes)?; + let med_stddev_size = std_deviation(&med_sizes)?; + + let med_freq_low_limit = med_mean_freq - threshold * med_stddev_freq; + let med_freq_high_limit = med_mean_freq + threshold * med_stddev_freq; + let med_size_low_limit = med_mean_size - threshold * med_stddev_size; + let med_size_high_limit = med_mean_size + threshold * med_stddev_size; + + for pkg in med_size { + let size = pkg.size as f64; + let freq = pkg.meta.change_frequency as f64; + + //lf_hs + if (freq <= med_freq_low_limit) && (size >= med_size_high_limit) { + bins.entry("lf_hs".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //mf_hs + else if (freq < med_freq_high_limit) + && (freq > med_freq_low_limit) + && (size >= med_size_high_limit) + { + bins.entry("mf_hs".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //hf_hs + else if (freq >= med_freq_high_limit) && (size >= med_size_high_limit) { + bins.entry("hf_hs".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //lf_ms + else if (freq <= med_freq_low_limit) + && (size < med_size_high_limit) + && (size > med_size_low_limit) + { + bins.entry("lf_ms".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //mf_ms + else if (freq < med_freq_high_limit) + && (freq > med_freq_low_limit) + && (size < med_size_high_limit) + && (size > med_size_low_limit) + { + bins.entry("mf_ms".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //hf_ms + else if (freq >= med_freq_high_limit) + && (size < med_size_high_limit) + && (size > med_size_low_limit) + { + bins.entry("hf_ms".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //lf_ls + else if (freq <= med_freq_low_limit) && (size <= med_size_low_limit) { + bins.entry("lf_ls".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //mf_ls + else if (freq < med_freq_high_limit) + && (freq > med_freq_low_limit) + && (size <= med_size_low_limit) + { + bins.entry("mf_ls".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + //hf_ls + else if (freq >= med_freq_high_limit) && (size <= med_size_low_limit) { + bins.entry("hf_ls".to_string()) + .and_modify(|bin| bin.push(pkg)) + .or_insert_with(|| vec![pkg]); + } + } + + for (name, pkgs) in &bins { + println!("{:#?}: {:#?}", name, pkgs.len()); + } + + Some(bins) } /// Given a set of components with size metadata (e.g. boxes of a certain size) /// and a number of bins (possible container layers) to use, determine which components /// go in which bin. This algorithm is pretty simple: -/// -/// - order by size -/// - If we have fewer components than bins, we're done -/// - Take the "tail" (all components past maximum), and group by source package -/// - If we have fewer components than bins, we're done -/// - Take the whole tail and group them toether (this is the overly simplistic part) + fn basic_packing<'a>( components: &'a [ObjectSourceMetaSized], - bins: NonZeroU32, + bin_size: NonZeroU32, prior_build_metadata: &'a Option>>, -) -> Vec> { - // let total_size: u64 = components.iter().map(|v| v.size).sum(); - // let avg_size: u64 = total_size / components.len() as u64; +) -> Vec> { let mut r = Vec::new(); - // And handle the easy case of enough bins for all components - // TODO: Possibly try to split off large files? - if components.len() <= bins.get() as usize { - r.extend(components.iter().map(|v| vec![v])); - return r; - } - // Create a mutable copy let mut components: Vec<_> = components.iter().collect(); - // Iterate over the component tail, folding by source id - let mut by_src = HashMap::<_, Vec<&ObjectSourceMetaSized>>::new(); - // Take the tail off components, then build up mapping from srcid -> Vec - for component in components.split_off(bins.get() as usize) { - by_src - .entry(&component.meta.srcid) - .or_default() - .push(component); + let before_processing_pkgs_len = components.len(); + if before_processing_pkgs_len == 0 { + return Vec::new(); } - // Take all the non-tail (largest) components, and append them first - r.extend(components.into_iter().map(|v| vec![v])); - // Add the tail - r.extend(by_src.into_values()); - // And order the new list - sort_packing(&mut r); - // It's possible that merging components gave us enough space; if so - // we're done! - if r.len() <= bins.get() as usize { - return r; + //Flatten out prior_build_metadata[i] to view all the packages in prior build as a single vec + // + //If the current rpm-ostree commit to be encapsulated is not the one in which packing structure changes, then + // Compare flatten(prior_build_metadata[i]) to components to see if pkgs added, updated, + // removed or kept same + // if pkgs added, then add them to the last bin of prior[i][n] + // if pkgs removed, then remove them from the prior[i] + // iterate through prior[i] and make bins according to the name in nevra of pkgs and return + // (no need of recomputing packaging structure) + //else if pkg structure to be changed || prior build not specified + // Recompute optimal packaging strcuture (Compute partitions, place packages and optimize build) + + if let Some(prior_build) = prior_build_metadata + /* && structure not be changed*/ + { + println!("Keeping old package structure"); + let mut curr_build: Vec> = prior_build.clone(); + //Packing only manaages RPMs not OStree commit + curr_build.remove(0); + let mut prev_pkgs: Vec = Vec::new(); + for bin in &curr_build { + for pkg in bin { + prev_pkgs.push(pkg.to_string()); + } + } + prev_pkgs.retain(|name| !name.is_empty()); + let curr_pkgs: Vec = components + .iter() + .map(|pkg| pkg.meta.name.to_string()) + .collect(); + let prev_pkgs_set: HashSet = HashSet::from_iter(prev_pkgs); + let curr_pkgs_set: HashSet = HashSet::from_iter(curr_pkgs); + let added: HashSet<&String> = curr_pkgs_set.difference(&prev_pkgs_set).collect(); + let removed: HashSet<&String> = prev_pkgs_set.difference(&curr_pkgs_set).collect(); + let mut add_pkgs_v: Vec = Vec::new(); + for pkg in added { + add_pkgs_v.push(pkg.to_string()); + } + let mut rem_pkgs_v: Vec = Vec::new(); + for pkg in removed { + rem_pkgs_v.push(pkg.to_string()); + } + let curr_build_len = &curr_build.len(); + curr_build[curr_build_len - 1].retain(|name| !name.is_empty()); + curr_build[curr_build_len - 1].extend(add_pkgs_v); + for bin in curr_build.iter_mut() { + bin.retain(|pkg| !rem_pkgs_v.contains(pkg)); + } + let mut name_to_component: HashMap = HashMap::new(); + for component in &components { + name_to_component + .entry(component.meta.name.to_string()) + .or_insert(component); + } + let mut modified_build: Vec> = Vec::new(); + for bin in curr_build { + let mut mod_bin = Vec::new(); + for pkg in bin { + mod_bin.push(name_to_component[&pkg]); + } + modified_build.push(mod_bin); + } + let mut after_processing_pkgs_len = 0; + modified_build.iter().for_each(|bin| { + after_processing_pkgs_len += bin.len(); + }); + assert_eq!(after_processing_pkgs_len, before_processing_pkgs_len); + assert!(modified_build.len() <= bin_size.get() as usize); + return modified_build; } - let last = (bins.get().checked_sub(1).unwrap()) as usize; - // The "tail" is components past our maximum. For now, we simply group all of that together as a single unit. - if let Some(tail) = r.drain(last..).reduce(|mut a, b| { - a.extend(b.into_iter()); - a - }) { - r.push(tail); - } + println!("Creating new packing structure"); + + components.sort_by(|a, b| a.meta.change_frequency.cmp(&b.meta.change_frequency)); + let mut max_freq_components: Vec<&ObjectSourceMetaSized> = Vec::new(); + components.retain(|pkg| { + let retain: bool = pkg.meta.change_frequency != u32::MAX; + if !retain { + max_freq_components.push(pkg); + } + retain + }); + let components_len_after_max_freq = components.len(); + match components_len_after_max_freq { + 0 => (), + _ => { + let partitions = get_partitions_with_threshold(components, 0.5) + .expect("Partitioning components into sets"); + + // Max_bins -: + // 1 for max_freq + // 1 for new_pkgs + // 1 for ls + // n for hs + // Left for ms + + let qty_hs_bins = match partitions.get("1hs") { + Some(n) => n.len(), + None => 0usize, + }; + let qty_hs_pkgs = qty_hs_bins.clone(); + + let qty_ls_bins = 1usize; + let qty_ls_pkgs = match partitions.get("2ls") { + Some(n) => n.len(), + None => 0usize, + }; - assert!(r.len() <= bins.get() as usize); + let qty_new_bins = 1usize; + let _qty_new_pkgs = 0usize; + + let qty_max_frequency_bins = 1usize; + let _qty_max_frequency_pkgs = max_freq_components.len(); + + //Can be negative or very low if qty_hs_pkgs is very high + let qty_ms_bins = bin_size.get() as usize + - (qty_hs_bins + qty_ls_bins + qty_new_bins + qty_max_frequency_bins); + + let pkg_per_bin_ms: usize = + match (components_len_after_max_freq - qty_hs_pkgs - qty_ls_pkgs) + .checked_div(qty_ms_bins) + { + Some(n) => { + if n >= 1 { + n + } else { + 3usize + } + } + None => 6usize, + }; + + for partition in partitions.keys() { + let pkgs = partitions.get(partition).expect("hashset"); + + if partition == "1hs" { + for pkg in pkgs { + r.push(vec![*pkg]); + } + } else if partition == "2ls" { + let mut bin: Vec<&ObjectSourceMetaSized> = Vec::new(); + for pkg in pkgs { + bin.push(*pkg); + } + r.push(bin); + } else { + let mut bin: Vec<&ObjectSourceMetaSized> = Vec::new(); + for (i, pkg) in pkgs.iter().enumerate() { + if bin.len() < pkg_per_bin_ms { + bin.push(*pkg); + } else { + r.push(bin.clone()); + bin.clear(); + bin.push(*pkg); + } + if i == pkgs.len() - 1 && !bin.is_empty() { + r.push(bin.clone()); + bin.clear(); + } + } + } + } + + println!("Bins before unoptimized build: {}", r.len()); + //Leave second last bin for max_freq_components + //Leave last bin for new packages added, so to not disturb + //previous bins. + while r.len() > (bin_size.get() - 2) as usize { + for i in (1..r.len() - 1).step_by(2).rev() { + if r.len() <= (bin_size.get() - 2) as usize { + break; + } + let prev = &r[i - 1]; + let curr = &r[i]; + let mut merge: Vec<&ObjectSourceMetaSized> = Vec::new(); + merge.extend(prev.iter()); + merge.extend(curr.iter()); + r.remove(i); + r.remove(i - 1); + r.insert(i, merge); + } + } + println!("Bins after optimization: {}", r.len()); + } + } + r.push(max_freq_components); + let new_pkgs_bin: Vec<&ObjectSourceMetaSized> = Vec::new(); + r.push(new_pkgs_bin); + let mut after_processing_pkgs_len = 0; + r.iter().for_each(|bin| { + after_processing_pkgs_len += bin.len(); + }); + assert_eq!(after_processing_pkgs_len, before_processing_pkgs_len); + assert!(r.len() <= bin_size.get() as usize); r } diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs index 2bbb0a6d..22dea326 100644 --- a/lib/src/container/store.rs +++ b/lib/src/container/store.rs @@ -200,11 +200,10 @@ pub struct PreparedImport { } impl PreparedImport { - /// Iterate over all layers; the ostree split object layers, the commit layer, and any non-ostree layers. + /// Iterate over all layers; the commit layer, the ostree split object layers, and any non-ostree layers. pub fn all_layers(&self) -> impl Iterator { - self.ostree_layers - .iter() - .chain(std::iter::once(&self.ostree_commit_layer)) + std::iter::once(&self.ostree_commit_layer) + .chain(self.ostree_layers.iter()) .chain(self.layers.iter()) } diff --git a/lib/src/fixture.rs b/lib/src/fixture.rs index 3c093473..95ab09c9 100644 --- a/lib/src/fixture.rs +++ b/lib/src/fixture.rs @@ -168,7 +168,8 @@ d tmp "## }; pub const CONTENTS_CHECKSUM_V0: &str = "5e41de82f9f861fa51e53ce6dd640a260e4fb29b7657f5a3f14157e93d2c0659"; -pub static CONTENTS_V0_LEN: Lazy = Lazy::new(|| OWNERS.len().checked_sub(1).unwrap()); +// 1 for ostree commit, 2 for max frequency packages, 3 as empty layer +pub const LAYERS_V0_LEN: usize = 3usize; #[derive(Debug, PartialEq, Eq)] enum SeLabel { diff --git a/lib/src/fixtures/fedora-coreos-contentmeta.json.gz b/lib/src/fixtures/fedora-coreos-contentmeta.json.gz index a1276a3f..285d587a 100644 Binary files a/lib/src/fixtures/fedora-coreos-contentmeta.json.gz and b/lib/src/fixtures/fedora-coreos-contentmeta.json.gz differ diff --git a/lib/src/objectsource.rs b/lib/src/objectsource.rs index 768104c9..a16f4dcc 100644 --- a/lib/src/objectsource.rs +++ b/lib/src/objectsource.rs @@ -39,8 +39,7 @@ pub struct ObjectSourceMeta { /// Unique identifier, does not need to be human readable, but can be. #[serde(with = "rcstr_serialize")] pub identifier: ContentID, - /// Identifier for this source (e.g. package name-version, git repo). - /// Unlike the [`ContentID`], this should be human readable. + /// Just the name of the package (no version), needs to be human readable. #[serde(with = "rcstr_serialize")] pub name: Rc, /// Identifier for the *source* of this content; for example, if multiple binary diff --git a/lib/tests/it/main.rs b/lib/tests/it/main.rs index a67903d2..a9db646d 100644 --- a/lib/tests/it/main.rs +++ b/lib/tests/it/main.rs @@ -20,7 +20,7 @@ use std::os::unix::fs::DirBuilderExt; use std::process::Command; use std::time::SystemTime; -use ostree_ext::fixture::{FileDef, Fixture, CONTENTS_CHECKSUM_V0, CONTENTS_V0_LEN}; +use ostree_ext::fixture::{FileDef, Fixture, CONTENTS_CHECKSUM_V0, LAYERS_V0_LEN}; const EXAMPLE_TAR_LAYER: &[u8] = include_bytes!("fixtures/hlinks.tar.gz"); const TEST_REGISTRY_DEFAULT: &str = "localhost:5000"; @@ -513,7 +513,7 @@ async fn impl_test_container_import_export(chunked: bool) -> Result<()> { "/usr/bin/bash" ); - let n_chunks = if chunked { *CONTENTS_V0_LEN } else { 1 }; + let n_chunks = if chunked { LAYERS_V0_LEN } else { 1 }; assert_eq!(cfg.rootfs().diff_ids().len(), n_chunks); assert_eq!(cfg.history().len(), n_chunks); @@ -616,7 +616,7 @@ fn validate_chunked_structure(oci_path: &Utf8Path) -> Result<()> { let d = Dir::open_ambient_dir(oci_path, cap_std::ambient_authority())?; let d = ocidir::OciDir::open(&d)?; let manifest = d.read_manifest()?; - assert_eq!(manifest.layers().len(), *CONTENTS_V0_LEN); + assert_eq!(manifest.layers().len(), LAYERS_V0_LEN); let ostree_layer = manifest.layers().first().unwrap(); let mut ostree_layer_blob = d .read_blob(ostree_layer) @@ -649,7 +649,7 @@ fn validate_chunked_structure(oci_path: &Utf8Path) -> Result<()> { #[tokio::test] async fn test_container_chunked() -> Result<()> { - let nlayers = *CONTENTS_V0_LEN - 1; + let nlayers = LAYERS_V0_LEN - 1; let mut fixture = Fixture::new_v1()?; let (imgref, expected_digest) = fixture.export_container().await.unwrap(); @@ -716,8 +716,11 @@ r usr/bin/bash bash-v0 let (first, second) = (to_fetch[0], to_fetch[1]); assert!(first.0.commit.is_none()); assert!(second.0.commit.is_none()); - assert_eq!(first.1, "testlink"); - assert_eq!(second.1, "bash"); + assert_eq!( + first.1, + "ostree export of commit 38ab1f9da373a0184b0b48db6e280076ab4b5d4691773475ae24825aae2272d4" + ); + assert_eq!(second.1, "7 components"); assert_eq!(store::list_images(fixture.destrepo()).unwrap().len(), 1); let n = store::count_layer_references(fixture.destrepo())? as i64; @@ -791,7 +794,7 @@ r usr/bin/bash bash-v0 store::remove_images(fixture.destrepo(), [&derived_imgref.imgref]).unwrap(); assert_eq!(store::list_images(fixture.destrepo()).unwrap().len(), 0); let n_removed = store::gc_image_layers(fixture.destrepo())?; - assert_eq!(n_removed, (*CONTENTS_V0_LEN + 1) as u32); + assert_eq!(n_removed, (LAYERS_V0_LEN + 1) as u32); // Repo should be clean now assert_eq!(store::count_layer_references(fixture.destrepo())?, 0);