Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iss 217 refactor task context access #219

Merged
merged 8 commits into from
Dec 14, 2024
2 changes: 1 addition & 1 deletion airflow/dags/cc_utils/census/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def __init__(self, dataset_base_url: str):
self.geographies_df = get_dataset_geography_metadata(geog_url=self.geographies_url)
self.groups_df = get_dataset_groups_metadata(groups_url=self.groups_url)
self.tags_df = get_dataset_tags_metadata(tags_url=self.tags_url)
self.time_of_check = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
self.time_of_check = dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

def set_dataset_metadata_urls(self):
if (self.metadata_catalog_df["dataset_base_url"] == self.base_url).sum() == 0:
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/cc_utils/census/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CensusTableMetadata:
def __init__(self, metadata_url: str = BASE_URL):
self.metadata_url = re.sub("/$", "", metadata_url)
self.page_metadata = self.get_page_metadata()
self.time_of_check = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
self.time_of_check = dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
self.data_freshness_check = self.initialize_data_freshness_check()

def parse_census_metadata_page(self, resp: requests.models.Response) -> pd.DataFrame:
Expand Down
7 changes: 3 additions & 4 deletions airflow/dags/cc_utils/socrata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
from logging import Logger
import re
from pathlib import Path
from typing import Dict, Optional, Union
from typing import Any, Dict, Optional, Union

import pandas as pd
import requests
from sqlalchemy.engine.base import Engine
from sqlalchemy import select, insert, update

# for airflow container
from cc_utils.db import (
execute_result_returning_query,
get_reflected_db_table,
Expand Down Expand Up @@ -47,7 +46,7 @@ def get_table_metadata(self) -> Dict:
response_json = response.json()
metadata = {
"_id": self.table_id,
"time_of_collection": dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"time_of_collection": dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
}
metadata.update(response_json["results"][0])
return metadata
Expand Down Expand Up @@ -260,7 +259,7 @@ def get_prior_metadata_checks_from_db(self, engine: Engine) -> pd.DataFrame:
)
return results_df

def initialize_data_freshness_check_record(self) -> None:
def initialize_data_freshness_check_record(self) -> dict[str, Any]:
"""There's probably a better name for this idea than 'table_check_metadata'. The goal
is to see if fresh data is available, log the results of that freshness-check in the dwh,
and then triger data refreshing if appropriate."""
Expand Down
10 changes: 10 additions & 0 deletions airflow/dags/cc_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import re
import subprocess

from airflow.models.taskinstance import TaskInstance


