Skip to content

Commit

Permalink
Merge branch 'main' into staging/br_tse_filiacao_partidaria
Browse files Browse the repository at this point in the history
  • Loading branch information
Winzen committed Oct 17, 2024
2 parents 81cc834 + 01531a4 commit 66b3117
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 62 deletions.
120 changes: 60 additions & 60 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,61 @@
# -*- coding: utf-8 -*-
"""
Prefect flows for basedosdados project
"""
###############################################################################
# Automatically managed, please do not touch
###############################################################################

from pipelines.datasets.botdosdados.flows import *
from pipelines.datasets.br_anatel_banda_larga_fixa.flows import *
from pipelines.datasets.br_anatel_telefonia_movel.flows import *
from pipelines.datasets.br_anp_precos_combustiveis.flows import *
from pipelines.datasets.br_ans_beneficiario.flows import *
from pipelines.datasets.br_b3_cotacoes.flows import *
from pipelines.datasets.br_bcb_agencia.flows import *
from pipelines.datasets.br_bcb_estban.flows import *
from pipelines.datasets.br_bcb_taxa_cambio.flows import *
from pipelines.datasets.br_bcb_taxa_selic.flows import *
from pipelines.datasets.br_bd_indicadores.flows import *
from pipelines.datasets.br_bd_metadados.flows import *
from pipelines.datasets.br_camara_dados_abertos.flows import *
from pipelines.datasets.br_cgu_beneficios_cidadao.flows import *
from pipelines.datasets.br_cgu_pessoal_executivo_federal.flows import *
from pipelines.datasets.br_cgu_servidores_executivo_federal.flows import *
from pipelines.datasets.br_cvm_administradores_carteira.flows import *
from pipelines.datasets.br_cvm_fi.flows import *
from pipelines.datasets.br_cvm_oferta_publica_distribuicao.flows import *
from pipelines.datasets.br_denatran_frota.flows import *
from pipelines.datasets.br_fgv_igp.flows import *
from pipelines.datasets.br_ibge_inpc.flows import *
from pipelines.datasets.br_ibge_ipca15.flows import *
from pipelines.datasets.br_ibge_ipca.flows import *
from pipelines.datasets.br_ibge_pnadc.flows import *
from pipelines.datasets.br_inmet_bdmep.flows import *
from pipelines.datasets.br_bd_siga_o_dinheiro.flows import *
from pipelines.datasets.br_me_caged.flows import *
from pipelines.datasets.br_me_cnpj.flows import *
from pipelines.datasets.br_me_comex_stat.flows import *
from pipelines.datasets.br_mercadolivre_ofertas.flows import *
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.flows import *
from pipelines.datasets.br_mp_pep_cargos_funcoes.flows import *
from pipelines.datasets.br_ms_cnes.flows import *
from pipelines.datasets.br_ms_sia.flows import *
from pipelines.datasets.br_ons_avaliacao_operacao.flows import *
from pipelines.datasets.br_ons_estimativa_custos.flows import *
from pipelines.datasets.br_poder360_pesquisas.flows import *
from pipelines.datasets.br_rf_cafir.flows import *
from pipelines.datasets.br_rf_cno.flows import *
from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import *
from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import *
from pipelines.datasets.br_stf_corte_aberta.flows import *
from pipelines.datasets.br_tse_eleicoes.flows import *
from pipelines.datasets.cross_update.flows import *
from pipelines.datasets.delete_flows.flows import *
from pipelines.datasets.fundacao_lemann.flows import *
from pipelines.datasets.mundo_transfermarkt_competicoes.flows import *
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.flows import *
from pipelines.datasets.br_cnj_improbidade_administrativa.flows import *
from pipelines.datasets.br_ms_sih.flows import *
from pipelines.datasets.br_ms_sinan.flows import *
# -*- coding: utf-8 -*-
"""
Prefect flows for basedosdados project
"""
###############################################################################
# Automatically managed, please do not touch
###############################################################################

