Skip to content

Commit

Permalink
Merge pull request #838 from basedosdados/staging/fix-date-column-nam…
Browse files Browse the repository at this point in the history
…e-cno

[fix] add upstream with unmapped in flow
  • Loading branch information
tricktx authored Aug 16, 2024
2 parents 963f6c2 + ee59295 commit a130efd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
12 changes: 5 additions & 7 deletions pipelines/datasets/br_rf_cno/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@
table_id=table_ids,
dump_mode=unmapped("append"),
source_format=unmapped('parquet'),
wait=unmapped(files),
upstream_tasks=[files]
upstream_tasks=[unmapped(files)] # https://github.com/PrefectHQ/prefect/issues/2752
)

dbt_parameters = create_parameters_list(
Expand All @@ -108,7 +107,7 @@
download_csv_file = True,
dbt_command = 'run',
disable_elementary = True,
upstream_tasks=[wait_upload_table]
upstream_tasks=[unmapped(wait_upload_table)]
)

with case(materialize_after_dump, True):
Expand All @@ -120,15 +119,15 @@
parameters=dbt_parameters,
labels=unmapped(current_flow_labels),
run_name=f"Materialize {dataset_id}.{table_ids}",
upstream_tasks=[dbt_parameters]
upstream_tasks=[unmapped(dbt_parameters)]
)

wait_for_materialization = wait_for_flow_run.map(
materialization_flow,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
upstream_tasks=[materialization_flow]
upstream_tasks=[unmapped(materialization_flow)]
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
Expand All @@ -144,10 +143,9 @@
date_column_name=unmapped({"date": "data_extracao"}),
date_format=unmapped("%Y-%m-%d"),
coverage_type=unmapped("all_bdpro"),
#time_delta=unmapped({"months": 6}),
prefect_mode=unmapped(materialization_mode),
bq_project=unmapped("basedosdados"),
upstream_tasks=[wait_for_materialization],
upstream_tasks=[unmapped(wait_for_materialization)],
)


Expand Down
8 changes: 6 additions & 2 deletions pipelines/datasets/br_rf_cno/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"""
Tasks for br_rf_cno
"""

from datetime import datetime, timedelta
from pipelines.constants import constants
import os
from bs4 import BeautifulSoup
from datetime import datetime
Expand Down Expand Up @@ -106,7 +107,10 @@ def wrangling(input_dir: str, output_dir: str, partition_date: str) -> None:
log('----- Wrangling completed')


@task
@task(
max_retries=5,
retry_delay=timedelta(seconds=constants.TASK_RETRY_DELAY.value),
)
def crawl_cno(root: str, url: str) -> None:
"""
Downloads and unpacks a ZIP file from the given URL.
Expand Down

0 comments on commit a130efd

Please sign in to comment.