From 95585acbce1f794500a1f544e12e2bdef3fe354c Mon Sep 17 00:00:00 2001 From: matttriano Date: Sun, 15 Oct 2023 18:38:01 -0500 Subject: [PATCH] Fixes echo statement leaking superset pw to console and adds in an unrelated uncommitted census data pipeline. --- airflow/dags/sources/tables.py | 6 ++ .../gross_rent_by_cook_county_il_tract.sql | 77 +++++++++++++++++++ airflow/dbt/models/data_raw/sources.yml | 2 + .../great_expectations/great_expectations.yml | 24 +++++- requirements/superset/docker-init.sh | 2 +- 5 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 airflow/dbt/models/data_raw/gross_rent_by_cook_county_il_tract.sql diff --git a/airflow/dags/sources/tables.py b/airflow/dags/sources/tables.py index 308ceac..6f3ea55 100644 --- a/airflow/dags/sources/tables.py +++ b/airflow/dags/sources/tables.py @@ -174,6 +174,12 @@ schedule="0 4 3 3 *", ) +COOK_COUNTY_PARCEL_ADDRESSES = SocrataTable( + table_id="3723-97qp", + table_name="cook_county_parcel_addresses", + schedule="10 7 4 * *", +) + COOK_COUNTY_PARCEL_LOCATIONS = SocrataTable( table_id="c49d-89sn", table_name="cook_county_parcel_locations", diff --git a/airflow/dbt/models/data_raw/gross_rent_by_cook_county_il_tract.sql b/airflow/dbt/models/data_raw/gross_rent_by_cook_county_il_tract.sql new file mode 100644 index 0000000..5fd2d2f --- /dev/null +++ b/airflow/dbt/models/data_raw/gross_rent_by_cook_county_il_tract.sql @@ -0,0 +1,77 @@ +{{ config(materialized='table') }} +{% set source_cols = [ + "b25063_001e", "b25063_001ea", "b25063_001m", "b25063_001ma", "b25063_002e", "b25063_002ea", + "b25063_002m", "b25063_002ma", "b25063_003e", "b25063_003ea", "b25063_003m", "b25063_003ma", + "b25063_004e", "b25063_004ea", "b25063_004m", "b25063_004ma", "b25063_005e", "b25063_005ea", + "b25063_005m", "b25063_005ma", "b25063_006e", "b25063_006ea", "b25063_006m", "b25063_006ma", + "b25063_007e", "b25063_007ea", "b25063_007m", "b25063_007ma", "b25063_008e", "b25063_008ea", + "b25063_008m", "b25063_008ma", "b25063_009e", "b25063_009ea", "b25063_009m", "b25063_009ma", + "b25063_010e", "b25063_010ea", "b25063_010m", "b25063_010ma", "b25063_011e", "b25063_011ea", + "b25063_011m", "b25063_011ma", "b25063_012e", "b25063_012ea", "b25063_012m", "b25063_012ma", + "b25063_013e", "b25063_013ea", "b25063_013m", "b25063_013ma", "b25063_014e", "b25063_014ea", + "b25063_014m", "b25063_014ma", "b25063_015e", "b25063_015ea", "b25063_015m", "b25063_015ma", + "b25063_016e", "b25063_016ea", "b25063_016m", "b25063_016ma", "b25063_017e", "b25063_017ea", + "b25063_017m", "b25063_017ma", "b25063_018e", "b25063_018ea", "b25063_018m", "b25063_018ma", + "b25063_019e", "b25063_019ea", "b25063_019m", "b25063_019ma", "b25063_020e", "b25063_020ea", + "b25063_020m", "b25063_020ma", "b25063_021e", "b25063_021ea", "b25063_021m", "b25063_021ma", + "b25063_022e", "b25063_022ea", "b25063_022m", "b25063_022ma", "b25063_023e", "b25063_023ea", + "b25063_023m", "b25063_023ma", "b25063_024e", "b25063_024ea", "b25063_024m", "b25063_024ma", + "b25063_025e", "b25063_025ea", "b25063_025m", "b25063_025ma", "b25063_026e", "b25063_026ea", + "b25063_026m", "b25063_026ma", "b25063_027e", "b25063_027ea", "b25063_027m", "b25063_027ma", + "geo_id", "name", "state", "county", "tract", "dataset_base_url", "dataset_id" +] %} +{% set metadata_cols = ["source_data_updated", "ingestion_check_time"] %} + +-- selecting all records already in the full data_raw table +WITH records_in_data_raw_table AS ( + SELECT *, 1 AS retention_priority + FROM {{ source('data_raw', 'gross_rent_by_cook_county_il_tract') }} +), + +-- selecting all distinct records from the latest data pull (in the "temp" table) +current_pull_with_distinct_combos_numbered AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + ) as rn + FROM {{ source('data_raw', 'temp_gross_rent_by_cook_county_il_tract') }} +), +distinct_records_in_current_pull AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }},{% endfor %} + 2 AS retention_priority + FROM current_pull_with_distinct_combos_numbered + WHERE rn = 1 +), + +-- stacking the existing data with all distinct records from the latest pull +data_raw_table_with_all_new_and_updated_records AS ( + SELECT * + FROM records_in_data_raw_table + UNION ALL + SELECT * + FROM distinct_records_in_current_pull +), + +-- selecting records that where source columns are distinct (keeping the earlier recovery +-- when there are duplicates to chose from) +data_raw_table_with_new_and_updated_records AS ( + SELECT *, + row_number() over(partition by + {% for sc in source_cols %}{{ sc }}{{ "," if not loop.last }}{% endfor %} + ORDER BY retention_priority + ) as rn + FROM data_raw_table_with_all_new_and_updated_records +), +distinct_records_for_data_raw_table AS ( + SELECT + {% for sc in source_cols %}{{ sc }},{% endfor %} + {% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %} + FROM data_raw_table_with_new_and_updated_records + WHERE rn = 1 +) + +SELECT * +FROM distinct_records_for_data_raw_table diff --git a/airflow/dbt/models/data_raw/sources.yml b/airflow/dbt/models/data_raw/sources.yml index 3c2e173..3e13cc2 100644 --- a/airflow/dbt/models/data_raw/sources.yml +++ b/airflow/dbt/models/data_raw/sources.yml @@ -44,6 +44,7 @@ sources: - name: cook_county_sao_case_initiation_data - name: cook_county_sao_case_intake_data - name: cook_county_sao_case_sentencing_data + - name: gross_rent_by_cook_county_il_tract - name: illinois_census_tracts_2022 - name: nyc_parcel_sales - name: temp_cc_hh_earnings_in_last_12mo_by_tract @@ -84,6 +85,7 @@ sources: - name: temp_cook_county_sao_case_initiation_data - name: temp_cook_county_sao_case_intake_data - name: temp_cook_county_sao_case_sentencing_data + - name: temp_gross_rent_by_cook_county_il_tract - name: temp_illinois_census_tracts_2022 - name: temp_nyc_parcel_sales - name: temp_united_states_counties_2022 diff --git a/airflow/great_expectations/great_expectations.yml b/airflow/great_expectations/great_expectations.yml index 7077a63..6d84991 100644 --- a/airflow/great_expectations/great_expectations.yml +++ b/airflow/great_expectations/great_expectations.yml @@ -18,6 +18,8 @@ datasources: class_name: Datasource module_name: great_expectations.datasource execution_engine: + class_name: SqlAlchemyExecutionEngine + module_name: great_expectations.execution_engine credentials: host: dwh_db port: '5432' @@ -25,8 +27,6 @@ datasources: password: ${DWH_POSTGRES_PASSWORD} database: ${DWH_POSTGRES_DB} drivername: postgresql - class_name: SqlAlchemyExecutionEngine - module_name: great_expectations.execution_engine data_connectors: default_runtime_data_connector_name: name: default_runtime_data_connector_name @@ -113,9 +113,9 @@ anonymous_usage_statistics: data_context_id: b25cb2e2-4999-427c-93b4-7597f9e172a9 enabled: true include_rendered_content: + globally: false expectation_suite: false expectation_validation_result: false - globally: false notebooks: fluent_datasources: fluent_dwh_source: @@ -487,4 +487,22 @@ fluent_datasources: batch_metadata: {} table_name: temp_cook_county_sao_case_intake_data schema_name: data_raw + data_raw.cook_county_sao_case_initiation_data: + type: table + order_by: [] + batch_metadata: {} + table_name: cook_county_sao_case_initiation_data + schema_name: data_raw + data_raw.temp_gross_rent_by_cook_county_il_tract: + type: table + order_by: [] + batch_metadata: {} + table_name: temp_gross_rent_by_cook_county_il_tract + schema_name: data_raw + data_raw.gross_rent_by_cook_county_il_tract: + type: table + order_by: [] + batch_metadata: {} + table_name: gross_rent_by_cook_county_il_tract + schema_name: data_raw connection_string: ${GX_DWH_DB_CONN} diff --git a/requirements/superset/docker-init.sh b/requirements/superset/docker-init.sh index 7e5e6e5..08c32a3 100755 --- a/requirements/superset/docker-init.sh +++ b/requirements/superset/docker-init.sh @@ -20,7 +20,7 @@ superset db upgrade echo_step "1" "Complete" "Applying DB migrations" # Create an admin user -echo_step "2" "Starting" "Setting up admin user ( admin / $ADMIN_PASSWORD )" +echo_step "2" "Starting" "Setting up admin user ( admin / $ADMIN_USERNAME )" superset fab create-admin \ --username $ADMIN_USERNAME \ --firstname $ADMIN_FIRST_NAME \