from pipelines.datasets.botdosdados.flows import *
from pipelines.datasets.br_anatel_banda_larga_fixa.flows import *
from pipelines.datasets.br_anatel_telefonia_movel.flows import *
from pipelines.datasets.br_anp_precos_combustiveis.flows import *
from pipelines.datasets.br_ans_beneficiario.flows import *
from pipelines.datasets.br_b3_cotacoes.flows import *
from pipelines.datasets.br_bcb_agencia.flows import *
from pipelines.datasets.br_bcb_estban.flows import *
from pipelines.datasets.br_bcb_taxa_cambio.flows import *
from pipelines.datasets.br_bcb_taxa_selic.flows import *
from pipelines.datasets.br_bd_indicadores.flows import *
from pipelines.datasets.br_bd_metadados.flows import *
from pipelines.datasets.br_camara_dados_abertos.flows import *
from pipelines.datasets.br_cgu_beneficios_cidadao.flows import *
from pipelines.datasets.br_cgu_pessoal_executivo_federal.flows import *
from pipelines.datasets.br_cgu_servidores_executivo_federal.flows import *
from pipelines.datasets.br_cvm_administradores_carteira.flows import *
from pipelines.datasets.br_cvm_fi.flows import *
from pipelines.datasets.br_cvm_oferta_publica_distribuicao.flows import *
from pipelines.datasets.br_denatran_frota.flows import *
from pipelines.datasets.br_fgv_igp.flows import *
from pipelines.datasets.br_ibge_inpc.flows import *
from pipelines.datasets.br_ibge_ipca15.flows import *
from pipelines.datasets.br_ibge_ipca.flows import *
from pipelines.datasets.br_ibge_pnadc.flows import *
from pipelines.datasets.br_inmet_bdmep.flows import *
from pipelines.datasets.br_bd_siga_o_dinheiro.flows import *
from pipelines.datasets.br_me_caged.flows import *
from pipelines.datasets.br_me_cnpj.flows import *
from pipelines.datasets.br_me_comex_stat.flows import *
from pipelines.datasets.br_mercadolivre_ofertas.flows import *
from pipelines.datasets.br_mg_belohorizonte_smfa_iptu.flows import *
from pipelines.datasets.br_mp_pep_cargos_funcoes.flows import *
from pipelines.datasets.br_ms_cnes.flows import *
from pipelines.datasets.br_ms_sia.flows import *
from pipelines.datasets.br_ons_avaliacao_operacao.flows import *
from pipelines.datasets.br_ons_estimativa_custos.flows import *
from pipelines.datasets.br_poder360_pesquisas.flows import *
from pipelines.datasets.br_rf_cafir.flows import *
from pipelines.datasets.br_rf_cno.flows import *
from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import *
from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import *
from pipelines.datasets.br_stf_corte_aberta.flows import *
from pipelines.datasets.br_tse_eleicoes.flows import *
from pipelines.datasets.cross_update.flows import *
from pipelines.datasets.delete_flows.flows import *
from pipelines.datasets.fundacao_lemann.flows import *
from pipelines.datasets.mundo_transfermarkt_competicoes.flows import *
from pipelines.datasets.mundo_transfermarkt_competicoes_internacionais.flows import *
from pipelines.datasets.br_cnj_improbidade_administrativa.flows import *
from pipelines.datasets.br_ms_sih.flows import *
from pipelines.datasets.br_ms_sinan.flows import *
from pipelines.datasets.br_cgu_emendas_parlamentares.flows import *
32 changes: 32 additions & 0 deletions pipelines/datasets/br_cgu_cartao_pagamento/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from copy import deepcopy, copy
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento
from pipelines.constants import constants
from pipelines.datasets.br_cgu_cartao_pagamento.schedules import (
every_day_microdados_compras_centralizadas,
every_day_microdados_defesa_civil,
every_day_microdados_governo_federal
)

br_cgu_cartao_pagamento__governo_federal = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__governo_federal.name = "br_cgu_cartao_pagamento.governo_federal"
br_cgu_cartao_pagamento__governo_federal.code_owners = ["trick"]
br_cgu_cartao_pagamento__governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__governo_federal.schedule = every_day_microdados_governo_federal

