From ee31be2799430bc24ce57923649e4ebdbbb8208b Mon Sep 17 00:00:00 2001 From: Laura Amaral Date: Wed, 7 Aug 2024 10:06:02 -0300 Subject: [PATCH] fix code-owner and add wait_upload_table in all pipelines --- .../datasets/br_ans_beneficiario/flows.py | 3 ++- pipelines/datasets/br_b3_cotacoes/flows.py | 1 + pipelines/datasets/br_bcb_agencia/flows.py | 1 + pipelines/datasets/br_bcb_estban/flows.py | 2 ++ pipelines/datasets/br_bd_indicadores/flows.py | 1 + .../br_cgu_beneficios_cidadao/flows.py | 9 ++++++--- .../br_cnj_improbidade_administrativa/flows.py | 2 +- pipelines/datasets/br_cvm_fi/flows.py | 18 ++++++++++++------ pipelines/datasets/br_denatran_frota/flows.py | 2 ++ pipelines/datasets/br_inmet_bdmep/flows.py | 3 ++- pipelines/datasets/br_me_cnpj/flows.py | 8 ++++---- pipelines/datasets/br_me_comex_stat/flows.py | 4 ++++ .../br_mg_belohorizonte_smfa_iptu/flows.py | 1 + .../br_ons_avaliacao_operacao/flows.py | 6 ++++++ .../datasets/br_ons_estimativa_custos/flows.py | 5 +++++ pipelines/datasets/br_rf_cafir/flows.py | 1 + .../mundo_transfermarkt_competicoes/flows.py | 4 ++-- .../flows.py | 2 +- .../crawler_camara_dados_abertos/flows.py | 1 + pipelines/utils/crawler_datasus/flows.py | 7 +++++-- pipelines/utils/metadata/flows.py | 2 +- pipelines/utils/to_download/flows.py | 2 +- .../utils/transfer_files_to_prod/flows.py | 2 +- 23 files changed, 63 insertions(+), 24 deletions(-) diff --git a/pipelines/datasets/br_ans_beneficiario/flows.py b/pipelines/datasets/br_ans_beneficiario/flows.py index 04ec91cee..057524589 100644 --- a/pipelines/datasets/br_ans_beneficiario/flows.py +++ b/pipelines/datasets/br_ans_beneficiario/flows.py @@ -38,7 +38,7 @@ with Flow( name="br_ans_beneficiario.informacao_consolidada", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as datasets_br_ans_beneficiario_flow: dataset_id = Parameter("dataset_id", default="br_ans_beneficiario", required=False) @@ -114,6 +114,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_b3_cotacoes/flows.py b/pipelines/datasets/br_b3_cotacoes/flows.py index 0a42b550f..0df37d9fe 100644 --- a/pipelines/datasets/br_b3_cotacoes/flows.py +++ b/pipelines/datasets/br_b3_cotacoes/flows.py @@ -71,6 +71,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_bcb_agencia/flows.py b/pipelines/datasets/br_bcb_agencia/flows.py index db937a1e2..db5a6c254 100644 --- a/pipelines/datasets/br_bcb_agencia/flows.py +++ b/pipelines/datasets/br_bcb_agencia/flows.py @@ -108,6 +108,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_bcb_estban/flows.py b/pipelines/datasets/br_bcb_estban/flows.py index 1fe3289c6..6bd492419 100644 --- a/pipelines/datasets/br_bcb_estban/flows.py +++ b/pipelines/datasets/br_bcb_estban/flows.py @@ -117,6 +117,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -230,6 +231,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_bd_indicadores/flows.py b/pipelines/datasets/br_bd_indicadores/flows.py index 4a00115a6..3e6db7a9b 100755 --- a/pipelines/datasets/br_bd_indicadores/flows.py +++ b/pipelines/datasets/br_bd_indicadores/flows.py @@ -113,6 +113,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py b/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py index 914e78e81..ea44d3ef8 100644 --- a/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py +++ b/pipelines/datasets/br_cgu_beneficios_cidadao/flows.py @@ -41,7 +41,7 @@ with Flow( name="br_cgu_beneficios_cidadao.novo_bolsa_familia", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as datasets_br_cgu_bolsa_familia_flow: dataset_id = Parameter( @@ -125,6 +125,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -170,7 +171,7 @@ with Flow( name="br_cgu_beneficios_cidadao.garantia_safra", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as datasets_br_cgu_garantia_safra_flow: dataset_id = Parameter( @@ -256,6 +257,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -297,7 +299,7 @@ with Flow( name="br_cgu_beneficios_cidadao.bpc", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as datasets_br_cgu_bpc_flow: dataset_id = Parameter( @@ -381,6 +383,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_cnj_improbidade_administrativa/flows.py b/pipelines/datasets/br_cnj_improbidade_administrativa/flows.py index fc4b1c30e..2750374ac 100644 --- a/pipelines/datasets/br_cnj_improbidade_administrativa/flows.py +++ b/pipelines/datasets/br_cnj_improbidade_administrativa/flows.py @@ -84,7 +84,7 @@ }, labels=current_flow_labels, run_name=r"Materialize {dataset_id}.{table_id}", - upstream_tasks=[current_flow_labels], + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_cvm_fi/flows.py b/pipelines/datasets/br_cvm_fi/flows.py index 4a241ea6a..d181d6439 100644 --- a/pipelines/datasets/br_cvm_fi/flows.py +++ b/pipelines/datasets/br_cvm_fi/flows.py @@ -51,7 +51,7 @@ with Flow( name="br_cvm_fi_documentos_informe_diario", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_cvm_fi_documentos_informe_diario: # Parameters @@ -111,6 +111,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -148,7 +149,7 @@ with Flow( name="br_cvm_fi_documentos_carteiras_fundos_investimento", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_cvm_fi_documentos_carteiras_fundos_investimento: # Parameters @@ -211,6 +212,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -250,7 +252,7 @@ with Flow( name="br_cvm_fi_documentos_extratos_informacoes", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_cvm_fi_documentos_extratos_informacoes: # Parameters @@ -317,6 +319,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -355,7 +358,7 @@ with Flow( name="br_cvm_fi_documentos_perfil_mensal", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_cvm_fi_documentos_perfil_mensal: # Parameters @@ -414,6 +417,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -450,7 +454,7 @@ with Flow( name="br_cvm_fi_documentos_informacao_cadastral", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_cvm_fi_documentos_informacao_cadastral: # Parameters @@ -508,6 +512,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -546,7 +551,7 @@ with Flow( name="br_cvm_fi_documentos_balancete", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_cvm_fi_documentos_balancete: # Parameters @@ -606,6 +611,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_denatran_frota/flows.py b/pipelines/datasets/br_denatran_frota/flows.py index cc60ae8fe..a3974b874 100644 --- a/pipelines/datasets/br_denatran_frota/flows.py +++ b/pipelines/datasets/br_denatran_frota/flows.py @@ -109,6 +109,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( @@ -225,6 +226,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_inmet_bdmep/flows.py b/pipelines/datasets/br_inmet_bdmep/flows.py index 81061d771..b61ff49a6 100644 --- a/pipelines/datasets/br_inmet_bdmep/flows.py +++ b/pipelines/datasets/br_inmet_bdmep/flows.py @@ -28,7 +28,7 @@ # from pipelines.datasets.br_ibge_pnadc.schedules import every_quarter # pylint: disable=C0103 -with Flow(name="br_inmet_bdmep", code_owners=["arthurfg"]) as br_inmet: +with Flow(name="br_inmet_bdmep", code_owners=["equipe_pipelines"]) as br_inmet: # Parameters dataset_id = Parameter("dataset_id", default="br_inmet_bdmep", required=False) table_id = Parameter("table_id", default="microdados", required=False) @@ -78,6 +78,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_me_cnpj/flows.py b/pipelines/datasets/br_me_cnpj/flows.py index a77dd6692..e8f1a4a72 100644 --- a/pipelines/datasets/br_me_cnpj/flows.py +++ b/pipelines/datasets/br_me_cnpj/flows.py @@ -36,7 +36,7 @@ with Flow( name="br_me_cnpj.empresas", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_me_cnpj_empresas: dataset_id = Parameter("dataset_id", default="br_me_cnpj", required=False) @@ -129,7 +129,7 @@ with Flow( name="br_me_cnpj.socios", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_me_cnpj_socios: dataset_id = Parameter("dataset_id", default="br_me_cnpj", required=False) @@ -222,7 +222,7 @@ with Flow( name="br_me_cnpj.estabelecimentos", code_owners=[ - "arthurfg", + "equipe_pipelines", ], executor=LocalDaskExecutor() ) as br_me_cnpj_estabelecimentos: @@ -361,7 +361,7 @@ with Flow( name="br_me_cnpj.simples", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as br_me_cnpj_simples: dataset_id = Parameter("dataset_id", default="br_me_cnpj", required=True) diff --git a/pipelines/datasets/br_me_comex_stat/flows.py b/pipelines/datasets/br_me_comex_stat/flows.py index 30b5e1da5..c831542df 100644 --- a/pipelines/datasets/br_me_comex_stat/flows.py +++ b/pipelines/datasets/br_me_comex_stat/flows.py @@ -113,6 +113,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -222,6 +223,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -332,6 +334,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -438,6 +441,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_mg_belohorizonte_smfa_iptu/flows.py b/pipelines/datasets/br_mg_belohorizonte_smfa_iptu/flows.py index 87be4dfdb..004fff7e8 100644 --- a/pipelines/datasets/br_mg_belohorizonte_smfa_iptu/flows.py +++ b/pipelines/datasets/br_mg_belohorizonte_smfa_iptu/flows.py @@ -86,6 +86,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py index 953994748..202a0f4d9 100644 --- a/pipelines/datasets/br_ons_avaliacao_operacao/flows.py +++ b/pipelines/datasets/br_ons_avaliacao_operacao/flows.py @@ -100,6 +100,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -198,6 +199,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -299,6 +301,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -401,6 +404,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -504,6 +508,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -605,6 +610,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_ons_estimativa_custos/flows.py b/pipelines/datasets/br_ons_estimativa_custos/flows.py index 949fd549d..11c57fdea 100644 --- a/pipelines/datasets/br_ons_estimativa_custos/flows.py +++ b/pipelines/datasets/br_ons_estimativa_custos/flows.py @@ -101,6 +101,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -204,6 +205,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -308,6 +310,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -412,6 +415,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -516,6 +520,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/br_rf_cafir/flows.py b/pipelines/datasets/br_rf_cafir/flows.py index 04c1d1992..e409c0b3a 100644 --- a/pipelines/datasets/br_rf_cafir/flows.py +++ b/pipelines/datasets/br_rf_cafir/flows.py @@ -93,6 +93,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py index 57c12fe4c..40256993b 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes/flows.py @@ -37,7 +37,7 @@ with Flow( name="mundo_transfermarkt_competicoes.brasileirao_serie_a", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as transfermarkt_brasileirao_flow: dataset_id = Parameter( @@ -129,7 +129,7 @@ with Flow( name="mundo_transfermarkt_competicoes.copa_brasil", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as transfermarkt_copa_flow: dataset_id = Parameter( diff --git a/pipelines/datasets/mundo_transfermarkt_competicoes_internacionais/flows.py b/pipelines/datasets/mundo_transfermarkt_competicoes_internacionais/flows.py index a7eccb139..5c7c52c84 100644 --- a/pipelines/datasets/mundo_transfermarkt_competicoes_internacionais/flows.py +++ b/pipelines/datasets/mundo_transfermarkt_competicoes_internacionais/flows.py @@ -40,7 +40,7 @@ with Flow( name="mundo_transfermarkt_competicoes_internacionais.champions_league", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as transfermarkt_flow: dataset_id = Parameter( diff --git a/pipelines/utils/crawler_camara_dados_abertos/flows.py b/pipelines/utils/crawler_camara_dados_abertos/flows.py index b555237bf..29bb1a8c3 100755 --- a/pipelines/utils/crawler_camara_dados_abertos/flows.py +++ b/pipelines/utils/crawler_camara_dados_abertos/flows.py @@ -85,6 +85,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( materialization_flow, diff --git a/pipelines/utils/crawler_datasus/flows.py b/pipelines/utils/crawler_datasus/flows.py index 16a3ad284..b9c081638 100644 --- a/pipelines/utils/crawler_datasus/flows.py +++ b/pipelines/utils/crawler_datasus/flows.py @@ -117,6 +117,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( @@ -223,6 +224,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -255,7 +257,7 @@ -with Flow(name="DATASUS-SIH", code_owners=["arthurfg"]) as flow_sihsus: +with Flow(name="DATASUS-SIH", code_owners=["equipe_pipelines"]) as flow_sihsus: # Parameters dataset_id = Parameter("dataset_id", default="br_ms_sih", required=False) table_id = Parameter("table_id", default = 'aihs_reduzidas', required=False) @@ -331,6 +333,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks = [wait_upload_table] ) wait_for_materialization = wait_for_flow_run( @@ -444,7 +447,7 @@ }, labels=current_flow_labels, run_name=f"Materialize {dataset_id}.{table_id}", - upstream_tasks=[wait_upload_table] + upstream_tasks=[wait_upload_table], ) wait_for_materialization = wait_for_flow_run( diff --git a/pipelines/utils/metadata/flows.py b/pipelines/utils/metadata/flows.py index 04a3489dc..7c5bf9f02 100644 --- a/pipelines/utils/metadata/flows.py +++ b/pipelines/utils/metadata/flows.py @@ -53,7 +53,7 @@ with Flow( name="create_update_quality_checks", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as quality_checks_updater: diff --git a/pipelines/utils/to_download/flows.py b/pipelines/utils/to_download/flows.py index 8f0f852ad..b89a5dbdd 100644 --- a/pipelines/utils/to_download/flows.py +++ b/pipelines/utils/to_download/flows.py @@ -14,7 +14,7 @@ with Flow( name="test_to_download_task", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as utils_to_download_flow: url = Parameter( diff --git a/pipelines/utils/transfer_files_to_prod/flows.py b/pipelines/utils/transfer_files_to_prod/flows.py index b9609d200..436d5a4f4 100644 --- a/pipelines/utils/transfer_files_to_prod/flows.py +++ b/pipelines/utils/transfer_files_to_prod/flows.py @@ -27,7 +27,7 @@ with Flow( name="BD Utils: Transfere arquivos do bucket basedosdados-dev para basedosdados", code_owners=[ - "arthurfg", + "equipe_pipelines", ], ) as transfer_files_to_prod_flow: dataset_id = Parameter("dataset_id", default="br_cgu_beneficios_cidadao", required=False)