Skip to content

Commit

Permalink
feat: registering more flows
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurfg committed Sep 15, 2023
1 parent e000f3a commit 41abe31
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 298 deletions.
152 changes: 25 additions & 127 deletions pipelines/datasets/br_anatel_banda_larga_fixa/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.constants import constants
from pipelines.utils.tasks import update_django_metadata
from pipelines.utils.metadata.tasks import update_django_metadata
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
Expand Down Expand Up @@ -110,44 +110,18 @@
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

# ! tabela bd pro
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id[0] + "_atualizado",
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id[0]}" + "_atualizado",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date_atualizado() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id[0] + "_atualizado",
table_id[0],
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
is_bd_pro=True,
is_free=True,
time_delta=2,
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date,
Expand Down Expand Up @@ -192,43 +166,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

# ! tabela bd pro
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id[1] + "_atualizado",
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id[1]}" + "_atualizado",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date_atualizado() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id[1] + "_atualizado",
table_id[1],
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
is_bd_pro=True,
is_free=True,
time_delta=2,
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date,
Expand Down Expand Up @@ -275,43 +224,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

# ! tabela bd pro
with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id[2] + "_atualizado",
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id[2]}" + "_atualizado",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date_atualizado() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id[2] + "_atualizado",
table_id[2],
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
is_bd_pro=True,
is_free=True,
time_delta=2,
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date,
Expand Down Expand Up @@ -357,44 +281,18 @@
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

# ! tabela bd pro

with case(materialize_after_dump, True):
# Trigger DBT flow run
current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id[3] + "_atualizado",
"mode": materialization_mode,
"dbt_alias": dbt_alias,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id[3]}" + "_atualizado",
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)

with case(update_metadata, True):
date = get_today_date_atualizado() # task que retorna a data atual
update_django_metadata(
dataset_id,
table_id[3] + "_atualizado",
table_id[3],
metadata_type="DateTimeRange",
bq_last_update=False,
bq_table_last_year_month=False,
is_bd_pro=True,
is_free=True,
time_delta=2,
time_unit="months",
api_mode="prod",
date_format="yy-mm",
_last_date=date,
Expand Down
Loading

0 comments on commit 41abe31

Please sign in to comment.