Skip to content

Commit

Permalink
Fixes echo statement leaking superset pw to console and adds in an un…
Browse files Browse the repository at this point in the history
…related uncommitted census data pipeline.
  • Loading branch information
MattTriano committed Oct 15, 2023
1 parent 02a8093 commit 95585ac
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 4 deletions.
6 changes: 6 additions & 0 deletions airflow/dags/sources/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@
schedule="0 4 3 3 *",
)

COOK_COUNTY_PARCEL_ADDRESSES = SocrataTable(
table_id="3723-97qp",
table_name="cook_county_parcel_addresses",
schedule="10 7 4 * *",
)

COOK_COUNTY_PARCEL_LOCATIONS = SocrataTable(
table_id="c49d-89sn",
table_name="cook_county_parcel_locations",
Expand Down
77 changes: 77 additions & 0 deletions airflow/dbt/models/data_raw/gross_rent_by_cook_county_il_tract.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{{ config(materialized='table') }}
{% set source_cols = [
"b25063_001e", "b25063_001ea", "b25063_001m", "b25063_001ma", "b25063_002e", "b25063_002ea",
"b25063_002m", "b25063_002ma", "b25063_003e", "b25063_003ea", "b25063_003m", "b25063_003ma",
"b25063_004e", "b25063_004ea", "b25063_004m", "b25063_004ma", "b25063_005e", "b25063_005ea",
"b25063_005m", "b25063_005ma", "b25063_006e", "b25063_006ea", "b25063_006m", "b25063_006ma",
"b25063_007e", "b25063_007ea", "b25063_007m", "b25063_007ma", "b25063_008e", "b25063_008ea",
"b25063_008m", "b25063_008ma", "b25063_009e", "b25063_009ea", "b25063_009m", "b25063_009ma",
"b25063_010e", "b25063_010ea", "b25063_010m", "b25063_010ma", "b25063_011e", "b25063_011ea",
"b25063_011m", "b25063_011ma", "b25063_012e", "b25063_012ea", "b25063_012m", "b25063_012ma",
"b25063_013e", "b25063_013ea", "b25063_013m", "b25063_013ma", "b25063_014e", "b25063_014ea",
"b25063_014m", "b25063_014ma", "b25063_015e", "b25063_015ea", "b25063_015m", "b25063_015ma",
"b25063_016e", "b25063_016ea", "b25063_016m", "b25063_016ma", "b25063_017e", "b25063_017ea",
"b25063_017m", "b25063_017ma", "b25063_018e", "b25063_018ea", "b25063_018m", "b25063_018ma",
"b25063_019e", "b25063_019ea", "b25063_019m", "b25063_019ma", "b25063_020e", "b25063_020ea",
"b25063_020m", "b25063_020ma", "b25063_021e", "b25063_021ea", "b25063_021m", "b25063_021ma",
"b25063_022e", "b25063_022ea", "b25063_022m", "b25063_022ma", "b25063_023e", "b25063_023ea",
"b25063_023m", "b25063_023ma", "b25063_024e", "b25063_024ea", "b25063_024m", "b25063_024ma",
"b25063_025e", "b25063_025ea", "b25063_025m", "b25063_025ma", "b25063_026e", "b25063_026ea",
"b25063_026m", "b25063_026ma", "b25063_027e", "b25063_027ea", "b25063_027m", "b25063_027ma",
"geo_id", "name", "state", "county", "tract", "dataset_base_url", "dataset_id"
] %}
{% set metadata_cols = ["source_data_updated", "ingestion_check_time"] %}

-- selecting all records already in the full data_raw table
WITH records_in_data_raw_table AS (
SELECT *, 1 AS retention_priority
FROM {{ source('data_raw', 'gross_rent_by_cook_county_il_tract') }}
),

-- selecting all distinct records from the latest data pull (in the "temp" table)
current_pull_with_distinct_combos_numbered AS (
SELECT *,
row_number() over(partition by
{% for sc in source_cols %}{{ sc }},{% endfor %}
{% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %}
) as rn
FROM {{ source('data_raw', 'temp_gross_rent_by_cook_county_il_tract') }}
),
distinct_records_in_current_pull AS (
SELECT
{% for sc in source_cols %}{{ sc }},{% endfor %}
{% for mc in metadata_cols %}{{ mc }},{% endfor %}
2 AS retention_priority
FROM current_pull_with_distinct_combos_numbered
WHERE rn = 1
),

-- stacking the existing data with all distinct records from the latest pull
data_raw_table_with_all_new_and_updated_records AS (
SELECT *
FROM records_in_data_raw_table
UNION ALL
SELECT *
FROM distinct_records_in_current_pull
),

-- selecting records that where source columns are distinct (keeping the earlier recovery
-- when there are duplicates to chose from)
data_raw_table_with_new_and_updated_records AS (
SELECT *,
row_number() over(partition by
{% for sc in source_cols %}{{ sc }}{{ "," if not loop.last }}{% endfor %}
ORDER BY retention_priority
) as rn
FROM data_raw_table_with_all_new_and_updated_records
),
distinct_records_for_data_raw_table AS (
SELECT
{% for sc in source_cols %}{{ sc }},{% endfor %}
{% for mc in metadata_cols %}{{ mc }}{{ "," if not loop.last }}{% endfor %}
FROM data_raw_table_with_new_and_updated_records
WHERE rn = 1
)

SELECT *
FROM distinct_records_for_data_raw_table
2 changes: 2 additions & 0 deletions airflow/dbt/models/data_raw/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ sources:
- name: cook_county_sao_case_initiation_data
- name: cook_county_sao_case_intake_data
- name: cook_county_sao_case_sentencing_data
- name: gross_rent_by_cook_county_il_tract
- name: illinois_census_tracts_2022
- name: nyc_parcel_sales
- name: temp_cc_hh_earnings_in_last_12mo_by_tract
Expand Down Expand Up @@ -84,6 +85,7 @@ sources:
- name: temp_cook_county_sao_case_initiation_data
- name: temp_cook_county_sao_case_intake_data
- name: temp_cook_county_sao_case_sentencing_data
- name: temp_gross_rent_by_cook_county_il_tract
- name: temp_illinois_census_tracts_2022
- name: temp_nyc_parcel_sales
- name: temp_united_states_counties_2022
Expand Down
24 changes: 21 additions & 3 deletions airflow/great_expectations/great_expectations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ datasources:
class_name: Datasource
module_name: great_expectations.datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
module_name: great_expectations.execution_engine
credentials:
host: dwh_db
port: '5432'
username: ${DWH_POSTGRES_USER}
password: ${DWH_POSTGRES_PASSWORD}
database: ${DWH_POSTGRES_DB}
drivername: postgresql
class_name: SqlAlchemyExecutionEngine
module_name: great_expectations.execution_engine
data_connectors:
default_runtime_data_connector_name:
name: default_runtime_data_connector_name
Expand Down Expand Up @@ -113,9 +113,9 @@ anonymous_usage_statistics:
data_context_id: b25cb2e2-4999-427c-93b4-7597f9e172a9
enabled: true
include_rendered_content:
globally: false
expectation_suite: false
expectation_validation_result: false
globally: false
notebooks:
fluent_datasources:
fluent_dwh_source:
Expand Down Expand Up @@ -487,4 +487,22 @@ fluent_datasources:
batch_metadata: {}
table_name: temp_cook_county_sao_case_intake_data
schema_name: data_raw
data_raw.cook_county_sao_case_initiation_data:
type: table
order_by: []
batch_metadata: {}
table_name: cook_county_sao_case_initiation_data
schema_name: data_raw
data_raw.temp_gross_rent_by_cook_county_il_tract:
type: table
order_by: []
batch_metadata: {}
table_name: temp_gross_rent_by_cook_county_il_tract
schema_name: data_raw
data_raw.gross_rent_by_cook_county_il_tract:
type: table
order_by: []
batch_metadata: {}
table_name: gross_rent_by_cook_county_il_tract
schema_name: data_raw
connection_string: ${GX_DWH_DB_CONN}
2 changes: 1 addition & 1 deletion requirements/superset/docker-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ superset db upgrade
echo_step "1" "Complete" "Applying DB migrations"

# Create an admin user
echo_step "2" "Starting" "Setting up admin user ( admin / $ADMIN_PASSWORD )"
echo_step "2" "Starting" "Setting up admin user ( admin / $ADMIN_USERNAME )"
superset fab create-admin \
--username $ADMIN_USERNAME \
--firstname $ADMIN_FIRST_NAME \
Expand Down

0 comments on commit 95585ac

Please sign in to comment.