Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated publishing of all data + metadata #131

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env bash
set -e

if [ -z "$ENV" ]; then
echo "ENV environment variable not set; must be either 'dev' or 'prod'"
exit 1
fi

if [ -z "$POPGETTER_COUNTRIES" ]; then
echo "POPGETTER_COUNTRIES environment variable not set; must be comma-separated list of country IDs"
exit 1
fi

if [ -z "$DAGSTER_HOME" ]; then
echo "DAGSTER_HOME environment variable not set; setting to temporary directory"
if ! DAGSTER_HOME=$(mktemp -d); then
echo "Failed to create temporary directory for DAGSTER_HOME"
exit 1
fi
export DAGSTER_HOME
echo "DAGSTER_HOME set to to $DAGSTER_HOME"
fi

export IGNORE_EXPERIMENTAL_WARNINGS=1
export DAGSTER_MODULE_NAME="${DAGSTER_MODULE_NAME:=popgetter}"

echo "Relevant environment variables:"
echo " - POPGETTER_COUNTRIES: $POPGETTER_COUNTRIES"
echo " - ENV: $ENV"
if [ "$ENV" == "prod" ]; then
export AZURE_STORAGE_ACCOUNT="${AZURE_STORAGE_ACCOUNT:=popgetter}"
export AZURE_CONTAINER="${AZURE_CONTAINER:=prod}"
if ! AZURE_DIRECTORY=$(python -c 'import popgetter; print(popgetter.__version__)' 2>/dev/null); then
echo "Failed to get popgetter version"
exit 1
fi
export AZURE_DIRECTORY
if [ -z "$SAS_TOKEN" ]; then
echo "SAS_TOKEN environment variable not set; it is required for Azure deployments"
exit 1
else
echo " - SAS_TOKEN: (exists)"
fi
echo " - AZURE_STORAGE_ACCOUNT: $AZURE_STORAGE_ACCOUNT"
echo " - AZURE_CONTAINER: $AZURE_CONTAINER"
echo " - AZURE_DIRECTORY: $AZURE_DIRECTORY"
fi

echo "Generating popgetter data. This may take a while."
python -m popgetter.run all
54 changes: 54 additions & 0 deletions docs/deployment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Deployment

This page explains how to generate and upload a complete set of metadata and
data, which conforms to the [output specification](output_structure.md), to
Azure blob storage.

## Overview

The deployment process is divided into two main steps:

1. **Fetching the data.** This means running the entire job for each country.
This step does not publish anything to Azure; instead, it generates pickled
data that is stored inside `$DAGSTER_HOME`.

2. **Uploading the data.** This means running each of the cloud sensor assets,
which have the names `publish...`. There are four of these assets: one for
the `countries.txt` file, one for the metadata structs, one for the
geometries, and one for the metrics. These assets are tied to custom IO
managers which read the pickled data from `$DAGSTER_HOME` and upload it to
Azure blob storage.

Both of these steps are automated using the `popgetter.run` module, which is in
turn invoked by the `deploy.sh` script in the repository root.

## Required environment variables

The deployment process requires the following environment variables to be set:

- `$POPGETTER_COUNTRIES`: A comma-separated list of country IDs to generate data
for. This list also feeds into the `countries.txt` file.

- `$ENV`: Set this to `prod` to deploy to Azure. (You can set it to `dev` too,
but this will publish the data to a local temporary directory, so it's only
useful for testing the script.)

- `$SAS_TOKEN`: The SAS token for the Azure blob storage account. Contact a
popgetter maintainer if you need this.

So, a typical deployment command might look like this:

```bash
POPGETTER_COUNTRIES=bel,gb_nir ENV=prod SAS_TOKEN="..." ./deploy.sh
```

Note that the `SAS_TOKEN` value must be quoted, because it contains ampersands
which the shell will take to mean "run this command in the background".

## Where are the data stored?

