From ca694b1e3e48827c72f02f28a072f3264e2e4baa Mon Sep 17 00:00:00 2001 From: matttriano Date: Wed, 11 Dec 2024 21:27:23 -0600 Subject: [PATCH 1/8] Starts updating socrata_tasks. --- airflow/dags/cc_utils/utils.py | 10 ++ airflow/dags/tasks/socrata_tasks.py | 156 ++++++++++++++++++---------- 2 files changed, 112 insertions(+), 54 deletions(-) diff --git a/airflow/dags/cc_utils/utils.py b/airflow/dags/cc_utils/utils.py index bc44821..e63c3f7 100644 --- a/airflow/dags/cc_utils/utils.py +++ b/airflow/dags/cc_utils/utils.py @@ -5,6 +5,8 @@ import re import subprocess +from airflow.models.taskinstance import TaskInstance + def typeset_zulu_tz_datetime_str(datetime_str: str) -> dt.datetime: datetime_str = re.sub("Z$", " +0000", datetime_str) @@ -62,6 +64,14 @@ def produce_offset_and_nrows_counts_for_pd_read_csv(file_path: Path, rows_per_ba return nrow_and_offset_nums +def get_task_group_id_prefix(task_instance: TaskInstance) -> str: + task_id_parts = task_instance.task_id.split(".") + if len(task_id_parts) > 1: + return ".".join(task_id_parts[:-1]) + "." + else: + return "" + + def log_as_info(logger: logging.Logger, msg: str) -> None: try: original_level = logger.level diff --git a/airflow/dags/tasks/socrata_tasks.py b/airflow/dags/tasks/socrata_tasks.py index e969e50..dbea2c8 100644 --- a/airflow/dags/tasks/socrata_tasks.py +++ b/airflow/dags/tasks/socrata_tasks.py @@ -7,6 +7,7 @@ from airflow.decorators import task, task_group from airflow.models.baseoperator import chain +from airflow.operators.python import get_current_context from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule @@ -28,6 +29,7 @@ from cc_utils.utils import ( get_local_data_raw_dir, get_lines_in_geojson_file, + get_task_group_id_prefix, produce_slice_indices_for_gpd_read_file, log_as_info, ) @@ -96,12 +98,41 @@ def ingest_into_table( def get_socrata_table_metadata( socrata_table: SocrataTable, task_logger: Logger ) -> SocrataTableMetadata: + context = get_current_context() + # log_as_info(task_logger, f"dir(context): {dir(context)}, keys: {context.keys()}") + # log_as_info(task_logger, f"context['ti']: {context['ti']}, dir(context['ti']): {dir(context['ti'])}") + log_as_info(task_logger, f"context['ti'].task_id: {context['ti'].task_id}") + log_as_info(task_logger, f"context['ti'].task_display_name: {context['ti'].task_display_name}") + log_as_info(task_logger, f"context['ti'].task: {context['ti'].task}") + log_as_info(task_logger, f"context['ti'].get_previous_ti(): {context['ti'].get_previous_ti()}") + # log_as_info(task_logger, f"context['ti'].previous_ti: {context['ti'].previous_ti}") # deprecated + # log_as_info(task_logger, f"context['ti'].previous_ti_success: {context['ti'].previous_ti_success}") # deprecated + log_as_info(task_logger, f"context['ti'].metadata: {context['ti'].metadata}") + log_as_info(task_logger, f"context['ti'].hostname: {context['ti'].hostname}") + log_as_info(task_logger, f"context['ti'].run_id: {context['ti'].run_id}") + log_as_info(task_logger, f"context['ti'].pid: {context['ti'].pid}") + log_as_info(task_logger, f"context['ti'].job_id: {context['ti'].job_id}") + log_as_info(task_logger, f"context['ti'].updated_at: {context['ti'].updated_at}") + log_as_info(task_logger, f"context['ti'].key: {context['ti'].key}") + log_as_info(task_logger, f"context['ti'].log: {context['ti'].log}") + log_as_info(task_logger, f"context['ti'].command_as_list(): {context['ti'].command_as_list()}") + log_as_info(task_logger, f"context['ti'].queue: {context['ti'].queue}") + log_as_info(task_logger, f"context['ti'].is_premature: {context['ti'].is_premature}") + log_as_info(task_logger, f"context['ti'].raw: {context['ti'].raw}") + log_as_info(task_logger, f"context['ti'].executor_config: {context['ti'].executor_config}") + log_as_info(task_logger, f"context['ti'].next_kwargs: {context['ti'].next_kwargs}") + log_as_info(task_logger, f"context['ti'].next_method: {context['ti'].next_method}") + log_as_info(task_logger, f"context['ti'].registry: {context['ti'].registry}") + socrata_metadata = SocrataTableMetadata(socrata_table=socrata_table) log_as_info( task_logger, f"Retrieved metadata for socrata table {socrata_metadata.table_name} and table_id" + f" {socrata_metadata.table_id}.", ) + log_as_info(task_logger, f" --- socrata_metadata pre push: {socrata_metadata}") + context["ti"].xcom_push(key="socrata_metadata_key", value=socrata_metadata) + log_as_info(task_logger, f" --- socrata_metadata post push: {socrata_metadata}") return socrata_metadata @@ -156,6 +187,10 @@ def check_table_metadata( def fresher_source_data_available( socrata_metadata: SocrataTableMetadata, conn_id: str, task_logger: Logger ) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + # socrata_metadata_from_xcom = context["ti"].xcom_pull(key="socrata_metadata_key") + # log_as_info(task_logger, f" --- socrata_metadata_from_xcom: {socrata_metadata_from_xcom}") tables_in_data_raw_schema = get_data_table_names_in_schema( engine=get_pg_engine(conn_id=conn_id), schema_name="data_raw" ) @@ -167,18 +202,17 @@ def fresher_source_data_available( ) log_as_info(task_logger, f" --- table_does_not_exist: {table_does_not_exist}") log_as_info(task_logger, f" --- update_availble: {update_availble}") + log_as_info(task_logger, f" --- task_group_id_prefix: {task_group_id_prefix}") if table_does_not_exist or update_availble: - return "update_socrata_table.download_fresh_data" + return f"{task_group_id_prefix}download_fresh_data" else: - return "update_socrata_table.update_result_of_check_in_metadata_table" + return f"{task_group_id_prefix}update_result_of_check_in_metadata_table" @task -def download_fresh_data(task_logger: Logger, **kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.check_table_metadata.ingest_table_freshness_check_metadata" - ) +def download_fresh_data(task_logger: Logger) -> SocrataTableMetadata: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") output_file_path = get_local_file_path(socrata_metadata=socrata_metadata) log_as_info(task_logger, f"Started downloading data at {dt.datetime.utcnow()} UTC") urlretrieve(url=socrata_metadata.data_download_url, filename=output_file_path) @@ -189,20 +223,22 @@ def download_fresh_data(task_logger: Logger, **kwargs) -> SocrataTableMetadata: @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def file_ext_branch_router(socrata_metadata: SocrataTableMetadata) -> str: dl_format = socrata_metadata.download_format + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) if dl_format.lower() == "geojson": - return "update_socrata_table.load_data_tg.load_geojson_data.drop_temp_table" + return f"{task_group_id_prefix}load_geojson_data.drop_temp_table" elif dl_format.lower() == "csv": - return "update_socrata_table.load_data_tg.load_csv_data.drop_temp_table" + return f"{task_group_id_prefix}load_csv_data.drop_temp_table" else: raise Exception(f"Download format '{dl_format}' not supported yet. CSV or GeoJSON for now") @task def drop_temp_table( - route_str: str, conn_id: str, task_logger: Logger, **kwargs + route_str: str, conn_id: str, task_logger: Logger ) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") log_as_info(task_logger, f"inside drop_temp_table, from {route_str}") engine = get_pg_engine(conn_id=conn_id) @@ -218,9 +254,9 @@ def drop_temp_table( @task -def create_temp_data_raw_table(conn_id: str, task_logger: Logger, **kwargs) -> None: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def create_temp_data_raw_table(conn_id: str, task_logger: Logger) -> None: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") table_name = f"temp_{socrata_metadata.table_name}" local_file_path = get_local_file_path(socrata_metadata=socrata_metadata) log_as_info( @@ -343,6 +379,11 @@ def ingest_geojson_data( socrata_metadata = ti.xcom_pull( task_ids="update_socrata_table.load_data_tg.load_geojson_data.drop_temp_table" ) + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + sm_xcom = context["ti"].xcom_pull(key="socrata_metadata_key") + log_as_info(task_logger, f"sm_xcom: {sm_xcom}, table_name: {sm_xcom.table_name}, source_data_last_updated: {sm_xcom.data_freshness_check['source_data_last_updated']}") + log_as_info(task_logger, f"socrata_metadata: {socrata_metadata}, table_name: {socrata_metadata.table_name}, source_data_last_updated: {socrata_metadata.data_freshness_check['source_data_last_updated']}") engine = get_pg_engine(conn_id=conn_id) temp_table_name = f"temp_{socrata_metadata.table_name}" @@ -418,9 +459,10 @@ def load_data_tg( @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def register_temp_table_asset(datasource_name: str, task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def register_temp_table_asset(datasource_name: str, task_logger: Logger) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") datasource = get_datasource(datasource_name=datasource_name, task_logger=task_logger) register_data_asset( schema_name="data_raw", @@ -432,9 +474,9 @@ def register_temp_table_asset(datasource_name: str, task_logger: Logger, **kwarg @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def socrata_table_checkpoint_exists(task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def socrata_table_checkpoint_exists(task_logger: Logger) -> str: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") checkpoint_name = f"data_raw.temp_{socrata_metadata.socrata_table.table_name}" if check_if_checkpoint_exists(checkpoint_name=checkpoint_name, task_logger=task_logger): log_as_info(task_logger, f"GE checkpoint for {checkpoint_name} exists") @@ -447,9 +489,9 @@ def socrata_table_checkpoint_exists(task_logger: Logger, **kwargs) -> str: @task -def run_socrata_checkpoint(task_logger: Logger, **kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def run_socrata_checkpoint(task_logger: Logger) -> SocrataTableMetadata: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") checkpoint_name = f"data_raw.temp_{socrata_metadata.socrata_table.table_name}" checkpoint_run_results = run_checkpoint( checkpoint_name=checkpoint_name, task_logger=task_logger @@ -493,21 +535,20 @@ def raw_data_validation_tg( @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def table_exists_in_data_raw(conn_id: str, task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" - ) +def table_exists_in_data_raw(conn_id: str, task_logger: Logger) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") tables_in_data_raw_schema = get_data_table_names_in_schema( engine=get_pg_engine(conn_id=conn_id), schema_name="data_raw" ) log_as_info(task_logger, f"tables_in_data_raw_schema: {tables_in_data_raw_schema}") if socrata_metadata.table_name not in tables_in_data_raw_schema: log_as_info(task_logger, f"Table {socrata_metadata.table_name} not in data_raw; creating.") - return "update_socrata_table.persist_new_raw_data_tg.create_table_in_data_raw" + return f"{task_group_id_prefix}create_table_in_data_raw" else: log_as_info(task_logger, f"Table {socrata_metadata.table_name} in data_raw; skipping.") - return "update_socrata_table.persist_new_raw_data_tg.dbt_data_raw_model_exists" + return f"{task_group_id_prefix}dbt_data_raw_model_exists" @task @@ -516,6 +557,11 @@ def create_table_in_data_raw(conn_id: str, task_logger: Logger, **kwargs) -> Soc socrata_metadata = ti.xcom_pull( task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" ) + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + sm_xcom = context["ti"].xcom_pull(key="socrata_metadata_key") + log_as_info(task_logger, f"sm_xcom: {sm_xcom}, table_name: {sm_xcom.table_name}, source_data_last_updated: {sm_xcom.data_freshness_check['source_data_last_updated']}") + log_as_info(task_logger, f"socrata_metadata: {socrata_metadata}, table_name: {socrata_metadata.table_name}, source_data_last_updated: {socrata_metadata.data_freshness_check['source_data_last_updated']}") try: table_name = socrata_metadata.table_name log_as_info(task_logger, f"Creating table data_raw.{table_name}") @@ -580,9 +626,9 @@ def update_data_raw_table(task_logger: Logger, **kwargs) -> str: @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def register_data_raw_table_asset(datasource_name: str, task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def register_data_raw_table_asset(datasource_name: str, task_logger: Logger) -> str: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") datasource = get_datasource(datasource_name=datasource_name, task_logger=task_logger) register_data_asset( schema_name="data_raw", @@ -641,9 +687,10 @@ def persist_new_raw_data_tg(conn_id: str, datasource_name: str, task_logger: Log @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def dbt_standardized_model_ready(task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def dbt_standardized_model_ready(task_logger: Logger) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") airflow_home = os.environ["AIRFLOW_HOME"] file_path = Path(airflow_home).joinpath( "dbt", @@ -665,18 +712,18 @@ def dbt_standardized_model_ready(task_logger: Logger, **kwargs) -> str: f"Found unfinished stub for dbt _standardized model in {host_file_path}." + " Please update that model before proceeding to feature engineering.", ) - return "update_socrata_table.transform_data_tg.highlight_unfinished_dbt_standardized_stub" + return f"{task_group_id_prefix}highlight_unfinished_dbt_standardized_stub" log_as_info( task_logger, f"Found a _standardized stage dbt model that looks finished; Proceeding" ) - return "update_socrata_table.transform_data_tg.dbt_clean_model_ready" + return f"{task_group_id_prefix}dbt_clean_model_ready" else: log_as_info(task_logger, f"No _standardized stage dbt model found.") log_as_info(task_logger, f"Creating a stub in loc: {host_file_path}") log_as_info( task_logger, f"Edit the stub before proceeding to generate _clean stage dbt models." ) - return "update_socrata_table.transform_data_tg.make_dbt_standardized_model" + return f"{task_group_id_prefix}make_dbt_standardized_model" @task @@ -691,9 +738,9 @@ def highlight_unfinished_dbt_standardized_stub(task_logger: Logger) -> str: @task -def make_dbt_standardized_model(conn_id: str, task_logger: Logger, **kwargs) -> None: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def make_dbt_standardized_model(conn_id: str, task_logger: Logger) -> None: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") engine = get_pg_engine(conn_id=conn_id) std_file_lines = format_dbt_stub_for_standardized_stage( table_name=socrata_metadata.table_name, engine=engine @@ -710,9 +757,10 @@ def make_dbt_standardized_model(conn_id: str, task_logger: Logger, **kwargs) -> @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def dbt_clean_model_ready(task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def dbt_clean_model_ready(task_logger: Logger) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") airflow_home = os.environ["AIRFLOW_HOME"] file_path = Path(airflow_home).joinpath( "dbt", @@ -722,15 +770,15 @@ def dbt_clean_model_ready(task_logger: Logger, **kwargs) -> str: ) if file_path.is_file(): log_as_info(task_logger, f"Found a _clean stage dbt model that looks finished; Ending") - return "update_socrata_table.transform_data_tg.run_dbt_models__standardized_onward" + return f"{task_group_id_prefix}run_dbt_models__standardized_onward" else: - return "update_socrata_table.transform_data_tg.dbt_make_clean_model" + return f"{task_group_id_prefix}dbt_make_clean_model" @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def dbt_make_clean_model(task_logger: Logger, **kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def dbt_make_clean_model(task_logger: Logger) -> SocrataTableMetadata: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") airflow_home = os.environ["AIRFLOW_HOME"] clean_file_path = Path(airflow_home).joinpath( "dbt", @@ -745,9 +793,9 @@ def dbt_make_clean_model(task_logger: Logger, **kwargs) -> SocrataTableMetadata: @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def run_dbt_models__standardized_onward(task_logger: Logger, **kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") +def run_dbt_models__standardized_onward(task_logger: Logger) -> SocrataTableMetadata: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") dbt_cmd = format_dbt_run_cmd( dataset_name=socrata_metadata.table_name, schema="standardized", From cfcc7245653281a2e6b4c3cf1618c6d1168c9d18 Mon Sep 17 00:00:00 2001 From: matttriano Date: Thu, 12 Dec 2024 14:26:39 -0600 Subject: [PATCH 2/8] Refactors out **kwargs and starts changing xcoms strategy. --- airflow/dags/tasks/socrata_tasks.py | 146 +++++++++------------------- 1 file changed, 46 insertions(+), 100 deletions(-) diff --git a/airflow/dags/tasks/socrata_tasks.py b/airflow/dags/tasks/socrata_tasks.py index dbea2c8..b7ab029 100644 --- a/airflow/dags/tasks/socrata_tasks.py +++ b/airflow/dags/tasks/socrata_tasks.py @@ -95,54 +95,27 @@ def ingest_into_table( @task -def get_socrata_table_metadata( - socrata_table: SocrataTable, task_logger: Logger -) -> SocrataTableMetadata: +def get_socrata_table_metadata(socrata_table: SocrataTable, task_logger: Logger) -> bool: context = get_current_context() - # log_as_info(task_logger, f"dir(context): {dir(context)}, keys: {context.keys()}") - # log_as_info(task_logger, f"context['ti']: {context['ti']}, dir(context['ti']): {dir(context['ti'])}") - log_as_info(task_logger, f"context['ti'].task_id: {context['ti'].task_id}") - log_as_info(task_logger, f"context['ti'].task_display_name: {context['ti'].task_display_name}") - log_as_info(task_logger, f"context['ti'].task: {context['ti'].task}") - log_as_info(task_logger, f"context['ti'].get_previous_ti(): {context['ti'].get_previous_ti()}") - # log_as_info(task_logger, f"context['ti'].previous_ti: {context['ti'].previous_ti}") # deprecated - # log_as_info(task_logger, f"context['ti'].previous_ti_success: {context['ti'].previous_ti_success}") # deprecated - log_as_info(task_logger, f"context['ti'].metadata: {context['ti'].metadata}") - log_as_info(task_logger, f"context['ti'].hostname: {context['ti'].hostname}") - log_as_info(task_logger, f"context['ti'].run_id: {context['ti'].run_id}") - log_as_info(task_logger, f"context['ti'].pid: {context['ti'].pid}") - log_as_info(task_logger, f"context['ti'].job_id: {context['ti'].job_id}") - log_as_info(task_logger, f"context['ti'].updated_at: {context['ti'].updated_at}") - log_as_info(task_logger, f"context['ti'].key: {context['ti'].key}") - log_as_info(task_logger, f"context['ti'].log: {context['ti'].log}") - log_as_info(task_logger, f"context['ti'].command_as_list(): {context['ti'].command_as_list()}") - log_as_info(task_logger, f"context['ti'].queue: {context['ti'].queue}") - log_as_info(task_logger, f"context['ti'].is_premature: {context['ti'].is_premature}") - log_as_info(task_logger, f"context['ti'].raw: {context['ti'].raw}") - log_as_info(task_logger, f"context['ti'].executor_config: {context['ti'].executor_config}") - log_as_info(task_logger, f"context['ti'].next_kwargs: {context['ti'].next_kwargs}") - log_as_info(task_logger, f"context['ti'].next_method: {context['ti'].next_method}") - log_as_info(task_logger, f"context['ti'].registry: {context['ti'].registry}") - socrata_metadata = SocrataTableMetadata(socrata_table=socrata_table) log_as_info( task_logger, f"Retrieved metadata for socrata table {socrata_metadata.table_name} and table_id" + f" {socrata_metadata.table_id}.", ) - log_as_info(task_logger, f" --- socrata_metadata pre push: {socrata_metadata}") + log_as_info(task_logger, f" --- socrata_metadata: {socrata_metadata.metadata}") context["ti"].xcom_push(key="socrata_metadata_key", value=socrata_metadata) - log_as_info(task_logger, f" --- socrata_metadata post push: {socrata_metadata}") - return socrata_metadata + return True @task def extract_table_freshness_info( - socrata_metadata: SocrataTableMetadata, conn_id: str, task_logger: Logger, -) -> SocrataTableMetadata: +) -> bool: engine = get_pg_engine(conn_id=conn_id) + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") socrata_metadata.check_warehouse_data_freshness(engine=engine) log_as_info( task_logger, @@ -150,23 +123,26 @@ def extract_table_freshness_info( + f"Fresh source data available: {socrata_metadata.data_freshness_check['updated_data_available']} " + f"Fresh source metadata available: {socrata_metadata.data_freshness_check['updated_metadata_available']}", ) - return socrata_metadata + context["ti"].xcom_push(key="socrata_metadata_key", value=socrata_metadata) + return True @task def ingest_table_freshness_check_metadata( - socrata_metadata: SocrataTableMetadata, conn_id: str, task_logger: Logger, -) -> None: +) -> bool: engine = get_pg_engine(conn_id=conn_id) + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") socrata_metadata.insert_current_freshness_check_to_db(engine=engine) log_as_info( task_logger, f"Ingested table freshness check results into metadata table. " + f"Freshness check id: {socrata_metadata.freshness_check_id}", ) - return socrata_metadata + context["ti"].xcom_push(key="socrata_metadata_key", value=socrata_metadata) + return True @task_group @@ -174,23 +150,18 @@ def check_table_metadata( socrata_table: SocrataTable, conn_id: str, task_logger: Logger ) -> SocrataTableMetadata: metadata_1 = get_socrata_table_metadata(socrata_table=socrata_table, task_logger=task_logger) - metadata_2 = extract_table_freshness_info(metadata_1, conn_id=conn_id, task_logger=task_logger) - metadata_3 = ingest_table_freshness_check_metadata( - metadata_2, conn_id=conn_id, task_logger=task_logger - ) + metadata_2 = extract_table_freshness_info(conn_id=conn_id, task_logger=task_logger) + metadata_3 = ingest_table_freshness_check_metadata(conn_id=conn_id, task_logger=task_logger) chain(metadata_1, metadata_2, metadata_3) return metadata_3 @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def fresher_source_data_available( - socrata_metadata: SocrataTableMetadata, conn_id: str, task_logger: Logger -) -> str: +def fresher_source_data_available(conn_id: str, task_logger: Logger) -> str: context = get_current_context() task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) - # socrata_metadata_from_xcom = context["ti"].xcom_pull(key="socrata_metadata_key") - # log_as_info(task_logger, f" --- socrata_metadata_from_xcom: {socrata_metadata_from_xcom}") + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") tables_in_data_raw_schema = get_data_table_names_in_schema( engine=get_pg_engine(conn_id=conn_id), schema_name="data_raw" ) @@ -234,9 +205,7 @@ def file_ext_branch_router(socrata_metadata: SocrataTableMetadata) -> str: @task -def drop_temp_table( - route_str: str, conn_id: str, task_logger: Logger -) -> SocrataTableMetadata: +def drop_temp_table(route_str: str, conn_id: str, task_logger: Logger) -> SocrataTableMetadata: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") @@ -372,18 +341,12 @@ def get_geospatial_load_indices( @task def ingest_geojson_data( - start_index: int, end_index: int, conn_id: str, task_logger: Logger, **kwargs -) -> None: + start_index: int, end_index: int, conn_id: str, task_logger: Logger +) -> bool: try: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.load_data_tg.load_geojson_data.drop_temp_table" - ) context = get_current_context() task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) - sm_xcom = context["ti"].xcom_pull(key="socrata_metadata_key") - log_as_info(task_logger, f"sm_xcom: {sm_xcom}, table_name: {sm_xcom.table_name}, source_data_last_updated: {sm_xcom.data_freshness_check['source_data_last_updated']}") - log_as_info(task_logger, f"socrata_metadata: {socrata_metadata}, table_name: {socrata_metadata.table_name}, source_data_last_updated: {socrata_metadata.data_freshness_check['source_data_last_updated']}") + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") engine = get_pg_engine(conn_id=conn_id) temp_table_name = f"temp_{socrata_metadata.table_name}" @@ -412,6 +375,7 @@ def ingest_geojson_data( task_logger, f"Successfully ingested records {start_index} to {end_index} using gpd.to_postgis()", ) + return True except Exception as e: task_logger.error( f"Failed to ingest geojson file to temp table. Error: {e}, {type(e)}", @@ -476,16 +440,17 @@ def register_temp_table_asset(datasource_name: str, task_logger: Logger) -> str: @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def socrata_table_checkpoint_exists(task_logger: Logger) -> str: context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") checkpoint_name = f"data_raw.temp_{socrata_metadata.socrata_table.table_name}" if check_if_checkpoint_exists(checkpoint_name=checkpoint_name, task_logger=task_logger): log_as_info(task_logger, f"GE checkpoint for {checkpoint_name} exists") - return "update_socrata_table.raw_data_validation_tg.run_socrata_checkpoint" + return f"{task_group_id_prefix}run_socrata_checkpoint" else: log_as_info( task_logger, f"GE checkpoint for {checkpoint_name} doesn't exist yet. Make it maybe?" ) - return "update_socrata_table.raw_data_validation_tg.validation_endpoint" + return f"{task_group_id_prefix}validation_endpoint" @task @@ -506,10 +471,8 @@ def run_socrata_checkpoint(task_logger: Logger) -> SocrataTableMetadata: @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def validation_endpoint(**kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull(task_ids="update_socrata_table.download_fresh_data") - return socrata_metadata +def validation_endpoint() -> bool: + return True @task_group @@ -552,16 +515,9 @@ def table_exists_in_data_raw(conn_id: str, task_logger: Logger) -> str: @task -def create_table_in_data_raw(conn_id: str, task_logger: Logger, **kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" - ) +def create_table_in_data_raw(conn_id: str, task_logger: Logger) -> SocrataTableMetadata: context = get_current_context() - task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) - sm_xcom = context["ti"].xcom_pull(key="socrata_metadata_key") - log_as_info(task_logger, f"sm_xcom: {sm_xcom}, table_name: {sm_xcom.table_name}, source_data_last_updated: {sm_xcom.data_freshness_check['source_data_last_updated']}") - log_as_info(task_logger, f"socrata_metadata: {socrata_metadata}, table_name: {socrata_metadata.table_name}, source_data_last_updated: {socrata_metadata.data_freshness_check['source_data_last_updated']}") + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") try: table_name = socrata_metadata.table_name log_as_info(task_logger, f"Creating table data_raw.{table_name}") @@ -580,28 +536,24 @@ def create_table_in_data_raw(conn_id: str, task_logger: Logger, **kwargs) -> Soc @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def dbt_data_raw_model_exists(task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" - ) +def dbt_data_raw_model_exists(task_logger: Logger) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") dbt_data_raw_model_dir = Path(f"/opt/airflow/dbt/models/data_raw") log_as_info(task_logger, f"dbt data_raw model dir ('{dbt_data_raw_model_dir}')") log_as_info(task_logger, f"Dir exists? {dbt_data_raw_model_dir.is_dir()}") table_model_path = dbt_data_raw_model_dir.joinpath(f"{socrata_metadata.table_name}.sql") if table_model_path.is_file(): - return "update_socrata_table.persist_new_raw_data_tg.update_data_raw_table" + return f"{task_group_id_prefix}update_data_raw_table" else: - return "update_socrata_table.persist_new_raw_data_tg.make_dbt_data_raw_model" + return f"{task_group_id_prefix}make_dbt_data_raw_model" @task(retries=1) -def make_dbt_data_raw_model(conn_id: str, task_logger: Logger, **kwargs) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" - ) - +def make_dbt_data_raw_model(conn_id: str, task_logger: Logger) -> SocrataTableMetadata: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") make_dbt_data_raw_model_file( table_name=socrata_metadata.table_name, engine=get_pg_engine(conn_id=conn_id) ) @@ -610,11 +562,9 @@ def make_dbt_data_raw_model(conn_id: str, task_logger: Logger, **kwargs) -> Socr @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def update_data_raw_table(task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.raw_data_validation_tg.validation_endpoint" - ) +def update_data_raw_table(task_logger: Logger) -> str: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") dbt_cmd = format_dbt_run_cmd( dataset_name=socrata_metadata.table_name, schema="data_raw", @@ -641,12 +591,10 @@ def register_data_raw_table_asset(datasource_name: str, task_logger: Logger) -> @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def update_result_of_check_in_metadata_table( - conn_id: str, task_logger: Logger, data_updated: bool, **kwargs + conn_id: str, task_logger: Logger, data_updated: bool ) -> SocrataTableMetadata: - ti = kwargs["ti"] - socrata_metadata = ti.xcom_pull( - task_ids="update_socrata_table.check_table_metadata.ingest_table_freshness_check_metadata" - ) + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") log_as_info( task_logger, f"Updating table_metadata record id #{socrata_metadata.freshness_check_id}." ) @@ -663,9 +611,7 @@ def persist_new_raw_data_tg(conn_id: str, datasource_name: str, task_logger: Log log_as_info(task_logger, f"Entered persist_new_raw_data_tg task_group") table_exists_1 = table_exists_in_data_raw(conn_id=conn_id, task_logger=task_logger) create_data_raw_table_1 = create_table_in_data_raw(conn_id=conn_id, task_logger=task_logger) - dbt_data_raw_model_exists_1 = dbt_data_raw_model_exists( - task_logger=task_logger, temp_table=False - ) + dbt_data_raw_model_exists_1 = dbt_data_raw_model_exists(task_logger=task_logger) make_dbt_data_raw_model_1 = make_dbt_data_raw_model(conn_id=conn_id, task_logger=task_logger) update_data_raw_table_1 = update_data_raw_table(task_logger=task_logger) register_data_raw_asset_1 = register_data_raw_table_asset( @@ -859,7 +805,7 @@ def update_socrata_table( socrata_table=socrata_table, conn_id=conn_id, task_logger=task_logger ) fresh_source_data_available_1 = fresher_source_data_available( - socrata_metadata=metadata_1, conn_id=conn_id, task_logger=task_logger + conn_id=conn_id, task_logger=task_logger ) extract_data_1 = download_fresh_data(task_logger=task_logger) load_data_tg_1 = load_data_tg( From b87acf0911500694534ce8df4d56cddfdd0b4205 Mon Sep 17 00:00:00 2001 From: matttriano Date: Thu, 12 Dec 2024 14:27:33 -0600 Subject: [PATCH 3/8] Updates ids of some data sources that have been deprecated/replaced. --- airflow/dags/sources/tables.py | 4 ++-- airflow/dbt/models/data_raw/sources.yml | 2 ++ .../great_expectations/great_expectations.yml | 24 +++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/airflow/dags/sources/tables.py b/airflow/dags/sources/tables.py index 4165a30..1edc8af 100644 --- a/airflow/dags/sources/tables.py +++ b/airflow/dags/sources/tables.py @@ -13,7 +13,7 @@ ) CHICAGO_BIKE_PATHS = SocrataTable( - table_id="3w5d-sru8", + table_id="hvv9-38ut", table_name="chicago_bike_paths", schedule="0 22 10 */3 *", ) @@ -43,7 +43,7 @@ ) CHICAGO_CTA_TRAIN_LINES = SocrataTable( - table_id="53r7-y88m", + table_id="xbyr-jnvx", table_name="chicago_cta_train_lines", schedule="20 5 1 * *", ) diff --git a/airflow/dbt/models/data_raw/sources.yml b/airflow/dbt/models/data_raw/sources.yml index 0b1697a..aaba667 100644 --- a/airflow/dbt/models/data_raw/sources.yml +++ b/airflow/dbt/models/data_raw/sources.yml @@ -17,6 +17,7 @@ sources: - name: chicago_community_area_boundaries - name: chicago_crimes - name: chicago_cta_bus_stops + - name: chicago_cta_train_lines - name: chicago_cta_train_stations - name: chicago_divvy_stations - name: chicago_food_inspections @@ -60,6 +61,7 @@ sources: - name: temp_chicago_community_area_boundaries - name: temp_chicago_crimes - name: temp_chicago_cta_bus_stops + - name: temp_chicago_cta_train_lines - name: temp_chicago_cta_train_stations - name: temp_chicago_divvy_stations - name: temp_chicago_food_inspections diff --git a/airflow/great_expectations/great_expectations.yml b/airflow/great_expectations/great_expectations.yml index f6eece3..eb0984e 100644 --- a/airflow/great_expectations/great_expectations.yml +++ b/airflow/great_expectations/great_expectations.yml @@ -541,4 +541,28 @@ fluent_datasources: batch_metadata: {} table_name: chicago_cta_train_stations schema_name: data_raw + data_raw.temp_chicago_bike_paths: + type: table + order_by: [] + batch_metadata: {} + table_name: temp_chicago_bike_paths + schema_name: data_raw + data_raw.chicago_bike_paths: + type: table + order_by: [] + batch_metadata: {} + table_name: chicago_bike_paths + schema_name: data_raw + data_raw.temp_chicago_cta_train_lines: + type: table + order_by: [] + batch_metadata: {} + table_name: temp_chicago_cta_train_lines + schema_name: data_raw + data_raw.chicago_cta_train_lines: + type: table + order_by: [] + batch_metadata: {} + table_name: chicago_cta_train_lines + schema_name: data_raw connection_string: ${GX_DWH_DB_CONN} From bcd074879ffbbb6e838b26c39f3f95eba19164f5 Mon Sep 17 00:00:00 2001 From: matttriano Date: Thu, 12 Dec 2024 19:52:45 -0600 Subject: [PATCH 4/8] Refactors to consistent xcom-push/pull socrata_metadata accessing and updates returns. --- airflow/dags/run_a_dbt_run_command.py | 4 +- airflow/dags/tasks/socrata_tasks.py | 149 +++++++------------------- 2 files changed, 42 insertions(+), 111 deletions(-) diff --git a/airflow/dags/run_a_dbt_run_command.py b/airflow/dags/run_a_dbt_run_command.py index 4982731..4f65f3e 100644 --- a/airflow/dags/run_a_dbt_run_command.py +++ b/airflow/dags/run_a_dbt_run_command.py @@ -8,7 +8,7 @@ from airflow.decorators import dag, task from airflow.utils.trigger_rule import TriggerRule -from sources.tables import CHICAGO_DIVVY_STATIONS as SOCRATA_TABLE +from sources.tables import NYC_PARCEL_SALES as SOCRATA_TABLE from cc_utils.utils import log_as_info task_logger = logging.getLogger("airflow.task") @@ -28,7 +28,7 @@ def run_specific_dbt_model_for_a_data_set(table_name: str, task_logger: Logger) -> None: dbt_cmd = f"""cd /opt/airflow/dbt && \ dbt --warn-error run --full-refresh --select \ - re_dbt.clean.chicago_towed_vehicles*+""" + re_dbt.standardized.{table_name}_standardized+""" task_logger.info(f"dbt run command: {dbt_cmd}") try: subproc_output = subprocess.run( diff --git a/airflow/dags/tasks/socrata_tasks.py b/airflow/dags/tasks/socrata_tasks.py index b7ab029..b7676af 100644 --- a/airflow/dags/tasks/socrata_tasks.py +++ b/airflow/dags/tasks/socrata_tasks.py @@ -47,53 +47,6 @@ def get_local_file_path(socrata_metadata: SocrataTableMetadata) -> Path: return local_file_path -def ingest_into_table( - socrata_metadata: SocrataTableMetadata, - conn_id: str, - task_logger: Logger, - temp_table: bool = False, -) -> None: - local_file_path = get_local_file_path(socrata_metadata=socrata_metadata) - if temp_table: - table_name = f"temp_{socrata_metadata.table_name}" - if_exists = "replace" - else: - table_name = f"{socrata_metadata.table_name}" - if_exists = "fail" - log_as_info(task_logger, f"Ingesting data to database table 'data_raw.{table_name}'") - source_data_updated = socrata_metadata.data_freshness_check["source_data_last_updated"] - time_of_check = socrata_metadata.data_freshness_check["time_of_check"] - engine = get_pg_engine(conn_id=conn_id) - if socrata_metadata.is_geospatial: - import geopandas as gpd - - gdf = gpd.read_file(local_file_path) - gdf["source_data_updated"] = source_data_updated - gdf["ingestion_check_time"] = time_of_check - gdf.to_postgis( - name=table_name, - schema="data_raw", - con=engine, - if_exists=if_exists, - chunksize=100000, - ) - log_as_info(task_logger, "Successfully ingested data using gpd.to_postgis()") - else: - import pandas as pd - - df = pd.read_csv(local_file_path) - df["source_data_updated"] = source_data_updated - df["ingestion_check_time"] = time_of_check - df.to_sql( - name=table_name, - schema="data_raw", - con=engine, - if_exists=if_exists, - chunksize=100000, - ) - log_as_info(task_logger, "Successfully ingested data using pd.to_sql()") - - @task def get_socrata_table_metadata(socrata_table: SocrataTable, task_logger: Logger) -> bool: context = get_current_context() @@ -146,15 +99,12 @@ def ingest_table_freshness_check_metadata( @task_group -def check_table_metadata( - socrata_table: SocrataTable, conn_id: str, task_logger: Logger -) -> SocrataTableMetadata: +def check_table_metadata(socrata_table: SocrataTable, conn_id: str, task_logger: Logger) -> None: metadata_1 = get_socrata_table_metadata(socrata_table=socrata_table, task_logger=task_logger) metadata_2 = extract_table_freshness_info(conn_id=conn_id, task_logger=task_logger) metadata_3 = ingest_table_freshness_check_metadata(conn_id=conn_id, task_logger=task_logger) chain(metadata_1, metadata_2, metadata_3) - return metadata_3 @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -181,21 +131,22 @@ def fresher_source_data_available(conn_id: str, task_logger: Logger) -> str: @task -def download_fresh_data(task_logger: Logger) -> SocrataTableMetadata: +def download_fresh_data(task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") output_file_path = get_local_file_path(socrata_metadata=socrata_metadata) log_as_info(task_logger, f"Started downloading data at {dt.datetime.utcnow()} UTC") urlretrieve(url=socrata_metadata.data_download_url, filename=output_file_path) log_as_info(task_logger, f"Finished downloading data at {dt.datetime.utcnow()} UTC") - return socrata_metadata + return True @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def file_ext_branch_router(socrata_metadata: SocrataTableMetadata) -> str: - dl_format = socrata_metadata.download_format +def file_ext_branch_router() -> str: context = get_current_context() task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") + dl_format = socrata_metadata.download_format if dl_format.lower() == "geojson": return f"{task_group_id_prefix}load_geojson_data.drop_temp_table" elif dl_format.lower() == "csv": @@ -205,7 +156,7 @@ def file_ext_branch_router(socrata_metadata: SocrataTableMetadata) -> str: @task -def drop_temp_table(route_str: str, conn_id: str, task_logger: Logger) -> SocrataTableMetadata: +def drop_temp_table(route_str: str, conn_id: str, task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") @@ -217,9 +168,9 @@ def drop_temp_table(route_str: str, conn_id: str, task_logger: Logger) -> Socrat query=f"DROP TABLE IF EXISTS {full_temp_table_name} CASCADE;", engine=engine, ) - return socrata_metadata + return True except Exception as e: - print(f"Failed to drop temp table {full_temp_table_name}. Error: {e}, {type(e)}") + raise Exception(f"Failed to drop temp table {full_temp_table_name}. Error: {e}, {type(e)}") @task @@ -256,16 +207,16 @@ def create_temp_data_raw_table(conn_id: str, task_logger: Logger) -> None: table_create_obj = a_table._create_table_setup() table_create_obj.create(bind=engine) log_as_info(task_logger, f"Successfully created table 'data_raw.{table_name}'") - return socrata_metadata + return True else: - raise Exception(f"File not found in expected location.") + raise FileNotFoundError(f"File not found in expected location.") @task -def ingest_csv_data( - socrata_metadata: SocrataTableMetadata, conn_id: str, task_logger: Logger -) -> SocrataTableMetadata: +def ingest_csv_data(conn_id: str, task_logger: Logger) -> bool: try: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") full_temp_table_name = f"data_raw.temp_{socrata_metadata.table_name}" file_path = get_local_file_path(socrata_metadata=socrata_metadata) log_as_info(task_logger, f"file_path: {file_path}, is_file: {file_path.is_file()}") @@ -307,7 +258,7 @@ def ingest_csv_data( log_as_info( task_logger, f"Successfully ingested csv data into {full_temp_table_name} via COPY." ) - return socrata_metadata + return True except Exception as e: log_as_info(task_logger, f"Failed to ingest flat file to temp table. Error: {e}, {type(e)}") @@ -316,19 +267,15 @@ def ingest_csv_data( def load_csv_data(route_str: str, conn_id: str, task_logger: Logger) -> None: drop_temp_csv_1 = drop_temp_table(route_str=route_str, conn_id=conn_id, task_logger=task_logger) create_temp_csv_1 = create_temp_data_raw_table(conn_id=conn_id, task_logger=task_logger) - ingest_temp_csv_1 = ingest_csv_data( - socrata_metadata=create_temp_csv_1, conn_id=conn_id, task_logger=task_logger - ) + ingest_temp_csv_1 = ingest_csv_data(conn_id=conn_id, task_logger=task_logger) chain(drop_temp_csv_1, create_temp_csv_1, ingest_temp_csv_1) @task -def get_geospatial_load_indices( - socrata_metadata: SocrataTableMetadata, - task_logger: Logger, - rows_per_batch: int = 500000, -): +def get_geospatial_load_indices(task_logger: Logger, rows_per_batch: int) -> list[dict[str, int]]: + context = get_current_context() + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") file_path = get_local_file_path(socrata_metadata=socrata_metadata) assert file_path.name.lower().endswith( ".geojson" @@ -384,15 +331,11 @@ def ingest_geojson_data( @task_group -def load_geojson_data(route_str: str, conn_id: str, task_logger: Logger) -> SocrataTableMetadata: +def load_geojson_data(route_str: str, conn_id: str, task_logger: Logger) -> None: drop_temp_geojson_1 = drop_temp_table( route_str=route_str, conn_id=conn_id, task_logger=task_logger ) - slice_indices_1 = get_geospatial_load_indices( - socrata_metadata=drop_temp_geojson_1, - task_logger=task_logger, - rows_per_batch=250000, - ) + slice_indices_1 = get_geospatial_load_indices(task_logger=task_logger, rows_per_batch=250000) ingest_temp_geojson_1 = ingest_geojson_data.partial( conn_id=conn_id, task_logger=task_logger ).expand_kwargs(slice_indices_1) @@ -401,14 +344,9 @@ def load_geojson_data(route_str: str, conn_id: str, task_logger: Logger) -> Socr @task_group -def load_data_tg( - socrata_metadata: SocrataTableMetadata, - conn_id: str, - task_logger: Logger, -) -> SocrataTableMetadata: +def load_data_tg(conn_id: str, task_logger: Logger) -> None: log_as_info(task_logger, f"Entered load_data_tg task_group") - file_ext_route_1 = file_ext_branch_router(socrata_metadata=socrata_metadata) - + file_ext_route_1 = file_ext_branch_router() geojson_route_1 = load_geojson_data( route_str=file_ext_route_1, conn_id=conn_id, task_logger=task_logger ) @@ -423,7 +361,7 @@ def load_data_tg( @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def register_temp_table_asset(datasource_name: str, task_logger: Logger) -> str: +def register_temp_table_asset(datasource_name: str, task_logger: Logger) -> bool: context = get_current_context() task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") @@ -454,7 +392,7 @@ def socrata_table_checkpoint_exists(task_logger: Logger) -> str: @task -def run_socrata_checkpoint(task_logger: Logger) -> SocrataTableMetadata: +def run_socrata_checkpoint(task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") checkpoint_name = f"data_raw.temp_{socrata_metadata.socrata_table.table_name}" @@ -467,7 +405,7 @@ def run_socrata_checkpoint(task_logger: Logger) -> SocrataTableMetadata: ) log_as_info(task_logger, f"validation success: {checkpoint_run_results.success}") log_as_info(task_logger, f"dir(checkpoint_run_results): {dir(checkpoint_run_results)}") - return socrata_metadata + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -476,10 +414,7 @@ def validation_endpoint() -> bool: @task_group -def raw_data_validation_tg( - datasource_name: str, - task_logger: Logger, -) -> SocrataTableMetadata: +def raw_data_validation_tg(datasource_name: str, task_logger: Logger) -> None: log_as_info(task_logger, f"Entered raw_data_validation_tg task_group") register_temp_table_1 = register_temp_table_asset( @@ -515,7 +450,7 @@ def table_exists_in_data_raw(conn_id: str, task_logger: Logger) -> str: @task -def create_table_in_data_raw(conn_id: str, task_logger: Logger) -> SocrataTableMetadata: +def create_table_in_data_raw(conn_id: str, task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") try: @@ -528,11 +463,11 @@ def create_table_in_data_raw(conn_id: str, task_logger: Logger) -> SocrataTableM f"CREATE TABLE data_raw.{table_name} (LIKE data_raw.temp_{table_name} INCLUDING ALL);" ) conn.commit() + return True except Exception as e: print( f"Failed to create data_raw table {table_name} from temp_{table_name}. Error: {e}, {type(e)}" ) - return socrata_metadata @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -551,14 +486,14 @@ def dbt_data_raw_model_exists(task_logger: Logger) -> str: @task(retries=1) -def make_dbt_data_raw_model(conn_id: str, task_logger: Logger) -> SocrataTableMetadata: +def make_dbt_data_raw_model(conn_id: str, task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") make_dbt_data_raw_model_file( table_name=socrata_metadata.table_name, engine=get_pg_engine(conn_id=conn_id) ) log_as_info(task_logger, f"Leaving make_dbt_data_raw_model") - return socrata_metadata + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -592,7 +527,7 @@ def register_data_raw_table_asset(datasource_name: str, task_logger: Logger) -> @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def update_result_of_check_in_metadata_table( conn_id: str, task_logger: Logger, data_updated: bool -) -> SocrataTableMetadata: +) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") log_as_info( @@ -603,7 +538,8 @@ def update_result_of_check_in_metadata_table( engine=get_pg_engine(conn_id=conn_id), update_payload={"data_pulled_this_check": data_updated}, ) - return socrata_metadata + context["ti"].xcom_push(key="socrata_metadata_key", value=socrata_metadata) + return True @task_group @@ -722,7 +658,7 @@ def dbt_clean_model_ready(task_logger: Logger) -> str: @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def dbt_make_clean_model(task_logger: Logger) -> SocrataTableMetadata: +def dbt_make_clean_model(task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") airflow_home = os.environ["AIRFLOW_HOME"] @@ -735,11 +671,11 @@ def dbt_make_clean_model(task_logger: Logger) -> SocrataTableMetadata: clean_file_lines = format_dbt_stub_for_clean_stage(table_name=socrata_metadata.table_name) log_as_info(task_logger, f"clean_file_lines: {clean_file_lines}") write_lines_to_file(file_lines=clean_file_lines, file_path=clean_file_path) - return socrata_metadata + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def run_dbt_models__standardized_onward(task_logger: Logger) -> SocrataTableMetadata: +def run_dbt_models__standardized_onward(task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") dbt_cmd = format_dbt_run_cmd( @@ -747,8 +683,7 @@ def run_dbt_models__standardized_onward(task_logger: Logger) -> SocrataTableMeta schema="standardized", run_downstream=True, ) - result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) - return socrata_metadata + return execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -798,7 +733,7 @@ def update_socrata_table( conn_id: str, datasource_name: str, task_logger: Logger, -) -> SocrataTableMetadata: +) -> None: log_as_info(task_logger, f"Updating Socrata table {socrata_table.table_name}") metadata_1 = check_table_metadata( @@ -808,11 +743,7 @@ def update_socrata_table( conn_id=conn_id, task_logger=task_logger ) extract_data_1 = download_fresh_data(task_logger=task_logger) - load_data_tg_1 = load_data_tg( - socrata_metadata=extract_data_1, - conn_id=conn_id, - task_logger=task_logger, - ) + load_data_tg_1 = load_data_tg(conn_id=conn_id, task_logger=task_logger) raw_data_validation_tg_1 = raw_data_validation_tg( datasource_name=datasource_name, task_logger=task_logger ) From e92fc5df100e68ae063eeea974672a6ac08d6f5e Mon Sep 17 00:00:00 2001 From: matttriano Date: Fri, 13 Dec 2024 15:33:09 -0600 Subject: [PATCH 5/8] Addresses datetime deprecation warning. --- airflow/dags/cc_utils/census/api.py | 2 +- airflow/dags/cc_utils/census/ftp.py | 2 +- airflow/dags/cc_utils/socrata.py | 7 ++-- ...ate_census_api_dataset_metadata_catalog.py | 2 +- airflow/dags/tasks/socrata_tasks.py | 36 +++++++++---------- airflow/dags/tasks/tiger_tasks.py | 2 +- 6 files changed, 25 insertions(+), 26 deletions(-) diff --git a/airflow/dags/cc_utils/census/api.py b/airflow/dags/cc_utils/census/api.py index 1352039..5e77c11 100644 --- a/airflow/dags/cc_utils/census/api.py +++ b/airflow/dags/cc_utils/census/api.py @@ -179,7 +179,7 @@ def __init__(self, dataset_base_url: str): self.geographies_df = get_dataset_geography_metadata(geog_url=self.geographies_url) self.groups_df = get_dataset_groups_metadata(groups_url=self.groups_url) self.tags_df = get_dataset_tags_metadata(tags_url=self.tags_url) - self.time_of_check = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + self.time_of_check = dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ") def set_dataset_metadata_urls(self): if (self.metadata_catalog_df["dataset_base_url"] == self.base_url).sum() == 0: diff --git a/airflow/dags/cc_utils/census/ftp.py b/airflow/dags/cc_utils/census/ftp.py index 7f6a7c7..2f4328f 100644 --- a/airflow/dags/cc_utils/census/ftp.py +++ b/airflow/dags/cc_utils/census/ftp.py @@ -22,7 +22,7 @@ class CensusTableMetadata: def __init__(self, metadata_url: str = BASE_URL): self.metadata_url = re.sub("/$", "", metadata_url) self.page_metadata = self.get_page_metadata() - self.time_of_check = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + self.time_of_check = dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ") self.data_freshness_check = self.initialize_data_freshness_check() def parse_census_metadata_page(self, resp: requests.models.Response) -> pd.DataFrame: diff --git a/airflow/dags/cc_utils/socrata.py b/airflow/dags/cc_utils/socrata.py index 173e15a..91f8fab 100644 --- a/airflow/dags/cc_utils/socrata.py +++ b/airflow/dags/cc_utils/socrata.py @@ -4,14 +4,13 @@ from logging import Logger import re from pathlib import Path -from typing import Dict, Optional, Union +from typing import Any, Dict, Optional, Union import pandas as pd import requests from sqlalchemy.engine.base import Engine from sqlalchemy import select, insert, update -# for airflow container from cc_utils.db import ( execute_result_returning_query, get_reflected_db_table, @@ -47,7 +46,7 @@ def get_table_metadata(self) -> Dict: response_json = response.json() metadata = { "_id": self.table_id, - "time_of_collection": dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "time_of_collection": dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ"), } metadata.update(response_json["results"][0]) return metadata @@ -260,7 +259,7 @@ def get_prior_metadata_checks_from_db(self, engine: Engine) -> pd.DataFrame: ) return results_df - def initialize_data_freshness_check_record(self) -> None: + def initialize_data_freshness_check_record(self) -> dict[str, Any]: """There's probably a better name for this idea than 'table_check_metadata'. The goal is to see if fresh data is available, log the results of that freshness-check in the dwh, and then triger data refreshing if appropriate.""" diff --git a/airflow/dags/metadata/update_census_api_dataset_metadata_catalog.py b/airflow/dags/metadata/update_census_api_dataset_metadata_catalog.py index 6c64ed9..600dc05 100644 --- a/airflow/dags/metadata/update_census_api_dataset_metadata_catalog.py +++ b/airflow/dags/metadata/update_census_api_dataset_metadata_catalog.py @@ -20,7 +20,7 @@ @task def request_and_ingest_dataset_metadata_catalog(conn_id: str, task_logger: Logger) -> str: catalog_df = get_dataset_metadata_catalog(dataset_base_url="https://api.census.gov/data.json") - catalog_df["time_of_check"] = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + catalog_df["time_of_check"] = dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ") task_logger.info(f"Datasets in Census API Dataset Metadata Catalog: {len(catalog_df)}") engine = get_pg_engine(conn_id=conn_id) api_dataset_metadata_table = get_reflected_db_table( diff --git a/airflow/dags/tasks/socrata_tasks.py b/airflow/dags/tasks/socrata_tasks.py index b7676af..9cb15ad 100644 --- a/airflow/dags/tasks/socrata_tasks.py +++ b/airflow/dags/tasks/socrata_tasks.py @@ -14,24 +14,24 @@ from cc_utils.cleanup import standardize_column_names from cc_utils.db import ( - get_pg_engine, - get_data_table_names_in_schema, execute_structural_command, + get_data_table_names_in_schema, + get_pg_engine, ) from cc_utils.file_factory import ( - make_dbt_data_raw_model_file, - write_lines_to_file, format_dbt_stub_for_standardized_stage, format_dbt_stub_for_clean_stage, + make_dbt_data_raw_model_file, + write_lines_to_file, ) from cc_utils.socrata import SocrataTable, SocrataTableMetadata -from cc_utils.transform import format_dbt_run_cmd, execute_dbt_cmd +from cc_utils.transform import execute_dbt_cmd, format_dbt_run_cmd from cc_utils.utils import ( - get_local_data_raw_dir, get_lines_in_geojson_file, + get_local_data_raw_dir, get_task_group_id_prefix, - produce_slice_indices_for_gpd_read_file, log_as_info, + produce_slice_indices_for_gpd_read_file, ) from cc_utils.validation import ( run_checkpoint, @@ -135,9 +135,9 @@ def download_fresh_data(task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") output_file_path = get_local_file_path(socrata_metadata=socrata_metadata) - log_as_info(task_logger, f"Started downloading data at {dt.datetime.utcnow()} UTC") + log_as_info(task_logger, f"Started downloading data at {dt.datetime.now(dt.UTC)} UTC") urlretrieve(url=socrata_metadata.data_download_url, filename=output_file_path) - log_as_info(task_logger, f"Finished downloading data at {dt.datetime.utcnow()} UTC") + log_as_info(task_logger, f"Finished downloading data at {dt.datetime.now(dt.UTC)} UTC") return True @@ -168,13 +168,13 @@ def drop_temp_table(route_str: str, conn_id: str, task_logger: Logger) -> bool: query=f"DROP TABLE IF EXISTS {full_temp_table_name} CASCADE;", engine=engine, ) - return True except Exception as e: raise Exception(f"Failed to drop temp table {full_temp_table_name}. Error: {e}, {type(e)}") + return True @task -def create_temp_data_raw_table(conn_id: str, task_logger: Logger) -> None: +def create_temp_data_raw_table(conn_id: str, task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") table_name = f"temp_{socrata_metadata.table_name}" @@ -207,9 +207,9 @@ def create_temp_data_raw_table(conn_id: str, task_logger: Logger) -> None: table_create_obj = a_table._create_table_setup() table_create_obj.create(bind=engine) log_as_info(task_logger, f"Successfully created table 'data_raw.{table_name}'") - return True else: raise FileNotFoundError(f"File not found in expected location.") + return True @task @@ -258,9 +258,9 @@ def ingest_csv_data(conn_id: str, task_logger: Logger) -> bool: log_as_info( task_logger, f"Successfully ingested csv data into {full_temp_table_name} via COPY." ) - return True except Exception as e: log_as_info(task_logger, f"Failed to ingest flat file to temp table. Error: {e}, {type(e)}") + return True @task_group @@ -322,12 +322,12 @@ def ingest_geojson_data( task_logger, f"Successfully ingested records {start_index} to {end_index} using gpd.to_postgis()", ) - return True except Exception as e: task_logger.error( f"Failed to ingest geojson file to temp table. Error: {e}, {type(e)}", exc_info=True, ) + return True @task_group @@ -463,11 +463,11 @@ def create_table_in_data_raw(conn_id: str, task_logger: Logger) -> bool: f"CREATE TABLE data_raw.{table_name} (LIKE data_raw.temp_{table_name} INCLUDING ALL);" ) conn.commit() - return True except Exception as e: print( f"Failed to create data_raw table {table_name} from temp_{table_name}. Error: {e}, {type(e)}" ) + return True @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -511,7 +511,7 @@ def update_data_raw_table(task_logger: Logger) -> str: @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def register_data_raw_table_asset(datasource_name: str, task_logger: Logger) -> str: +def register_data_raw_table_asset(datasource_name: str, task_logger: Logger) -> bool: context = get_current_context() socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") datasource = get_datasource(datasource_name=datasource_name, task_logger=task_logger) @@ -687,9 +687,9 @@ def run_dbt_models__standardized_onward(task_logger: Logger) -> bool: @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def endpoint(task_logger: Logger) -> None: +def endpoint(task_logger: Logger) -> bool: log_as_info(task_logger, "Ending run") - return "end" + return True @task_group diff --git a/airflow/dags/tasks/tiger_tasks.py b/airflow/dags/tasks/tiger_tasks.py index eda2172..f4574bc 100644 --- a/airflow/dags/tasks/tiger_tasks.py +++ b/airflow/dags/tasks/tiger_tasks.py @@ -85,7 +85,7 @@ def record_source_freshness_check( { "dataset_name": [tiger_dataset.dataset_name], "source_data_last_modified": [entity_vintage["last_modified"].max()], - "time_of_check": [dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")], + "time_of_check": [dt.datetime.now(dt.UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")], } ) log_as_info(task_logger, f"dataset name: {freshness_check_record['dataset_name']}") From 92478eb43f5f84393ebd218eb6527e1822aeb78b Mon Sep 17 00:00:00 2001 From: matttriano Date: Fri, 13 Dec 2024 23:08:57 -0600 Subject: [PATCH 6/8] Adding models datasets previously left in data_raw. --- airflow/dags/run_a_dbt_run_command.py | 42 ++----- ...transform_data_tg_for_a_socrata_dataset.py | 116 ++++++++++++++++++ .../clean/chicago_cta_train_lines_clean.sql | 18 +++ .../chicago_street_center_lines_clean.sql | 24 ++++ ...o_vacant_and_abandoned_buildings_clean.sql | 4 +- .../data_raw/chicago_cta_train_lines.sql | 59 +++++++++ .../chicago_crimes_standardized.sql | 7 +- .../chicago_cta_train_lines_standardized.sql | 28 +++++ .../chicago_police_stations_standardized.sql | 6 +- ...icago_street_center_lines_standardized.sql | 74 +++++++++++ ...t_and_abandoned_buildings_standardized.sql | 9 +- 11 files changed, 346 insertions(+), 41 deletions(-) create mode 100644 airflow/dags/run_transform_data_tg_for_a_socrata_dataset.py create mode 100644 airflow/dbt/models/clean/chicago_cta_train_lines_clean.sql create mode 100644 airflow/dbt/models/clean/chicago_street_center_lines_clean.sql create mode 100644 airflow/dbt/models/data_raw/chicago_cta_train_lines.sql create mode 100644 airflow/dbt/models/standardized/chicago_cta_train_lines_standardized.sql create mode 100644 airflow/dbt/models/standardized/chicago_street_center_lines_standardized.sql diff --git a/airflow/dags/run_a_dbt_run_command.py b/airflow/dags/run_a_dbt_run_command.py index 4f65f3e..fe032ac 100644 --- a/airflow/dags/run_a_dbt_run_command.py +++ b/airflow/dags/run_a_dbt_run_command.py @@ -8,44 +8,24 @@ from airflow.decorators import dag, task from airflow.utils.trigger_rule import TriggerRule -from sources.tables import NYC_PARCEL_SALES as SOCRATA_TABLE +from sources.tables import CHICAGO_POLICE_DISTRICT_BOUNDARIES as SOCRATA_TABLE from cc_utils.utils import log_as_info +from cc_utils.transform import execute_dbt_cmd task_logger = logging.getLogger("airflow.task") -# REFERENCE -# Running a model with all of its downstream models -# dbt_cmd = f"""cd /opt/airflow/dbt && \ -# dbt --warn-error run --select \ -# re_dbt.dwh.{table_name}_fact+""" -# Running all models in the `dwh` level -# dbt_cmd = f"""cd /opt/airflow/dbt && -# dbt --warn-error run --select -# re_dbt.dwh.*""" - -@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def run_specific_dbt_model_for_a_data_set(table_name: str, task_logger: Logger) -> None: +@task +def run_specific_dbt_model_for_a_data_set(table_name: str, task_logger: Logger) -> bool: + dbt_cmd = f"""cd /opt/airflow/dbt && \ + dbt --warn-error run --full-refresh --select \ + re_dbt.standardized.{table_name}_standardized""" + result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) dbt_cmd = f"""cd /opt/airflow/dbt && \ dbt --warn-error run --full-refresh --select \ - re_dbt.standardized.{table_name}_standardized+""" - task_logger.info(f"dbt run command: {dbt_cmd}") - try: - subproc_output = subprocess.run( - dbt_cmd, shell=True, capture_output=True, text=True, check=False - ) - log_as_info(task_logger, f"subproc_output.stderr: {subproc_output.stderr}") - log_as_info(task_logger, f"subproc_output.stdout: {subproc_output.stdout}") - raise_exception = False - for el in subproc_output.stdout.split("\n"): - log_as_info(task_logger, f"{el}") - if re.search("(\\d* of \\d* ERROR)", el): - raise_exception = True - if raise_exception: - raise Exception("dbt model failed. Review the above outputs") - except subprocess.CalledProcessError as err: - log_as_info(task_logger, f"Error {err} while running dbt models. {type(err)}") - raise + re_dbt.clean.{table_name}_clean""" + result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) + return result @dag( diff --git a/airflow/dags/run_transform_data_tg_for_a_socrata_dataset.py b/airflow/dags/run_transform_data_tg_for_a_socrata_dataset.py new file mode 100644 index 0000000..6f2ff8b --- /dev/null +++ b/airflow/dags/run_transform_data_tg_for_a_socrata_dataset.py @@ -0,0 +1,116 @@ +import datetime as dt +import logging +import os +from logging import Logger +from pathlib import Path + +from cc_utils.utils import get_task_group_id_prefix +from cc_utils.transform import execute_dbt_cmd +from sources.tables import CHICAGO_VACANT_AND_ABANDONED_BUILDINGS as SOCRATA_TABLE +from tasks.socrata_tasks import ( + dbt_make_clean_model, + dbt_standardized_model_ready, + endpoint, + get_socrata_table_metadata, + highlight_unfinished_dbt_standardized_stub, + log_as_info, + make_dbt_standardized_model, +) + +from airflow.decorators import dag, task, task_group +from airflow.models.baseoperator import chain +from airflow.operators.python import get_current_context +from airflow.utils.edgemodifier import Label +from airflow.utils.trigger_rule import TriggerRule + +task_logger = logging.getLogger("airflow.task") + + +@task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) +def dbt_clean_model_ready(task_logger: Logger) -> str: + context = get_current_context() + task_group_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + socrata_metadata = context["ti"].xcom_pull(key="socrata_metadata_key") + airflow_home = os.environ["AIRFLOW_HOME"] + file_path = Path(airflow_home).joinpath( + "dbt", + "models", + "clean", + f"{socrata_metadata.table_name}_clean.sql", + ) + if file_path.is_file(): + log_as_info(task_logger, f"headed to {task_group_id_prefix}run_dbt_std_model") + return f"{task_group_id_prefix}run_dbt_std_model" + else: + return f"{task_group_id_prefix}dbt_make_clean_model" + + +@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) +def run_dbt_std_model(table_name: str, task_logger: Logger) -> bool: + dbt_cmd = f"""cd /opt/airflow/dbt && \ + dbt --warn-error run --select \ + re_dbt.standardized.{table_name}_standardized""" + result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) + return result + + +@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) +def run_dbt_clean_model(table_name: str, task_logger: Logger) -> bool: + dbt_cmd = f"""cd /opt/airflow/dbt && \ + dbt --warn-error run --select \ + re_dbt.clean.{table_name}_clean""" + result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) + return result + + +@task_group +def transform_data_tg(socrata_table, conn_id: str, task_logger: Logger): + std_model_ready_1 = dbt_standardized_model_ready(task_logger=task_logger) + highlight_std_stub_1 = highlight_unfinished_dbt_standardized_stub(task_logger=task_logger) + make_std_model_1 = make_dbt_standardized_model(conn_id=conn_id, task_logger=task_logger) + clean_model_ready_1 = dbt_clean_model_ready(task_logger=task_logger) + make_clean_model_1 = dbt_make_clean_model(task_logger=task_logger) + # run_dbt_models_1 = run_dbt_models__standardized_onward(task_logger=task_logger) + run_dbt_std_model_1 = run_dbt_std_model( + table_name=socrata_table.table_name, task_logger=task_logger + ) + run_dbt_clean_model_1 = run_dbt_clean_model( + table_name=socrata_table.table_name, task_logger=task_logger + ) + endpoint_1 = endpoint(task_logger=task_logger) + + chain( + std_model_ready_1, + [ + Label("No dbt _standardized model found"), + Label("dbt _standardized model needs review"), + Label("dbt _standardized model looks good"), + ], + [make_std_model_1, highlight_std_stub_1, clean_model_ready_1], + ) + chain([make_std_model_1, highlight_std_stub_1], endpoint_1) + chain( + clean_model_ready_1, + [Label("dbt _clean model looks good!"), make_clean_model_1], + run_dbt_std_model_1, + run_dbt_clean_model_1, + endpoint_1, + ) + + +@dag( + schedule=SOCRATA_TABLE.schedule, + start_date=dt.datetime(2022, 11, 1), + catchup=False, + tags=["dbt", "utility"], +) +def run_socrata_transform_data_tg(): + metadata_1 = get_socrata_table_metadata(socrata_table=SOCRATA_TABLE, task_logger=task_logger) + transform_data_1 = transform_data_tg( + socrata_table=SOCRATA_TABLE, conn_id="dwh_db_conn", task_logger=task_logger + ) + + chain(metadata_1, transform_data_1) + + +run_socrata_transform_data_tg() diff --git a/airflow/dbt/models/clean/chicago_cta_train_lines_clean.sql b/airflow/dbt/models/clean/chicago_cta_train_lines_clean.sql new file mode 100644 index 0000000..569381e --- /dev/null +++ b/airflow/dbt/models/clean/chicago_cta_train_lines_clean.sql @@ -0,0 +1,18 @@ +{% set dataset_name = "chicago_cta_train_lines" %} +{% set ck_cols = ["description"] %} +{% set record_id = "description" %} +{% set base_cols = [ + "description", "legend", "type", "lines", "shape_len", "geometry", "source_data_updated", + "ingestion_check_time" +] %} +{% set updated_at_col = "source_data_updated" %} + +{% set query = generate_clean_stage_incremental_dedupe_query( + dataset_name=dataset_name, + record_id=record_id, + ck_cols=ck_cols, + base_cols=base_cols, + updated_at_col=updated_at_col +) %} + +{{ query }} diff --git a/airflow/dbt/models/clean/chicago_street_center_lines_clean.sql b/airflow/dbt/models/clean/chicago_street_center_lines_clean.sql new file mode 100644 index 0000000..e0f2e29 --- /dev/null +++ b/airflow/dbt/models/clean/chicago_street_center_lines_clean.sql @@ -0,0 +1,24 @@ +{% set dataset_name = "chicago_street_center_lines" %} +{% set ck_cols = ["objectid"] %} +{% set record_id = "objectid" %} +{% set base_cols = [ + "objectid", "pre_dir", "street_name", "street_type", "suffix_dir", "oneway_dir", "dir_travel", + "tiered", "left_to_addr", "left_from_add", "left_parity", "left_zip", "left_fips", + "left_censusbl", "right_from_add", "right_to_addr", "right_parity", "right_zip", "right_fips", + "right_censusbl", "to_zlev", "from_cross", "from_cross_st", "to_cross", "to_cross_st", + "logic_left_from", "logic_left_to", "logic_right_from", "logic_right_to", "ewns", "ewns_dir", + "ewns_coord", "class", "street_id", "from_node_id", "to_node_id", "trans_id", "flag_string", + "status", "status_date", "create_use", "create_time", "edit_type", "edit_date", "update_use", + "update_time", "length", "shape_len", "geometry", "source_data_updated", "ingestion_check_time" +] %} +{% set updated_at_col = "source_data_updated" %} + +{% set query = generate_clean_stage_incremental_dedupe_query( + dataset_name=dataset_name, + record_id=record_id, + ck_cols=ck_cols, + base_cols=base_cols, + updated_at_col=updated_at_col +) %} + +{{ query }} diff --git a/airflow/dbt/models/clean/chicago_vacant_and_abandoned_buildings_clean.sql b/airflow/dbt/models/clean/chicago_vacant_and_abandoned_buildings_clean.sql index 63c0454..0cc2fdb 100644 --- a/airflow/dbt/models/clean/chicago_vacant_and_abandoned_buildings_clean.sql +++ b/airflow/dbt/models/clean/chicago_vacant_and_abandoned_buildings_clean.sql @@ -1,8 +1,8 @@ {% set dataset_name = "chicago_vacant_and_abandoned_buildings" %} -{% set ck_cols = ["violation_number", "property_address", "entity_or_person_s_"] %} +{% set ck_cols = ["violation_number", "property_address", "entity_or_person"] %} {% set record_id = "vacant_bldg_violation_id" %} {% set base_cols = [ - "vacant_bldg_violation_id", "violation_number", "property_address", "entity_or_persons", + "vacant_bldg_violation_id", "violation_number", "property_address", "entity_or_person", "issued_date", "violation_type", "docket_number", "last_hearing_date", "issuing_department", "disposition_description", "total_fines", "interest_amount", "total_administrative_costs", "original_total_amount_due", "collection_costs_or_attorney_fees", "court_cost", "total_paid", diff --git a/airflow/dbt/models/data_raw/chicago_cta_train_lines.sql b/airflow/dbt/models/data_raw/chicago_cta_train_lines.sql new file mode 100644 index 0000000..570fe74 --- /dev/null +++ b/airflow/dbt/models/data_raw/chicago_cta_train_lines.sql @@ -0,0 +1,59 @@ +{{ config(materialized='table') }} +{% set source_cols = [ + "description", "legend", "type", "lines", "shape_len", "geometry" +] %} +{% set metadata_cols = ["source_data_updated", "ingestion_check_time"] %} + +-- selecting all records already in the full data_raw table +WITH records_in_data_raw_table AS ( + SELECT *, 1 AS retention_priority + FROM {{ source('data_raw', 'chicago_cta_train_lines') }} +), + +-- selecting all distinct records from the latest data pull (in the "temp" table) +current_pull_with_distinct_combos_numbered AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + ) as rn + FROM {{ source('data_raw', 'temp_chicago_cta_train_lines') }} +), +distinct_records_in_current_pull AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }},{% endfor %} + 2 AS retention_priority + FROM current_pull_with_distinct_combos_numbered + WHERE rn = 1 +), + +-- stacking the existing data with all distinct records from the latest pull +data_raw_table_with_all_new_and_updated_records AS ( + SELECT * + FROM records_in_data_raw_table + UNION ALL + SELECT * + FROM distinct_records_in_current_pull +), + +-- selecting records that where source columns are distinct (keeping the earlier recovery +-- when there are duplicates to chose from) +data_raw_table_with_new_and_updated_records AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }}{{ "," if not loop.last }}{% endfor %} + ORDER BY retention_priority + ) as rn + FROM data_raw_table_with_all_new_and_updated_records +), +distinct_records_for_data_raw_table AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + FROM data_raw_table_with_new_and_updated_records + WHERE rn = 1 +) + +SELECT * +FROM distinct_records_for_data_raw_table diff --git a/airflow/dbt/models/standardized/chicago_crimes_standardized.sql b/airflow/dbt/models/standardized/chicago_crimes_standardized.sql index 192b198..2f9ef6c 100644 --- a/airflow/dbt/models/standardized/chicago_crimes_standardized.sql +++ b/airflow/dbt/models/standardized/chicago_crimes_standardized.sql @@ -7,7 +7,7 @@ WITH records_with_basic_cleaning AS ( id::bigint AS id, upper(case_number::text) AS case_number, date::timestamptz - AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS date + AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS date, updated_on::timestamptz AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS updated_on, upper(primary_type::text) AS primary_type, @@ -26,7 +26,10 @@ WITH records_with_basic_cleaning AS ( longitude::double precision AS longitude, x_coordinate::bigint AS x_coordinate, y_coordinate::bigint AS y_coordinate, - geometry::GEOMETRY(POINT, 4326) AS geometry, + CASE + WHEN ST_IsEmpty(geometry) THEN NULL + ELSE geometry::geometry(Point, 4326) + END AS geometry, source_data_updated::timestamptz AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS source_data_updated, ingestion_check_time::timestamptz diff --git a/airflow/dbt/models/standardized/chicago_cta_train_lines_standardized.sql b/airflow/dbt/models/standardized/chicago_cta_train_lines_standardized.sql new file mode 100644 index 0000000..4ba2809 --- /dev/null +++ b/airflow/dbt/models/standardized/chicago_cta_train_lines_standardized.sql @@ -0,0 +1,28 @@ +{{ config(materialized='view') }} +{% set ck_cols = ["description"] %} +{% set record_id = "description" %} + +WITH records_with_basic_cleaning AS ( + SELECT + upper(description::text) AS description, + upper(legend::text) AS legend, + upper(type::text) AS type, + upper(lines::text) AS lines, + upper(shape_len::text) AS shape_len, + geometry::GEOMETRY(MULTILINESTRING,4326) AS geometry, + source_data_updated::timestamptz + AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS source_data_updated, + ingestion_check_time::timestamptz + AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS ingestion_check_time + FROM {{ ref('chicago_cta_train_lines') }} + ORDER BY {% for ck in ck_cols %}{{ ck }}{{ "," if not loop.last }}{% endfor %} +) + + +SELECT + {% if ck_cols|length > 1 %} + {{ dbt_utils.generate_surrogate_key(ck_cols) }} AS {{ record_id }}, + {% endif %} + a.* +FROM records_with_basic_cleaning AS a +ORDER BY {% for ck in ck_cols %}{{ ck }},{% endfor %} source_data_updated diff --git a/airflow/dbt/models/standardized/chicago_police_stations_standardized.sql b/airflow/dbt/models/standardized/chicago_police_stations_standardized.sql index d919969..bda8dd3 100644 --- a/airflow/dbt/models/standardized/chicago_police_stations_standardized.sql +++ b/airflow/dbt/models/standardized/chicago_police_stations_standardized.sql @@ -8,11 +8,11 @@ WITH records_with_basic_cleaning AS ( CASE WHEN upper(district) = 'HEADQUARTERS' THEN 'HQ' ELSE lpad(upper(district::char(2)), 2, '0') - END AS district, - upper(address::text) AS address, + END AS district, + upper(address::text) AS address, upper(city::text) AS city, upper(zip::text) AS zip, - upper(state::text) AS state, + upper(state::text) AS state, upper(phone::text) AS phone, upper(tty::text) AS tty, upper(fax::text) AS fax, diff --git a/airflow/dbt/models/standardized/chicago_street_center_lines_standardized.sql b/airflow/dbt/models/standardized/chicago_street_center_lines_standardized.sql new file mode 100644 index 0000000..7d6538a --- /dev/null +++ b/airflow/dbt/models/standardized/chicago_street_center_lines_standardized.sql @@ -0,0 +1,74 @@ +{{ config(materialized='view') }} +{% set ck_cols = ["objectid"] %} +{% set record_id = "objectid" %} + +WITH records_with_basic_cleaning AS ( + SELECT + objectid::bigint AS objectid, + upper(pre_dir::text) AS pre_dir, + upper(street_nam::text) AS street_name, + upper(street_typ::text) AS street_type, + upper(suf_dir::text) AS suffix_dir, + upper(oneway_dir::text) AS oneway_dir, + upper(dir_travel::text) AS dir_travel, + upper(tiered::text) AS tiered, + l_t_add::int AS left_to_addr, + l_f_add::int AS left_from_add, + upper(l_parity::text) AS left_parity, + upper(l_zip::text) AS left_zip, + upper(l_fips::text) AS left_fips, + upper(l_censusbl::text) AS left_censusbl, + r_f_add::int AS right_from_add, + r_t_add::int AS right_to_addr, + upper(r_parity::text) AS right_parity, + upper(r_zip::text) AS right_zip, + upper(r_fips::text) AS right_fips, + upper(r_censusbl::text) AS right_censusbl, + upper(t_zlev::text) AS to_zlev, + upper(f_cross::text) AS from_cross, + upper(f_cross_st::text) AS from_cross_st, + upper(t_cross::text) AS to_cross, + upper(t_cross_st::text) AS to_cross_st, + upper(logiclf::text) AS logic_left_from, + upper(logiclt::text) AS logic_left_to, + upper(logicrf::text) AS logic_right_from, + upper(logicrt::text) AS logic_right_to, + ewns::int AS ewns, + upper(ewns_dir::text) AS ewns_dir, + ewns_coord::int AS ewns_coord, + upper(class::text) AS class, + upper(streetname::text) AS street_id, + upper(fnode_id::text) AS from_node_id, + upper(tnode_id::text) AS to_node_id, + trans_id::int AS trans_id, + upper(flag_strin::text) AS flag_string, + upper(status::text) AS status, + status_dat::date AS status_date, + upper(create_use::text) AS create_use, + create_tim::date AS create_time, + upper(edit_type::text) AS edit_type, + case + when edit_date = '0' then NULL + else edit_date::date + end AS edit_date, + upper(update_use::text) AS update_use, + update_tim::date AS update_time, + length::double precision AS length, + shape_len::double precision AS shape_len, + geometry::GEOMETRY(GEOMETRY,4326) AS geometry, + source_data_updated::timestamptz + AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS source_data_updated, + ingestion_check_time::timestamptz + AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS ingestion_check_time + FROM {{ ref('chicago_street_center_lines') }} + ORDER BY {% for ck in ck_cols %}{{ ck }}{{ "," if not loop.last }}{% endfor %} +) + + +SELECT + {% if ck_cols|length > 1 %} + {{ dbt_utils.generate_surrogate_key(ck_cols) }} AS {{ record_id }}, + {% endif %} + a.* +FROM records_with_basic_cleaning AS a +ORDER BY {% for ck in ck_cols %}{{ ck }},{% endfor %} source_data_updated diff --git a/airflow/dbt/models/standardized/chicago_vacant_and_abandoned_buildings_standardized.sql b/airflow/dbt/models/standardized/chicago_vacant_and_abandoned_buildings_standardized.sql index 3c8ae6f..b801225 100644 --- a/airflow/dbt/models/standardized/chicago_vacant_and_abandoned_buildings_standardized.sql +++ b/airflow/dbt/models/standardized/chicago_vacant_and_abandoned_buildings_standardized.sql @@ -1,12 +1,12 @@ {{ config(materialized='view') }} -{% set ck_cols = ["violation_number", "property_address", "entity_or_person_s_"] %} +{% set ck_cols = ["violation_number", "property_address", "entity_or_person"] %} {% set record_id = "vacant_bldg_violation_id" %} WITH records_with_basic_cleaning AS ( SELECT upper(violation_number::text) AS violation_number, upper(property_address::text) AS property_address, - upper(entity_or_person_s_::text) AS entity_or_persons, + upper(entity_or_person_s_::text) AS entity_or_person, issued_date::date AS issued_date, upper(violation_type::text) AS violation_type, upper(docket_number::text) AS docket_number, @@ -24,7 +24,10 @@ WITH records_with_basic_cleaning AS ( current_amount_due::numeric(8,2) AS current_amount_due, latitude::double precision AS latitude, longitude::double precision AS longitude, - geometry::GEOMETRY(POINT, 4326) AS geometry, + CASE + WHEN ST_IsEmpty(geometry) THEN NULL + ELSE geometry::geometry(Point, 4326) + END AS geometry, source_data_updated::timestamptz AT TIME ZONE 'UTC' AT TIME ZONE 'America/Chicago' AS source_data_updated, ingestion_check_time::timestamptz From e2e1cbb3f73f8fb4ed2f4b40e297e75e0b136fec Mon Sep 17 00:00:00 2001 From: matttriano Date: Sat, 14 Dec 2024 14:04:10 -0600 Subject: [PATCH 7/8] Refactors task context accessing in TIGER taskflows. --- .../update_united_states_counties_2024.py | 28 +++++ airflow/dags/sources/tiger_datasets.py | 18 ++- airflow/dags/tasks/tiger_tasks.py | 111 +++++++++--------- airflow/dbt/models/data_raw/sources.yml | 2 + .../data_raw/united_states_counties_2024.sql | 61 ++++++++++ .../great_expectations/great_expectations.yml | 12 ++ 6 files changed, 171 insertions(+), 61 deletions(-) create mode 100644 airflow/dags/geography/update_united_states_counties_2024.py create mode 100644 airflow/dbt/models/data_raw/united_states_counties_2024.sql diff --git a/airflow/dags/geography/update_united_states_counties_2024.py b/airflow/dags/geography/update_united_states_counties_2024.py new file mode 100644 index 0000000..a391681 --- /dev/null +++ b/airflow/dags/geography/update_united_states_counties_2024.py @@ -0,0 +1,28 @@ +import datetime as dt +import logging + +from airflow.decorators import dag + +from tasks.tiger_tasks import update_tiger_table + +from sources.tiger_datasets import UNITED_STATES_COUNTIES_2024 as TIGER_DATASET + +task_logger = logging.getLogger("airflow.task") + + +@dag( + schedule=TIGER_DATASET.schedule, + start_date=dt.datetime(2022, 11, 1), + catchup=False, + tags=["census", "geospatial", "TIGER"], +) +def update_united_states_counties_2024(): + update_tiger_table( + tiger_dataset=TIGER_DATASET, + datasource_name="fluent_dwh_source", + conn_id="dwh_db_conn", + task_logger=task_logger, + ) + + +update_united_states_counties_2024() diff --git a/airflow/dags/sources/tiger_datasets.py b/airflow/dags/sources/tiger_datasets.py index 02da7bb..8f25e2b 100644 --- a/airflow/dags/sources/tiger_datasets.py +++ b/airflow/dags/sources/tiger_datasets.py @@ -11,7 +11,7 @@ vintage_year=2022, entity_name="AREAWATER", geography=COOK_COUNTY_CENSUS_TRACTS, - schedule="30 3 2 * *", + schedule="30 3 2 */3 *", ) COOK_COUNTY_ROADS_2022 = TIGERDataset( @@ -19,7 +19,7 @@ vintage_year=2022, entity_name="ROADS", geography=COOK_COUNTY_CENSUS_TRACTS, - schedule="25 3 2 * *", + schedule="25 3 2 */3 *", ) ILLINOIS_CENSUS_TRACTS_2022 = TIGERDataset( @@ -27,7 +27,7 @@ vintage_year=2022, entity_name="TRACT", geography=ILLINOIS_CENSUS_TRACTS, - schedule="5 4 3 * *", + schedule="5 4 3 */3 *", ) UNITED_STATES_RAILS_2022 = TIGERDataset( @@ -35,7 +35,7 @@ vintage_year=2022, entity_name="RAILS", geography=ENTIRE_UNITED_STATES, - schedule="15 4 3 * *", + schedule="15 4 3 */3 *", ) UNITED_STATES_COUNTIES_2022 = TIGERDataset( @@ -43,5 +43,13 @@ vintage_year=2022, entity_name="COUNTY", geography=UNITED_STATES_COUNTIES, - schedule="25 4 3 * *", + schedule="25 4 3 */3 *", +) + +UNITED_STATES_COUNTIES_2024 = TIGERDataset( + base_dataset_name="united_states_counties", + vintage_year=2024, + entity_name="COUNTY", + geography=UNITED_STATES_COUNTIES, + schedule="35 4 3 */3 *", ) diff --git a/airflow/dags/tasks/tiger_tasks.py b/airflow/dags/tasks/tiger_tasks.py index f4574bc..56f6d79 100644 --- a/airflow/dags/tasks/tiger_tasks.py +++ b/airflow/dags/tasks/tiger_tasks.py @@ -5,19 +5,20 @@ from airflow.decorators import task, task_group from airflow.models.baseoperator import chain -from airflow.utils.edgemodifier import Label +from airflow.operators.python import get_current_context from airflow.providers.postgres.hooks.postgres import PostgresHook +from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule import pandas as pd from sqlalchemy import insert, update -from cc_utils.cleanup import standardize_column_names from cc_utils.census.tiger import ( TIGERCatalog, TIGERGeographicEntityVintage, TIGERDataset, TIGERDatasetFreshnessCheck, ) +from cc_utils.cleanup import standardize_column_names from cc_utils.db import ( get_pg_engine, get_data_table_names_in_schema, @@ -30,7 +31,7 @@ make_dbt_data_raw_model_file, ) from cc_utils.transform import format_dbt_run_cmd, execute_dbt_cmd -from cc_utils.utils import log_as_info +from cc_utils.utils import get_task_group_id_prefix, log_as_info from cc_utils.validation import ( run_checkpoint, check_if_checkpoint_exists, @@ -54,7 +55,8 @@ def get_tiger_catalog(task_logger: Logger) -> TIGERCatalog: @task def get_entity_vintage_metadata( tiger_dataset: TIGERDataset, tiger_catalog: TIGERCatalog, task_logger: Logger -) -> TIGERGeographicEntityVintage: +) -> bool: + context = get_current_context() vintages = TIGERGeographicEntityVintage( entity_name=tiger_dataset.entity_name, year=tiger_dataset.vintage_year, @@ -70,16 +72,18 @@ def get_entity_vintage_metadata( ) entity_vintage = vintages.get_entity_file_metadata(geography=tiger_dataset.geography) log_as_info(task_logger, f"entities after filtering: {len(entity_vintage)}") - return vintages + context["ti"].xcom_push(key="entity_vintage_key", value=vintages) + return True @task def record_source_freshness_check( tiger_dataset: TIGERDataset, - vintages: TIGERGeographicEntityVintage, conn_id: str, task_logger: Logger, ) -> pd.DataFrame: + context = get_current_context() + vintages = context["ti"].xcom_pull(key="entity_vintage_key") entity_vintage = vintages.get_entity_file_metadata(geography=tiger_dataset.geography) freshness_check_record = pd.DataFrame( { @@ -145,25 +149,25 @@ def get_latest_local_freshness_check( @task -def organize_freshness_check_results(task_logger: Logger, **kwargs) -> TIGERDatasetFreshnessCheck: - ti = kwargs["ti"] +def organize_freshness_check_results(task_logger: Logger) -> bool: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) freshness_check = TIGERDatasetFreshnessCheck( - source_freshness=ti.xcom_pull( - task_ids="update_tiger_table.check_freshness.record_source_freshness_check" + source_freshness=context["ti"].xcom_pull( + task_ids=f"{tg_id_prefix}record_source_freshness_check" ), - local_freshness=ti.xcom_pull( - task_ids="update_tiger_table.check_freshness.get_latest_local_freshness_check" + local_freshness=context["ti"].xcom_pull( + task_ids=f"{tg_id_prefix}get_latest_local_freshness_check" ), ) log_as_info(task_logger, f"Source_freshness records: {len(freshness_check.source_freshness)}") log_as_info(task_logger, f"local_freshness records: {len(freshness_check.local_freshness)}") - return freshness_check + context["ti"].xcom_push(key="freshness_check_key", value=freshness_check) + return True @task_group -def check_freshness( - tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger -) -> TIGERDatasetFreshnessCheck: +def check_freshness(tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger) -> None: tiger_catalog = get_tiger_catalog(task_logger=task_logger) local_dataset_freshness = get_latest_local_freshness_check( tiger_dataset=tiger_dataset, conn_id=conn_id, task_logger=task_logger @@ -173,22 +177,19 @@ def check_freshness( ) source_dataset_freshness = record_source_freshness_check( tiger_dataset=tiger_dataset, - vintages=entity_vintages, conn_id=conn_id, task_logger=task_logger, ) freshness_check = organize_freshness_check_results(task_logger=task_logger) chain(local_dataset_freshness, freshness_check) - chain(source_dataset_freshness, freshness_check) - return freshness_check + chain(entity_vintages, source_dataset_freshness, freshness_check) @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def fresher_source_data_available(task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - freshness_check = ti.xcom_pull( - task_ids="update_tiger_table.check_freshness.organize_freshness_check_results" - ) +def fresher_source_data_available(task_logger: Logger) -> str: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) + freshness_check = context["ti"].xcom_pull(key="freshness_check_key") dataset_in_local_dwh = len(freshness_check.local_freshness) > 0 log_as_info(task_logger, f"Dataset in local dwh: {dataset_in_local_dwh}") @@ -200,26 +201,22 @@ def fresher_source_data_available(task_logger: Logger, **kwargs) -> str: log_as_info(task_logger, f"Source dataset last modified: {source_last_modified}") local_dataset_is_fresh = local_last_modified >= source_last_modified if local_dataset_is_fresh: - return "update_tiger_table.local_data_is_fresh" - return "update_tiger_table.request_and_ingest_fresh_data" + return f"{tg_id_prefix}local_data_is_fresh" + return f"{tg_id_prefix}request_and_ingest_fresh_data" @task -def local_data_is_fresh(task_logger: Logger): - return "hi" +def local_data_is_fresh(task_logger: Logger) -> bool: + return True @task def request_and_ingest_fresh_data( - tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger, **kwargs -): - ti = kwargs["ti"] - vintages = ti.xcom_pull( - task_ids="update_tiger_table.check_freshness.get_entity_vintage_metadata" - ) - freshness_check = ti.xcom_pull( - task_ids="update_tiger_table.check_freshness.organize_freshness_check_results" - ) + tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger +) -> bool: + context = get_current_context() + vintages = context["ti"].xcom_pull(key="entity_vintage_key") + freshness_check = context["ti"].xcom_pull(key="freshness_check_key") source_freshness = freshness_check.source_freshness log_as_info( task_logger, f"source_freshness: {source_freshness} (type: {type(source_freshness)})" @@ -240,15 +237,13 @@ def request_and_ingest_fresh_data( if_exists="replace", chunksize=100000, ) - return "ingested" + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def record_data_update(conn_id: str, task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - freshness_check = ti.xcom_pull( - task_ids="update_tiger_table.check_freshness.organize_freshness_check_results" - ) +def record_data_update(conn_id: str, task_logger: Logger) -> bool: + context = get_current_context() + freshness_check = context["ti"].xcom_pull(key="freshness_check_key") engine = get_pg_engine(conn_id=conn_id) dataset_id = freshness_check.source_freshness["id"].max() @@ -272,7 +267,7 @@ def record_data_update(conn_id: str, task_logger: Logger, **kwargs) -> str: query=f"""SELECT * FROM metadata.dataset_metadata WHERE id = {dataset_id}""", ) log_as_info(task_logger, f"General metadata record post-update: {post_update_record}") - return "success" + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -292,14 +287,16 @@ def register_temp_table_asset( @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def table_checkpoint_exists(tiger_dataset: TIGERDataset, task_logger: Logger) -> str: checkpoint_name = f"data_raw.temp_{tiger_dataset.dataset_name}" + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) if check_if_checkpoint_exists(checkpoint_name=checkpoint_name, task_logger=task_logger): log_as_info(task_logger, f"GE checkpoint for {checkpoint_name} exists") - return "update_tiger_table.raw_data_validation_tg.run_temp_table_checkpoint" + return f"{tg_id_prefix}run_temp_table_checkpoint" else: log_as_info( task_logger, f"GE checkpoint for {checkpoint_name} doesn't exist yet. Make it maybe?" ) - return "update_tiger_table.raw_data_validation_tg.validation_endpoint" + return f"{tg_id_prefix}validation_endpoint" @task @@ -348,16 +345,18 @@ def raw_data_validation_tg( @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def table_exists_in_data_raw(tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger) -> str: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) tables_in_data_raw_schema = get_data_table_names_in_schema( engine=get_pg_engine(conn_id=conn_id), schema_name="data_raw" ) log_as_info(task_logger, f"tables_in_data_raw_schema: {tables_in_data_raw_schema}") if tiger_dataset.dataset_name not in tables_in_data_raw_schema: log_as_info(task_logger, f"Table {tiger_dataset.dataset_name} not in data_raw; creating.") - return "update_tiger_table.persist_new_raw_data_tg.create_table_in_data_raw" + return f"{tg_id_prefix}create_table_in_data_raw" else: log_as_info(task_logger, f"Table {tiger_dataset.dataset_name} in data_raw; skipping.") - return "update_tiger_table.persist_new_raw_data_tg.dbt_data_raw_model_exists" + return f"{tg_id_prefix}dbt_data_raw_model_exists" @task @@ -381,23 +380,25 @@ def create_table_in_data_raw(tiger_dataset: TIGERDataset, conn_id: str, task_log @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def dbt_data_raw_model_exists(tiger_dataset: TIGERDataset, task_logger: Logger) -> str: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) dbt_data_raw_model_dir = Path(f"/opt/airflow/dbt/models/data_raw") log_as_info(task_logger, f"dbt data_raw model dir ('{dbt_data_raw_model_dir}')") log_as_info(task_logger, f"Dir exists? {dbt_data_raw_model_dir.is_dir()}") table_model_path = dbt_data_raw_model_dir.joinpath(f"{tiger_dataset.dataset_name}.sql") if table_model_path.is_file(): - return "update_tiger_table.persist_new_raw_data_tg.update_data_raw_table" + return f"{tg_id_prefix}update_data_raw_table" else: - return "update_tiger_table.persist_new_raw_data_tg.make_dbt_data_raw_model" + return f"{tg_id_prefix}make_dbt_data_raw_model" @task(retries=1) -def make_dbt_data_raw_model(tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger) -> str: +def make_dbt_data_raw_model(tiger_dataset: TIGERDataset, conn_id: str, task_logger: Logger) -> bool: make_dbt_data_raw_model_file( table_name=tiger_dataset.dataset_name, engine=get_pg_engine(conn_id=conn_id) ) log_as_info(task_logger, f"Leaving make_dbt_data_raw_model") - return "dbt_file_made" + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) @@ -415,7 +416,7 @@ def update_data_raw_table(tiger_dataset: TIGERDataset, task_logger: Logger) -> s @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def register_data_raw_table_asset( datasource_name: str, tiger_dataset: TIGERDataset, task_logger: Logger -) -> str: +) -> bool: datasource = get_datasource(datasource_name=datasource_name, task_logger=task_logger) register_data_asset( schema_name="data_raw", @@ -466,9 +467,7 @@ def update_tiger_table( freshness_check_1 = check_freshness( tiger_dataset=tiger_dataset, conn_id=conn_id, task_logger=task_logger ) - update_available_1 = fresher_source_data_available( - freshness_check=freshness_check_1, task_logger=task_logger - ) + update_available_1 = fresher_source_data_available(task_logger=task_logger) local_data_is_fresh_1 = local_data_is_fresh(task_logger=task_logger) update_data_1 = request_and_ingest_fresh_data( tiger_dataset=tiger_dataset, conn_id=conn_id, task_logger=task_logger diff --git a/airflow/dbt/models/data_raw/sources.yml b/airflow/dbt/models/data_raw/sources.yml index aaba667..c8ed57b 100644 --- a/airflow/dbt/models/data_raw/sources.yml +++ b/airflow/dbt/models/data_raw/sources.yml @@ -95,6 +95,8 @@ sources: - name: temp_illinois_census_tracts_2022 - name: temp_nyc_parcel_sales - name: temp_united_states_counties_2022 + - name: temp_united_states_counties_2024 - name: temp_united_states_rails_2022 - name: united_states_counties_2022 + - name: united_states_counties_2024 - name: united_states_rails_2022 diff --git a/airflow/dbt/models/data_raw/united_states_counties_2024.sql b/airflow/dbt/models/data_raw/united_states_counties_2024.sql new file mode 100644 index 0000000..eb1629e --- /dev/null +++ b/airflow/dbt/models/data_raw/united_states_counties_2024.sql @@ -0,0 +1,61 @@ +{{ config(materialized='table') }} +{% set source_cols = [ + "statefp", "countyfp", "countyns", "geoid", "geoidfq", "name", "namelsad", "lsad", "classfp", + "mtfcc", "csafp", "cbsafp", "metdivfp", "funcstat", "aland", "awater", "intptlat", "intptlon", + "geometry", "vintage_year" +] %} +{% set metadata_cols = ["source_data_updated", "ingestion_check_time"] %} + +-- selecting all records already in the full data_raw table +WITH records_in_data_raw_table AS ( + SELECT *, 1 AS retention_priority + FROM {{ source('data_raw', 'united_states_counties_2024') }} +), + +-- selecting all distinct records from the latest data pull (in the "temp" table) +current_pull_with_distinct_combos_numbered AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + ) as rn + FROM {{ source('data_raw', 'temp_united_states_counties_2024') }} +), +distinct_records_in_current_pull AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }},{% endfor %} + 2 AS retention_priority + FROM current_pull_with_distinct_combos_numbered + WHERE rn = 1 +), + +-- stacking the existing data with all distinct records from the latest pull +data_raw_table_with_all_new_and_updated_records AS ( + SELECT * + FROM records_in_data_raw_table + UNION ALL + SELECT * + FROM distinct_records_in_current_pull +), + +-- selecting records that where source columns are distinct (keeping the earlier recovery +-- when there are duplicates to chose from) +data_raw_table_with_new_and_updated_records AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }}{{ "," if not loop.last }}{% endfor %} + ORDER BY retention_priority + ) as rn + FROM data_raw_table_with_all_new_and_updated_records +), +distinct_records_for_data_raw_table AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + FROM data_raw_table_with_new_and_updated_records + WHERE rn = 1 +) + +SELECT * +FROM distinct_records_for_data_raw_table diff --git a/airflow/great_expectations/great_expectations.yml b/airflow/great_expectations/great_expectations.yml index eb0984e..a3484d4 100644 --- a/airflow/great_expectations/great_expectations.yml +++ b/airflow/great_expectations/great_expectations.yml @@ -565,4 +565,16 @@ fluent_datasources: batch_metadata: {} table_name: chicago_cta_train_lines schema_name: data_raw + data_raw.temp_united_states_counties_2024: + type: table + order_by: [] + batch_metadata: {} + table_name: temp_united_states_counties_2024 + schema_name: data_raw + data_raw.united_states_counties_2024: + type: table + order_by: [] + batch_metadata: {} + table_name: united_states_counties_2024 + schema_name: data_raw connection_string: ${GX_DWH_DB_CONN} From 218f787433fa2b6677790b7984ab38e2417dd4cb Mon Sep 17 00:00:00 2001 From: matttriano Date: Sat, 14 Dec 2024 14:41:59 -0600 Subject: [PATCH 8/8] Refactors task context accessing in census dataset taskflows. --- ...date_cc_hh_income_in_last_12mo_by_tract.py | 28 ++++++ airflow/dags/sources/census_api_datasets.py | 10 +++ airflow/dags/tasks/census_tasks.py | 86 ++++++++++--------- .../cc_hh_income_in_last_12mo_by_tract.sql | 71 +++++++++++++++ airflow/dbt/models/data_raw/sources.yml | 2 + .../great_expectations/great_expectations.yml | 12 +++ 6 files changed, 167 insertions(+), 42 deletions(-) create mode 100644 airflow/dags/cook_county/update_cc_hh_income_in_last_12mo_by_tract.py create mode 100644 airflow/dbt/models/data_raw/cc_hh_income_in_last_12mo_by_tract.sql diff --git a/airflow/dags/cook_county/update_cc_hh_income_in_last_12mo_by_tract.py b/airflow/dags/cook_county/update_cc_hh_income_in_last_12mo_by_tract.py new file mode 100644 index 0000000..456dd7f --- /dev/null +++ b/airflow/dags/cook_county/update_cc_hh_income_in_last_12mo_by_tract.py @@ -0,0 +1,28 @@ +import datetime as dt +import logging + +from airflow.decorators import dag + +from tasks.census_tasks import update_census_table + +from sources.census_api_datasets import CC_HH_INCOME_IN_LAST_12MO_BY_TRACT as CENSUS_DATASET + +task_logger = logging.getLogger("airflow.task") + + +@dag( + schedule=CENSUS_DATASET.schedule, + start_date=dt.datetime(2022, 11, 1), + catchup=False, + tags=["cook_county", "census"], +) +def update_cc_hh_income_in_last_12mo_by_tract(): + update_census_table( + census_dataset=CENSUS_DATASET, + datasource_name="fluent_dwh_source", + conn_id="dwh_db_conn", + task_logger=task_logger, + ) + + +update_cc_hh_income_in_last_12mo_by_tract() diff --git a/airflow/dags/sources/census_api_datasets.py b/airflow/dags/sources/census_api_datasets.py index d563d13..c21e00a 100644 --- a/airflow/dags/sources/census_api_datasets.py +++ b/airflow/dags/sources/census_api_datasets.py @@ -52,6 +52,16 @@ schedule="40 5 20 3,9 *", ) +CC_HH_INCOME_IN_LAST_12MO_BY_TRACT = CensusVariableGroupDataset( + dataset_name="cc_hh_income_in_last_12mo_by_tract", + api_call_obj=CensusVariableGroupAPICall( + dataset_base_url="http://api.census.gov/data/2023/acs/acs5", + group_name="B19001", + geographies=COOK_COUNTY_CENSUS_TRACTS, + ), + schedule="30 5 5 5,11 *", +) + CC_HH_INTERNET_ACCESS_BY_AGE_BY_TRACT = CensusVariableGroupDataset( dataset_name="cc_hh_internet_access_by_age_by_tract", api_call_obj=CensusVariableGroupAPICall( diff --git a/airflow/dags/tasks/census_tasks.py b/airflow/dags/tasks/census_tasks.py index e109675..fbf2b71 100644 --- a/airflow/dags/tasks/census_tasks.py +++ b/airflow/dags/tasks/census_tasks.py @@ -5,6 +5,7 @@ from airflow.decorators import task, task_group from airflow.models.baseoperator import chain +from airflow.operators.python import get_current_context from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.trigger_rule import TriggerRule from airflow.utils.edgemodifier import Label @@ -30,7 +31,7 @@ make_dbt_data_raw_model_file, ) from cc_utils.transform import format_dbt_run_cmd, execute_dbt_cmd -from cc_utils.utils import log_as_info +from cc_utils.utils import get_task_group_id_prefix, log_as_info from cc_utils.validation import ( run_checkpoint, check_if_checkpoint_exists, @@ -155,19 +156,21 @@ def get_latest_local_freshness_check( @task -def organize_freshness_check_results(task_logger: Logger, **kwargs) -> CensusDatasetFreshnessCheck: - ti = kwargs["ti"] +def organize_freshness_check_results(task_logger: Logger) -> CensusDatasetFreshnessCheck: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) freshness_check = CensusDatasetFreshnessCheck( - dataset_source=ti.xcom_pull( - task_ids="update_census_table.check_freshness.get_source_dataset_metadata" + dataset_source=context["ti"].xcom_pull( + task_ids=f"{tg_id_prefix}get_source_dataset_metadata" ), - source_freshness=ti.xcom_pull( - task_ids="update_census_table.check_freshness.record_source_freshness_check" + source_freshness=context["ti"].xcom_pull( + task_ids=f"{tg_id_prefix}record_source_freshness_check" ), - local_freshness=ti.xcom_pull( - task_ids="update_census_table.check_freshness.get_latest_local_freshness_check" + local_freshness=context["ti"].xcom_pull( + task_ids=f"{tg_id_prefix}get_latest_local_freshness_check" ), ) + context["ti"].xcom_push(key="freshness_check_key", value=freshness_check) return freshness_check @@ -197,6 +200,8 @@ def check_freshness( def fresher_source_data_available( freshness_check: CensusDatasetFreshnessCheck, task_logger: Logger ) -> str: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) dataset_in_local_dwh = len(freshness_check.local_freshness) > 0 log_as_info(task_logger, f"Dataset in local dwh: {dataset_in_local_dwh}") @@ -208,23 +213,21 @@ def fresher_source_data_available( log_as_info(task_logger, f"Source dataset last modified: {source_last_modified}") local_dataset_is_fresh = local_last_modified >= source_last_modified if local_dataset_is_fresh: - return "update_census_table.local_data_is_fresh" - return "update_census_table.update_local_metadata.get_freshness_check_results" + return f"{tg_id_prefix}local_data_is_fresh" + return f"{tg_id_prefix}update_local_metadata.get_freshness_check_results" @task -def get_freshness_check_results(task_logger: Logger, **kwargs) -> CensusDatasetFreshnessCheck: - ti = kwargs["ti"] - freshness_check = ti.xcom_pull( - task_ids="update_census_table.check_freshness.organize_freshness_check_results" - ) +def get_freshness_check_results(task_logger: Logger) -> CensusDatasetFreshnessCheck: + context = get_current_context() + freshness_check = context["ti"].xcom_pull(key="freshness_check_key") return freshness_check @task def ingest_dataset_metadata( freshness_check: CensusDatasetFreshnessCheck, conn_id: str, task_logger: Logger -) -> str: +) -> bool: metadata_df = freshness_check.dataset_source.metadata_catalog_df.copy() metadata_df["time_of_check"] = freshness_check.source_freshness["time_of_check"].max() log_as_info(task_logger, f"Dataset metadata columns:") @@ -241,7 +244,7 @@ def ingest_dataset_metadata( .returning(metadata_table) ) ingested_df = execute_result_returning_orm_query(engine=engine, select_query=insert_statement) - return "success" + return True @task @@ -379,18 +382,16 @@ def update_local_metadata(conn_id: str, task_logger: Logger): @task -def local_data_is_fresh(task_logger: Logger): - return "hi" +def local_data_is_fresh(task_logger: Logger) -> bool: + return True @task def request_and_ingest_dataset( - census_dataset: CensusVariableGroupDataset, conn_id: str, task_logger: Logger, **kwargs + census_dataset: CensusVariableGroupDataset, conn_id: str, task_logger: Logger ) -> str: - ti = kwargs["ti"] - freshness_check = ti.xcom_pull( - task_ids="update_census_table.check_freshness.organize_freshness_check_results" - ) + context = get_current_context() + freshness_check = context["ti"].xcom_pull(key="freshness_check_key") engine = get_pg_engine(conn_id=conn_id) dataset_df = census_dataset.api_call_obj.make_api_call() @@ -423,11 +424,9 @@ def request_and_ingest_dataset( @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def record_data_update(conn_id: str, task_logger: Logger, **kwargs) -> str: - ti = kwargs["ti"] - freshness_check = ti.xcom_pull( - task_ids="update_census_table.check_freshness.organize_freshness_check_results" - ) +def record_data_update(conn_id: str, task_logger: Logger) -> bool: + context = get_current_context() + freshness_check = context["ti"].xcom_pull(key="freshness_check_key") engine = get_pg_engine(conn_id=conn_id) dataset_id = freshness_check.source_freshness["id"].max() @@ -451,14 +450,13 @@ def record_data_update(conn_id: str, task_logger: Logger, **kwargs) -> str: query=f"""SELECT * FROM metadata.dataset_metadata WHERE id = {dataset_id}""", ) log_as_info(task_logger, f"General metadata record post-update: {post_update_record}") - return "success" + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def register_temp_table_asset( census_dataset: CensusVariableGroupDataset, datasource_name: str, task_logger: Logger -) -> str: - task_logger.info +) -> bool: datasource = get_datasource(datasource_name=datasource_name, task_logger=task_logger) register_data_asset( schema_name="data_raw", @@ -471,15 +469,17 @@ def register_temp_table_asset( @task.branch(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) def table_checkpoint_exists(census_dataset: CensusVariableGroupDataset, task_logger: Logger) -> str: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) checkpoint_name = f"data_raw.temp_{census_dataset.dataset_name}" if check_if_checkpoint_exists(checkpoint_name=checkpoint_name, task_logger=task_logger): log_as_info(task_logger, f"GE checkpoint for {checkpoint_name} exists") - return "update_census_table.raw_data_validation_tg.run_temp_table_checkpoint" + return f"{tg_id_prefix}run_temp_table_checkpoint" else: log_as_info( task_logger, f"GE checkpoint for {checkpoint_name} doesn't exist yet. Make it maybe?" ) - return "update_census_table.raw_data_validation_tg.validation_endpoint" + return f"{tg_id_prefix}validation_endpoint" @task @@ -569,29 +569,31 @@ def create_table_in_data_raw( def dbt_data_raw_model_exists( census_dataset: CensusVariableGroupDataset, task_logger: Logger ) -> str: + context = get_current_context() + tg_id_prefix = get_task_group_id_prefix(task_instance=context["ti"]) dbt_data_raw_model_dir = Path(f"/opt/airflow/dbt/models/data_raw") log_as_info(task_logger, f"dbt data_raw model dir ('{dbt_data_raw_model_dir}')") log_as_info(task_logger, f"Dir exists? {dbt_data_raw_model_dir.is_dir()}") table_model_path = dbt_data_raw_model_dir.joinpath(f"{census_dataset.dataset_name}.sql") if table_model_path.is_file(): - return "update_census_table.persist_new_raw_data_tg.update_data_raw_table" + return f"{tg_id_prefix}update_data_raw_table" else: - return "update_census_table.persist_new_raw_data_tg.make_dbt_data_raw_model" + return f"{tg_id_prefix}make_dbt_data_raw_model" @task(retries=1) def make_dbt_data_raw_model( census_dataset: CensusVariableGroupDataset, conn_id: str, task_logger: Logger -) -> str: +) -> bool: make_dbt_data_raw_model_file( table_name=census_dataset.dataset_name, engine=get_pg_engine(conn_id=conn_id) ) - log_as_info(task_logger, f"Leaving make_dbt_data_raw_model") - return "dbt_file_made" + log_as_info(task_logger, f"dbt model file made, leaving make_dbt_data_raw_model") + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) -def update_data_raw_table(census_dataset: CensusVariableGroupDataset, task_logger: Logger) -> str: +def update_data_raw_table(census_dataset: CensusVariableGroupDataset, task_logger: Logger) -> bool: dbt_cmd = format_dbt_run_cmd( dataset_name=census_dataset.dataset_name, schema="data_raw", @@ -599,7 +601,7 @@ def update_data_raw_table(census_dataset: CensusVariableGroupDataset, task_logge ) result = execute_dbt_cmd(dbt_cmd=dbt_cmd, task_logger=task_logger) log_as_info(task_logger, f"dbt transform result: {result}") - return "data_raw_updated" + return True @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) diff --git a/airflow/dbt/models/data_raw/cc_hh_income_in_last_12mo_by_tract.sql b/airflow/dbt/models/data_raw/cc_hh_income_in_last_12mo_by_tract.sql new file mode 100644 index 0000000..2a2780b --- /dev/null +++ b/airflow/dbt/models/data_raw/cc_hh_income_in_last_12mo_by_tract.sql @@ -0,0 +1,71 @@ +{{ config(materialized='table') }} +{% set source_cols = [ + "b19001_001e", "b19001_001ea", "b19001_001m", "b19001_001ma", "b19001_002e", "b19001_002ea", + "b19001_002m", "b19001_002ma", "b19001_003e", "b19001_003ea", "b19001_003m", "b19001_003ma", + "b19001_004e", "b19001_004ea", "b19001_004m", "b19001_004ma", "b19001_005e", "b19001_005ea", + "b19001_005m", "b19001_005ma", "b19001_006e", "b19001_006ea", "b19001_006m", "b19001_006ma", + "b19001_007e", "b19001_007ea", "b19001_007m", "b19001_007ma", "b19001_008e", "b19001_008ea", + "b19001_008m", "b19001_008ma", "b19001_009e", "b19001_009ea", "b19001_009m", "b19001_009ma", + "b19001_010e", "b19001_010ea", "b19001_010m", "b19001_010ma", "b19001_011e", "b19001_011ea", + "b19001_011m", "b19001_011ma", "b19001_012e", "b19001_012ea", "b19001_012m", "b19001_012ma", + "b19001_013e", "b19001_013ea", "b19001_013m", "b19001_013ma", "b19001_014e", "b19001_014ea", + "b19001_014m", "b19001_014ma", "b19001_015e", "b19001_015ea", "b19001_015m", "b19001_015ma", + "b19001_016e", "b19001_016ea", "b19001_016m", "b19001_016ma", "b19001_017e", "b19001_017ea", + "b19001_017m", "b19001_017ma", "geo_id", "name", "state", "county", "tract", "dataset_base_url", + "dataset_id" +] %} +{% set metadata_cols = ["source_data_updated", "ingestion_check_time"] %} + +-- selecting all records already in the full data_raw table +WITH records_in_data_raw_table AS ( + SELECT *, 1 AS retention_priority + FROM {{ source('data_raw', 'cc_hh_income_in_last_12mo_by_tract') }} +), + +-- selecting all distinct records from the latest data pull (in the "temp" table) +current_pull_with_distinct_combos_numbered AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + ) as rn + FROM {{ source('data_raw', 'temp_cc_hh_income_in_last_12mo_by_tract') }} +), +distinct_records_in_current_pull AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }},{% endfor %} + 2 AS retention_priority + FROM current_pull_with_distinct_combos_numbered + WHERE rn = 1 +), + +-- stacking the existing data with all distinct records from the latest pull +data_raw_table_with_all_new_and_updated_records AS ( + SELECT * + FROM records_in_data_raw_table + UNION ALL + SELECT * + FROM distinct_records_in_current_pull +), + +-- selecting records that where source columns are distinct (keeping the earlier recovery +-- when there are duplicates to chose from) +data_raw_table_with_new_and_updated_records AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }}{{ "," if not loop.last }}{% endfor %} + ORDER BY retention_priority + ) as rn + FROM data_raw_table_with_all_new_and_updated_records +), +distinct_records_for_data_raw_table AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + FROM data_raw_table_with_new_and_updated_records + WHERE rn = 1 +) + +SELECT * +FROM distinct_records_for_data_raw_table diff --git a/airflow/dbt/models/data_raw/sources.yml b/airflow/dbt/models/data_raw/sources.yml index c8ed57b..e0cafbc 100644 --- a/airflow/dbt/models/data_raw/sources.yml +++ b/airflow/dbt/models/data_raw/sources.yml @@ -7,6 +7,7 @@ sources: - raw tables: - name: cc_hh_earnings_in_last_12mo_by_tract + - name: cc_hh_income_in_last_12mo_by_tract - name: cc_hh_internet_access_by_age_by_tract - name: cc_housing_units_by_tract - name: cc_planning_db_housing_and_demos_by_bg @@ -51,6 +52,7 @@ sources: - name: illinois_census_tracts_2022 - name: nyc_parcel_sales - name: temp_cc_hh_earnings_in_last_12mo_by_tract + - name: temp_cc_hh_income_in_last_12mo_by_tract - name: temp_cc_hh_internet_access_by_age_by_tract - name: temp_cc_housing_units_by_tract - name: temp_cc_planning_db_housing_and_demos_by_bg diff --git a/airflow/great_expectations/great_expectations.yml b/airflow/great_expectations/great_expectations.yml index a3484d4..a056899 100644 --- a/airflow/great_expectations/great_expectations.yml +++ b/airflow/great_expectations/great_expectations.yml @@ -577,4 +577,16 @@ fluent_datasources: batch_metadata: {} table_name: united_states_counties_2024 schema_name: data_raw + data_raw.temp_cc_hh_income_in_last_12mo_by_tract: + type: table + order_by: [] + batch_metadata: {} + table_name: temp_cc_hh_income_in_last_12mo_by_tract + schema_name: data_raw + data_raw.cc_hh_income_in_last_12mo_by_tract: + type: table + order_by: [] + batch_metadata: {} + table_name: cc_hh_income_in_last_12mo_by_tract + schema_name: data_raw connection_string: ${GX_DWH_DB_CONN}