Skip to content

Commit

Permalink
Merge pull request #92 from Urban-Analytics-Technology-Platform/belgi…
Browse files Browse the repository at this point in the history
…um_spec_new

Update output types of publishing assets, revise Belgium DAG and cloud sensors (#75)
  • Loading branch information
sgreenbury authored May 17, 2024
2 parents 07014d2 + 3b2a0c2 commit aa1df17
Show file tree
Hide file tree
Showing 17 changed files with 1,357 additions and 539 deletions.
18 changes: 12 additions & 6 deletions output_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The top level should contain:
├── metric_metadata.parquet
├── source_data_releases.parquet
├── data_publishers.parquet
├── geometry_metadata.parquet
├── country_metadata.parquet
├── metrics/
│   ├── {metric_filename_a}.parquet
Expand All @@ -51,6 +52,8 @@ with tabulated metadata:
`SourceDataRelease`
- A `data_publishers.parquet` file containing a serialised list of
`DataPublisher`
- A `geometry_metadata.parquet` file containing a serialised list of
`GeometryMetadata`
- A `country_metadata.parquet` file containing a serialised list of
`CountryMetadata` (most countries should only have one entry, but there may be
more than one because of entities like the UK)
Expand All @@ -63,17 +66,20 @@ serialised as parquet files, and can be given any filename, as the
`MetricMetadata` struct should contain the filepath to them.

Likewise, geometries should be placed in the `geometries` subdirectory. Each set
of geometries should consist of two files, with the same filename stem and
different extensions:

- `{filename}.fgb` - a FlatGeobuf file with the geoIDs stored in the `GEO_ID`
column
of geometries should consist of four files, with the same filename stem and
different extensions. The filename stem is specified inside the
`GeometryMetadata` struct.

- `{filename}.flatgeobuf` - a FlatGeobuf file with the geoIDs stored in the
`GEO_ID` column
- `{filename}.geojsonseq` - corresponding GeoJSONSeq file
- `{filename}.pmtiles` - corresponding PMTiles file
- `{filename}.parquet` - a serialised dataframe storing the names of the
corresponding areas. This dataframe must have:

- a `GEO_ID` column which corresponds exactly to those in the FlatGeobuf file.
- one or more other columns, whose names are
[lowercase ISO 639-3 chdes](https://iso639-3.sil.org/code_tables/639/data),
[lowercase ISO 639-3 codes](https://iso639-3.sil.org/code_tables/639/data),
and contain the names of each region in those specified languages.

For example, the parquet file corresponding to the Belgian regions (with
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ dependencies = [
"jcs >=0.2.1", # For generating IDs from class attributes
]



[project.optional-dependencies]
test = [
"pytest >=6",
Expand Down
72 changes: 63 additions & 9 deletions python/popgetter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
from __future__ import annotations

import os
import warnings
from collections.abc import Sequence
from pathlib import Path

from dagster import ExperimentalWarning

from popgetter.io_managers.azure import (
AzureGeneralIOManager,
AzureGeoIOManager,
AzureMetadataIOManager,
AzureMetricsIOManager,
)
from popgetter.io_managers.local import (
LocalGeoIOManager,
LocalMetadataIOManager,
LocalMetricsIOManager,
)
from popgetter.utils import StagingDirResource

__version__ = "0.1.0"

__all__ = ["__version__"]


if "IGNORE_EXPERIMENTAL_WARNINGS" in os.environ:
warnings.filterwarnings("ignore", category=ExperimentalWarning)


import os

from dagster import (
AssetsDefinition,
AssetSelection,
Expand All @@ -27,13 +48,18 @@
UnresolvedAssetJobDefinition,
)

from popgetter import assets, cloud_outputs
from popgetter import assets, azure_test, cloud_outputs

all_assets: Sequence[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] = [
*load_assets_from_package_module(assets.us, group_name="us"),
*load_assets_from_package_module(assets.be, group_name="be"),
*load_assets_from_package_module(assets.uk, group_name="uk"),
*load_assets_from_modules([cloud_outputs], group_name="cloud_assets"),
*load_assets_from_package_module(cloud_outputs, group_name="cloud_outputs"),
*(
load_assets_from_modules([azure_test], group_name="azure_test")
if os.getenv("ENV") == "prod"
else []
),
]

job_be: UnresolvedAssetJobDefinition = define_asset_job(
Expand All @@ -56,15 +82,43 @@
)


def resources_by_env():
env = os.getenv("ENV", "dev")
if env == "prod":
return {
"metadata_io_manager": AzureMetadataIOManager(),
"geometry_io_manager": AzureGeoIOManager(),
"metrics_io_manager": AzureMetricsIOManager(),
"azure_general_io_manager": AzureGeneralIOManager(".bin"),
}
if env == "dev":
return {
"metadata_io_manager": LocalMetadataIOManager(),
"geometry_io_manager": LocalGeoIOManager(),
"metrics_io_manager": LocalMetricsIOManager(),
}

err = f"$ENV should be either 'dev' or 'prod', but received '{env}'"
raise ValueError(err)


resources = {
"pipes_subprocess_client": PipesSubprocessClient(),
"staging_res": StagingDirResource(
staging_dir=str(Path(__file__).parent.joinpath("staging_dir").resolve())
),
}

resources.update(resources_by_env())

defs: Definitions = Definitions(
assets=all_assets,
schedules=[],
sensors=[cloud_outputs.country_outputs_sensor],
resources={
"pipes_subprocess_client": PipesSubprocessClient(),
"staging_res": StagingDirResource(
staging_dir=str(Path(__file__).parent.joinpath("staging_dir").resolve())
),
},
sensors=[
cloud_outputs.metadata_sensor,
cloud_outputs.geometry_sensor,
cloud_outputs.metrics_sensor,
],
resources=resources,
jobs=[job_be, job_us, job_uk],
)
17 changes: 0 additions & 17 deletions python/popgetter/assets/be/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,11 @@
#!/usr/bin/python3
from __future__ import annotations

from dagster import (
asset,
)

from popgetter.metadata import (
CountryMetadata,
)

from . import (
census_derived, # noqa: F401
census_geometry, # noqa: F401
census_tables, # noqa: F401
)
from .belgium import asset_prefix, country


@asset(key_prefix=asset_prefix)
def get_country_metadata() -> CountryMetadata:
"""
Returns a CountryMetadata of metadata about the country.
"""
return country


# @asset(key_prefix=asset_prefix)
Expand Down
Loading

0 comments on commit aa1df17

Please sign in to comment.