def typeset_zulu_tz_datetime_str(datetime_str: str) -> dt.datetime:
datetime_str = re.sub("Z$", " +0000", datetime_str)
Expand Down Expand Up @@ -62,6 +64,14 @@ def produce_offset_and_nrows_counts_for_pd_read_csv(file_path: Path, rows_per_ba
return nrow_and_offset_nums


def get_task_group_id_prefix(task_instance: TaskInstance) -> str:
task_id_parts = task_instance.task_id.split(".")
if len(task_id_parts) > 1:
return ".".join(task_id_parts[:-1]) + "."
else:
return ""


def log_as_info(logger: logging.Logger, msg: str) -> None:
try:
original_level = logger.level
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import datetime as dt
import logging

from airflow.decorators import dag

from tasks.census_tasks import update_census_table

from sources.census_api_datasets import CC_HH_INCOME_IN_LAST_12MO_BY_TRACT as CENSUS_DATASET

task_logger = logging.getLogger("airflow.task")


@dag(
schedule=CENSUS_DATASET.schedule,
start_date=dt.datetime(2022, 11, 1),
catchup=False,
tags=["cook_county", "census"],
)
def update_cc_hh_income_in_last_12mo_by_tract():
update_census_table(
census_dataset=CENSUS_DATASET,
datasource_name="fluent_dwh_source",
conn_id="dwh_db_conn",
task_logger=task_logger,
)


update_cc_hh_income_in_last_12mo_by_tract()
28 changes: 28 additions & 0 deletions airflow/dags/geography/update_united_states_counties_2024.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import datetime as dt
import logging

from airflow.decorators import dag

from tasks.tiger_tasks import update_tiger_table

from sources.tiger_datasets import UNITED_STATES_COUNTIES_2024 as TIGER_DATASET

task_logger = logging.getLogger("airflow.task")


@dag(
schedule=TIGER_DATASET.schedule,
start_date=dt.datetime(2022, 11, 1),
catchup=False,
tags=["census", "geospatial", "TIGER"],
)
def update_united_states_counties_2024():
update_tiger_table(
tiger_dataset=TIGER_DATASET,
datasource_name="fluent_dwh_source",
conn_id="dwh_db_conn",
task_logger=task_logger,
)


update_united_states_counties_2024()
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@task
def request_and_ingest_dataset_metadata_catalog(conn_id: str, task_logger: Logger) -> str:
catalog_df = get_dataset_metadata_catalog(dataset_base_url="https://api.census.gov/data.json")
catalog_df["time_of_check"] = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
catalog_df["time_of_check"] = dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
task_logger.info(f"Datasets in Census API Dataset Metadata Catalog: {len(catalog_df)}")
engine = get_pg_engine(conn_id=conn_id)
api_dataset_metadata_table = get_reflected_db_table(
Expand Down
42 changes: 11 additions & 31 deletions airflow/dags/run_a_dbt_run_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,24 @@
from airflow.decorators import dag, task
from airflow.utils.trigger_rule import TriggerRule

from sources.tables import CHICAGO_DIVVY_STATIONS as SOCRATA_TABLE
from sources.tables import CHICAGO_POLICE_DISTRICT_BOUNDARIES as SOCRATA_TABLE
from cc_utils.utils import log_as_info
from cc_utils.transform import execute_dbt_cmd

task_logger = logging.getLogger("airflow.task")

# REFERENCE
# Running a model with all of its downstream models
# dbt_cmd = f"""cd /opt/airflow/dbt && \
# dbt --warn-error run --select \
# re_dbt.dwh.{table_name}_fact+"""
# Running all models in the `dwh` level
# dbt_cmd = f"""cd /opt/airflow/dbt &&
# dbt --warn-error run --select
# re_dbt.dwh.*"""


@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def run_specific_dbt_model_for_a_data_set(table_name: str, task_logger: Logger) -> None:
@task
def run_specific_dbt_model_for_a_data_set(table_name: str, task_logger: Logger) -> bool:
dbt_cmd = f"""cd /opt/airflow/dbt && \
dbt --warn-error run --full-refresh --select \
re_dbt.standardized.{table_name}_standardized"""
result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger)
dbt_cmd = f"""cd /opt/airflow/dbt && \
dbt --warn-error run --full-refresh --select \
re_dbt.clean.chicago_towed_vehicles*+"""
task_logger.info(f"dbt run command: {dbt_cmd}")
try:
subproc_output = subprocess.run(
dbt_cmd, shell=True, capture_output=True, text=True, check=False
)
log_as_info(task_logger, f"subproc_output.stderr: {subproc_output.stderr}")
log_as_info(task_logger, f"subproc_output.stdout: {subproc_output.stdout}")
raise_exception = False
for el in subproc_output.stdout.split("\n"):
log_as_info(task_logger, f"{el}")
if re.search("(\\d* of \\d* ERROR)", el):
raise_exception = True
if raise_exception:
raise Exception("dbt model failed. Review the above outputs")
except subprocess.CalledProcessError as err:
log_as_info(task_logger, f"Error {err} while running dbt models. {type(err)}")
raise
re_dbt.clean.{table_name}_clean"""
result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger)
return result


@dag(
Expand Down
116 changes: 116 additions & 0 deletions airflow/dags/run_transform_data_tg_for_a_socrata_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import datetime as dt
import logging
import os
from logging import Logger
from pathlib import Path

from cc_utils.utils import get_task_group_id_prefix
from cc_utils.transform import execute_dbt_cmd
from sources.tables import CHICAGO_VACANT_AND_ABANDONED_BUILDINGS as SOCRATA_TABLE
from tasks.socrata_tasks import (
dbt_make_clean_model,
dbt_standardized_model_ready,
endpoint,
get_socrata_table_metadata,
highlight_unfinished_dbt_standardized_stub,
log_as_info,
make_dbt_standardized_model,
)

from airflow.decorators import dag, task, task_group
from airflow.models.baseoperator import chain
from airflow.operators.python import get_current_context
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

task_logger = logging.getLogger("airflow.task")


@task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def dbt_clean_model_ready(task_logger: Logger) -> str:
context = get_current_context()
task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"])
socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key")
airflow_home = os.environ["AIRFLOW_HOME"]
file_path = Path(airflow_home).joinpath(
"dbt",
"models",
"clean",
f"{socrata_metadata.table_name}_clean.sql",
)
if file_path.is_file():
log_as_info(task_logger, f"headed to {task_group_id_prefix}run_dbt_std_model")
return f"{task_group_id_prefix}run_dbt_std_model"
else:
return f"{task_group_id_prefix}dbt_make_clean_model"


@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def run_dbt_std_model(table_name: str, task_logger: Logger) -> bool:
dbt_cmd = f"""cd /opt/airflow/dbt && \
dbt --warn-error run --select \
re_dbt.standardized.{table_name}_standardized"""
result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger)
return result


@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def run_dbt_clean_model(table_name: str, task_logger: Logger) -> bool:
dbt_cmd = f"""cd /opt/airflow/dbt && \
dbt --warn-error run --select \
re_dbt.clean.{table_name}_clean"""
result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger)
return result


@task_group
def transform_data_tg(socrata_table, conn_id: str, task_logger: Logger):
std_model_ready_1 = dbt_standardized_model_ready(task_logger=task_logger)
highlight_std_stub_1 = highlight_unfinished_dbt_standardized_stub(task_logger=task_logger)
make_std_model_1 = make_dbt_standardized_model(conn_id=conn_id, task_logger=task_logger)
clean_model_ready_1 = dbt_clean_model_ready(task_logger=task_logger)
make_clean_model_1 = dbt_make_clean_model(task_logger=task_logger)
# run_dbt_models_1 = run_dbt_models__standardized_onward(task_logger=task_logger)
run_dbt_std_model_1 = run_dbt_std_model(
table_name=socrata_table.table_name, task_logger=task_logger
)
run_dbt_clean_model_1 = run_dbt_clean_model(
table_name=socrata_table.table_name, task_logger=task_logger
)
endpoint_1 = endpoint(task_logger=task_logger)

chain(
std_model_ready_1,
[
Label("No dbt _standardized model found"),
Label("dbt _standardized model needs review"),
Label("dbt _standardized model looks good"),
],
[make_std_model_1, highlight_std_stub_1, clean_model_ready_1],
)
chain([make_std_model_1, highlight_std_stub_1], endpoint_1)
chain(
clean_model_ready_1,
[Label("dbt _clean model looks good!"), make_clean_model_1],
run_dbt_std_model_1,
run_dbt_clean_model_1,
endpoint_1,
)


@dag(
schedule=SOCRATA_TABLE.schedule,
start_date=dt.datetime(2022, 11, 1),
catchup=False,
tags=["dbt", "utility"],
)
def run_socrata_transform_data_tg():
metadata_1 = get_socrata_table_metadata(socrata_table=SOCRATA_TABLE, task_logger=task_logger)
transform_data_1 = transform_data_tg(
socrata_table=SOCRATA_TABLE, conn_id="dwh_db_conn", task_logger=task_logger
)

chain(metadata_1, transform_data_1)


run_socrata_transform_data_tg()
10 changes: 10 additions & 0 deletions airflow/dags/sources/census_api_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@
schedule="40 5 20 3,9 *",
)

CC_HH_INCOME_IN_LAST_12MO_BY_TRACT = CensusVariableGroupDataset(
dataset_name="cc_hh_income_in_last_12mo_by_tract",
api_call_obj=CensusVariableGroupAPICall(
dataset_base_url="http://api.census.gov/data/2023/acs/acs5",
group_name="B19001",
geographies=COOK_COUNTY_CENSUS_TRACTS,
),
schedule="30 5 5 5,11 *",
)

CC_HH_INTERNET_ACCESS_BY_AGE_BY_TRACT = CensusVariableGroupDataset(
dataset_name="cc_hh_internet_access_by_age_by_tract",
api_call_obj=CensusVariableGroupAPICall(
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/sources/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)

CHICAGO_BIKE_PATHS = SocrataTable(
table_id="3w5d-sru8",
table_id="hvv9-38ut",
table_name="chicago_bike_paths",
schedule="0 22 10 */3 *",
)
Expand Down Expand Up @@ -43,7 +43,7 @@
)

CHICAGO_CTA_TRAIN_LINES = SocrataTable(
table_id="53r7-y88m",
table_id="xbyr-jnvx",
table_name="chicago_cta_train_lines",
schedule="20 5 1 * *",
)
Expand Down
Loading
Loading