Skip to content

Commit

Permalink
Merge pull request #19 from Urban-Analytics-Technology-Platform/combi…
Browse files Browse the repository at this point in the history
…ne_geoms_and_metrics

V1 of bringing everything together
  • Loading branch information
yongrenjie authored May 7, 2024
2 parents a1b40cf + b7e0837 commit ccbed08
Show file tree
Hide file tree
Showing 9 changed files with 535,758 additions and 47 deletions.
15 changes: 10 additions & 5 deletions schema/popgetter_0.1.0.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@
"items": {
"$ref": "#/definitions/CountryMetadata"
}
},
"available_metrics":{
"title":"The metrics from this data source",
"description":"A list of metrics that are provided by this source",
"type":"array",
"items":{
"$ref":"#/definitions/MetricMetadata"
}
}
},
"required": [
Expand Down Expand Up @@ -267,12 +275,9 @@
"source_metric_id",
"description",
"hxl_tag",
"parquet_column_name",
"source_data_release_id",
"source_download_url",
"source_documentation_url"
"parquet_column_name"
]
}
},
"version": "0.1.0"
}
}
37 changes: 25 additions & 12 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use anyhow::Result;
use clap::{Args, Parser, Subcommand};
use enum_dispatch::enum_dispatch;
use popgetter::data_request_spec::{BBox, DataRequestSpec, RegionSpec};
use popgetter::{
data_request_spec::{BBox, DataRequestSpec, MetricSpec, RegionSpec},
Popgetter,
};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use strum_macros::EnumString;
Expand All @@ -21,7 +24,7 @@ pub enum OutputFormat {
/// Trait that defines what to run when a given subcommand is invoked.
#[enum_dispatch]
pub trait RunCommand {
fn run(&self) -> Result<()>;
async fn run(&self) -> Result<()>;
}

/// The Data command is the one we invoke to get a set of metrics and geometry
Expand All @@ -45,21 +48,31 @@ pub struct DataCommand {
}

impl RunCommand for DataCommand {
fn run(&self) -> Result<()> {
println!("Running Data Command");
println!("{self:#?}");
async fn run(&self) -> Result<()> {
let popgetter = Popgetter::new()?;
let data_request = DataRequestSpec::from(self);
let results = popgetter.get_data_request(&data_request).await?;
println!("{results:#?}");
Ok(())
}
}

impl From<DataCommand> for DataRequestSpec {
fn from(value: DataCommand) -> Self {
let region = if let Some(bbox) = value.bbox {
impl From<&DataCommand> for DataRequestSpec {
fn from(value: &DataCommand) -> Self {
let region = if let Some(bbox) = value.bbox.clone() {
vec![RegionSpec::BoundingBox(bbox)]
} else {
vec![]
};
let metrics = vec![];

let metrics: Vec<MetricSpec> = if let Some(metric_string) = &value.metrics {
metric_string
.split(',')
.map(|s| MetricSpec::NamedMetric(s.trim().into()))
.collect()
} else {
vec![]
};
DataRequestSpec { region, metrics }
}
}
Expand All @@ -77,7 +90,7 @@ pub struct MetricsCommand {
}

impl RunCommand for MetricsCommand {
fn run(&self) -> Result<()> {
async fn run(&self) -> Result<()> {
println!("Running Metrics Command");
Ok(())
}
Expand All @@ -89,7 +102,7 @@ impl RunCommand for MetricsCommand {
pub struct CountriesCommand;

impl RunCommand for CountriesCommand {
fn run(&self) -> Result<()> {
async fn run(&self) -> Result<()> {
println!("Running Countries Command");
Ok(())
}
Expand All @@ -101,7 +114,7 @@ impl RunCommand for CountriesCommand {
pub struct SurveysCommand;

impl RunCommand for SurveysCommand {
fn run(&self) -> Result<()> {
async fn run(&self) -> Result<()> {
println!("Running Surveys Command");
Ok(())
}
Expand Down
29 changes: 29 additions & 0 deletions src/data_request_spec.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,44 @@
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::{
any::Any,
ops::{Index, IndexMut},
str::FromStr,
};

use crate::{metadata::SourceDataRelease, parquet::MetricRequest};

#[derive(Serialize, Deserialize, Debug)]
pub struct DataRequestSpec {
pub region: Vec<RegionSpec>,
pub metrics: Vec<MetricSpec>,
}

impl DataRequestSpec {
pub fn metric_requests(&self, catalogue: &SourceDataRelease) -> Result<Vec<MetricRequest>> {
let mut metric_requests: Vec<MetricRequest> = vec![];
println!("Try to get metrics {:#?}", self.metrics);
for metric_spec in &self.metrics {
match metric_spec {
MetricSpec::NamedMetric(name) => {
metric_requests.push(
catalogue
.get_metric_details(&name)
.with_context(|| "Failed to find metric")?
.into(),
);
}
_ => todo!("unsupported metric spec"),
}
}
Ok(metric_requests)
}

pub fn geom_details(&self, catalogue: &SourceDataRelease) -> Result<String> {
Ok(catalogue.geography_file.clone())
}
}

#[derive(Serialize, Deserialize, Debug)]
pub enum MetricSpec {
NamedMetric(String),
Expand Down
57 changes: 30 additions & 27 deletions src/geo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use flatgeobuf::{ FeatureProperties, HttpFgbReader, geozero};
use geozero::ToWkt;
use anyhow::Result;
use anyhow::{Context, Result};
use polars::{frame::DataFrame, prelude::NamedFrom, series::Series};
use crate::data_request_spec::BBox;

/// Function to request geometries from a remotly hosted FGB
Expand All @@ -11,7 +12,7 @@ use crate::data_request_spec::BBox;
/// `bbox`: an optional bounding box to filter the features by
///
/// Returns: a Result object containing a vector of (geometry, properties).
pub async fn get_geometries(file_url:&str, bbox:Option<&BBox>) -> Result<Vec<(String, HashMap<String,String>)>>{
pub async fn get_geometries(file_url:&str, bbox:Option<&BBox>, geoid_col: Option<String>) -> Result<DataFrame>{
let fgb = HttpFgbReader::open(file_url)
.await?;

Expand All @@ -24,15 +25,23 @@ pub async fn get_geometries(file_url:&str, bbox:Option<&BBox>) -> Result<Vec<(St
.select_all()
.await?
};

let mut features = Vec::new();

let mut geoms: Vec<String> = vec![];
let mut ids: Vec<String> = vec![];
let geoid_col = geoid_col.unwrap_or_else(|| "GEOID".to_owned());

while let Some(feature) = fgb.next().await? {
let props = feature.properties()?;
let geom = feature.to_wkt()?;
features.push((geom, props));
geoms.push(feature.to_wkt()?);
let id = props.get(&geoid_col)
.with_context(|| "failed to get id")?;
ids.push(id.clone());
}
Ok(features)

let ids = Series::new("GEOID",ids);
let geoms = Series::new("geometry", geoms);
let result = DataFrame::new(vec![ids,geoms])?;
Ok(result)
}

#[cfg(test)]
Expand All @@ -41,11 +50,11 @@ mod tests {
use flatgeobuf::{geozero::PropertyProcessor, ColumnType, FgbWriter, GeometryType};
use ::geozero::{ColumnValue, geojson::GeoJson};
use httpmock::prelude::*;
use polars::datatypes::AnyValue;

fn test_fgb()-> FgbWriter<'static>{
let mut fgb = FgbWriter::create("countries", GeometryType::Polygon).unwrap();
fgb.add_column("id", ColumnType::String ,|_fbb,col| { col.nullable=false} );
fgb.add_column("val", ColumnType::Float,|_fbb,col| { col.nullable=false} );
fgb.add_column("GEOID", ColumnType::String ,|_fbb,col| { col.nullable=false} );
let geom1 = GeoJson(r#"
{
"coordinates": [
Expand Down Expand Up @@ -116,12 +125,10 @@ mod tests {

fgb.add_feature_geom(geom1,|feat|{
feat.property(0,"id" ,&ColumnValue::String("one")).unwrap();
feat.property(1,"val" ,&ColumnValue::Float(0.2)).unwrap();
}).unwrap();

fgb.add_feature_geom(geom2,|feat|{
feat.property(0,"id" ,&ColumnValue::String("two")).unwrap();
feat.property(1,"val" ,&ColumnValue::Float(30.0) ).unwrap();
}).unwrap();

fgb
Expand Down Expand Up @@ -150,20 +157,18 @@ mod tests {
let server = mock_fgb_server();

// Get the geometries
let geoms = get_geometries(&server.url("/fgb_example.fgb"),None ).await;
let geoms = get_geometries(&server.url("/fgb_example.fgb"),None, None).await;
println!("{geoms:#?}");
assert!(geoms.is_ok(),"The geometry call should not error");
let geoms = geoms.unwrap();

assert_eq!(geoms.len(),2, "Should recover two features");
assert_eq!(geoms.shape(),(2,2), "Should recover two features");
// Order seems to get moved around when reading back
let feature2 = &geoms[0];
let feature1 = &geoms[1];
assert_eq!(feature1.0, "POLYGON((-2.3188783647485707 52.69979322604925,-2.5454719671669466 52.15312184913398,-2.013470682738614 51.91675689282138,-1.6883550645436003 52.297957365071426,-1.865690550690033 52.79521231460612,-2.3188783647485707 52.69979322604925))" , "feature_1 should have the correct geometry");
assert_eq!(feature2.0, "POLYGON((-0.12189002707808072 51.69745608244253,-0.663758311409083 51.538416425565146,-0.3977386344766103 51.38495470191404,-0.0627732105033374 51.37880690189908,0.16381686538463214 51.54453889108953,-0.12189002707808072 51.69745608244253))", "feature_2 should have the correct geometry");
assert_eq!(feature1.1.get("id").unwrap(), "one" , "feature_1 Should have the correct id");
assert_eq!(feature2.1.get("id").unwrap(), "two" , "feature_2 Should have the correct id");
assert_eq!(feature1.1.get("val").unwrap(), "0.2" , "feature_1 Should have the correct value");
assert_eq!(feature2.1.get("val").unwrap(), "30" , "feature_2 Should have the correct value");

let row1 = geoms.get(0).unwrap();
let row2 = geoms.get(1).unwrap();
assert!(row1[0].eq(&AnyValue::String("two")), "Feature 1 should have the right ID");
assert!(row2[0].eq(&AnyValue::String("one")), "Feature 2 should have the right ID");
}

#[tokio::test]
Expand All @@ -176,18 +181,16 @@ mod tests {
-1.373_095_490_899_146_4,
53.026_908_220_355_35,
]);
let geoms = get_geometries(&server.url("/fgb_example.fgb"),Some(&bbox)).await;
let geoms = get_geometries(&server.url("/fgb_example.fgb"),Some(&bbox), None).await;

assert!(geoms.is_ok(),"The geometry call should not error");
let geoms = geoms.unwrap();
println!("{geoms:#?}");

assert_eq!(geoms.len(),1, "Should recover one features");
assert_eq!(geoms.shape(),(1,2), "Should recover one features");
// Order seems to get moved around when reading back
println!("{geoms:#?}");
let feature1 = &geoms[0];
assert_eq!(feature1.0, "POLYGON((-2.3188783647485707 52.69979322604925,-2.5454719671669466 52.15312184913398,-2.013470682738614 51.91675689282138,-1.6883550645436003 52.297957365071426,-1.865690550690033 52.79521231460612,-2.3188783647485707 52.69979322604925))" , "feature_1 should have the correct geometry");
assert_eq!(feature1.1.get("id").unwrap(), "one" , "feature_1 Should have the correct id");
assert_eq!(feature1.1.get("val").unwrap(), "0.2" , "feature_1 Should have the correct value");

}

}
31 changes: 30 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use anyhow::Result;
use data_request_spec::DataRequestSpec;
use metadata::{load_metadata, SourceDataRelease};
use parquet::{get_metrics, MetricRequest};
use polars::{frame::DataFrame, prelude::DataFrameJoinOps};
use tokio::try_join;

use crate::geo::get_geometries;
pub mod data_request_spec;
pub mod geo;
pub mod metadata;
Expand All @@ -10,8 +16,31 @@ pub struct Popgetter {
}

impl Popgetter {
/// Setup the Popgetter object
pub fn new() -> Result<Self> {
let metadata = load_metadata("us_metadata.json")?;
let metadata = load_metadata("us_metadata_test2.json")?;
Ok(Self { metadata })
}

/// Given a Data Request Spec
/// Return a DataFrame of the selected dataset
pub async fn get_data_request(&self, data_request: &DataRequestSpec) -> Result<DataFrame> {
let metric_requests = data_request.metric_requests(&self.metadata)?;
let geom_file = data_request.geom_details(&self.metadata)?;

// Required because polars is blocking
let metrics = tokio::task::spawn_blocking(move || {
get_metrics(&metric_requests,None)
});

let geoms = get_geometries(&geom_file, None, None);

// try_join requires us to have the errors from all futures be the same.
// We use anyhow to get it back properly
let (metrics,geoms) = try_join!(async move { metrics.await.map_err(anyhow::Error::from)}, geoms)?;

let result =metrics?.left_join(&geoms,["GEO_ID"],["GEOID"])?;
Ok(result)
}
}

5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use anyhow::Result;
use clap::Parser;
use cli::{Cli, RunCommand};

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let args = Cli::parse();
if let Some(command) = args.command {
command.run()?;
command.run().await?;
}
Ok(())
}
30 changes: 30 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,38 @@ use anyhow::Result;
use serde::{Deserialize, Serialize};
use typify::import_types;

use crate::parquet::MetricRequest;

import_types!("schema/popgetter_0.1.0.json");

impl SourceDataRelease{
pub fn get_metric_details(&self,metric_id: &str)->Option<&MetricMetadata>{
self.available_metrics.iter().find(
|m| m.source_metric_id == metric_id
)
}
}

// TODO we might want to just pass the MetricMetaData rather than
// having to
impl From<MetricMetadata> for MetricRequest{
fn from(value: MetricMetadata) -> Self {
MetricRequest {
column: value.parquet_column_name.clone(),
file: value.metric_parquet_file_url.clone().unwrap()
}
}
}

impl From<&MetricMetadata> for MetricRequest{
fn from(value: &MetricMetadata) -> Self {
MetricRequest {
column: value.parquet_column_name.clone(),
file: value.metric_parquet_file_url.clone().unwrap()
}
}
}

pub fn load_metadata(path: &str) -> Result<SourceDataRelease> {
let file = File::open(path)?;
let reader = BufReader::new(file);
Expand Down
Loading

0 comments on commit ccbed08

Please sign in to comment.