The data and metadata will be uploaded to the
['popgetter' Azure storage account](https://portal.azure.com/#@turing.ac.uk/resource/subscriptions/06e7b12a-f395-4021-9fa2-5305fa01903e/resourceGroups/popgetter/providers/Microsoft.Storage/storageAccounts/popgetter/containersList)
under the Urban Analytics Technology Platform subscription: specifically, it
will be placed in the container named `prod`, and the directory corresponding to
the current version of popgetter.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ dev = [
"types-requests", # Required for type checking requests
"urllib3<2", # Pin this, pending this PR for dagster https://github.com/dagster-io/dagster/pull/16738
"pre-commit", # Used for managing pre-commit hooks
"pyright >=1.1.339" # Used for static type checking (mypy is not yet compatible with Dagster)
"pyright >=1.1.339", # Used for static type checking (mypy is not yet compatible with Dagster)
"python-dotenv >=1.0.1", # For sourcing .env
]
docs = [
"mkdocs >=1.6.0"
Expand Down
9 changes: 6 additions & 3 deletions python/popgetter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
if "IGNORE_EXPERIMENTAL_WARNINGS" in os.environ:
warnings.filterwarnings("ignore", category=ExperimentalWarning)

from popgetter.env import PROD
from popgetter.io_managers.azure import (
AzureCountriesTextIOManager,
AzureGeneralIOManager,
AzureGeoIOManager,
AzureMetadataIOManager,
Expand All @@ -21,6 +23,7 @@
AzureMetricsPartitionedIOManager,
)
from popgetter.io_managers.local import (
LocalCountriesTextIOManager,
LocalGeoIOManager,
LocalMetadataIOManager,
LocalMetricsIOManager,
Expand Down Expand Up @@ -56,15 +59,13 @@
from popgetter import azure_test, cloud_outputs
from popgetter.assets import countries

PROD = os.getenv("ENV") == "prod"

all_assets: Sequence[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] = [
*[
asset
for (module, name) in countries
for asset in load_assets_from_package_module(module, group_name=name)
],
*load_assets_from_package_module(cloud_outputs, group_name="cloud_outputs"),
*load_assets_from_modules([cloud_outputs], group_name="cloud_outputs"),
*(load_assets_from_modules([azure_test], group_name="azure_test") if PROD else []),
]

Expand All @@ -84,6 +85,7 @@ def resources_by_env():
"metadata_io_manager": AzureMetadataIOManager(),
"geometry_io_manager": AzureGeoIOManager(),
"metrics_io_manager": AzureMetricsIOManager(),
"countries_text_io_manager": AzureCountriesTextIOManager(),
"azure_general_io_manager": AzureGeneralIOManager(".bin"),
"metrics_partitioned_io_manager": AzureMetricsPartitionedIOManager(),
"metrics_metadata_io_manager": AzureMetricsMetadataIOManager(),
Expand All @@ -95,6 +97,7 @@ def resources_by_env():
"metrics_io_manager": LocalMetricsIOManager(),
"metrics_partitioned_io_manager": LocalMetricsPartitionedIOManager(),
"metrics_metadata_io_manager": LocalMetricsMetadataIOManager(),
"countries_text_io_manager": LocalCountriesTextIOManager(),
}
)

Expand Down
4 changes: 2 additions & 2 deletions python/popgetter/assets/gb_nir/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import io
import os
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date
Expand All @@ -21,6 +20,7 @@

from popgetter.assets.country import Country
from popgetter.cloud_outputs import GeometryOutput, MetricsOutput
from popgetter.env import PROD
from popgetter.metadata import (
COL,
CountryMetadata,
Expand Down Expand Up @@ -97,7 +97,7 @@ class NIGeometryLevel:
}

# Required tables
TABLES_TO_PROCESS = ["MS-A09", "DT-0018"] if os.getenv("ENV") == "dev" else None
TABLES_TO_PROCESS = None if PROD else ["MS-A09", "DT-0018"]

# 2021 census collection date
CENSUS_COLLECTION_DATE = date(2021, 3, 21)
Expand Down
24 changes: 20 additions & 4 deletions python/popgetter/cloud_outputs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
from __future__ import annotations

import os
from dataclasses import dataclass

import geopandas as gpd
import pandas as pd
from dagster import AssetsDefinition
from dagster import AssetsDefinition, asset

from popgetter.metadata import GeometryMetadata, MetricMetadata

from .sensor_class import CloudAssetSensor


@asset(io_manager_key="countries_text_io_manager")
def publish_country_list() -> list[str]:
"""
Generates a top-level countries.txt file in Azure. Each line of this file
contains one country ID. The country IDs to be published are read from the
POPGETTER_COUNTRIES environment variable, which is a comma-separated list
of country IDs.
"""
countries = os.getenv("POPGETTER_COUNTRIES")
if countries is None:
err = "POPGETTER_COUNTRIES environment variable not set"
raise RuntimeError(err)
return [c.lower().strip() for c in countries.split(",")]


@dataclass
class GeometryOutput:
"""This class conceptualises the expected output types of a geometry
Expand Down Expand Up @@ -83,17 +99,17 @@ class MetricsOutput:


def send_to_metadata_sensor(asset: AssetsDefinition):
metadata_factory.monitored_asset_keys.append(asset.key)
metadata_factory.add_monitored_asset(asset.key)
return asset


def send_to_geometry_sensor(asset: AssetsDefinition):
geometry_factory.monitored_asset_keys.append(asset.key)
geometry_factory.add_monitored_asset(asset.key)
return asset


def send_to_metrics_sensor(asset: AssetsDefinition):
metrics_factory.monitored_asset_keys.append(asset.key)
metrics_factory.add_monitored_asset(asset.key)
return asset


Expand Down
29 changes: 15 additions & 14 deletions python/popgetter/cloud_outputs/sensor_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def __init__(
self.partition_definition = DynamicPartitionsDefinition(
name=self.partition_definition_name
)
# Upon initialisation, remove all partitions so that the web UI doesn't
# show any stray partitions left over from old Dagster launches (this
# can happen when e.g. switching between different git branches)
# Upon initialisation, regenerate all partitions so that the web UI
# doesn't show any stray partitions left over from old Dagster launches
# (this can happen when e.g. switching between different git branches)
try:
with DagsterInstance.get() as instance:
for partition_key in instance.get_dynamic_partitions(
Expand All @@ -67,6 +67,17 @@ def __init__(
except DagsterHomeNotSetError:
pass

def add_monitored_asset(self, asset_key: AssetKey):
self.monitored_asset_keys.append(asset_key)
try:
with DagsterInstance.get() as instance:
instance.add_dynamic_partitions(
partitions_def_name=self.partition_definition_name,
partition_keys=["/".join(asset_key.path)],
)
except DagsterHomeNotSetError:
pass

def create_publishing_asset(self):
@asset(
name=self.publishing_asset_name,
Expand Down Expand Up @@ -102,16 +113,6 @@ def inner_sensor(context):
for monitored_asset_key, execution_value in asset_events.items():
monitored_asset_name = "/".join(monitored_asset_key.path)

# Add the monitored asset to the list of dynamic partitions of
# the publishing asset, if it's not already there.
if monitored_asset_name not in context.instance.get_dynamic_partitions(
self.partition_definition_name
):
context.instance.add_dynamic_partitions(
partitions_def_name=self.partition_definition_name,
partition_keys=[monitored_asset_name],
)

if execution_value is not None:
# Trigger a run request for the publishing asset, if it has
# been materialised.
Expand All @@ -121,7 +122,7 @@ def inner_sensor(context):
# multiple times for the same asset materialisation.
yield RunRequest(
run_key=f"{monitored_asset_name}_{execution_value.run_id}",
partition_key="/".join(monitored_asset_key.path),
partition_key=monitored_asset_name,
)
context.advance_cursor({monitored_asset_key: execution_value})

Expand Down
12 changes: 12 additions & 0 deletions python/popgetter/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from __future__ import annotations

import os

env = os.getenv("ENV", "dev")
if env.lower() == "dev":
PROD = False
elif env.lower() == "prod":
PROD = True
else:
err = f"Invalid ENV value: {env}, must be 'dev' or 'prod'."
raise ValueError(err)
16 changes: 16 additions & 0 deletions python/popgetter/io_managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,19 @@ def handle_output(
),
}
)


class CountriesTextIOManager(PopgetterIOManager):
def handle_text(self, context: OutputContext, text: str, full_path: UPath) -> None:
raise NotImplementedError

def handle_output(self, context: OutputContext, obj: list[str]) -> None:
self.handle_text(
context, "\n".join(obj), self.get_base_path() / UPath("countries.txt")
)
context.add_output_metadata(
metadata={
"num_countries": len(obj),
"countries": obj,
}
)
10 changes: 10 additions & 0 deletions python/popgetter/io_managers/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from upath import UPath

from . import (
CountriesTextIOManager,
GeoIOManager,
MetadataIOManager,
MetricsIOManager,
Expand Down Expand Up @@ -195,6 +196,15 @@ class AzureMetricsMetadataIOManager(AzureMixin, MetricsMetdataIOManager):
pass


class AzureCountriesTextIOManager(AzureMixin, CountriesTextIOManager):
def handle_text(self, context: OutputContext, text: str, full_path: UPath) -> None:
self.dump_to_path(context, bytes(text, "utf-8"), full_path)

def load_input(self, _context: InputContext) -> Any:
err = "This IO manager is only for publishing; it should not be used to read in data"
raise NotImplementedError(err)


class AzureGeneralIOManager(AzureMixin, IOManager):
"""This class is used only for an asset which tests the Azure functionality
(see cloud_outputs/azure_test.py). It is not used for publishing any
Expand Down
8 changes: 8 additions & 0 deletions python/popgetter/io_managers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from upath import UPath

from . import (
CountriesTextIOManager,
GeoIOManager,
MetadataIOManager,
MetricsIOManager,
Expand Down Expand Up @@ -63,3 +64,10 @@ class LocalMetricsMetadataIOManager(LocalMixin, MetricsMetdataIOManager):

class LocalMetricsPartitionedIOManager(LocalMixin, MetricsPartitionedIOManager):
pass


class LocalCountriesTextIOManager(LocalMixin, CountriesTextIOManager):
def handle_text(self, _context: OutputContext, text: str, full_path: UPath) -> None:
self.make_parent_dirs(full_path)
with full_path.open("w") as f:
f.write(text)
Loading
Loading