diff --git a/commons/src/graph.rs b/commons/src/graph.rs index 358bd29..363a602 100644 --- a/commons/src/graph.rs +++ b/commons/src/graph.rs @@ -37,15 +37,35 @@ impl Graph { }, }; let mut has_basearch = false; - for commit in entry.commits { - if commit.architecture != scope.basearch || commit.checksum.is_empty() { - continue; + if scope.oci { + if let Some(oci_images) = entry.oci_images { + for oci_image in oci_images { + if oci_image.architecture != scope.basearch + || oci_image.digest_ref.is_empty() + { + continue; + } + has_basearch = true; + current.payload = oci_image.digest_ref; + current + .metadata + .insert(metadata::SCHEME.to_string(), "oci".to_string()); + } + } else { + // This release doesn't have OCI images, skip it. + return None; + } + } else { + for commit in entry.commits { + if commit.architecture != scope.basearch || commit.checksum.is_empty() { + continue; + } + has_basearch = true; + current.payload = commit.checksum; + current + .metadata + .insert(metadata::SCHEME.to_string(), "checksum".to_string()); } - has_basearch = true; - current.payload = commit.checksum; - current - .metadata - .insert(metadata::SCHEME.to_string(), "checksum".to_string()); } // Not a valid release payload for this graph scope, skip it. @@ -210,4 +230,5 @@ impl Graph { pub struct GraphScope { pub basearch: String, pub stream: String, + pub oci: bool, } diff --git a/commons/src/metadata.rs b/commons/src/metadata.rs index 02e1036..d5f436e 100644 --- a/commons/src/metadata.rs +++ b/commons/src/metadata.rs @@ -6,8 +6,8 @@ use serde_derive::Deserialize; pub static RELEASES_JSON: &str = "https://builds.coreos.fedoraproject.org/prod/streams/${stream}/releases.json"; -/// Templated URL for stream metadata. -pub static STREAM_JSON: &str = "https://builds.coreos.fedoraproject.org/updates/${stream}.json"; +/// Templated URL for updates metadata. +pub static UPDATES_JSON: &str = "https://builds.coreos.fedoraproject.org/updates/${stream}.json"; pub static SCHEME: &str = "org.fedoraproject.coreos.scheme"; @@ -24,55 +24,65 @@ pub static START_EPOCH: &str = "org.fedoraproject.coreos.updates.start_epoch"; pub static START_VALUE: &str = "org.fedoraproject.coreos.updates.start_value"; /// Fedora CoreOS release index. -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ReleasesJSON { pub releases: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Release { pub commits: Vec, + #[serde(rename = "oci-images")] + pub oci_images: Option>, pub version: String, pub metadata: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ReleaseCommit { pub architecture: String, pub checksum: String, } +#[derive(Clone, Debug, Deserialize)] +pub struct ReleaseOciImage { + pub architecture: String, + pub image: String, + #[serde(rename = "digest-ref")] + pub digest_ref: String, +} + /// Fedora CoreOS updates metadata -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdatesJSON { pub stream: String, pub releases: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ReleaseUpdate { pub version: String, pub metadata: UpdateMetadata, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateMetadata { pub barrier: Option, pub deadend: Option, pub rollout: Option, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateBarrier { pub reason: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateDeadend { pub reason: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct UpdateRollout { pub start_epoch: Option, pub start_percentage: Option, diff --git a/commons/src/policy.rs b/commons/src/policy.rs index d39f9ed..10e33ab 100644 --- a/commons/src/policy.rs +++ b/commons/src/policy.rs @@ -30,7 +30,7 @@ pub fn throttle_rollouts(input: Graph, client_wariness: f64) -> Graph { for (index, release) in graph.nodes.iter().enumerate() { // Skip if this release is not being rolled out. - if release.metadata.get(metadata::ROLLOUT).is_none() { + if !release.metadata.contains_key(metadata::ROLLOUT) { continue; }; diff --git a/commons/src/web.rs b/commons/src/web.rs index e3f0cf3..6661d4f 100644 --- a/commons/src/web.rs +++ b/commons/src/web.rs @@ -26,6 +26,7 @@ pub fn build_cors_middleware(origin_allowlist: &Option>) -> CorsFact pub fn validate_scope( basearch: Option, stream: Option, + oci: Option, scope_allowlist: &Option>, ) -> Result { let basearch = basearch.ok_or_else(|| err_msg("missing basearch"))?; @@ -34,15 +35,22 @@ pub fn validate_scope( let stream = stream.ok_or_else(|| err_msg("missing stream"))?; ensure!(!stream.is_empty(), "empty stream"); - let scope = GraphScope { basearch, stream }; + let oci = oci.unwrap_or_default(); + + let scope = GraphScope { + basearch, + stream, + oci, + }; // Optionally filter out scope according to given allowlist, if any. if let Some(allowlist) = scope_allowlist { if !allowlist.contains(&scope) { bail!( - "scope not allowed: basearch='{}', stream='{}'", + "scope not allowed: basearch='{}', stream='{}', oci='{}'", scope.basearch, - scope.stream + scope.stream, + scope.oci, ); } } @@ -57,26 +65,28 @@ mod tests { #[test] fn test_validate_scope() { { - let r = validate_scope(None, None, &None); + let r = validate_scope(None, None, None, &None); assert!(r.is_err()); } { let basearch = Some("test_empty".to_string()); let stream = Some("".to_string()); - let r = validate_scope(basearch, stream, &None); + let oci = None; + let r = validate_scope(basearch, stream, oci, &None); assert!(r.is_err()); } { let basearch = Some("x86_64".to_string()); let stream = Some("stable".to_string()); - let r = validate_scope(basearch, stream, &None); + let oci = Some(false); + let r = validate_scope(basearch, stream, oci, &None); assert!(r.is_ok()); } { let basearch = Some("x86_64".to_string()); let stream = Some("stable".to_string()); let filter_none_allowed = Some(HashSet::new()); - let r = validate_scope(basearch, stream, &filter_none_allowed); + let r = validate_scope(basearch, stream, None, &filter_none_allowed); assert!(r.is_err()); } { @@ -85,9 +95,10 @@ mod tests { let allowed_scope = GraphScope { basearch: "x86_64".to_string(), stream: "stable".to_string(), + oci: false, }; let filter = Some(maplit::hashset! {allowed_scope}); - let r = validate_scope(basearch, stream, &filter); + let r = validate_scope(basearch, stream, None, &filter); assert!(r.is_ok()); } } diff --git a/fcos-graph-builder/src/main.rs b/fcos-graph-builder/src/main.rs index 0d2767a..c916d75 100644 --- a/fcos-graph-builder/src/main.rs +++ b/fcos-graph-builder/src/main.rs @@ -24,27 +24,27 @@ lazy_static::lazy_static! { static ref CACHED_GRAPH_REQUESTS: IntCounterVec = register_int_counter_vec!( "fcos_cincinnati_gb_cache_graph_requests_total", "Total number of requests for a cached graph", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref GRAPH_FINAL_EDGES: IntGaugeVec = register_int_gauge_vec!( "fcos_cincinnati_gb_scraper_graph_final_edges", "Number of edges in the cached graph, after processing", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref GRAPH_FINAL_RELEASES: IntGaugeVec = register_int_gauge_vec!( "fcos_cincinnati_gb_scraper_graph_final_releases", "Number of releases in the cached graph, after processing", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref LAST_REFRESH: IntGaugeVec = register_int_gauge_vec!( "fcos_cincinnati_gb_scraper_graph_last_refresh_timestamp", "UTC timestamp of last graph refresh", - &["basearch", "stream"] + &["basearch", "stream", "type"] ).unwrap(); static ref UPSTREAM_SCRAPES: IntCounterVec = register_int_counter_vec!( "fcos_cincinnati_gb_scraper_upstream_scrapes_total", "Total number of upstream scrapes", - &["basearch", "stream"] + &["stream"] ).unwrap(); // NOTE(lucab): alternatively this could come from the runtime library, see // https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics @@ -76,10 +76,14 @@ fn main() -> Fallible<()> { (settings.service, settings.status) }; - let mut scrapers = HashMap::with_capacity(service_settings.scopes.len()); - for scope in &service_settings.scopes { - let addr = scraper::Scraper::new(scope.clone())?.start(); - scrapers.insert(scope.clone(), addr); + let mut scrapers = HashMap::with_capacity(service_settings.streams.len()); + for (&stream, &arches) in &service_settings.streams { + let addr = scraper::Scraper::new( + stream.to_string(), + arches.iter().map(|&arch| String::from(arch)).collect(), + )? + .start(); + scrapers.insert(stream.to_string(), addr); } // TODO(lucab): get allowed scopes from config file. @@ -126,7 +130,7 @@ fn main() -> Fallible<()> { #[derive(Clone, Debug)] pub(crate) struct AppState { scope_filter: Option>, - scrapers: HashMap>, + scrapers: HashMap>, } /// Mandatory parameters for querying a graph from graph-builder. @@ -134,29 +138,35 @@ pub(crate) struct AppState { struct GraphQuery { basearch: Option, stream: Option, + oci: Option, } pub(crate) async fn gb_serve_graph( data: web::Data, web::Query(query): web::Query, ) -> Result { - let scope = match commons::web::validate_scope(query.basearch, query.stream, &data.scope_filter) - { + let scope = match commons::web::validate_scope( + query.basearch, + query.stream, + query.oci, + &data.scope_filter, + ) { Err(e) => { log::error!("graph request with invalid scope: {}", e); return Ok(HttpResponse::BadRequest().finish()); } Ok(s) => { log::trace!( - "serving request for valid scope: basearch='{}', stream='{}'", + "serving request for valid scope: basearch='{}', stream='{}', oci='{}'", s.basearch, - s.stream + s.stream, + s.oci, ); s } }; - let addr = match data.scrapers.get(&scope) { + let addr = match data.scrapers.get(&scope.stream) { None => { log::error!( "no scraper configured for scope: basearch='{}', stream='{}'", diff --git a/fcos-graph-builder/src/scraper.rs b/fcos-graph-builder/src/scraper.rs index b1ff438..3aacef3 100644 --- a/fcos-graph-builder/src/scraper.rs +++ b/fcos-graph-builder/src/scraper.rs @@ -3,6 +3,7 @@ use actix_web::web::Bytes; use commons::{graph, metadata}; use failure::{Error, Fallible}; use reqwest::Method; +use std::collections::HashMap; use std::num::NonZeroU64; use std::time::Duration; @@ -12,40 +13,51 @@ const DEFAULT_HTTP_REQ_TIMEOUT: Duration = Duration::from_secs(30 * 60); /// Release scraper. #[derive(Clone, Debug)] pub struct Scraper { - graph: Bytes, + stream: String, + /// arch -> graph + graphs: HashMap, + /// arch -> graph + oci_graphs: HashMap, hclient: reqwest::Client, pause_secs: NonZeroU64, release_index_url: reqwest::Url, - scope: graph::GraphScope, - stream_metadata_url: reqwest::Url, + updates_url: reqwest::Url, } impl Scraper { - pub(crate) fn new(scope: graph::GraphScope) -> Fallible { - let graph = { + pub(crate) fn new(stream: String, arches: Vec) -> Fallible { + let empty = { let empty_graph = graph::Graph::default(); let data = serde_json::to_vec(&empty_graph)?; Bytes::from(data) }; + let graphs = arches + .iter() + .map(|arch| (arch.clone(), empty.clone())) + .collect(); + let oci_graphs = arches + .into_iter() + .map(|arch| (arch, empty.clone())) + .collect(); let vars = maplit::hashmap! { - "basearch".to_string() => scope.basearch.clone(), - "stream".to_string() => scope.stream.clone(), + "stream".to_string() => stream.clone(), }; let releases_json = envsubst::substitute(metadata::RELEASES_JSON, &vars)?; - let stream_json = envsubst::substitute(metadata::STREAM_JSON, &vars)?; + let updates_json = envsubst::substitute(metadata::UPDATES_JSON, &vars)?; let hclient = reqwest::ClientBuilder::new() .pool_idle_timeout(Some(Duration::from_secs(10))) .timeout(DEFAULT_HTTP_REQ_TIMEOUT) .build()?; let scraper = Self { - graph, + graphs, + oci_graphs, hclient, pause_secs: NonZeroU64::new(30).expect("non-zero pause"), - scope, + stream, release_index_url: reqwest::Url::parse(&releases_json)?, - stream_metadata_url: reqwest::Url::parse(&stream_json)?, + updates_url: reqwest::Url::parse(&updates_json)?, }; Ok(scraper) } @@ -56,6 +68,7 @@ impl Scraper { method: reqwest::Method, url: reqwest::Url, ) -> Fallible { + log::trace!("building new request for {url}"); let builder = self.hclient.request(method, url); Ok(builder) } @@ -75,7 +88,7 @@ impl Scraper { /// Fetch updates metadata. fn fetch_updates(&self) -> impl Future> { - let target = self.stream_metadata_url.clone(); + let target = self.updates_url.clone(); let req = self.new_request(Method::GET, target); async { @@ -87,44 +100,92 @@ impl Scraper { } /// Combine release-index and updates metadata. - fn assemble_graph(&self) -> impl Future> { + fn assemble_graphs( + &self, + ) -> impl Future< + Output = Result<(HashMap, HashMap), Error>, + > { let stream_releases = self.fetch_releases(); let stream_updates = self.fetch_updates(); - let scope = self.scope.clone(); - // NOTE(lucab): this inner scope is in order to get a 'static lifetime on - // the future for actix compatibility. - async { + // yuck... we clone a bunch here to keep the async closure 'static + let stream = self.stream.clone(); + let arches: Vec = self.graphs.keys().cloned().collect(); + + async move { let (graph, updates) = futures::future::try_join(stream_releases, stream_updates).await?; - graph::Graph::from_metadata(graph, updates, scope) + // first the legacy graphs + let mut map = HashMap::with_capacity(arches.len()); + for arch in &arches { + map.insert( + arch.clone(), + graph::Graph::from_metadata( + graph.clone(), + updates.clone(), + graph::GraphScope { + basearch: arch.clone(), + stream: stream.clone(), + oci: false, + }, + )?, + ); + } + // now the OCI graphs + let mut oci_map = HashMap::with_capacity(arches.len()); + for arch in &arches { + oci_map.insert( + arch.clone(), + graph::Graph::from_metadata( + graph.clone(), + updates.clone(), + graph::GraphScope { + basearch: arch.clone(), + stream: stream.clone(), + oci: true, + }, + )?, + ); + } + Ok((map, oci_map)) } } /// Update cached graph. - fn update_cached_graph(&mut self, graph: graph::Graph) -> Result<(), Error> { + fn update_cached_graph( + &mut self, + arch: String, + oci: bool, + graph: graph::Graph, + ) -> Result<(), Error> { let data = serde_json::to_vec_pretty(&graph).map_err(|e| failure::format_err!("{}", e))?; - self.graph = Bytes::from(data); + let graph_type = if oci { "oci" } else { "checksum" }; let refresh_timestamp = chrono::Utc::now(); crate::LAST_REFRESH - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&arch, &self.stream, graph_type]) .set(refresh_timestamp.timestamp()); crate::GRAPH_FINAL_EDGES - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&arch, &self.stream, graph_type]) .set(graph.edges.len() as i64); crate::GRAPH_FINAL_RELEASES - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&arch, &self.stream, graph_type]) .set(graph.nodes.len() as i64); log::trace!( - "cached graph for {}/{}: releases={}, edges={}", - self.scope.basearch, - self.scope.stream, + "cached graph for {}/{}/oci={}: releases={}, edges={}", + &arch, + self.stream, + oci, graph.nodes.len(), graph.edges.len() ); + if oci { + self.oci_graphs.insert(arch, Bytes::from(data)); + } else { + self.graphs.insert(arch, Bytes::from(data)); + } Ok(()) } } @@ -149,13 +210,20 @@ impl Handler for Scraper { fn handle(&mut self, _msg: RefreshTick, _ctx: &mut Self::Context) -> Self::Result { crate::UPSTREAM_SCRAPES - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) + .with_label_values(&[&self.stream]) .inc(); - let latest_graph = self.assemble_graph(); - let update_graph = actix::fut::wrap_future::<_, Self>(latest_graph) - .map(|graph, actor, _ctx| { - let res = graph.and_then(|g| actor.update_cached_graph(g)); + let latest_graphs = self.assemble_graphs(); + let update_graphs = actix::fut::wrap_future::<_, Self>(latest_graphs) + .map(|graphs, actor, _ctx| { + let res: Result<(), Error> = graphs.and_then(|(g, oci_g)| { + g.into_iter() + .map(|(arch, graph)| (arch, false, graph)) + .chain(oci_g.into_iter().map(|(arch, graph)| (arch, true, graph))) + .try_for_each(|(arch, oci, graph)| { + actor.update_cached_graph(arch, oci, graph) + }) + }); if let Err(e) = res { log::error!("transient scraping failure: {}", e); }; @@ -166,7 +234,7 @@ impl Handler for Scraper { actix::fut::ok(()) }); - Box::new(update_graph) + Box::new(update_graphs) } } @@ -183,25 +251,31 @@ impl Handler for Scraper { fn handle(&mut self, msg: GetCachedGraph, _ctx: &mut Self::Context) -> Self::Result { use failure::format_err; + let graph_type = if msg.scope.oci { "oci" } else { "checksum" }; - if msg.scope.basearch != self.scope.basearch { - return Box::new(actix::fut::err(format_err!( - "unexpected basearch '{}'", - msg.scope.basearch - ))); - } - if msg.scope.stream != self.scope.stream { + if msg.scope.stream != self.stream { return Box::new(actix::fut::err(format_err!( "unexpected stream '{}'", msg.scope.stream ))); } - - crate::CACHED_GRAPH_REQUESTS - .with_label_values(&[&self.scope.basearch, &self.scope.stream]) - .inc(); - - Box::new(actix::fut::ok(self.graph.clone())) + let target_graphmap = if msg.scope.oci { + &self.oci_graphs + } else { + &self.graphs + }; + if let Some(graph) = target_graphmap.get(&msg.scope.basearch) { + crate::CACHED_GRAPH_REQUESTS + .with_label_values(&[&msg.scope.basearch, &msg.scope.stream, graph_type]) + .inc(); + + Box::new(actix::fut::ok(graph.clone())) + } else { + Box::new(actix::fut::err(format_err!( + "unexpected basearch '{}'", + msg.scope.basearch + ))) + } } } diff --git a/fcos-graph-builder/src/settings.rs b/fcos-graph-builder/src/settings.rs index 2799d2d..40c7913 100644 --- a/fcos-graph-builder/src/settings.rs +++ b/fcos-graph-builder/src/settings.rs @@ -1,7 +1,6 @@ use crate::config::FileConfig; -use commons::graph::GraphScope; use failure::Fallible; -use std::collections::BTreeSet; +use std::collections::BTreeMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; /// Runtime settings for the graph-builder. @@ -25,7 +24,8 @@ pub struct ServiceSettings { pub(crate) origin_allowlist: Option>, pub(crate) ip_addr: IpAddr, pub(crate) port: u16, - pub(crate) scopes: BTreeSet, + // stream --> set of valid arches for it + pub(crate) streams: BTreeMap<&'static str, &'static [&'static str]>, } impl ServiceSettings { @@ -33,20 +33,11 @@ impl ServiceSettings { const DEFAULT_GB_SERVICE_ADDR: Ipv4Addr = Ipv4Addr::UNSPECIFIED; /// Default TCP port for graph-builder main service. const DEFAULT_GB_SERVICE_PORT: u16 = 8080; - /// Default scopes (basearch plus stream) to process. - const DEFAULT_SCOPES: [(&'static str, &'static str); 12] = [ - ("aarch64", "next"), - ("aarch64", "stable"), - ("aarch64", "testing"), - ("ppc64le", "next"), - ("ppc64le", "stable"), - ("ppc64le", "testing"), - ("s390x", "next"), - ("s390x", "stable"), - ("s390x", "testing"), - ("x86_64", "next"), - ("x86_64", "stable"), - ("x86_64", "testing"), + /// Default streams and their basearches to process. + const DEFAULT_STREAMS: [(&'static str, &'static [&'static str]); 3] = [ + ("stable", &["x86_64", "aarch64", "s390x", "ppc64le"]), + ("testing", &["x86_64", "aarch64", "s390x", "ppc64le"]), + ("next", &["x86_64", "aarch64", "s390x", "ppc64le"]), ]; pub fn socket_addr(&self) -> SocketAddr { @@ -60,13 +51,7 @@ impl Default for ServiceSettings { origin_allowlist: None, ip_addr: Self::DEFAULT_GB_SERVICE_ADDR.into(), port: Self::DEFAULT_GB_SERVICE_PORT, - scopes: Self::DEFAULT_SCOPES - .iter() - .map(|(basearch, stream)| GraphScope { - basearch: basearch.to_string(), - stream: stream.to_string(), - }) - .collect(), + streams: Self::DEFAULT_STREAMS.iter().copied().collect(), } } } diff --git a/fcos-policy-engine/src/main.rs b/fcos-policy-engine/src/main.rs index 7861675..68f7bea 100644 --- a/fcos-policy-engine/src/main.rs +++ b/fcos-policy-engine/src/main.rs @@ -130,6 +130,7 @@ pub struct GraphQuery { stream: Option, rollout_wariness: Option, node_uuid: Option, + oci: Option, } pub(crate) async fn pe_serve_graph( @@ -141,6 +142,7 @@ pub(crate) async fn pe_serve_graph( let scope = match commons::web::validate_scope( query.basearch.clone(), query.stream.clone(), + query.oci, &data.scope_filter, ) { Err(e) => { @@ -160,6 +162,7 @@ pub(crate) async fn pe_serve_graph( data.upstream_endpoint.clone(), scope.stream, scope.basearch, + scope.oci, data.upstream_req_timeout, ) .await?; @@ -187,7 +190,7 @@ fn compute_wariness(params: &GraphQuery) -> f64 { .unwrap_or_default() .parse::() { - let wariness = input.max(0.0).min(1.0); + let wariness = input.clamp(0.0, 1.0); return wariness; } @@ -204,9 +207,9 @@ fn compute_wariness(params: &GraphQuery) -> f64 { uuid.hash(&mut hasher); let digest = hasher.finish(); // Scale down. - let scaled = (digest as f64) / (std::u64::MAX as f64); + let scaled = (digest as f64) / (u64::MAX as f64); // Clamp within limits. - scaled.max(COMPUTED_MIN).min(COMPUTED_MAX) + scaled.clamp(COMPUTED_MIN, COMPUTED_MAX) }; wariness diff --git a/fcos-policy-engine/src/utils.rs b/fcos-policy-engine/src/utils.rs index b459cbe..bd68760 100644 --- a/fcos-policy-engine/src/utils.rs +++ b/fcos-policy-engine/src/utils.rs @@ -19,6 +19,7 @@ pub(crate) async fn fetch_graph_from_gb( upstream_base: reqwest::Url, stream: String, basearch: String, + oci: bool, req_timeout: Duration, ) -> Result { if stream.trim().is_empty() { @@ -32,6 +33,7 @@ pub(crate) async fn fetch_graph_from_gb( basearch: Some(basearch), rollout_wariness: None, node_uuid: None, + oci: Some(oci), }; // Cannot use `?` directly here otherwise will produce the error: // the trait `std::marker::Sync` is not implemented for `(dyn std::error::Error + std::marker::Send + 'static)`