Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update output types of publishing assets, add cloud sensors #92

Merged
merged 37 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
cf23864
Add non-pickle azure_io_manager, revise cloud_assets to write to azure
sgreenbury May 9, 2024
274a785
Add publishing_io_manager resources by ENV
sgreenbury May 9, 2024
ee6f37a
Remove references to pickling from popgetter ADLS IO manager
sgreenbury May 9, 2024
f01ec92
Add IO managers for local and Azure
sgreenbury May 10, 2024
4f52818
Merge remote-tracking branch 'origin/main' into 75-azure-tmp
sgreenbury May 10, 2024
03380be
Fix for updated metadata structure
sgreenbury May 10, 2024
c8e4977
Revise azure_io_manager _get_path() method
sgreenbury May 10, 2024
fc9f05f
Update metadata & output spec to include GeometryRelease
yongrenjie May 10, 2024
6e03d4c
Add assets for CountryMetadata and SourceDataRelease
yongrenjie May 10, 2024
201adb3
(Failed) attempt to generate geometries for multiple levels
yongrenjie May 10, 2024
5db26ab
Generate a single geometry asset instead of multiasset
yongrenjie May 10, 2024
138b836
Add a local IO manager for geometry
yongrenjie May 10, 2024
0f034e7
Handle outputs implementation for ADLS2IOManager
sgreenbury May 10, 2024
e817fa3
Merge remote-tracking branch 'origin/belgium_spec_new' into 75-io-man…
sgreenbury May 13, 2024
99120bc
Initial Azure geo IO manager
sgreenbury May 14, 2024
7dc103d
Azure IO manager, top level Azure metadata IO manager
sgreenbury May 15, 2024
993ed73
Remove obsolete Azure IO manager for partitioned assets
sgreenbury May 15, 2024
cf1ee54
Add blob client, import adls constructor fn
sgreenbury May 15, 2024
1dda122
Update lease duration, add extension for general IO manager
sgreenbury May 15, 2024
88bcab1
Enable general_io_manager for both prod & dev
yongrenjie May 15, 2024
85474e1
Rework local IO managers to use new input types as discussed
yongrenjie May 15, 2024
cdd5c70
Implement cloud output sensors for TLM + geometries
yongrenjie May 15, 2024
e9d167a
Update Azure IO managers to work with cloud sensors (to a certain ext…
yongrenjie May 15, 2024
e657a82
Rename IOManagers to Mixins, re-add Azure test jobs
yongrenjie May 15, 2024
e97730c
Refactor classes so that everything is in the right place
yongrenjie May 15, 2024
9c93b22
Fix all the errors 🤪
yongrenjie May 15, 2024
bd1ad96
Fix even more errors
yongrenjie May 15, 2024
1de2a88
Create a factory class that generates both sensors
yongrenjie May 15, 2024
72f279c
Add one unpartitioned output for Belgian metrics, tidy some code
yongrenjie May 16, 2024
bca1f45
rename 'TopLevelMetadata' -> just 'Metadata'
yongrenjie May 16, 2024
d5d5ef1
Metrics IO manager + cloud sensor
yongrenjie May 16, 2024
3f3e62e
Set $IGNORE_EXPERIMENTAL_WARNINGS to any nonempty value to suppress them
yongrenjie May 16, 2024
fd5cabb
Add output metadata to MetricsIOManager
yongrenjie May 17, 2024
77e645d
Stop the sensor from rerunning on assets that aren't freshly material…
yongrenjie May 17, 2024
697efbc
Fix type errors
yongrenjie May 17, 2024
1ff9abb
Revise metadata IO manager to handle individual, list or dict metadata
sgreenbury May 17, 2024
3b2a0c2
Fix tests and revise resources imported for provided env
sgreenbury May 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions output_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The top level should contain:
├── metric_metadata.parquet
├── source_data_releases.parquet
├── data_publishers.parquet
├── geometry_metadata.parquet
├── country_metadata.parquet
├── metrics/
│   ├── {metric_filename_a}.parquet
Expand All @@ -51,6 +52,8 @@ with tabulated metadata:
`SourceDataRelease`
- A `data_publishers.parquet` file containing a serialised list of
`DataPublisher`
- A `geometry_metadata.parquet` file containing a serialised list of
`GeometryMetadata`
- A `country_metadata.parquet` file containing a serialised list of
`CountryMetadata` (most countries should only have one entry, but there may be
more than one because of entities like the UK)
Expand All @@ -63,17 +66,20 @@ serialised as parquet files, and can be given any filename, as the
`MetricMetadata` struct should contain the filepath to them.

Likewise, geometries should be placed in the `geometries` subdirectory. Each set
of geometries should consist of two files, with the same filename stem and
different extensions:

- `{filename}.fgb` - a FlatGeobuf file with the geoIDs stored in the `GEO_ID`
column
of geometries should consist of four files, with the same filename stem and
different extensions. The filename stem is specified inside the
`GeometryMetadata` struct.

- `{filename}.flatgeobuf` - a FlatGeobuf file with the geoIDs stored in the
`GEO_ID` column
- `{filename}.geojsonseq` - corresponding GeoJSONSeq file
- `{filename}.pmtiles` - corresponding PMTiles file
- `{filename}.parquet` - a serialised dataframe storing the names of the
corresponding areas. This dataframe must have:

- a `GEO_ID` column which corresponds exactly to those in the FlatGeobuf file.
- one or more other columns, whose names are
[lowercase ISO 639-3 chdes](https://iso639-3.sil.org/code_tables/639/data),
[lowercase ISO 639-3 codes](https://iso639-3.sil.org/code_tables/639/data),
and contain the names of each region in those specified languages.

For example, the parquet file corresponding to the Belgian regions (with
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ dependencies = [
"jcs >=0.2.1", # For generating IDs from class attributes
]



[project.optional-dependencies]
test = [
"pytest >=6",
Expand Down
59 changes: 50 additions & 9 deletions python/popgetter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,43 @@
from __future__ import annotations

import os
import warnings
from collections.abc import Sequence
from pathlib import Path

from dagster import ExperimentalWarning

from popgetter.io_managers.azure import (
AzureGeneralIOManager,
AzureGeoIOManager,
AzureMetadataIOManager,
AzureMetricsIOManager,
)
from popgetter.io_managers.local import (
LocalGeoIOManager,
LocalMetadataIOManager,
LocalMetricsIOManager,
)
from popgetter.utils import StagingDirResource

__version__ = "0.1.0"

__all__ = ["__version__"]


if "IGNORE_EXPERIMENTAL_WARNINGS" in os.environ:
warnings.filterwarnings("ignore", category=ExperimentalWarning)


import os

from dagster import (
AssetsDefinition,
AssetSelection,
Definitions,
PipesSubprocessClient,
SourceAsset,
define_asset_job,
load_assets_from_modules,
load_assets_from_package_module,
)
from dagster._core.definitions.cacheable_assets import (
Expand All @@ -33,7 +53,7 @@
*load_assets_from_package_module(assets.us, group_name="us"),
*load_assets_from_package_module(assets.be, group_name="be"),
*load_assets_from_package_module(assets.uk, group_name="uk"),
*load_assets_from_modules([cloud_outputs], group_name="cloud_assets"),
*load_assets_from_package_module(cloud_outputs, group_name="cloud_outputs"),
]

job_be: UnresolvedAssetJobDefinition = define_asset_job(
Expand All @@ -55,16 +75,37 @@
description="Downloads UK data.",
)

resources_by_env = {
"prod": {
"metadata_io_manager": AzureMetadataIOManager(),
"geometry_io_manager": AzureGeoIOManager(),
"metrics_io_manager": AzureMetricsIOManager(),
},
"dev": {
"metadata_io_manager": LocalMetadataIOManager(),
"geometry_io_manager": LocalGeoIOManager(),
"metrics_io_manager": LocalMetricsIOManager(),
},
}

resources = {
"pipes_subprocess_client": PipesSubprocessClient(),
"staging_res": StagingDirResource(
staging_dir=str(Path(__file__).parent.joinpath("staging_dir").resolve())
),
"azure_general_io_manager": AzureGeneralIOManager(".bin"),
}

resources.update(resources_by_env[os.getenv("ENV", "dev")])

defs: Definitions = Definitions(
assets=all_assets,
schedules=[],
sensors=[cloud_outputs.country_outputs_sensor],
resources={
"pipes_subprocess_client": PipesSubprocessClient(),
"staging_res": StagingDirResource(
staging_dir=str(Path(__file__).parent.joinpath("staging_dir").resolve())
),
},
sensors=[
cloud_outputs.metadata_sensor,
cloud_outputs.geometry_sensor,
cloud_outputs.metrics_sensor,
],
resources=resources,
jobs=[job_be, job_us, job_uk],
)
17 changes: 0 additions & 17 deletions python/popgetter/assets/be/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,11 @@
#!/usr/bin/python3
from __future__ import annotations

from dagster import (
asset,
)

from popgetter.metadata import (
CountryMetadata,
)

from . import (
census_derived, # noqa: F401
census_geometry, # noqa: F401
census_tables, # noqa: F401
)
from .belgium import asset_prefix, country


@asset(key_prefix=asset_prefix)
def get_country_metadata() -> CountryMetadata:
"""
Returns a CountryMetadata of metadata about the country.
"""
return country


# @asset(key_prefix=asset_prefix)
Expand Down
Loading
Loading