br_cgu_cartao_pagamento__defesa_civil = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__defesa_civil.name = "br_cgu_cartao_pagamento.defesa_civil"
br_cgu_cartao_pagamento__defesa_civil.code_owners = ["trick"]
br_cgu_cartao_pagamento__defesa_civil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__defesa_civil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__defesa_civil.schedule = every_day_microdados_defesa_civil

br_cgu_cartao_pagamento__compras_centralizadas = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__compras_centralizadas.name = "br_cgu_cartao_pagamento.compras_centralizadas"
br_cgu_cartao_pagamento__compras_centralizadas.code_owners = ["trick"]
br_cgu_cartao_pagamento__compras_centralizadas.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__compras_centralizadas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__compras_centralizadas.schedule = every_day_microdados_compras_centralizadas
69 changes: 69 additions & 0 deletions pipelines/datasets/br_cgu_cartao_pagamento/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock, IntervalClock
from pipelines.constants import constants
from pipelines.utils.crawler_cgu.constants import constants as constants_cgu

every_day_microdados_governo_federal = Schedule(
clocks=[
CronClock(
cron="0 20 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_governo_federal",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)

every_day_microdados_defesa_civil = Schedule(
clocks=[
CronClock(
cron="30 20 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_defesa_civil",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)

every_day_microdados_compras_centralizadas = Schedule(
clocks=[
CronClock(
cron="00 21 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_compras_centralizadas",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)
Empty file.
35 changes: 35 additions & 0 deletions pipelines/utils/crawler_cgu/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""

from enum import Enum
from datetime import datetime

class constants(Enum): # pylint: disable=c0103
"""
Constant values for the br_cgu_cartao_pagamento project
"""

TABELA = {
"microdados_governo_federal" : {
"INPUT_DATA" : "/tmp/input/microdados_governo_federal",
"OUTPUT_DATA" : "/tmp/output/microdados_governo_federal",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/",
"READ" : "_CPGF",
"ONLY_ONE_FILE" : False},

"microdados_compras_centralizadas" : {
"INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas",
"OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/",
"READ" : "_CPGFComprasCentralizadas",
"ONLY_ONE_FILE" : False},

"microdados_defesa_civil" : {
"INPUT_DATA" : "/tmp/input/microdados_defesa_civil",
"OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/",
"READ" : "_CPDC",
"ONLY_ONE_FILE" : False}
}
120 changes: 120 additions & 0 deletions pipelines/utils/crawler_cgu/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""
Flows for br_cgu_cartao_pagamento
"""
from datetime import timedelta
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect import Parameter, case
from pipelines.constants import constants
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.crawler_cgu.tasks import (
partition_data,
get_current_date_and_download_file,
)
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
)

with Flow(
name="CGU - Cartão de Pagamento"
) as flow_cgu_cartao_pagamento:

dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True)
table_id = Parameter("table_id", default ="microdados_governo_federal", required=True)
####
# Relative_month = 1 means that the data will be downloaded for the current month
####
relative_month = Parameter("relative_month", default=1, required=False)
materialization_mode = Parameter("materialization_mode", default="dev", required=False)
materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)
rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id)

data_source_max_date = get_current_date_and_download_file(
table_id,
dataset_id,
relative_month,
)

dados_desatualizados = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=data_source_max_date,
date_format="%Y-%m",
upstream_tasks=[data_source_max_date]
)

with case(dados_desatualizados, True):

filepath = partition_data(
table_id=table_id,
upstream_tasks=[dados_desatualizados]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=filepath,
upstream_tasks=[filepath],
)

with case(materialize_after_dump, True):

current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
"disable_elementary": False,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks=[wait_upload_table],
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
upstream_tasks=[materialization_flow],
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"year": "ano_extrato", "month": "mes_extrato"},
date_format="%Y-%m",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)

flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_cgu_cartao_pagamento.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
Loading

0 comments on commit 66b3117

Please sign in to comment.