Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for querying OCI graph #99

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions commons/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,35 @@ impl Graph {
},
};
let mut has_basearch = false;
for commit in entry.commits {
if commit.architecture != scope.basearch || commit.checksum.is_empty() {
continue;
if scope.oci {
if let Some(oci_images) = entry.oci_images {
for oci_image in oci_images {
if oci_image.architecture != scope.basearch
|| oci_image.digest_ref.is_empty()
{
continue;
}
has_basearch = true;
current.payload = oci_image.digest_ref;
current
.metadata
.insert(metadata::SCHEME.to_string(), "oci".to_string());
}
} else {
// This release doesn't have OCI images, skip it.
return None;
}
} else {
for commit in entry.commits {
if commit.architecture != scope.basearch || commit.checksum.is_empty() {
continue;
}
has_basearch = true;
current.payload = commit.checksum;
current
.metadata
.insert(metadata::SCHEME.to_string(), "checksum".to_string());
}
has_basearch = true;
current.payload = commit.checksum;
current
.metadata
.insert(metadata::SCHEME.to_string(), "checksum".to_string());
}

// Not a valid release payload for this graph scope, skip it.
Expand Down Expand Up @@ -210,4 +230,5 @@ impl Graph {
pub struct GraphScope {
pub basearch: String,
pub stream: String,
pub oci: bool,
}
32 changes: 21 additions & 11 deletions commons/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use serde_derive::Deserialize;
pub static RELEASES_JSON: &str =
"https://builds.coreos.fedoraproject.org/prod/streams/${stream}/releases.json";

/// Templated URL for stream metadata.
pub static STREAM_JSON: &str = "https://builds.coreos.fedoraproject.org/updates/${stream}.json";
/// Templated URL for updates metadata.
pub static UPDATES_JSON: &str = "https://builds.coreos.fedoraproject.org/updates/${stream}.json";

pub static SCHEME: &str = "org.fedoraproject.coreos.scheme";

Expand All @@ -24,55 +24,65 @@ pub static START_EPOCH: &str = "org.fedoraproject.coreos.updates.start_epoch";
pub static START_VALUE: &str = "org.fedoraproject.coreos.updates.start_value";

/// Fedora CoreOS release index.
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ReleasesJSON {
pub releases: Vec<Release>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Release {
pub commits: Vec<ReleaseCommit>,
#[serde(rename = "oci-images")]
pub oci_images: Option<Vec<ReleaseOciImage>>,
pub version: String,
pub metadata: String,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ReleaseCommit {
pub architecture: String,
pub checksum: String,
}

#[derive(Clone, Debug, Deserialize)]
pub struct ReleaseOciImage {
pub architecture: String,
pub image: String,
#[serde(rename = "digest-ref")]
pub digest_ref: String,
}

/// Fedora CoreOS updates metadata
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdatesJSON {
pub stream: String,
pub releases: Vec<ReleaseUpdate>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ReleaseUpdate {
pub version: String,
pub metadata: UpdateMetadata,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateMetadata {
pub barrier: Option<UpdateBarrier>,
pub deadend: Option<UpdateDeadend>,
pub rollout: Option<UpdateRollout>,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateBarrier {
pub reason: String,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateDeadend {
pub reason: String,
}

#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct UpdateRollout {
pub start_epoch: Option<i64>,
pub start_percentage: Option<f64>,
Expand Down
2 changes: 1 addition & 1 deletion commons/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn throttle_rollouts(input: Graph, client_wariness: f64) -> Graph {

for (index, release) in graph.nodes.iter().enumerate() {
// Skip if this release is not being rolled out.
if release.metadata.get(metadata::ROLLOUT).is_none() {
if !release.metadata.contains_key(metadata::ROLLOUT) {
continue;
};

Expand Down
27 changes: 19 additions & 8 deletions commons/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub fn build_cors_middleware(origin_allowlist: &Option<Vec<String>>) -> CorsFact
pub fn validate_scope(
basearch: Option<String>,
stream: Option<String>,
oci: Option<bool>,
scope_allowlist: &Option<HashSet<GraphScope>>,
) -> Result<GraphScope, failure::Error> {
let basearch = basearch.ok_or_else(|| err_msg("missing basearch"))?;
Expand All @@ -34,15 +35,22 @@ pub fn validate_scope(
let stream = stream.ok_or_else(|| err_msg("missing stream"))?;
ensure!(!stream.is_empty(), "empty stream");

let scope = GraphScope { basearch, stream };
let oci = oci.unwrap_or_default();

let scope = GraphScope {
basearch,
stream,
oci,
};

// Optionally filter out scope according to given allowlist, if any.
if let Some(allowlist) = scope_allowlist {
if !allowlist.contains(&scope) {
bail!(
"scope not allowed: basearch='{}', stream='{}'",
"scope not allowed: basearch='{}', stream='{}', oci='{}'",
scope.basearch,
scope.stream
scope.stream,
scope.oci,
);
}
}
Expand All @@ -57,26 +65,28 @@ mod tests {
#[test]
fn test_validate_scope() {
{
let r = validate_scope(None, None, &None);
let r = validate_scope(None, None, None, &None);
assert!(r.is_err());
}
{
let basearch = Some("test_empty".to_string());
let stream = Some("".to_string());
let r = validate_scope(basearch, stream, &None);
let oci = None;
let r = validate_scope(basearch, stream, oci, &None);
assert!(r.is_err());
}
{
let basearch = Some("x86_64".to_string());
let stream = Some("stable".to_string());
let r = validate_scope(basearch, stream, &None);
let oci = Some(false);
let r = validate_scope(basearch, stream, oci, &None);
assert!(r.is_ok());
}
{
let basearch = Some("x86_64".to_string());
let stream = Some("stable".to_string());
let filter_none_allowed = Some(HashSet::new());
let r = validate_scope(basearch, stream, &filter_none_allowed);
let r = validate_scope(basearch, stream, None, &filter_none_allowed);
assert!(r.is_err());
}
{
Expand All @@ -85,9 +95,10 @@ mod tests {
let allowed_scope = GraphScope {
basearch: "x86_64".to_string(),
stream: "stable".to_string(),
oci: false,
};
let filter = Some(maplit::hashset! {allowed_scope});
let r = validate_scope(basearch, stream, &filter);
let r = validate_scope(basearch, stream, None, &filter);
assert!(r.is_ok());
}
}
Expand Down
40 changes: 25 additions & 15 deletions fcos-graph-builder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,27 @@ lazy_static::lazy_static! {
static ref CACHED_GRAPH_REQUESTS: IntCounterVec = register_int_counter_vec!(
"fcos_cincinnati_gb_cache_graph_requests_total",
"Total number of requests for a cached graph",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref GRAPH_FINAL_EDGES: IntGaugeVec = register_int_gauge_vec!(
"fcos_cincinnati_gb_scraper_graph_final_edges",
"Number of edges in the cached graph, after processing",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref GRAPH_FINAL_RELEASES: IntGaugeVec = register_int_gauge_vec!(
"fcos_cincinnati_gb_scraper_graph_final_releases",
"Number of releases in the cached graph, after processing",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref LAST_REFRESH: IntGaugeVec = register_int_gauge_vec!(
"fcos_cincinnati_gb_scraper_graph_last_refresh_timestamp",
"UTC timestamp of last graph refresh",
&["basearch", "stream"]
&["basearch", "stream", "type"]
).unwrap();
static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!(
"fcos_cincinnati_gb_scraper_upstream_scrapes_total",
"Total number of upstream scrapes",
&["basearch", "stream"]
&["stream"]
).unwrap();
// NOTE(lucab): alternatively this could come from the runtime library, see
// https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics
Expand Down Expand Up @@ -76,10 +76,14 @@ fn main() -> Fallible<()> {
(settings.service, settings.status)
};

let mut scrapers = HashMap::with_capacity(service_settings.scopes.len());
for scope in &service_settings.scopes {
let addr = scraper::Scraper::new(scope.clone())?.start();
scrapers.insert(scope.clone(), addr);
let mut scrapers = HashMap::with_capacity(service_settings.streams.len());
for (&stream, &arches) in &service_settings.streams {
let addr = scraper::Scraper::new(
stream.to_string(),
arches.iter().map(|&arch| String::from(arch)).collect(),
)?
.start();
scrapers.insert(stream.to_string(), addr);
}

// TODO(lucab): get allowed scopes from config file.
Expand Down Expand Up @@ -126,37 +130,43 @@ fn main() -> Fallible<()> {
#[derive(Clone, Debug)]
pub(crate) struct AppState {
scope_filter: Option<HashSet<graph::GraphScope>>,
scrapers: HashMap<graph::GraphScope, Addr<scraper::Scraper>>,
scrapers: HashMap<String, Addr<scraper::Scraper>>,
}

/// Mandatory parameters for querying a graph from graph-builder.
#[derive(Deserialize)]
struct GraphQuery {
basearch: Option<String>,
stream: Option<String>,
oci: Option<bool>,
}

pub(crate) async fn gb_serve_graph(
data: web::Data<AppState>,
web::Query(query): web::Query<GraphQuery>,
) -> Result<HttpResponse, failure::Error> {
let scope = match commons::web::validate_scope(query.basearch, query.stream, &data.scope_filter)
{
let scope = match commons::web::validate_scope(
query.basearch,
query.stream,
query.oci,
&data.scope_filter,
) {
Err(e) => {
log::error!("graph request with invalid scope: {}", e);
return Ok(HttpResponse::BadRequest().finish());
}
Ok(s) => {
log::trace!(
"serving request for valid scope: basearch='{}', stream='{}'",
"serving request for valid scope: basearch='{}', stream='{}', oci='{}'",
s.basearch,
s.stream
s.stream,
s.oci,
);
s
}
};

let addr = match data.scrapers.get(&scope) {
let addr = match data.scrapers.get(&scope.stream) {
None => {
log::error!(
"no scraper configured for scope: basearch='{}', stream='{}'",
Expand Down
Loading
Loading