Skip to content

Commit

Permalink
Merge pull request #75 from Urban-Analytics-Technology-Platform/74-ca…
Browse files Browse the repository at this point in the history
…ching

Add caching for CLI (#74)
  • Loading branch information
sgreenbury authored Sep 3, 2024
2 parents e6f1d3a + ef826ee commit 8004fb8
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 21 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ path = "src/lib.rs"
[[bin]]
name = "popgetter"
path = "src/main.rs"
required-features = ["cache", "formatters"]

[dependencies]
anyhow = "1.0.75"
Expand Down Expand Up @@ -58,5 +59,6 @@ tempfile = "3.12"


[features]
default = ["formatters"]
default = ["cache", "formatters"]
cache = []
formatters = ["dep:geojson"]
37 changes: 32 additions & 5 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// FromStr is required by EnumString. The compiler seems to not be able to
// see that and so is giving a warning. Dont remove it
use anyhow::Result;
use anyhow::{Context, Result};
use clap::{Args, Parser, Subcommand};
use enum_dispatch::enum_dispatch;
use log::{debug, info};
use log::{debug, error, info};
use nonempty::nonempty;
use polars::frame::DataFrame;
use popgetter::{
Expand Down Expand Up @@ -144,6 +144,33 @@ impl From<OutputFormat> for OutputFormatter {
}
}

async fn read_popgetter(config: Config) -> anyhow::Result<Popgetter> {
let xdg_dirs = xdg::BaseDirectories::with_prefix("popgetter")?;
// TODO: enable cache to be optional
let path = xdg_dirs.get_cache_home();
// Try to read metadata from cache
if path.exists() {
match Popgetter::new_with_config_and_cache(config.clone(), &path) {
Ok(popgetter) => return Ok(popgetter),
Err(err) => {
// Log error, continue without cache and attempt to create one
error!("Failed to read metadata from cache with error: {err}");
}
}
}
// If no metadata cache, get metadata and try to cache
std::fs::create_dir_all(&path)?;
let popgetter = Popgetter::new_with_config(config).await?;

// If error creating cache, remove cache path
if let Err(err) = popgetter.metadata.write_cache(&path) {
std::fs::remove_dir_all(&path)
.with_context(|| "Failed to remove cache dir following error writing cache: {err}")?;
Err(err)?
}
Ok(popgetter)
}

impl RunCommand for DataCommand {
async fn run(&self, config: Config) -> Result<()> {
info!("Running `data` subcommand");
Expand All @@ -153,7 +180,7 @@ impl RunCommand for DataCommand {
DOWNLOADING_SEARCHING_STRING.to_string() + RUNNING_TAIL_STRING,
)
});
let popgetter = Popgetter::new_with_config(config).await?;
let popgetter = read_popgetter(config).await?;
let search_params: SearchParams = self.search_params_args.clone().into();
let search_results = popgetter.search(&search_params);

Expand Down Expand Up @@ -360,7 +387,7 @@ impl RunCommand for MetricsCommand {
DOWNLOADING_SEARCHING_STRING.into(),
)
});
let popgetter = Popgetter::new_with_config(config).await?;
let popgetter = read_popgetter(config).await?;
let search_results = popgetter.search(&self.search_params_args.to_owned().into());
if let Some(mut s) = sp {
s.stop_with_symbol(COMPLETE_PROGRESS_STRING);
Expand Down Expand Up @@ -400,7 +427,7 @@ impl RunCommand for CountriesCommand {
spinner_message.to_string() + RUNNING_TAIL_STRING,
)
});
let popgetter = Popgetter::new_with_config(config).await?;
let popgetter = read_popgetter(config).await?;
if let Some(mut s) = sp {
s.stop_with_symbol(COMPLETE_PROGRESS_STRING);
}
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(default)]
pub struct Config {
pub base_path: String,
Expand Down
36 changes: 34 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::path::Path;

use anyhow::Result;
use data_request_spec::DataRequestSpec;
use log::debug;
use metadata::Metadata;
use polars::frame::DataFrame;
use search::{DownloadParams, Params, SearchParams, SearchResults};
use search::{Params, SearchParams, SearchResults};

use crate::config::Config;

Expand All @@ -22,7 +24,8 @@ pub mod metadata;
pub mod parquet;
pub mod search;

/// Type for popgetter data and API
/// Type for popgetter metadata, config and API
#[derive(Debug, PartialEq)]
pub struct Popgetter {
pub metadata: Metadata,
pub config: Config,
Expand All @@ -41,7 +44,16 @@ impl Popgetter {
Ok(Self { metadata, config })
}

// Only include method with "cache" feature since it requires a filesystem
#[cfg(feature = "cache")]
/// Setup the Popgetter object with custom configuration from cache
pub fn new_with_config_and_cache<P: AsRef<Path>>(config: Config, cache: P) -> Result<Self> {
let metadata = Metadata::from_cache(cache)?;
Ok(Self { metadata, config })
}

/// Generates `SearchResults` using popgetter given `SearchParams`
// TODO: consider reverting to an API where `SearchParams` are moved, add benches
pub fn search(&self, search_params: &SearchParams) -> SearchResults {
search_params
.clone()
Expand All @@ -67,3 +79,23 @@ impl Popgetter {
.await
}
}

#[cfg(test)]
#[cfg(feature = "cache")]
mod tests {

use tempfile::TempDir;

use super::*;

#[tokio::test]
async fn test_popgetter_cache() -> anyhow::Result<()> {
let tempdir = TempDir::new()?;
let config = Config::default();
let popgetter = Popgetter::new_with_config(config.clone()).await?;
popgetter.metadata.write_cache(&tempdir)?;
let popgetter_from_cache = Popgetter::new_with_config_and_cache(config, &tempdir)?;
assert_eq!(popgetter, popgetter_from_cache);
Ok(())
}
}
76 changes: 64 additions & 12 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
use std::default::Default;
use std::fmt::Display;
use std::path::Path;

use anyhow::{anyhow, Result};
use futures::future::join_all;
use log::debug;
use log::info;
use polars::{
frame::DataFrame,
lazy::{
dsl::col,
frame::{IntoLazy, LazyFrame, ScanArgsParquet},
},
prelude::{JoinArgs, JoinType, UnionArgs},
prelude::{DataFrame, JoinArgs, JoinType, ParquetCompression, ParquetWriter, UnionArgs},
};
use tokio::try_join;

use crate::{config::Config, search::MetricId, COL};

/// This struct contains the base url and names of the files that contain the metadata.
pub struct PATHS {}

impl PATHS {
pub const GEOMETRY_METADATA: &'static str = "geometry_metadata.parquet";
pub const METRIC_METADATA: &'static str = "metric_metadata.parquet";
pub const COUNTRY: &'static str = "country_metadata.parquet";
pub const SOURCE: &'static str = "source_data_releases.parquet";
pub const PUBLISHER: &'static str = "data_publishers.parquet";
/// This module contains the names of the files that contain the metadata.
pub mod paths {
pub const GEOMETRY_METADATA: &str = "geometry_metadata.parquet";
pub const METRIC_METADATA: &str = "metric_metadata.parquet";
pub const COUNTRY: &str = "country_metadata.parquet";
pub const SOURCE: &str = "source_data_releases.parquet";
pub const PUBLISHER: &str = "data_publishers.parquet";
}
use paths as PATHS;

/// `CountryMetadataLoader` takes a country iso string
/// along with a CountryMetadataPaths and provides methods
Expand All @@ -50,7 +49,7 @@ impl ExpandedMetadata {
/// from a single `CountryMetadataLoader` or for all countries.
/// It also provides the various functions for searching and
/// getting `MetricRequests` from the catalogue.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Metadata {
pub metrics: DataFrame,
pub geometries: DataFrame,
Expand All @@ -59,6 +58,59 @@ pub struct Metadata {
pub countries: DataFrame,
}

#[cfg(feature = "cache")]
fn path_to_df<P: AsRef<Path>>(path: P) -> anyhow::Result<DataFrame> {
Ok(LazyFrame::scan_parquet(path, ScanArgsParquet::default())?.collect()?)
}

#[cfg(feature = "cache")]
fn df_to_file<P: AsRef<Path>>(path: P, df: &DataFrame) -> anyhow::Result<()> {
let file = std::fs::File::create(path)?;
ParquetWriter::new(file)
.with_compression(ParquetCompression::Zstd(None))
.finish(&mut df.clone())?;
Ok(())
}

#[cfg(feature = "cache")]
fn prepend<P: AsRef<Path>>(cache_path: P, file_name: &str) -> std::path::PathBuf {
cache_path.as_ref().join(file_name)
}

// Only include methods with "cache" feature since it requires a filesystem
#[cfg(feature = "cache")]
impl Metadata {
pub fn from_cache<P: AsRef<Path>>(cache_dir: P) -> anyhow::Result<Self> {
let metrics = path_to_df(prepend(&cache_dir, PATHS::METRIC_METADATA))?;
let geometries = path_to_df(prepend(&cache_dir, PATHS::GEOMETRY_METADATA))?;
let source_data_releases = path_to_df(prepend(&cache_dir, PATHS::SOURCE))?;
let data_publishers = path_to_df(prepend(&cache_dir, PATHS::PUBLISHER))?;
let countries = path_to_df(prepend(&cache_dir, PATHS::COUNTRY))?;
Ok(Self {
metrics,
geometries,
source_data_releases,
data_publishers,
countries,
})
}

pub fn write_cache<P: AsRef<Path>>(&self, cache_dir: P) -> anyhow::Result<()> {
df_to_file(prepend(&cache_dir, PATHS::METRIC_METADATA), &self.metrics)?;
df_to_file(
prepend(&cache_dir, PATHS::GEOMETRY_METADATA),
&self.geometries,
)?;
df_to_file(
prepend(&cache_dir, PATHS::SOURCE),
&self.source_data_releases,
)?;
df_to_file(prepend(&cache_dir, PATHS::PUBLISHER), &self.data_publishers)?;
df_to_file(prepend(&cache_dir, PATHS::COUNTRY), &self.countries)?;
Ok(())
}
}

/// Describes a fully specified selection plan. The MetricIds should all
/// be the ID variant. Geometry and years are backed in now.
/// Advice specifies and alternative options that the user should
Expand Down

0 comments on commit 8004fb8

Please sign in to comment.