Skip to content

Commit

Permalink
Colocar upstream_tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Winzen committed Oct 17, 2024
1 parent c44f1f6 commit a9ef6a6
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions pipelines/datasets/br_tse_filiacao_partidaria/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@

with case(outdated, True):

collector = collector_starter(upstream_tasks=[outdated])
collector = collector_starter(upstream_tasks=[data_source_max_date])

collect_task = collect(collector=collector, upstream_tasks=[collector])

ready_data_path = processing(collector=collector, upstream_tasks=[collector])
ready_data_path = processing(collector=collector, upstream_tasks=[collect_task])

wait_upload_table = create_table_and_upload_to_gcs(
data_path=ready_data_path,
Expand All @@ -94,7 +94,7 @@
# materialize municipio_exportacao
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
current_flow_labels = get_current_flow_labels(upstream_tasks=[wait_upload_table])
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
Expand All @@ -109,6 +109,7 @@
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks=[current_flow_labels]

)

Expand All @@ -127,17 +128,17 @@
)
# coverage updater

# with case(update_metadata, True):
# update_django_metadata(
# dataset_id=dataset_id,
# table_id=table_id,
# date_column_name={"date": "data_extracao"},
# date_format="%Y",
# prefect_mode=materialization_mode,
# coverage_type="all_free",
# bq_project="basedosdados",
# upstream_tasks=[wait_for_materialization],
# )
with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"date": "data_extracao"},
date_format="%Y",
prefect_mode=materialization_mode,
coverage_type="all_free",
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)

br_tse_filiacao_partidaria_microdados.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_tse_filiacao_partidaria_microdados.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
Expand Down

0 comments on commit a9ef6a6

Please sign in to comment.