Skip to content

Commit

Permalink
Merge pull request #865 from basedosdados/staging/br_cgu_cartao_pagam…
Browse files Browse the repository at this point in the history
…ento

[dados] br_cgu_cartao_pagamento
  • Loading branch information
tricktx authored Oct 17, 2024
2 parents 3dfad50 + b92127b commit 01531a4
Show file tree
Hide file tree
Showing 10 changed files with 510 additions and 2 deletions.
1 change: 1 addition & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@
from pipelines.datasets.br_ms_sih.flows import *
from pipelines.datasets.br_ms_sinan.flows import *
from pipelines.datasets.br_cgu_emendas_parlamentares.flows import *
from pipelines.datasets.br_cgu_cartao_pagamento.flows import *
32 changes: 32 additions & 0 deletions pipelines/datasets/br_cgu_cartao_pagamento/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from copy import deepcopy, copy
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento
from pipelines.constants import constants
from pipelines.datasets.br_cgu_cartao_pagamento.schedules import (
every_day_microdados_compras_centralizadas,
every_day_microdados_defesa_civil,
every_day_microdados_governo_federal
)

br_cgu_cartao_pagamento__governo_federal = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__governo_federal.name = "br_cgu_cartao_pagamento.governo_federal"
br_cgu_cartao_pagamento__governo_federal.code_owners = ["trick"]
br_cgu_cartao_pagamento__governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__governo_federal.schedule = every_day_microdados_governo_federal

br_cgu_cartao_pagamento__defesa_civil = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__defesa_civil.name = "br_cgu_cartao_pagamento.defesa_civil"
br_cgu_cartao_pagamento__defesa_civil.code_owners = ["trick"]
br_cgu_cartao_pagamento__defesa_civil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__defesa_civil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__defesa_civil.schedule = every_day_microdados_defesa_civil

br_cgu_cartao_pagamento__compras_centralizadas = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__compras_centralizadas.name = "br_cgu_cartao_pagamento.compras_centralizadas"
br_cgu_cartao_pagamento__compras_centralizadas.code_owners = ["trick"]
br_cgu_cartao_pagamento__compras_centralizadas.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__compras_centralizadas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__compras_centralizadas.schedule = every_day_microdados_compras_centralizadas
69 changes: 69 additions & 0 deletions pipelines/datasets/br_cgu_cartao_pagamento/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock, IntervalClock
from pipelines.constants import constants
from pipelines.utils.crawler_cgu.constants import constants as constants_cgu

every_day_microdados_governo_federal = Schedule(
clocks=[
CronClock(
cron="0 20 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_governo_federal",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)

every_day_microdados_defesa_civil = Schedule(
clocks=[
CronClock(
cron="30 20 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_defesa_civil",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)

every_day_microdados_compras_centralizadas = Schedule(
clocks=[
CronClock(
cron="00 21 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_compras_centralizadas",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)
Empty file.
35 changes: 35 additions & 0 deletions pipelines/utils/crawler_cgu/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""

from enum import Enum
from datetime import datetime

class constants(Enum): # pylint: disable=c0103
"""
Constant values for the br_cgu_cartao_pagamento project
"""

TABELA = {
"microdados_governo_federal" : {
"INPUT_DATA" : "/tmp/input/microdados_governo_federal",
"OUTPUT_DATA" : "/tmp/output/microdados_governo_federal",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/",
"READ" : "_CPGF",
"ONLY_ONE_FILE" : False},

"microdados_compras_centralizadas" : {
"INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas",
"OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/",
"READ" : "_CPGFComprasCentralizadas",
"ONLY_ONE_FILE" : False},

"microdados_defesa_civil" : {
"INPUT_DATA" : "/tmp/input/microdados_defesa_civil",
"OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/",
"READ" : "_CPDC",
"ONLY_ONE_FILE" : False}
}
120 changes: 120 additions & 0 deletions pipelines/utils/crawler_cgu/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""
Flows for br_cgu_cartao_pagamento
"""
from datetime import timedelta
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect import Parameter, case
from pipelines.constants import constants
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.crawler_cgu.tasks import (
partition_data,
get_current_date_and_download_file,
)
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
)

with Flow(
name="CGU - Cartão de Pagamento"
) as flow_cgu_cartao_pagamento:

dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True)
table_id = Parameter("table_id", default ="microdados_governo_federal", required=True)
####
# Relative_month = 1 means that the data will be downloaded for the current month
####
relative_month = Parameter("relative_month", default=1, required=False)
materialization_mode = Parameter("materialization_mode", default="dev", required=False)
materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)
rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id)

data_source_max_date = get_current_date_and_download_file(
table_id,
dataset_id,
relative_month,
)

dados_desatualizados = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=data_source_max_date,
date_format="%Y-%m",
upstream_tasks=[data_source_max_date]
)

with case(dados_desatualizados, True):

filepath = partition_data(
table_id=table_id,
upstream_tasks=[dados_desatualizados]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=filepath,
upstream_tasks=[filepath],
)

with case(materialize_after_dump, True):

current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
"disable_elementary": False,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks=[wait_upload_table],
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
upstream_tasks=[materialization_flow],
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"year": "ano_extrato", "month": "mes_extrato"},
date_format="%Y-%m",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)

flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_cgu_cartao_pagamento.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
80 changes: 80 additions & 0 deletions pipelines/utils/crawler_cgu/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
"""
Tasks for br_cgu_cartao_pagamento
"""
from datetime import datetime
from prefect import task
from dateutil.relativedelta import relativedelta
import pandas as pd
from pipelines.utils.utils import log, to_partitions
from pipelines.utils.crawler_cgu.utils import read_csv, last_date_in_metadata
from pipelines.utils.crawler_cgu.constants import constants
from pipelines.utils.crawler_cgu.utils import download_file
from typing import Tuple

@task
def partition_data(table_id: str) -> str:
"""
Partition data from a given table.
This function reads data from a specified table, partitions it based on
the columns 'ANO_EXTRATO' and 'MES_EXTRATO', and saves the partitioned
data to a specified output path.
Args:
table_id (str): The identifier of the table to be partitioned.
Returns:
str: The path where the partitioned data is saved.
"""

value_constants = constants.TABELA.value[table_id]

log("---------------------------- Read data ----------------------------")
# Read the data
df = read_csv(table_id = table_id,
url = value_constants['URL'])

# Partition the data
log(" ---------------------------- Partiting data -----------------------")

to_partitions(
data = df,
partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'],
savepath = value_constants['OUTPUT_DATA'],
file_type='csv')

log("---------------------------- Data partitioned ----------------------")

return value_constants['OUTPUT_DATA']

@task
def get_current_date_and_download_file(table_id : str,
dataset_id : str,
relative_month : int = 1) -> datetime:
"""
Get the maximum date from a given table for a specific year and month.
Args:
table_id (str): The ID of the table.
year (int): The year.
month (int): The month.
Returns:
datetime: The maximum date as a datetime object.
"""
last_date_in_api, next_date_in_api = last_date_in_metadata(
dataset_id = dataset_id,
table_id = table_id,
relative_month = relative_month
)


max_date = str(download_file(table_id = table_id,
year = next_date_in_api.year,
month = next_date_in_api.month,
relative_month=relative_month))

date = datetime.strptime(max_date, '%Y-%m-%d')

return date
Loading

0 comments on commit 01531a4

Please sign in to comment.