diff --git a/output_spec.md b/output_spec.md index 8ad02cd..96caf76 100644 --- a/output_spec.md +++ b/output_spec.md @@ -30,6 +30,7 @@ The top level should contain: ├── metric_metadata.parquet ├── source_data_releases.parquet ├── data_publishers.parquet + ├── geometry_metadata.parquet ├── country_metadata.parquet ├── metrics/ │   ├── {metric_filename_a}.parquet @@ -51,6 +52,8 @@ with tabulated metadata: `SourceDataRelease` - A `data_publishers.parquet` file containing a serialised list of `DataPublisher` +- A `geometry_metadata.parquet` file containing a serialised list of + `GeometryMetadata` - A `country_metadata.parquet` file containing a serialised list of `CountryMetadata` (most countries should only have one entry, but there may be more than one because of entities like the UK) @@ -63,17 +66,20 @@ serialised as parquet files, and can be given any filename, as the `MetricMetadata` struct should contain the filepath to them. Likewise, geometries should be placed in the `geometries` subdirectory. Each set -of geometries should consist of two files, with the same filename stem and -different extensions: - -- `{filename}.fgb` - a FlatGeobuf file with the geoIDs stored in the `GEO_ID` - column +of geometries should consist of four files, with the same filename stem and +different extensions. The filename stem is specified inside the +`GeometryMetadata` struct. + +- `{filename}.flatgeobuf` - a FlatGeobuf file with the geoIDs stored in the + `GEO_ID` column +- `{filename}.geojsonseq` - corresponding GeoJSONSeq file +- `{filename}.pmtiles` - corresponding PMTiles file - `{filename}.parquet` - a serialised dataframe storing the names of the corresponding areas. This dataframe must have: - a `GEO_ID` column which corresponds exactly to those in the FlatGeobuf file. - one or more other columns, whose names are - [lowercase ISO 639-3 chdes](https://iso639-3.sil.org/code_tables/639/data), + [lowercase ISO 639-3 codes](https://iso639-3.sil.org/code_tables/639/data), and contain the names of each region in those specified languages. For example, the parquet file corresponding to the Belgian regions (with diff --git a/pyproject.toml b/pyproject.toml index 5ff16c2..00743ca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,8 @@ dependencies = [ "jcs >=0.2.1", # For generating IDs from class attributes ] + + [project.optional-dependencies] test = [ "pytest >=6", diff --git a/python/popgetter/__init__.py b/python/popgetter/__init__.py index eccfaba..144be45 100644 --- a/python/popgetter/__init__.py +++ b/python/popgetter/__init__.py @@ -1,8 +1,23 @@ from __future__ import annotations +import os +import warnings from collections.abc import Sequence from pathlib import Path +from dagster import ExperimentalWarning + +from popgetter.io_managers.azure import ( + AzureGeneralIOManager, + AzureGeoIOManager, + AzureMetadataIOManager, + AzureMetricsIOManager, +) +from popgetter.io_managers.local import ( + LocalGeoIOManager, + LocalMetadataIOManager, + LocalMetricsIOManager, +) from popgetter.utils import StagingDirResource __version__ = "0.1.0" @@ -10,6 +25,12 @@ __all__ = ["__version__"] +if "IGNORE_EXPERIMENTAL_WARNINGS" in os.environ: + warnings.filterwarnings("ignore", category=ExperimentalWarning) + + +import os + from dagster import ( AssetsDefinition, AssetSelection, @@ -27,13 +48,18 @@ UnresolvedAssetJobDefinition, ) -from popgetter import assets, cloud_outputs +from popgetter import assets, azure_test, cloud_outputs all_assets: Sequence[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] = [ *load_assets_from_package_module(assets.us, group_name="us"), *load_assets_from_package_module(assets.be, group_name="be"), *load_assets_from_package_module(assets.uk, group_name="uk"), - *load_assets_from_modules([cloud_outputs], group_name="cloud_assets"), + *load_assets_from_package_module(cloud_outputs, group_name="cloud_outputs"), + *( + load_assets_from_modules([azure_test], group_name="azure_test") + if os.getenv("ENV") == "prod" + else [] + ), ] job_be: UnresolvedAssetJobDefinition = define_asset_job( @@ -56,15 +82,43 @@ ) +def resources_by_env(): + env = os.getenv("ENV", "dev") + if env == "prod": + return { + "metadata_io_manager": AzureMetadataIOManager(), + "geometry_io_manager": AzureGeoIOManager(), + "metrics_io_manager": AzureMetricsIOManager(), + "azure_general_io_manager": AzureGeneralIOManager(".bin"), + } + if env == "dev": + return { + "metadata_io_manager": LocalMetadataIOManager(), + "geometry_io_manager": LocalGeoIOManager(), + "metrics_io_manager": LocalMetricsIOManager(), + } + + err = f"$ENV should be either 'dev' or 'prod', but received '{env}'" + raise ValueError(err) + + +resources = { + "pipes_subprocess_client": PipesSubprocessClient(), + "staging_res": StagingDirResource( + staging_dir=str(Path(__file__).parent.joinpath("staging_dir").resolve()) + ), +} + +resources.update(resources_by_env()) + defs: Definitions = Definitions( assets=all_assets, schedules=[], - sensors=[cloud_outputs.country_outputs_sensor], - resources={ - "pipes_subprocess_client": PipesSubprocessClient(), - "staging_res": StagingDirResource( - staging_dir=str(Path(__file__).parent.joinpath("staging_dir").resolve()) - ), - }, + sensors=[ + cloud_outputs.metadata_sensor, + cloud_outputs.geometry_sensor, + cloud_outputs.metrics_sensor, + ], + resources=resources, jobs=[job_be, job_us, job_uk], ) diff --git a/python/popgetter/assets/be/__init__.py b/python/popgetter/assets/be/__init__.py index 60f9d46..46f9119 100755 --- a/python/popgetter/assets/be/__init__.py +++ b/python/popgetter/assets/be/__init__.py @@ -1,28 +1,11 @@ #!/usr/bin/python3 from __future__ import annotations -from dagster import ( - asset, -) - -from popgetter.metadata import ( - CountryMetadata, -) - from . import ( census_derived, # noqa: F401 census_geometry, # noqa: F401 census_tables, # noqa: F401 ) -from .belgium import asset_prefix, country - - -@asset(key_prefix=asset_prefix) -def get_country_metadata() -> CountryMetadata: - """ - Returns a CountryMetadata of metadata about the country. - """ - return country # @asset(key_prefix=asset_prefix) diff --git a/python/popgetter/assets/be/census_derived.py b/python/popgetter/assets/be/census_derived.py index 94eaf9f..8e3bbcc 100644 --- a/python/popgetter/assets/be/census_derived.py +++ b/python/popgetter/assets/be/census_derived.py @@ -1,21 +1,24 @@ from __future__ import annotations +from collections.abc import Callable +from dataclasses import dataclass +from functools import reduce + import pandas as pd from dagster import ( AssetIn, - AssetOut, + IdentityPartitionMapping, MetadataValue, SpecificPartitionsPartitionMapping, StaticPartitionsDefinition, asset, - multi_asset, ) from icecream import ic -from popgetter.metadata import MetricMetadata +from popgetter.metadata import MetricMetadata, SourceDataRelease, metadata_to_dataframe from .belgium import asset_prefix -from .census_tables import dataset_node_partition, source +from .census_tables import dataset_node_partition _needed_dataset = [ { @@ -48,55 +51,66 @@ needed_dataset_mapping = SpecificPartitionsPartitionMapping(_needed_dataset_nodes) needed_dataset_partition = StaticPartitionsDefinition(_needed_dataset_nodes) -# Using HXL tags for variable names (https://hxlstandard.org/standard/1-1final/dictionary/#tag_population) -_derived_columns: list[dict] = [ - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_children_age5_17", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] >= 5) & (df["CD_AGE"] < 18), - }, - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_infants_age0_4", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] <= 4), - }, - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_children_age0_17", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] >= 0) & (df["CD_AGE"] < 18), - }, - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_adults_f", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] > 18) & (df["CD_SEX"] == "F"), - }, - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_adults_m", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] > 18) & (df["CD_SEX"] == "M"), - }, - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_adults", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] > 18), - }, - { - "node": "https://statbel.fgov.be/node/4689", - "hxltag": "population_ind", - "group_by_column": "CD_REFNIS", - "filter_func": lambda df: (df["CD_AGE"] >= 0), - }, -] -derived_columns = pd.DataFrame( - _derived_columns, columns=["node", "hxltag", "group_by_column", "filter_func"] -) +@dataclass +class DerivedColumn: + hxltag: str + filter_func: Callable[[pd.DataFrame], pd.DataFrame] + output_column_name: str + human_readable_name: str + + +# The keys of this dict are the nodes (i.e. partition keys). The values are a +# list of all columns of data derived from this node. +DERIVED_COLUMN_SPECIFICATIONS: dict[str, tuple[str, list[DerivedColumn]]] = { + "https://statbel.fgov.be/node/4689": ( + "CD_REFNIS", + [ + DerivedColumn( + hxltag="#population+children+age5_17", + filter_func=lambda df: df.query("CD_AGE >= 5 and CD_AGE < 18"), + output_column_name="children_5_17", + human_readable_name="Children aged 5 to 17", + ), + DerivedColumn( + hxltag="#population+infants+age0_4", + filter_func=lambda df: df.query("CD_AGE >= 0 and CD_AGE < 5"), + output_column_name="infants_0_4", + human_readable_name="Infants aged 0 to 4", + ), + DerivedColumn( + hxltag="#population+children+age0_17", + filter_func=lambda df: df.query("CD_AGE >= 0 and CD_AGE < 18"), + output_column_name="children_0_17", + human_readable_name="Children aged 0 to 17", + ), + DerivedColumn( + hxltag="#population+adults+f", + filter_func=lambda df: df.query("CD_AGE >= 18 and CD_SEX == 'F'"), + output_column_name="adults_f", + human_readable_name="Female adults", + ), + DerivedColumn( + hxltag="#population+adults+m", + filter_func=lambda df: df.query("CD_AGE >= 18 and CD_SEX == 'M'"), + output_column_name="adults_m", + human_readable_name="Male adults", + ), + DerivedColumn( + hxltag="#population+adults", + filter_func=lambda df: df.query("CD_AGE >= 18"), + output_column_name="adults", + human_readable_name="Adults", + ), + DerivedColumn( + hxltag="#population+ind", + filter_func=lambda df: df, + output_column_name="individuals", + human_readable_name="Total individuals", + ), + ], + ) +} @asset(key_prefix=asset_prefix) @@ -123,24 +137,24 @@ def needed_datasets(context) -> pd.DataFrame: return needed_df -def census_table_metadata(catalog_row: dict) -> MetricMetadata: +def make_census_table_metadata( + catalog_row: dict, source_data_release: SourceDataRelease +) -> MetricMetadata: return MetricMetadata( human_readable_name=catalog_row["human_readable_name"], source_download_url=catalog_row["source_download_url"], source_archive_file_path=catalog_row["source_archive_file_path"], source_documentation_url=catalog_row["source_documentation_url"], - source_data_release_id=source.id, - # TODO - this is a placeholder - parent_metric_id="unknown_at_this_stage", + source_data_release_id=source_data_release.id, + parent_metric_id=None, potential_denominator_ids=None, parquet_margin_of_error_file=None, parquet_margin_of_error_column=None, parquet_column_name=catalog_row["source_column"], - # TODO - this is a placeholder - metric_parquet_file_url="unknown_at_this_stage", + metric_parquet_path="__PLACEHOLDER__", hxl_tag=catalog_row["hxltag"], description=catalog_row["description"], - source_metric_id=catalog_row["hxltag"], + source_metric_id=catalog_row["source_column"], ) @@ -159,9 +173,7 @@ def filter_needed_catalog( needed_df = needed_datasets.merge(catalog_as_dataframe, how="inner", on="node") - # Now add some metadata to the context context.add_output_metadata( - # Metadata can be any key-value pair metadata={ "num_records": len(needed_df), "columns": MetadataValue.md( @@ -174,142 +186,163 @@ def filter_needed_catalog( return needed_df -@multi_asset( +@asset( ins={ "individual_census_table": AssetIn( key_prefix=asset_prefix, partition_mapping=needed_dataset_mapping ), - # "individual_census_table": AssetIn(key_prefix=asset_prefix), "filter_needed_catalog": AssetIn(key_prefix=asset_prefix), - }, - outs={ - "source_table": AssetOut(key_prefix=asset_prefix), - "source_mmd": AssetOut(key_prefix=asset_prefix), + "source_data_releases": AssetIn(key_prefix=asset_prefix), }, partitions_def=dataset_node_partition, + key_prefix=asset_prefix, ) -def get_enriched_tables( - context, individual_census_table, filter_needed_catalog -) -> tuple[pd.DataFrame, MetricMetadata]: - ic(context) - partition_keys = context.asset_partition_keys_for_input( +def source_metrics_by_partition( + context, + individual_census_table: dict[str, pd.DataFrame], + filter_needed_catalog: pd.DataFrame, + # TODO: generalise to list or dict of SourceDataReleases as there may be + # tables in here that are not at the same release level + # E.g. keys as Geography level ID + source_data_releases: dict[str, SourceDataRelease], + # TODO: return an intermediate type instead of MetricMetadata +) -> tuple[MetricMetadata, pd.DataFrame]: + input_partition_keys = context.asset_partition_keys_for_input( input_name="individual_census_table" ) - output_partition = context.asset_partition_key_for_output("source_table") - ic(partition_keys) - ic(len(partition_keys)) - ic(output_partition) - ic(type(output_partition)) + output_partition_key = context.partition_key - if output_partition not in partition_keys: - err_msg = f"Requested partition {output_partition} not found in the subset of 'needed' partitions {partition_keys}" - raise ValueError(err_msg) + if output_partition_key not in input_partition_keys: + skip_reason = f"Skipping as requested partition {output_partition_key} is not part of the 'needed' partitions {input_partition_keys}" + context.log.warning(skip_reason) + raise RuntimeError(skip_reason) - if output_partition not in individual_census_table: + try: + result_df = individual_census_table[output_partition_key] + except KeyError: err_msg = ( - f"Partition key {output_partition} not found in individual_census_table\n" + f"Partition key {output_partition_key} not found in individual_census_table\n" f"Available keys are {individual_census_table.keys()}" ) - raise ValueError(err_msg) + raise ValueError(err_msg) from None - result_df = individual_census_table[output_partition] catalog_row = filter_needed_catalog[ - filter_needed_catalog["node"].eq(output_partition) - ] - ic(catalog_row) - ic(type(catalog_row)) - catalog_row = catalog_row.to_dict(orient="index") - ic(catalog_row) - ic(type(catalog_row)) - catalog_row = catalog_row.popitem()[1] - ic(catalog_row) - ic(type(catalog_row)) + filter_needed_catalog["node"] == output_partition_key + ].to_dict(orient="records")[0] - result_mmd = census_table_metadata(catalog_row) + # TODO: refine upon more general level handling with derived column config. + # This config is currently called `DERIVED_COLUMN_SPECIFICATIONS` here and the + # level can also be included there. + key = "municipality" + result_mmd = make_census_table_metadata(catalog_row, source_data_releases[key]) - # pivot_data(context, result_df, catalog_row) + return result_mmd, result_df - return result_df, result_mmd - -@multi_asset( +@asset( partitions_def=dataset_node_partition, ins={ - "source_table": AssetIn( - key_prefix=asset_prefix, partition_mapping=needed_dataset_mapping + "source_metrics_by_partition": AssetIn( + key_prefix=asset_prefix, partition_mapping=IdentityPartitionMapping() ), - "source_mmd": AssetIn( - key_prefix=asset_prefix, partition_mapping=needed_dataset_mapping - ), - }, - outs={ - "derived_table": AssetOut(key_prefix=asset_prefix), - "derived_mmds": AssetOut(key_prefix=asset_prefix), }, + key_prefix=asset_prefix, ) -def pivot_data( +def derived_metrics_by_partition( context, - source_table: dict[str, pd.DataFrame], - source_mmd: dict[str, MetricMetadata], -) -> tuple[pd.DataFrame, list[MetricMetadata]]: - node = context.asset_partition_key_for_output("derived_table") - - census_table = source_table[node] - parent_mmd = source_mmd[node] - - ic(census_table.columns) - ic(parent_mmd.parquet_column_name) - assert parent_mmd.parquet_column_name in census_table.columns - assert len(census_table) > 0 - - ic(census_table.head()) - ic(parent_mmd) - - source_column = parent_mmd.parquet_column_name - metrics = derived_columns[derived_columns["node"].eq(node)] - - # TODO, check whether it is necessary to forcibly remove columns that are not - # meaningful for the aggregation. - - new_table: pd.DataFrame = pd.DataFrame() - - new_mmds: list[MetricMetadata] = [] - - for row_tuple in metrics.itertuples(): - ic(row_tuple) - _, _, col_name, group_by_column, filter = row_tuple - new_col_def = {col_name: pd.NamedAgg(column=source_column, aggfunc="sum")} - subset = census_table.loc[filter] - ic(subset.head()) - ic(len(subset)) - temp_table: pd.DataFrame = subset.groupby( - by=group_by_column, as_index=True - ).agg( - func=None, - **new_col_def, # type: ignore TODO, don't know why pyright is complaining here + source_metrics_by_partition: tuple[MetricMetadata, pd.DataFrame], +) -> tuple[list[MetricMetadata], pd.DataFrame]: + node = context.partition_key + + source_mmd, source_table = source_metrics_by_partition + source_column = source_mmd.parquet_column_name + assert source_column in source_table.columns + assert len(source_table) > 0 + + try: + geo_id_col_name, metric_specs = DERIVED_COLUMN_SPECIFICATIONS[node] + except KeyError: + skip_reason = ( + f"Skipping as no derived columns are to be created for node {node}" ) + context.log.warning(skip_reason) + raise RuntimeError(skip_reason) from None + + # Rename the geoID column to GEO_ID + source_table = source_table.rename(columns={geo_id_col_name: "GEO_ID"}) - new_mmd = parent_mmd.copy() - new_mmd.parent_metric_id = parent_mmd.source_metric_id - new_mmd.hxl_tag = col_name - new_mmds.append(new_mmd) + derived_metrics: list[pd.DataFrame] = [] + derived_mmd: list[MetricMetadata] = [] - if len(new_table) == 0: - new_table = temp_table - else: - new_table = new_table.merge( - temp_table, left_index=True, right_index=True, how="inner" - ) + parquet_file_name = "".join(c for c in node if c.isalnum()) + ".parquet" + + for metric_spec in metric_specs: + new_table = ( + source_table.pipe(metric_spec.filter_func) + .groupby(by="GEO_ID", as_index=True) + .sum() + .rename(columns={source_column: metric_spec.output_column_name}) + .filter(items=["GEO_ID", metric_spec.output_column_name]) + ) + derived_metrics.append(new_table) + + new_mmd = source_mmd.copy() + new_mmd.parent_metric_id = source_mmd.source_metric_id + new_mmd.metric_parquet_path = parquet_file_name + new_mmd.hxl_tag = metric_spec.hxltag + new_mmd.parquet_column_name = metric_spec.output_column_name + new_mmd.human_readable_name = metric_spec.human_readable_name + derived_mmd.append(new_mmd) + + joined_metrics = reduce( + lambda left, right: left.merge( + right, on="GEO_ID", how="inner", validate="one_to_one" + ), + derived_metrics, + ) context.add_output_metadata( - output_name="derived_table", metadata={ - "num_records": len(new_table), # Metadata can be any key-value pair - "columns": MetadataValue.md( - "\n".join([f"- '`{col}`'" for col in new_table.columns.to_list()]) + "metadata_preview": MetadataValue.md( + metadata_to_dataframe(derived_mmd).head().to_markdown() + ), + "metrics_shape": f"{joined_metrics.shape[0]} rows x {joined_metrics.shape[1]} columns", + "metrics_preview": MetadataValue.md(joined_metrics.head().to_markdown()), + }, + ) + + return derived_mmd, joined_metrics + + +@asset( + ins={ + "derived_metrics_by_partition": AssetIn( + key_prefix=asset_prefix, + partition_mapping=SpecificPartitionsPartitionMapping( + ["https://statbel.fgov.be/node/4689"] ), - "preview": MetadataValue.md(new_table.head().to_markdown()), + ), + }, + key_prefix=asset_prefix, +) +def metrics( + context, derived_metrics_by_partition: tuple[list[MetricMetadata], pd.DataFrame] +) -> list[tuple[str, list[MetricMetadata], pd.DataFrame]]: + """ + This asset exists solely to aggregate all the derived tables into one + single unpartitioned asset, which the downstream publishing tasks can use. + + Right now it is a bit boring because it only relies on one partition, but + it could be extended when we have more data products. + """ + mmds, table = derived_metrics_by_partition + filepath = mmds[0].metric_parquet_path + + context.add_output_metadata( + metadata={ + "num_metrics": len(mmds), + "num_parquets": 1, }, ) - return new_table, new_mmds + return [(filepath, mmds, table)] diff --git a/python/popgetter/assets/be/census_geometry.py b/python/popgetter/assets/be/census_geometry.py index 8dc52c3..f58a12a 100644 --- a/python/popgetter/assets/be/census_geometry.py +++ b/python/popgetter/assets/be/census_geometry.py @@ -1,57 +1,189 @@ from __future__ import annotations +from dataclasses import dataclass +from datetime import date + import geopandas as gpd import matplotlib.pyplot as plt +import pandas as pd from dagster import ( + AssetIn, MetadataValue, + SpecificPartitionsPartitionMapping, + asset, ) from icecream import ic +from popgetter.metadata import ( + GeometryMetadata, + SourceDataRelease, +) from popgetter.utils import markdown_from_plot -# TODO: Need to re-implement aggregate_sectors_to_municipalities to work with the sectors coming from the partitioned asset. +from .belgium import asset_prefix +from .census_tables import publisher -# @asset( -# key_prefix=asset_prefix, -# ins={ -# "sector_geometries": AssetIn(key_prefix=asset_prefix), -# }, -# ) -def aggregate_sectors_to_municipalities(context, sector_geometries) -> gpd.GeoDataFrame: - """ - Aggregates a GeoDataFrame of the Statistical Sectors to Municipalities. +@dataclass +class BelgiumGeometryLevel: + level: str + hxl_tag: str + geo_id_column: str + name_columns: dict[str, str] # keys = language codes, values = column names + + +BELGIUM_GEOMETRY_LEVELS = { + "province": BelgiumGeometryLevel( + level="province", + hxl_tag="adm1", + geo_id_column="cd_prov_refnis", + name_columns={ + "nld": "tx_prov_descr_nl", + "fra": "tx_prov_descr_fr", + "deu": "tx_prov_descr_de", + }, + ), + "region": BelgiumGeometryLevel( + level="region", + hxl_tag="adm2", + geo_id_column="cd_rgn_refnis", + name_columns={ + "nld": "tx_rgn_descr_nl", + "fra": "tx_rgn_descr_fr", + "deu": "tx_rgn_descr_de", + }, + ), + "arrondisement": BelgiumGeometryLevel( + level="arrondisement", + hxl_tag="adm3", + geo_id_column="cd_dstr_refnis", + name_columns={ + "nld": "tx_adm_dstr_descr_nl", + "fra": "tx_adm_dstr_descr_fr", + "deu": "tx_adm_dstr_descr_de", + }, + ), + "municipality": BelgiumGeometryLevel( + level="municipality", + hxl_tag="adm4", + geo_id_column="cd_munty_refnis", + name_columns={ + "nld": "tx_munty_descr_nl", + "fra": "tx_munty_descr_fr", + "deu": "tx_munty_descr_de", + }, + ), + "statistical_sector": BelgiumGeometryLevel( + level="statistical_sector", + hxl_tag="adm5", + geo_id_column="cd_sector", + name_columns={ + "nld": "tx_sector_descr_nl", + "fra": "tx_sector_descr_fr", + "deu": "tx_sector_descr_de", + }, + ), +} - The `sectors_gdf` is assumed to be produced by `get_geometries()`. - Also saves the result to a GeoPackage file in the output_dir - returns a GeoDataFrame of the Municipalities. +@asset( + ins={ + "sector_geometries": AssetIn( + key=[asset_prefix, "individual_census_table"], + partition_mapping=SpecificPartitionsPartitionMapping( + ["https://statbel.fgov.be/node/4726"] + ), + ), + }, + key_prefix=asset_prefix, +) +def geometry( + context, sector_geometries +) -> list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]]: """ - # output_dir = WORKING_DIR / "statistical_sectors" - munty_gdf = sector_geometries.dissolve(by="cd_munty_refnis") - # munty_gdf.to_file(output_dir / "municipalities.gpkg", driver="GPKG") - munty_gdf.index = munty_gdf.index.astype(str) - ic(munty_gdf.head()) - ic(len(munty_gdf)) - - # Plot and convert the image to Markdown to preview it within Dagster - # Yes we do pass the `plt` object to the markdown_from_plot function and not the `ax` object - # ax = munty_gdf.plot(color="green") - ax = munty_gdf.plot(column="tx_sector_descr_nl", legend=False) - ax.set_title("Municipalities in Belgium") - md_plot = markdown_from_plot(plt) + Produces the full set of data / metadata associated with Belgian + municipalities. The outputs, in order, are: + + 1. A DataFrame containing a serialised GeometryMetadata object. + 2. A GeoDataFrame containing the geometries of the municipalities. + 3. A DataFrame containing the names of the municipalities (in this case, + they are in Dutch, French, and German). + """ + geometries_to_return = [] + for level_details in BELGIUM_GEOMETRY_LEVELS.values(): + geometry_metadata = GeometryMetadata( + validity_period_start=date(2023, 1, 1), + validity_period_end=date(2023, 12, 31), + level=level_details.level, + hxl_tag=level_details.hxl_tag, + ) + + region_geometries = ( + sector_geometries.dissolve(by=level_details.geo_id_column) + .reset_index() + .rename(columns={level_details.geo_id_column: "GEO_ID"}) + .loc[:, ["geometry", "GEO_ID"]] + ) + ic(region_geometries.head()) + + region_names = ( + sector_geometries.rename( + columns={ + level_details.geo_id_column: "GEO_ID", + level_details.name_columns["nld"]: "nld", + level_details.name_columns["fra"]: "fra", + level_details.name_columns["deu"]: "deu", + } + ) + .loc[:, ["GEO_ID", "nld", "fra", "deu"]] + .drop_duplicates() + ) + ic(region_names.head()) + + geometries_to_return.append( + (geometry_metadata, region_geometries, region_names) + ) + + # Add output metadata + first_metadata, first_gdf, first_names = geometries_to_return[0] + first_joined_gdf = first_gdf.merge(first_names, on="GEO_ID") + ax = first_joined_gdf.plot(column="nld", legend=False) + ax.set_title(f"Belgium 2023 {first_metadata.level}") + md_plot = markdown_from_plot(plt) context.add_output_metadata( metadata={ - "num_records": len(munty_gdf), # Metadata can be any key-value pair - "columns": MetadataValue.md( - "\n".join([f"- '`{col}`'" for col in munty_gdf.columns.to_list()]) + "all_geom_levels": MetadataValue.md( + ",".join([metadata.level for metadata, _, _ in geometries_to_return]) ), - "preview": MetadataValue.md( - munty_gdf.loc[:, munty_gdf.columns != "geometry"].head().to_markdown() - ), - "plot": MetadataValue.md(md_plot), + "first_geometry_plot": MetadataValue.md(md_plot), + "first_names_preview": MetadataValue.md(first_names.head().to_markdown()), } ) - return munty_gdf + return geometries_to_return + + +@asset(key_prefix=asset_prefix) +def source_data_releases( + geometry: list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]] +) -> dict[str, SourceDataRelease]: + """ + Returns all SourceDataReleases for each geometry level. + """ + return { + geo_metadata.level: SourceDataRelease( + name="StatBel Open Data", + date_published=date(2015, 10, 22), + reference_period_start=date(2015, 10, 22), + reference_period_end=date(2015, 10, 22), + collection_period_start=date(2015, 10, 22), + collection_period_end=date(2015, 10, 22), + expect_next_update=date(2022, 1, 1), + url="https://statbel.fgov.be/en/open-data", + description="TBC", + data_publisher_id=publisher.id, + geometry_metadata_id=geo_metadata.id, + ) + for geo_metadata, _, _ in geometry + } diff --git a/python/popgetter/assets/be/census_tables.py b/python/popgetter/assets/be/census_tables.py index 1ae7202..fabd0f2 100644 --- a/python/popgetter/assets/be/census_tables.py +++ b/python/popgetter/assets/be/census_tables.py @@ -3,7 +3,6 @@ import csv import sqlite3 import zipfile -from datetime import date from pathlib import Path, PurePath from tempfile import TemporaryDirectory from urllib.parse import urlparse @@ -22,8 +21,8 @@ from rdflib.namespace import DCAT, DCTERMS, SKOS from popgetter.metadata import ( + CountryMetadata, DataPublisher, - SourceDataRelease, ) from popgetter.utils import extract_main_file_from_zip, markdown_from_plot @@ -38,28 +37,21 @@ opendata_catalog_root = URIRef("http://data.gov.be/catalog/statbelopen") -source: SourceDataRelease = SourceDataRelease( - name="StatBel Open Data", - date_published=date(2015, 10, 22), - reference_period_start=date(2015, 10, 22), - reference_period_end=date(2015, 10, 22), - collection_period_start=date(2015, 10, 22), - collection_period_end=date(2015, 10, 22), - expect_next_update=date(2022, 1, 1), - url="https://statbel.fgov.be/en/open-data", - data_publisher_id=publisher.id, - description="TBC", - geography_file="TBC", - geography_level="Municipality", -) - dataset_node_partition = DynamicPartitionsDefinition(name="dataset_nodes") @asset(key_prefix=asset_prefix) -def get_publisher_metadata(): +def country_metadata() -> CountryMetadata: + """ + Returns the CountryMetadata for this country. + """ + return country + + +@asset(key_prefix=asset_prefix) +def data_publisher() -> DataPublisher: """ - Returns a DataPublisher of metadata about the publisher. + Returns the DataPublisher for this country. """ return publisher @@ -102,7 +94,7 @@ def catalog_as_dataframe(context, opendata_dataset_list: Graph) -> pd.DataFrame: "node": [], "human_readable_name": [], "description": [], - "metric_parquet_file_url": [], + "metric_parquet_path": [], "parquet_column_name": [], "parquet_margin_of_error_column": [], "parquet_margin_of_error_file": [], @@ -132,14 +124,13 @@ def catalog_as_dataframe(context, opendata_dataset_list: Graph) -> pd.DataFrame: ) ) - catalog_summary["metric_parquet_file_url"].append(None) + # This is unknown at this stage + catalog_summary["metric_parquet_path"].append(None) catalog_summary["parquet_margin_of_error_column"].append(None) catalog_summary["parquet_margin_of_error_file"].append(None) catalog_summary["potential_denominator_ids"].append(None) catalog_summary["parent_metric_id"].append(None) - catalog_summary["source_data_release_id"].append(source.id) - - # This is unknown at this stage + catalog_summary["source_data_release_id"].append(None) catalog_summary["parquet_column_name"].append(None) download_url, archive_file_path, format = get_distribution_url( diff --git a/python/popgetter/azure_test.py b/python/popgetter/azure_test.py new file mode 100644 index 0000000..41380d0 --- /dev/null +++ b/python/popgetter/azure_test.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +import pandas as pd +from dagster import asset + + +@asset(io_manager_key="azure_general_io_manager") +def test_azure(): + return pd.DataFrame({"col1": [1, 2], "col2": [3, 4]}).to_parquet(None) + + +@asset(io_manager_key="azure_general_io_manager") +def test_azure_large(): + return b"0" * (450 * 1024 * 1024 + 100) diff --git a/python/popgetter/cloud_outputs/__init__.py b/python/popgetter/cloud_outputs/__init__.py new file mode 100644 index 0000000..1256933 --- /dev/null +++ b/python/popgetter/cloud_outputs/__init__.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from .sensor_class import CloudAssetSensor + +metadata_factory = CloudAssetSensor( + asset_names_to_monitor=[ + "be/country_metadata", + "be/data_publisher", + "be/source_data_releases", + ], + io_manager_key="metadata_io_manager", + prefix="metadata", + interval=20, +) + +metadata_sensor = metadata_factory.create_sensor() +metadata_asset = metadata_factory.create_publishing_asset() + +geometry_factory = CloudAssetSensor( + asset_names_to_monitor=[ + "be/geometry", + ], + io_manager_key="geometry_io_manager", + prefix="geometry", + interval=60, +) + +geometry_sensor = geometry_factory.create_sensor() +geometry_asset = geometry_factory.create_publishing_asset() + +metrics_factory = CloudAssetSensor( + asset_names_to_monitor=[ + "be/metrics", + ], + io_manager_key="metrics_io_manager", + prefix="metrics", + interval=60, +) + +metrics_sensor = metrics_factory.create_sensor() +metrics_asset = metrics_factory.create_publishing_asset() diff --git a/python/popgetter/cloud_outputs/sensor_class.py b/python/popgetter/cloud_outputs/sensor_class.py new file mode 100644 index 0000000..d0b75fe --- /dev/null +++ b/python/popgetter/cloud_outputs/sensor_class.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from functools import reduce + +from dagster import ( + AssetSelection, + DefaultSensorStatus, + Output, + RunRequest, + StaticPartitionsDefinition, + asset, + multi_asset_sensor, +) + + +class CloudAssetSensor: + """ + Class which scaffolds a cloud sensor. This class defines a publishing + asset, which uses a custom IO manager to publish data either to Azure or + local storage. + + The publishing asset in turn monitors a list of country-specific assets, + which are responsible for generating the data that are to be published. + + Arguments + --------- + + `asset_names_to_monitor`: list[str] + A list of asset names in the country pipelines that we want to monitor. + Each asset name should be a single string and should be prefixed with + the asset group name and a forward slash (e.g. 'be/geometry'). + + `io_manager_key`: str + The key of the IO manager used for publishing. See the 'resources' and + 'resources_by_env' dicts in python/popgetter/__init__.py. + + `prefix`: str + A string used to disambiguate the different assets / sensors that are + being generated here, as Dagster does not allow duplicated names. + + `interval`: int + The minimum interval at which the sensor should run, in seconds. + """ + + def __init__( + self, + asset_names_to_monitor: list[str], + io_manager_key: str, + prefix: str, + interval: int, + ): + self.asset_names_to_monitor = asset_names_to_monitor + self.io_manager_key = io_manager_key + self.publishing_asset_name = f"publish_{prefix}" + self.sensor_name = f"sensor_{prefix}" + self.interval = interval + + self.partition_definition = StaticPartitionsDefinition( + self.asset_names_to_monitor + ) + self.assets_to_monitor = reduce( + lambda x, y: x | y, + [AssetSelection.keys(k) for k in self.asset_names_to_monitor], + ) + + def create_publishing_asset(self): + @asset( + name=self.publishing_asset_name, + partitions_def=self.partition_definition, + io_manager_key=self.io_manager_key, + ) + def publish(context): + from popgetter import defs as popgetter_defs + + # load_asset_value expects a list of strings + output = popgetter_defs.load_asset_value(context.partition_key.split("/")) + return Output(output) + + return publish + + def create_sensor(self): + @multi_asset_sensor( + monitored_assets=self.assets_to_monitor, + request_assets=AssetSelection.keys(self.publishing_asset_name), + name=self.sensor_name, + minimum_interval_seconds=self.interval, + default_status=DefaultSensorStatus.RUNNING, + ) + def inner_sensor(context): + asset_events = context.latest_materialization_records_by_key() + for asset_key, execution_value in asset_events.items(): + # Assets which were materialised since the last time the sensor + # was run will have non-None execution_values (it will be an + # dagster.EventLogRecord class, which contains more information + # if needed). + if execution_value is not None: + yield RunRequest( + run_key=None, + partition_key="/".join(asset_key.path), + ) + context.advance_cursor({asset_key: execution_value}) + + return inner_sensor diff --git a/python/popgetter/cloud_outputs.py b/python/popgetter/cloud_outputs_old.py similarity index 70% rename from python/popgetter/cloud_outputs.py rename to python/popgetter/cloud_outputs_old.py index e923c0b..1d885e7 100644 --- a/python/popgetter/cloud_outputs.py +++ b/python/popgetter/cloud_outputs_old.py @@ -1,16 +1,16 @@ from __future__ import annotations +import tempfile +import uuid from pathlib import Path import geopandas as gpd from dagster import ( AssetKey, - AssetOut, AssetSelection, Config, DefaultSensorStatus, DynamicPartitionsDefinition, - EnvVar, Noneable, Output, RunConfig, @@ -19,28 +19,11 @@ asset, define_asset_job, load_assets_from_current_module, - local_file_manager, - multi_asset, multi_asset_sensor, ) -from dagster_azure.adls2 import adls2_file_manager from icecream import ic from slugify import slugify -resources = { - "DEV": {"publishing_file_manager": local_file_manager}, - "PRODUCTION": { - "publishing_file_manager": adls2_file_manager - # See https://docs.dagster.io/_apidocs/libraries/dagster-azure#dagster_azure.adls2.adls2_file_manager - # storage_account="tbc", # The storage account name. - # credential={}, # The credential used to authenticate the connection. - # adls2_file_system="tbc", - # adls2_prefix="tbc", - }, -} - -current_resource = resources[EnvVar("DEV")] - cloud_assets = load_assets_from_current_module(group_name="cloud_assets") @@ -80,7 +63,10 @@ def __init__(self, **kwargs): # Belgium Geography + tables | ( AssetSelection.groups("be") - & AssetSelection.keys("be/source_table").downstream(include_self=False) + # Note: include_self=True as the geodataframe is currently not output for + # derived tables but only for source table: + # be-source-table-https-statbel-fgov-be-node-4726 + & AssetSelection.keys("be/source_table").downstream(include_self=True) ) ) @@ -183,8 +169,8 @@ def country_outputs_sensor(context): @asset( - # TODO: This feels like a code smell. (mixing my metaphors) - # It feels that this structure is duplicated and it ought + # todo: this feels like a code smell. (mixing my metaphors) + # it feels that this structure is duplicated and it ought # to be possible to have some reusable structure. config_schema={ "asset_to_load": str, @@ -226,84 +212,34 @@ def upstream_df(context): raise ValueError(err_msg) -@multi_asset( - outs={ - "parquet_path": AssetOut(is_required=False), - "flatgeobuff_path": AssetOut(is_required=False), - "geojson_seq_path": AssetOut(is_required=False), - }, - can_subset=True, - required_resource_keys={"staging_res"}, - partitions_def=publishing_partition, -) -def cartography_in_cloud_formats(context, upstream_df): - """ " - Returns dict of parquet, FlatGeobuf and GeoJSONSeq paths - """ - staging_res = context.resources.staging_res - # ic(staging_res) - # ic(staging_res.staging_dir) - staging_dir_str = staging_res.staging_dir - - staging_dir = Path(staging_dir_str) - - # Extract the selected keys from the context - selected_keys = [ - key.to_user_string() for key in context.op_execution_context.selected_asset_keys - ] - - def _parquet_helper(output_path): - upstream_df.to_parquet(output_path) - - def _flatgeobuff_helper(output_path): - if output_path.exists(): - if output_path.is_dir(): - # Assuming that the directory is only one level deep - for file in output_path.iterdir(): - file.unlink() - output_path.rmdir() - else: - output_path.unlink() - upstream_df.to_file(output_path, driver="FlatGeobuf") - - def _geojson_seq_helper(output_path): - upstream_df.to_file(output_path, driver="GeoJSONSeq") - - # helper functions - format_helpers = { - "parquet_path": ("parquet", _parquet_helper), - "flatgeobuff_path": ("flatgeobuff", _flatgeobuff_helper), - "geojson_seq_path": ("geojsonseq", _geojson_seq_helper), - } - - for output_type in selected_keys: - # output_type = output_asset_key.to_user_string() - context.log.debug(ic(f"yielding {output_type}")) - extension, helper_function = format_helpers[output_type] - output_file_base = context.partition_key - output_path = staging_dir / f"{output_file_base}.{extension}" - output_path.parent.mkdir(exist_ok=True) - output_path.touch(exist_ok=True) - helper_function(output_path) - - yield Output( - value=output_path, - output_name=output_type, - metadata={ - "file_type": output_type, - "file_path": output_path, - }, - ) +def df_to_bytes(df: gpd.GeoDataFrame, output_type: str) -> bytes: + tmp = tempfile.NamedTemporaryFile(prefix=str(uuid.uuid4())) + if output_type.lower() == "parquet": + df.to_parquet(tmp.name + ".parquet") + elif output_type.lower() == "flatgeobuf": + df.to_file(tmp.name + ".flatgeobuf", driver="FlatGeobuf") + elif output_type.lower() == "geojsonseq": + df.to_file(tmp.name + ".geojsonseq", driver="GeoJSONSeq") + else: + value_error: str = f"'{output_type}' is not currently supported." + raise ValueError(value_error) + with Path(tmp.name).open(mode="rb") as f: + return f.read() + + +@asset(io_manager_key="publishing_io_manager", partitions_def=publishing_partition) +def parquet(context, upstream_df): # noqa: ARG001 + return df_to_bytes(upstream_df, "parquet") - ic("end of cartography_in_cloud_formats") +@asset(io_manager_key="publishing_io_manager", partitions_def=publishing_partition) +def flatgeobuf(context, upstream_df): # noqa: ARG001 + return df_to_bytes(upstream_df, "flatgeobuf") -# def upload_cartography_to_cloud(context, cartography_in_cloud_formats): -# """ -# Uploads the cartography files to the cloud. -# """ -# log_msg = f"Uploading cartography to the cloud - {cartography_in_cloud_formats}" -# context.log.info(log_msg) + +@asset(io_manager_key="publishing_io_manager", partitions_def=publishing_partition) +def geojsonseq(context, upstream_df): # noqa: ARG001 + return df_to_bytes(upstream_df, "geojsonseq") # Not working yet - need to figure out questions about how we run docker @@ -312,7 +248,6 @@ def _geojson_seq_helper(output_path): # def generate_pmtiles(context, geojson_seq_path): # client = docker.from_env() # mount_folder = Path(geojson_seq_path).parent.resolve() - # container = client.containers.run( # "stuartlynn/tippecanoe:latest", # "tippecanoe -o tracts.pmtiles tracts.geojsonseq", @@ -320,7 +255,6 @@ def _geojson_seq_helper(output_path): # detach=True, # remove=True, # ) - # output = container.attach(stdout=True, stream=True, logs=True) # for line in output: # context.log.info(line) diff --git a/python/popgetter/io_managers/__init__.py b/python/popgetter/io_managers/__init__.py new file mode 100644 index 0000000..985f30b --- /dev/null +++ b/python/popgetter/io_managers/__init__.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +from dataclasses import dataclass + +import geopandas as gpd +import pandas as pd +from dagster import InputContext, IOManager, MetadataValue, OutputContext +from upath import UPath + +from popgetter.metadata import ( + CountryMetadata, + DataPublisher, + GeometryMetadata, + MetricMetadata, + SourceDataRelease, + metadata_to_dataframe, +) + + +class IOManagerError(Exception): + pass + + +class PopgetterIOManager(IOManager): + def get_base_path(self) -> UPath: + raise NotImplementedError + + def handle_df( + self, context: OutputContext, df: pd.DataFrame, full_path: UPath + ) -> None: + raise NotImplementedError + + def load_input(self, _context: InputContext) -> pd.DataFrame: + err_msg = "This IOManager is only for writing outputs" + raise RuntimeError(err_msg) + + +class MetadataIOManager(PopgetterIOManager): + def get_output_filename( + self, obj_entry: CountryMetadata | DataPublisher | SourceDataRelease + ) -> str: + if isinstance(obj_entry, CountryMetadata): + return "country_metadata.parquet" + if isinstance(obj_entry, DataPublisher): + return "data_publishers.parquet" + if isinstance(obj_entry, SourceDataRelease): + return "source_data_releases.parquet" + + err_msg = "This IO manager only accepts CountryMetadata, DataPublisher, and SourceDataRelease" + raise ValueError(err_msg) + + def get_full_path( + self, + context: OutputContext, + obj_entry: CountryMetadata | DataPublisher | SourceDataRelease, + ) -> UPath: + path_prefixes = list(context.partition_key.split("/"))[:-1] + filename = self.get_output_filename(obj_entry) + return self.get_base_path() / UPath("/".join([*path_prefixes, filename])) + + def handle_output( + self, + context: OutputContext, + obj: ( + CountryMetadata + | DataPublisher + | SourceDataRelease + | list[CountryMetadata] + | list[DataPublisher] + | list[SourceDataRelease] + | dict[str, CountryMetadata] + | dict[str, DataPublisher] + | dict[str, SourceDataRelease] + ), + ): + # Handling multiple obj types at runtime to simplify the DAG + + # TODO: add handling of: incorrect type of obj passed, empty dictionary and + # any other potential errors + if isinstance(obj, dict): + vals = list(obj.values()) + elif isinstance(obj, list): + vals = obj + else: + vals = [obj] + val = vals[0] + full_path = self.get_full_path(context, val) + context.add_output_metadata(metadata={"parquet_path": str(full_path)}) + self.handle_df(context, metadata_to_dataframe(vals), full_path) + + +class GeoIOManager(PopgetterIOManager): + def handle_flatgeobuf( + self, context: OutputContext, geo_df: gpd.GeoDataFrame, full_path: UPath + ) -> None: + raise NotImplementedError + + def handle_geojsonseq( + self, context: OutputContext, geo_df: gpd.GeoDataFrame, full_path: UPath + ) -> None: + raise NotImplementedError + + def handle_pmtiles( + self, _context: OutputContext, _geo_df: gpd.GeoDataFrame, _full_path: UPath + ) -> None: + err_msg = "Pmtiles not currently implemented. You shouldn't be calling this." + raise RuntimeError(err_msg) + + @dataclass + class GeometryOutputPaths: + flatgeobuf: UPath + pmtiles: UPath + geojsonseq: UPath + names: UPath + + def get_full_paths_geoms( + self, + context: OutputContext, + geo_metadata: GeometryMetadata, + ) -> GeometryOutputPaths: + filename_stem = geo_metadata.filename_stem + asset_prefix = list(context.partition_key.split("/"))[:-1] # e.g. ['be'] + base_path = self.get_base_path() + return self.GeometryOutputPaths( + flatgeobuf=base_path + / UPath("/".join([*asset_prefix, "geometries", f"{filename_stem}.fgb"])), + pmtiles=base_path + / UPath( + "/".join([*asset_prefix, "geometries", f"TODO_{filename_stem}.pmtiles"]) + ), + geojsonseq=base_path + / UPath( + "/".join([*asset_prefix, "geometries", f"{filename_stem}.geojsonseq"]) + ), + names=base_path + / UPath( + "/".join([*asset_prefix, "geometries", f"{filename_stem}.parquet"]) + ), + ) + + def get_full_path_metadata( + self, + context: OutputContext, + ) -> UPath: + base_path = self.get_base_path() + asset_prefix = list(context.partition_key.split("/"))[:-1] + return base_path / UPath("/".join([*asset_prefix, "geometry_metadata.parquet"])) + + def handle_output( + self, + context: OutputContext, + obj: list[tuple[GeometryMetadata, gpd.GeoDataFrame, pd.DataFrame]], + ) -> None: + output_metadata = { + "flatgeobuf_paths": [], + "pmtiles_paths": [], + "geojsonseq_paths": [], + "names_paths": [], + } + + for geo_metadata, gdf, names_df in obj: + full_paths = self.get_full_paths_geoms(context, geo_metadata) + + self.handle_flatgeobuf(context, gdf, full_paths.flatgeobuf) + self.handle_geojsonseq(context, gdf, full_paths.geojsonseq) + # TODO self.handle_pmtiles(context, gdf, full_paths.pmtiles) + self.handle_df(context, names_df, full_paths.names) + + output_metadata["flatgeobuf_paths"].append(str(full_paths.flatgeobuf)) + output_metadata["pmtiles_paths"].append(str(full_paths.pmtiles)) + output_metadata["geojsonseq_paths"].append(str(full_paths.geojsonseq)) + output_metadata["names_paths"].append(str(full_paths.names)) + + metadata_df_filepath = self.get_full_path_metadata(context) + metadata_df = metadata_to_dataframe([md for md, _, _ in obj]) + self.handle_df(context, metadata_df, metadata_df_filepath) + + context.add_output_metadata( + metadata={ + "metadata_path": str(metadata_df_filepath), + "metadata_preview": MetadataValue.md(metadata_df.head().to_markdown()), + **output_metadata, + } + ) + + +class MetricsIOManager(PopgetterIOManager): + def get_full_path_metadata( + self, + context: OutputContext, + ) -> UPath: + base_path = self.get_base_path() + asset_prefix = list(context.partition_key.split("/"))[:-1] + return base_path / UPath("/".join([*asset_prefix, "metric_metadata.parquet"])) + + def get_full_path_metrics( + self, + context: OutputContext, + parquet_path: str, + ) -> UPath: + base_path = self.get_base_path() + asset_prefix = list(context.partition_key.split("/"))[:-1] + return base_path / UPath("/".join([*asset_prefix, "metrics", parquet_path])) + + def handle_output( + self, + context: OutputContext, + obj: list[tuple[str, list[MetricMetadata], pd.DataFrame]], + ) -> None: + # Aggregate all the MetricMetadatas into a single dataframe, then + # serialise + all_metadatas_df = metadata_to_dataframe( + [md for _, per_file_metadatas, _ in obj for md in per_file_metadatas] + ) + metadata_df_filepath = self.get_full_path_metadata(context) + self.handle_df(context, all_metadatas_df, metadata_df_filepath) + + # Write dataframes to the parquet files specified in the first element + # of the tuple + for rel_path, _, df in obj: + full_path = self.get_full_path_metrics(context, rel_path) + self.handle_df(context, df, full_path) + + # Add metadata + context.add_output_metadata( + metadata={ + "metric_parquet_paths": [ + str(self.get_full_path_metrics(context, rel_path)) + for rel_path, _, _ in obj + ], + "num_metrics": len(all_metadatas_df), + "metric_human_readable_names": all_metadatas_df[ + "human_readable_name" + ].tolist(), + "metadata_parquet_path": str(metadata_df_filepath), + "metadata_preview": MetadataValue.md( + all_metadatas_df.head().to_markdown() + ), + } + ) diff --git a/python/popgetter/io_managers/azure.py b/python/popgetter/io_managers/azure.py new file mode 100644 index 0000000..fdc8155 --- /dev/null +++ b/python/popgetter/io_managers/azure.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +import os +import tempfile +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path + +import geopandas as gpd +import pandas as pd +from azure.core.credentials import ( + AzureSasCredential, +) +from azure.storage.filedatalake import ( + DataLakeLeaseClient, + DataLakeServiceClient, + FileSystemClient, +) +from dagster import ( + Any, + InputContext, + IOManager, + OutputContext, +) +from dagster_azure.adls2.utils import ResourceNotFoundError, create_adls2_client +from dagster_azure.blob.utils import create_blob_client +from icecream import ic +from upath import UPath + +from . import GeoIOManager, MetadataIOManager, MetricsIOManager + +# Set no time limit on lease duration to enable large files to be uploaded +_LEASE_DURATION = -1 + +# Set connection timeout to be larger than default: +# https://github.com/Azure/azure-sdk-for-python/issues/26993#issuecomment-1289799860 +_CONNECTION_TIMEOUT = 6000 + + +class AzureMixin: + storage_account: str | None = os.getenv("AZURE_STORAGE_ACCOUNT") + container: str | None = os.getenv("AZURE_CONTAINER") + prefix: str | None = os.getenv("AZURE_DIRECTORY") + sas_token: str | None = os.getenv("SAS_TOKEN") + adls2_client: DataLakeServiceClient + file_system_client: FileSystemClient + + def __init__(self): + if self.storage_account is None: + err_msg = "Storage account needs to be provided." + raise ValueError(err_msg) + if self.sas_token is None: + err_msg = "Credenital (SAS) needs to be provided." + raise ValueError(err_msg) + if self.container is None: + err_msg = "Container needs to be provided." + raise ValueError(err_msg) + + self.adls2_client = create_adls2_client( + self.storage_account, AzureSasCredential(self.sas_token) + ) + self.file_system_client = self.adls2_client.get_file_system_client( + self.container + ) + # Blob client needed to handle copying as ADLS doesn't have a copy API yet + self.blob_client = create_blob_client( + self.storage_account, AzureSasCredential(self.sas_token) + ) + self.blob_container_client = self.blob_client.get_container_client( + self.container + ) + + self.lease_duration = _LEASE_DURATION + self.file_system_client.get_file_system_properties() + + def get_base_path(self) -> UPath: + return UPath(self.prefix) + + @property + def lease_client_constructor(self) -> Any: + return DataLakeLeaseClient + + def _uri_for_path(self, path: UPath, protocol: str = "abfss://") -> str: + return "{protocol}{filesystem}@{account}.dfs.core.windows.net/{key}".format( + protocol=protocol, + filesystem=self.file_system_client.file_system_name, + account=self.file_system_client.account_name, + key=path.as_posix(), + ) + + def get_loading_input_log_message(self, path: UPath) -> str: + return f"Loading ADLS2 object from: {self._uri_for_path(path)}" + + def get_writing_output_log_message(self, path: UPath) -> str: + return f"Writing ADLS2 object at: {self._uri_for_path(path)}" + + def unlink(self, path: UPath) -> None: + file_client = self.file_system_client.get_file_client(path.as_posix()) + with self._acquire_lease(file_client, is_rm=True) as lease: + file_client.delete_file(lease=lease, recursive=True) + + def dump_to_path(self, context: OutputContext, obj: bytes, path: UPath) -> None: + if self.path_exists(path): + context.log.warning(f"Removing existing ADLS2 key: {path}") # noqa: G004 + self.unlink(path) + + file = self.file_system_client.create_file(path.as_posix()) + with self._acquire_lease(file) as lease: + # Note: chunk_size can also be specified, see API for Azure SDK for Python, DataLakeFileClient: + # https://learn.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.datalakefileclient + file.upload_data( + obj, + lease=lease, + overwrite=True, + connection_timeout=_CONNECTION_TIMEOUT, + ) + + def path_exists(self, path: UPath) -> bool: + try: + self.file_system_client.get_file_client( + path.as_posix() + ).get_file_properties() + except ResourceNotFoundError: + return False + return True + + @contextmanager + def _acquire_lease(self, client: Any, is_rm: bool = False) -> Iterator[str]: + lease_client = self.lease_client_constructor(client=client) + try: + lease_client.acquire(lease_duration=self.lease_duration) + yield lease_client.id + finally: + # cannot release a lease on a file that no longer exists, so need to check + if not is_rm: + lease_client.release() + + def handle_df(self, context: OutputContext, df: pd.DataFrame, path: UPath) -> None: + self.dump_to_path(context, df.to_parquet(None), path) + + +class AzureMetadataIOManager(AzureMixin, MetadataIOManager): + pass + + +class AzureGeoIOManager(AzureMixin, GeoIOManager): + def handle_flatgeobuf( + self, context: OutputContext, geo_df: gpd.GeoDataFrame, full_path: UPath + ) -> None: + tmp = tempfile.NamedTemporaryFile() + fname = tmp.name + ".fgb" + geo_df.to_file(fname, driver="FlatGeobuf") + with Path(fname).open(mode="rb") as f: + b: bytes = f.read() + context.log.debug(ic(f"Size: {len(b) / (1_024 * 1_024):.3f}MB")) + self.dump_to_path(context, b, full_path) + + def handle_geojsonseq( + self, context: OutputContext, geo_df: gpd.GeoDataFrame, full_path: UPath + ) -> None: + tmp = tempfile.NamedTemporaryFile() + fname = tmp.name + ".geojsonseq" + geo_df.to_file(fname, driver="GeoJSONSeq") + with Path(fname).open(mode="rb") as f: + b: bytes = f.read() + context.log.debug(ic(f"Size: {len(b) / (1_024 * 1_024):.3f}MB")) + self.dump_to_path(context, b, full_path) + + def handle_names( + self, context: OutputContext, names_df: pd.DataFrame, full_path: UPath + ) -> None: + self.dump_to_path(context, names_df.to_parquet(None), full_path) + + def handle_geo_metadata( + self, context: OutputContext, geo_metadata_df: pd.DataFrame, full_path: UPath + ) -> None: + self.dump_to_path(context, geo_metadata_df.to_parquet(None), full_path) + + +class AzureMetricsIOManager(AzureMixin, MetricsIOManager): + pass + + +class AzureGeneralIOManager(AzureMixin, IOManager): + """This class is used only for an asset which tests the Azure functionality + (see cloud_outputs/azure_test.py). It is not used for publishing any + popgetter data.""" + + extension: str + + def __init__(self, extension: str | None = None): + super().__init__() + if extension is not None and not extension.startswith("."): + err_msg = f"Provided extension ('{extension}') does not begin with '.'" + raise ValueError(err_msg) + self.extension = "" if extension is None else extension + + def handle_output(self, context: OutputContext, obj: bytes) -> None: + path = self.get_base_path() / ".".join( + [*context.asset_key.path, self.extension] + ) + self.dump_to_path(context, obj, path) + + def load_input(self, context: InputContext) -> Any: + return super().load_input(context) diff --git a/python/popgetter/io_managers/local.py b/python/popgetter/io_managers/local.py new file mode 100644 index 0000000..a1f6163 --- /dev/null +++ b/python/popgetter/io_managers/local.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import os + +import geopandas as gpd +import pandas as pd +from dagster import OutputContext +from upath import UPath + +from . import GeoIOManager, MetadataIOManager, MetricsIOManager + + +class LocalMixin: + dagster_home: str | None = os.getenv("DAGSTER_HOME") + + def get_base_path(self) -> UPath: + if not self.dagster_home: + err = "The DAGSTER_HOME environment variable must be set." + raise ValueError(err) + return UPath(self.dagster_home) / "cloud_outputs" + + def make_parent_dirs(self, full_path: UPath) -> None: + full_path.parent.mkdir(parents=True, exist_ok=True) + + def handle_df( + self, _context: OutputContext, df: pd.DataFrame, full_path: UPath + ) -> None: + self.make_parent_dirs(full_path) + df.to_parquet(full_path) + + +class LocalMetadataIOManager(LocalMixin, MetadataIOManager): + pass + + +class LocalGeoIOManager(LocalMixin, GeoIOManager): + def handle_flatgeobuf( + self, _context: OutputContext, geo_df: gpd.GeoDataFrame, full_path: UPath + ) -> None: + self.make_parent_dirs(full_path) + geo_df.to_file(full_path, driver="FlatGeobuf") + + def handle_geojsonseq( + self, _context: OutputContext, geo_df: gpd.GeoDataFrame, full_path: UPath + ) -> None: + self.make_parent_dirs(full_path) + geo_df.to_file(full_path, driver="GeoJSONSeq") + + +class LocalMetricsIOManager(LocalMixin, MetricsIOManager): + pass diff --git a/python/popgetter/metadata.py b/python/popgetter/metadata.py index 13aa197..c8d148a 100644 --- a/python/popgetter/metadata.py +++ b/python/popgetter/metadata.py @@ -1,10 +1,12 @@ from __future__ import annotations +from collections.abc import Sequence from datetime import date from hashlib import sha256 from typing import Self import jcs +import pandas as pd from pydantic import BaseModel, Field, computed_field, model_validator @@ -16,7 +18,10 @@ def hash_class_vars(class_instance): Note that `vars()` does not include properties, so the IDs themselves are not part of the hash, which avoids self-reference issues. """ - variables = vars(class_instance) + # Must copy the dict to avoid overriding the actual instance attributes! + # Because we're only modifying dates -> strings, we don't need to perform a + # deepcopy + variables = dict(**vars(class_instance)) # Python doesn't serialise dates to JSON, have to convert to ISO 8601 first for key, val in variables.items(): if isinstance(val, date): @@ -24,6 +29,16 @@ def hash_class_vars(class_instance): return sha256(jcs.canonicalize(variables)).hexdigest() +def metadata_to_dataframe( + metadata_instances: Sequence[BaseModel], +): + """ + Convert a list of metadata instances to a pandas DataFrame. Any of the four + metadata classes defined in this module can be used here. + """ + return pd.DataFrame([md.model_dump() for md in metadata_instances]) + + class CountryMetadata(BaseModel): @computed_field @property @@ -65,6 +80,33 @@ def id(self) -> str: ) +class GeometryMetadata(BaseModel): + @computed_field + @property + def id(self) -> str: + return hash_class_vars(self) + + @computed_field + @property + def filename_stem(self) -> str: + level = "_".join(self.level.lower().split()) + year = self.validity_period_start.year + return f"{level}_{year}" + + validity_period_start: date = Field( + description="The start of the range of time for which the regions are valid (inclusive)" + ) + validity_period_end: date = Field( + description="The end of the range of time for which the regions are valid (inclusive). If the data is a single-day snapshot, this should be the same as `validity_period_start`." + ) + level: str = Field( + description="The geography level contained in the file (e.g. output area, LSOA, MSOA, etc)" + ) + hxl_tag: str = Field( + description="Humanitarian eXchange Language (HXL) description for the geography level" + ) + + class SourceDataRelease(BaseModel): @computed_field @property @@ -95,11 +137,8 @@ def id(self) -> str: description="The ID of the publisher of the data release" ) description: str = Field(description="A description of the data release") - geography_file: str = Field( - description="The path of the geography FlatGeobuf file, relative to the top level of the data release" - ) - geography_level: str = Field( - description="The geography level contained in the file (e.g. output area, LSOA, MSOA, etc)" + geometry_metadata_id: str = Field( + description="The ID of the geometry metadata associated with this data release" ) @model_validator(mode="after") @@ -124,7 +163,7 @@ def id(self) -> str: description='A human readable name for the metric, something like "Total Population under 12 years old"' ) source_metric_id: str = Field( - description='The name of the metric that comes from the source dataset ( for example in the ACS this might be "B001_E001" or something similar' + description='The name of the metric that comes from the source dataset (for example in the ACS this might be "B001_E001" or something similar)' ) description: str = Field( description="A longer description of the metric which might include info on the caveats for the metric" @@ -132,8 +171,8 @@ def id(self) -> str: hxl_tag: str = Field( description="Field description using the Humanitarian eXchange Language (HXL) standard" ) - metric_parquet_file_url: str | None = Field( - description="The relative path output file that contains this metric value. This should be relative to the root of a base URL defined at project level and should NOT include the file extension" + metric_parquet_path: str = Field( + description="The path to the parquet file that contains the metric" ) parquet_column_name: str = Field( description="Name of column in the outputted parquet file which contains the metric" @@ -164,7 +203,13 @@ def id(self) -> str: ) -EXPORTED_MODELS = [CountryMetadata, DataPublisher, SourceDataRelease, MetricMetadata] +EXPORTED_MODELS = [ + CountryMetadata, + DataPublisher, + SourceDataRelease, + MetricMetadata, + GeometryMetadata, +] def export_schema(): diff --git a/tests/test_cloud_outputs.py b/tests/test_cloud_outputs.py index 452a7b5..7e65cfd 100644 --- a/tests/test_cloud_outputs.py +++ b/tests/test_cloud_outputs.py @@ -1,180 +1,168 @@ from __future__ import annotations -from datetime import datetime - import pytest -from dagster import ( - AssetKey, - RunRequest, - SkipReason, - StaticPartitionsDefinition, - asset, - build_asset_context, - build_multi_asset_sensor_context, - instance_for_test, - materialize_to_memory, -) -from icecream import ic # generate_pmtiles, -from popgetter import defs -from popgetter.cloud_outputs import ( - cartography_in_cloud_formats, - country_outputs_sensor, -) - +# from popgetter.cloud_outputs import ( +# cartography_in_cloud_formats, +# country_outputs_sensor, +# ) # generate_pmtiles, -from popgetter.utils import StagingDirResource - # TODO, Move this to a fixture to somewhere more universal from .test_be import demo_sectors # noqa: F401 - -def test_country_outputs_sensor(): - """ "" - This test checks that the country_outputs_sensor function returns a RunRequest - with the correct AssetKey and PartitionKey. - - Three upstream assets are mocked, which different qualities. - The context object is mocked and this also defines which assets are monitored. Hence the test does not attempt to - check that the sensor is correctly monitoring the required production assets. - - Hence this test only checks that the correct config parameters are returned via the RunRequest object(s). - """ - - @asset - def irrelevant_asset(): - return 1 - - @asset - def monitored_asset(): - return 2 - - @asset( - partitions_def=StaticPartitionsDefinition(["A", "B", "C"]), - ) - def partitioned_monitored_asset(context): - return ic(context.partition_key) - - # instance = DagsterInstance.ephemeral() - with instance_for_test() as instance: - ctx = build_multi_asset_sensor_context( - monitored_assets=[ - AssetKey("monitored_asset"), - AssetKey("partitioned_monitored_asset"), - ], - instance=instance, - definitions=defs, - ) - - # Nothing is materialized yet - result = list(country_outputs_sensor(ctx)) - assert len(result) == 1 - assert isinstance(result[0], SkipReason) - - # Only the irrelevant asset is materialized - materialize_to_memory([irrelevant_asset], instance=instance) - result = list(country_outputs_sensor(ctx)) - assert len(result) == 1 - assert isinstance(result[0], SkipReason) - - # Only the monitored asset is materialized - materialize_to_memory([monitored_asset], instance=instance) - result = list(country_outputs_sensor(ctx)) - assert len(result) == 1 - assert isinstance(result[0], RunRequest) - - # Both non-partitioned assets are materialized - materialize_to_memory([monitored_asset, irrelevant_asset], instance=instance) - result = list(country_outputs_sensor(ctx)) - assert len(result) == 1 - assert isinstance(result[0], RunRequest) - assert result[0].partition_key == "monitored-asset" - assert ( - result[0].run_config["ops"]["upstream_df"]["config"]["asset_to_load"] - == "monitored_asset" - ) - assert ( - result[0].run_config["ops"]["upstream_df"]["config"]["partition_to_load"] - is None - ) - - # All three assets are materialized - materialize_to_memory( - [monitored_asset, irrelevant_asset, partitioned_monitored_asset], - instance=instance, - partition_key="A", - ) - result = list(country_outputs_sensor(ctx)) - assert len(result) == 2 - - # The order of the results does not need to be guaranteed - # These two functions are used to check the results below - def assert_for_non_partitioned_assets(r): - assert isinstance(r, RunRequest) - assert r.partition_key == "monitored-asset" - assert ( - r.run_config["ops"]["upstream_df"]["config"]["asset_to_load"] - == "monitored_asset" - ) - assert ( - r.run_config["ops"]["upstream_df"]["config"]["partition_to_load"] - is None - ) - - def assert_for_partitioned_assets(r): - assert r.partition_key == "partitioned-monitored-asset-a" - assert ( - r.run_config["ops"]["upstream_df"]["config"]["asset_to_load"] - == "partitioned_monitored_asset" - ) - assert ( - r.run_config["ops"]["upstream_df"]["config"]["partition_to_load"] == "A" - ) - - # Now check that the results include one partitioned and one non-partitioned asset - try: - assert_for_non_partitioned_assets(result[0]) - assert_for_partitioned_assets(result[1]) - except AssertionError: - assert_for_non_partitioned_assets(result[1]) - assert_for_partitioned_assets(result[0]) - - -# TODO: The no QA comment below is pending moving the fixture to a more -# universal location -def test_cartography_in_cloud_formats(tmp_path, demo_sectors): # noqa: F811 - # This test is outputs each of the cartography outputs to individual files - # in the staging directory. The test then checks that the files exist and - # that they were created after the test started. - time_at_start = datetime.now() - - staging_res = StagingDirResource(staging_dir=str(tmp_path)) - resources_for_test = { - "staging_res": staging_res, - "unit_test_key": "test_cartography_in_cloud_formats", - } - - with ( - instance_for_test() as instance, - build_asset_context( - resources=resources_for_test, - instance=instance, - partition_key="historic-european-region", - ) as context, - ): - # Collect the results - # Results should be a generator which produces three Output objects - results = cartography_in_cloud_formats(context, demo_sectors) - - output_paths = [r.value for r in list(results)] - # There should be 3 outputs - assert len(output_paths) == 3 - - # Check that the output paths exist and were created after the test started - for output_path in output_paths: - assert output_path.exists() - assert output_path.stat().st_mtime > time_at_start.timestamp() +# Commented out test as part of #92 as functions no longer importable +# @pytest.mark.skip( +# reason="This tests old code that is no longer imported since cloud outputs were reworked in #92" +# ) +# def test_country_outputs_sensor(): +# """ "" +# This test checks that the country_outputs_sensor function returns a RunRequest +# with the correct AssetKey and PartitionKey. + +# Three upstream assets are mocked, which different qualities. +# The context object is mocked and this also defines which assets are monitored. Hence the test does not attempt to +# check that the sensor is correctly monitoring the required production assets. + +# Hence this test only checks that the correct config parameters are returned via the RunRequest object(s). +# """ + +# @asset +# def irrelevant_asset(): +# return 1 + +# @asset +# def monitored_asset(): +# return 2 + +# @asset( +# partitions_def=StaticPartitionsDefinition(["A", "B", "C"]), +# ) +# def partitioned_monitored_asset(context): +# return ic(context.partition_key) + +# # instance = DagsterInstance.ephemeral() +# with instance_for_test() as instance: +# ctx = build_multi_asset_sensor_context( +# monitored_assets=[ +# AssetKey("monitored_asset"), +# AssetKey("partitioned_monitored_asset"), +# ], +# instance=instance, +# definitions=defs, +# ) + +# # Nothing is materialized yet +# result = list(country_outputs_sensor(ctx)) +# assert len(result) == 1 +# assert isinstance(result[0], SkipReason) + +# # Only the irrelevant asset is materialized +# materialize_to_memory([irrelevant_asset], instance=instance) +# result = list(country_outputs_sensor(ctx)) +# assert len(result) == 1 +# assert isinstance(result[0], SkipReason) + +# # Only the monitored asset is materialized +# materialize_to_memory([monitored_asset], instance=instance) +# result = list(country_outputs_sensor(ctx)) +# assert len(result) == 1 +# assert isinstance(result[0], RunRequest) + +# # Both non-partitioned assets are materialized +# materialize_to_memory([monitored_asset, irrelevant_asset], instance=instance) +# result = list(country_outputs_sensor(ctx)) +# assert len(result) == 1 +# assert isinstance(result[0], RunRequest) +# assert result[0].partition_key == "monitored-asset" +# assert ( +# result[0].run_config["ops"]["upstream_df"]["config"]["asset_to_load"] +# == "monitored_asset" +# ) +# assert ( +# result[0].run_config["ops"]["upstream_df"]["config"]["partition_to_load"] +# is None +# ) + +# # All three assets are materialized +# materialize_to_memory( +# [monitored_asset, irrelevant_asset, partitioned_monitored_asset], +# instance=instance, +# partition_key="A", +# ) +# result = list(country_outputs_sensor(ctx)) +# assert len(result) == 2 + +# # The order of the results does not need to be guaranteed +# # These two functions are used to check the results below +# def assert_for_non_partitioned_assets(r): +# assert isinstance(r, RunRequest) +# assert r.partition_key == "monitored-asset" +# assert ( +# r.run_config["ops"]["upstream_df"]["config"]["asset_to_load"] +# == "monitored_asset" +# ) +# assert ( +# r.run_config["ops"]["upstream_df"]["config"]["partition_to_load"] +# is None +# ) + +# def assert_for_partitioned_assets(r): +# assert r.partition_key == "partitioned-monitored-asset-a" +# assert ( +# r.run_config["ops"]["upstream_df"]["config"]["asset_to_load"] +# == "partitioned_monitored_asset" +# ) +# assert ( +# r.run_config["ops"]["upstream_df"]["config"]["partition_to_load"] == "A" +# ) + +# # Now check that the results include one partitioned and one non-partitioned asset +# try: +# assert_for_non_partitioned_assets(result[0]) +# assert_for_partitioned_assets(result[1]) +# except AssertionError: +# assert_for_non_partitioned_assets(result[1]) +# assert_for_partitioned_assets(result[0]) + +# Commented out test as part of #92 as functions no longer importable +# @pytest.mark.skip( +# reason="This tests old code that is no longer imported since cloud outputs were reworked in #92" +# ) +# # TODO: The no QA comment below is pending moving the fixture to a more +# # universal location +# def test_cartography_in_cloud_formats(tmp_path, demo_sectors): +# # This test is outputs each of the cartography outputs to individual files +# # in the staging directory. The test then checks that the files exist and +# # that they were created after the test started. +# time_at_start = datetime.now() + +# staging_res = StagingDirResource(staging_dir=str(tmp_path)) +# resources_for_test = { +# "staging_res": staging_res, +# "unit_test_key": "test_cartography_in_cloud_formats", +# } + +# with ( +# instance_for_test() as instance, +# build_asset_context( +# resources=resources_for_test, +# instance=instance, +# partition_key="historic-european-region", +# ) as context, +# ): +# # Collect the results +# # Results should be a generator which produces three Output objects +# results = cartography_in_cloud_formats(context, demo_sectors) + +# output_paths = [r.value for r in list(results)] +# # There should be 3 outputs +# assert len(output_paths) == 3 + +# # Check that the output paths exist and were created after the test started +# for output_path in output_paths: +# assert output_path.exists() +# assert output_path.stat().st_mtime > time_at_start.timestamp() @pytest.mark.skip(reason="Test not implemented yet") diff --git a/tests/test_metadata.py b/tests/test_metadata.py index 4614902..bc272b2 100644 --- a/tests/test_metadata.py +++ b/tests/test_metadata.py @@ -20,8 +20,7 @@ def test_source_data_release_validation_reference(): url="https://example.com", data_publisher_id="test_publisher_id", description="This is a test data release", - geography_file="test_geography_file", - geography_level="test_geography_level", + geometry_metadata_id="test_geom_id", ) @@ -38,8 +37,7 @@ def test_source_data_release_validation_collection(): url="https://example.com", data_publisher_id="test_publisher_id", description="This is a test data release", - geography_file="test_geography_file", - geography_level="test_geography_level", + geometry_metadata_id="test_geom_id", ) @@ -55,12 +53,11 @@ def test_source_data_release_hash(): url="https://example.com", data_publisher_id="test_publisher_id", description="This is a test data release", - geography_file="test_geography_file", - geography_level="test_geography_level", + geometry_metadata_id="test_geom_id", ) assert ( source_data_release.id - == "15e6144c641c637247bde426fba653f209717799e41df6709a589bafbb4014c1" + == "9ec7e234d73664339e4c1f04bfa485dbb17e204dd72dc3ffbb9cab6870475597" ) source_data_release2 = SourceDataRelease( @@ -74,8 +71,7 @@ def test_source_data_release_hash(): url="https://example.com", data_publisher_id="test_publisher_id", description="This is a test data release", - geography_file="test_geography_file", - geography_level="test_geography_level", + geometry_metadata_id="test_geom_id", ) assert source_data_release.id != source_data_release2.id