From 1ffc88b1a47d2b35c10dc3e1bc4e28b787b08a5b Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 12 Sep 2024 17:10:56 -0300 Subject: [PATCH 01/13] add pipeline cgu cartao pagamento --- pipelines/datasets/__init__.py | 1 + .../datasets/br_cgu_cartao_pagamento/flows.py | 18 +++ pipelines/utils/crawler_cgu/__init__.py | 0 pipelines/utils/crawler_cgu/constants.py | 35 ++++++ pipelines/utils/crawler_cgu/flows.py | 117 ++++++++++++++++++ pipelines/utils/crawler_cgu/schedules.py | 90 ++++++++++++++ pipelines/utils/crawler_cgu/tasks.py | 45 +++++++ pipelines/utils/crawler_cgu/utils.py | 71 +++++++++++ 8 files changed, 377 insertions(+) create mode 100644 pipelines/datasets/br_cgu_cartao_pagamento/flows.py create mode 100644 pipelines/utils/crawler_cgu/__init__.py create mode 100644 pipelines/utils/crawler_cgu/constants.py create mode 100644 pipelines/utils/crawler_cgu/flows.py create mode 100644 pipelines/utils/crawler_cgu/schedules.py create mode 100644 pipelines/utils/crawler_cgu/tasks.py create mode 100644 pipelines/utils/crawler_cgu/utils.py diff --git a/pipelines/datasets/__init__.py b/pipelines/datasets/__init__.py index 4aeca1337..cf36c83d6 100644 --- a/pipelines/datasets/__init__.py +++ b/pipelines/datasets/__init__.py @@ -59,3 +59,4 @@ from pipelines.datasets.br_ms_sih.flows import * from pipelines.datasets.br_ms_sinan.flows import * from pipelines.datasets.br_cgu_emendas_parlamentares.flows import * +from pipelines.datasets.br_cgu_cartao_pagamento.flows import * diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py new file mode 100644 index 000000000..1e2fc1858 --- /dev/null +++ b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +from copy import deepcopy + +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS + +from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento +from pipelines.constants import constants +# from pipelines.datasets.br_ms_sinan.schedules import ( +# everyday_sinan_microdados +# ) + +br_cgu_cartao_pagamento_governo_federal = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento_governo_federal.name = "br_cgu_cartao_pagamento.governo_federal" +br_cgu_cartao_pagamento_governo_federal.code_owners = ["trick"] +br_cgu_cartao_pagamento_governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento_governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +#br_cgu_cartao_pagamento_governo_federal.schedule = everyday_sinan_microdados \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/__init__.py b/pipelines/utils/crawler_cgu/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py new file mode 100644 index 000000000..b9c1d2c30 --- /dev/null +++ b/pipelines/utils/crawler_cgu/constants.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +""" +Constant values for the datasets projects +""" + +from enum import Enum + + +class constants(Enum): # pylint: disable=c0103 + """ + Constant values for the br_cgu_cartao_pagamento project + """ + + TABELA = { + "microdados" : { + "INPUT" : "/tmp/input/microdados", + "OUTPUT" : "/tmp/output/microdados", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", + "READ" : "_CPGF", + "UNICO" : False}, + + "compras_centralizadas" : { + "INPUT" : "/tmp/input/compras_centralizadas", + "OUTPUT" : "/tmp/output/compras_centralizadas", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", + "READ" : "_CPGFComprasCentralizadas", + "UNICO" : False}, + + "defesa_civil" : { + "INPUT" : "/tmp/input/defesa_civil", + "OUTPUT" : "/tmp/output/defesa_civil", + "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", + "READ" : "_CPDC", + "UNICO" : False} + } \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py new file mode 100644 index 000000000..cd52b34ca --- /dev/null +++ b/pipelines/utils/crawler_cgu/flows.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +""" +Flows for br_cgu_cartao_pagamento +""" +from datetime import timedelta +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect import Parameter, case +from pipelines.constants import constants +from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from pipelines.utils.constants import constants as utils_constants +from pipelines.utils.decorators import Flow +from pipelines.utils.crawler_cgu.tasks import ( + partition_data, + get_max_date +) + +from pipelines.utils.decorators import Flow +from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants +from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated +from pipelines.utils.tasks import ( + create_table_and_upload_to_gcs, + get_current_flow_labels, + rename_current_flow_run_dataset_table, +) + +with Flow( + name="CGU - Cartão de Pagamento" +) as flow_cgu_cartao_pagamento: + + dataset_id = Parameter("dataset_id", required=True) + table_id = Parameter("table_id", required=True) + year = Parameter("year", default=2024, required=False) + month = Parameter("month", default=9, required=False) + materialization_mode = Parameter("materialization_mode", default="dev", required=False) + materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) + dbt_alias = Parameter("dbt_alias", default=True, required=False) + update_metadata = Parameter("update_metadata", default=False, required=False) + rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + + data_source_max_date = get_max_date(table_id, year, month) + +# dados_desatualizados = check_if_data_is_outdated( +# dataset_id=dataset_id, +# table_id=table_id, +# data_source_max_date=data_source_max_date, +# date_format="%Y-%m-%d", +# upstream_tasks=[data_source_max_date] +# ) + + # with case(dados_desatualizados, True): + + filepath = partition_data(table_id=table_id, + year=year, + month=month, + upstream_tasks=[data_source_max_date] + ) + + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run/test", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"year": "ano_extracao", "month": "mes_extracao"}, + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) + + +flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +flow_cgu_cartao_pagamento.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value +) diff --git a/pipelines/utils/crawler_cgu/schedules.py b/pipelines/utils/crawler_cgu/schedules.py new file mode 100644 index 000000000..851c74ccd --- /dev/null +++ b/pipelines/utils/crawler_cgu/schedules.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +""" +Schedules for br_cgu_cartao_pagamento +""" + +############################################################################### +# +# Aqui é onde devem ser definidos os schedules para os flows do projeto. +# Cada schedule indica o intervalo de tempo entre as execuções. +# Um schedule pode ser definido para um ou mais flows. +# Mais informações sobre schedules podem ser encontradas na documentação do +# Prefect: https://docs.prefect.io/core/concepts/schedules.html +# +# De modo a manter consistência na codebase, todo o código escrito passará +# pelo pylint. Todos os warnings e erros devem ser corrigidos. +# +# Os schedules devem ser definidos de acordo com a sintaxe do Prefect, como, +# por exemplo, o seguinte (para executar a cada 1 minuto): +# +# ----------------------------------------------------------------------------- +# from datetime import timedelta, datetime +# from prefect.schedules import Schedule +# from prefect.schedules.clocks import IntervalClock +# from pipelines.constants import constants +# +# minute_schedule = Schedule( +# clocks=[ +# IntervalClock( +# interval=timedelta(minutes=1), +# start_date=datetime(2021, 1, 1), +# labels=[ +# constants.DATASETS_AGENT_LABEL.value, +# ] +# ), +# ] +# ) +# ----------------------------------------------------------------------------- +# +# Vale notar que o parâmetro `labels` é obrigatório e deve ser uma lista com +# apenas um elemento, correspondendo ao label do agente que será executado. +# O label do agente é definido em `constants.py` e deve ter o formato +# `DATASETS_AGENT_LABEL`. +# +# Outro exemplo, para executar todos os dias à meia noite, segue abaixo: +# +# ----------------------------------------------------------------------------- +# from prefect import task +# from datetime import timedelta +# import pendulum +# from prefect.schedules import Schedule +# from prefect.schedules.clocks import IntervalClock +# from pipelines.constants import constants +# +# every_day_at_midnight = Schedule( +# clocks=[ +# IntervalClock( +# interval=timedelta(days=1), +# start_date=pendulum.datetime( +# 2021, 1, 1, 0, 0, 0, tz="America/Sao_Paulo"), +# labels=[ +# constants.K8S_AGENT_LABEL.value, +# ] +# ) +# ] +# ) +# ----------------------------------------------------------------------------- +# +# Abaixo segue um código para exemplificação, que pode ser removido. +# +############################################################################### + + +from datetime import datetime, timedelta + +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock + +from pipelines.constants import constants + +every_two_weeks = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(weeks=2), + start_date=datetime(2021, 1, 1), + labels=[ + constants.DATASETS_AGENT_LABEL.value, + ] + ), + ] +) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py new file mode 100644 index 000000000..4657caa8a --- /dev/null +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +""" +Tasks for br_cgu_cartao_pagamento +""" +from prefect import task +from datetime import datetime +import pandas as pd +from pipelines.utils.utils import log, to_partitions +from pipelines.utils.crawler_cgu.utils import read_csv +from pipelines.utils.crawler_cgu.constants import constants +from pipelines.utils.crawler_cgu.utils import download_file + +@task +def partition_data(table_id: str, year: str, month: str) -> str: + """ + Partition data from a given table + """ + + flow_unico = constants.TABELA.value[table_id] + + log("Partitioning data") + # Read the data + df = read_csv(table_id = table_id, + year = year, + month = month, + url = flow_unico['URL']) + # Partition the data + to_partitions( + data = df, + partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], + savepath = flow_unico['OUTPUT'], + file_type='csv') + log("Data partitioned") + + return flow_unico['OUTPUT'] + +@task +def get_max_date(table_id, year, month): + max_date = download_file(table_id, year, month) + + fix_date = max_date[:4] + '-' + max_date[4:6] + '-' + '01' + + date = datetime.strptime(fix_date, '%Y-%m-%d') + + return date \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py new file mode 100644 index 000000000..a9a008268 --- /dev/null +++ b/pipelines/utils/crawler_cgu/utils.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +""" +General purpose functions for the br_cgu_cartao_pagamento project +""" + +import pandas as pd +import os +from pipelines.utils.crawler_cgu.constants import constants +from urllib.request import urlopen +from io import BytesIO +import zipfile +from typing import List +import unidecode + +def download_and_unzip_file(url : str, path : str) -> None: + print("------------------ Baixando e descompactando arquivo ------------------") + try: + r = urlopen(url) + zip = zipfile.ZipFile(BytesIO(r.read())) + zip.extractall(path=path) + except Exception as e: + print(e) + print("Erro ao baixar e descompactar arquivo") + +def download_file(table_id : str, year : str, month : str) -> None: + """ + Download a file from a given URL and save it to a given path + """ + flow_unico = constants.TABELA.value[table_id] + if not flow_unico['UNICO']: + + url = f"{flow_unico['URL']}{year}{str(month).zfill(2)}/" + print(f'------------------ {url} ------------------') + + download_and_unzip_file(url, flow_unico['INPUT']) + + return url.split("/")[-2] + + url = flow_unico['URL'] + download_and_unzip_file(url, flow_unico['INPUT']) + return None + + +def read_csv(table_id : str, url : str, year : str, month : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: + """ + Read a csv file from a given path + """ + + flow_unico = constants.TABELA.value[table_id] + # Check if the file exists + input = flow_unico['INPUT'] + print(f"Criando diretório: {input}") + if not os.path.exists(input): + os.makedirs(input) + print(f' --------------------- {year} ---------------------') + print(f' --------------------- {month} ---------------------') + print(f' --------------------- {flow_unico["READ"]} ---------------------') + # Read the file + file_with_year_month = f"{input}/{year}{month}{flow_unico['READ']}.csv" + print(file_with_year_month) + + df = pd.read_csv(filepath_or_buffer=file_with_year_month, sep=';', encoding='latin1') + + df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] + + for list_column_replace in column_replace: + df[list_column_replace] = df[list_column_replace].str.replace(",", ".").astype(float) + + df.columns + + return df \ No newline at end of file From 63c2d8043a682e85f009271ebb95141dde2ad810 Mon Sep 17 00:00:00 2001 From: tricktx Date: Mon, 16 Sep 2024 16:35:49 -0300 Subject: [PATCH 02/13] try get max data in url and add check_if_data_is_outdated --- pipelines/utils/crawler_cgu/constants.py | 18 ++-- pipelines/utils/crawler_cgu/flows.py | 128 +++++++++++------------ pipelines/utils/crawler_cgu/tasks.py | 26 +++-- pipelines/utils/crawler_cgu/utils.py | 68 ++++++------ pipelines/utils/utils.py | 34 +++++- 5 files changed, 161 insertions(+), 113 deletions(-) diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index b9c1d2c30..d283657cd 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -12,23 +12,23 @@ class constants(Enum): # pylint: disable=c0103 """ TABELA = { - "microdados" : { - "INPUT" : "/tmp/input/microdados", - "OUTPUT" : "/tmp/output/microdados", + "microdados_governo_federal" : { + "INPUT" : "/tmp/input/microdados_governo_federal", + "OUTPUT" : "/tmp/output/microdados_governo_federal", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", "READ" : "_CPGF", "UNICO" : False}, - "compras_centralizadas" : { - "INPUT" : "/tmp/input/compras_centralizadas", - "OUTPUT" : "/tmp/output/compras_centralizadas", + "microdados_compras_centralizadas" : { + "INPUT" : "/tmp/input/microdados_compras_centralizadas", + "OUTPUT" : "/tmp/output/microdados_compras_centralizadas", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", "READ" : "_CPGFComprasCentralizadas", "UNICO" : False}, - "defesa_civil" : { - "INPUT" : "/tmp/input/defesa_civil", - "OUTPUT" : "/tmp/output/defesa_civil", + "microdados_defesa_civil" : { + "INPUT" : "/tmp/input/microdados_defesa_civil", + "OUTPUT" : "/tmp/output/microdados_defesa_civil", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "UNICO" : False} diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index cd52b34ca..304516f4e 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -31,7 +31,7 @@ dataset_id = Parameter("dataset_id", required=True) table_id = Parameter("table_id", required=True) year = Parameter("year", default=2024, required=False) - month = Parameter("month", default=9, required=False) + month = Parameter("month", default=8, required=False) materialization_mode = Parameter("materialization_mode", default="dev", required=False) materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) dbt_alias = Parameter("dbt_alias", default=True, required=False) @@ -40,75 +40,75 @@ data_source_max_date = get_max_date(table_id, year, month) -# dados_desatualizados = check_if_data_is_outdated( -# dataset_id=dataset_id, -# table_id=table_id, -# data_source_max_date=data_source_max_date, -# date_format="%Y-%m-%d", -# upstream_tasks=[data_source_max_date] -# ) - - # with case(dados_desatualizados, True): + dados_desatualizados = check_if_data_is_outdated( + dataset_id=dataset_id, + table_id=table_id, + data_source_max_date=data_source_max_date, + date_format="%Y-%m", + upstream_tasks=[data_source_max_date] +) - filepath = partition_data(table_id=table_id, - year=year, - month=month, - upstream_tasks=[data_source_max_date] - ) + with case(dados_desatualizados, True): - wait_upload_table = create_table_and_upload_to_gcs( - data_path=filepath, - dataset_id=dataset_id, - table_id=table_id, - dump_mode="append", - wait=filepath, - upstream_tasks=[filepath], - ) + filepath = partition_data(table_id=table_id, + year=year, + month=month, + upstream_tasks=[data_source_max_date] + ) - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - materialization_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id, - "mode": materialization_mode, - "dbt_alias": dbt_alias, - "dbt_command": "run/test", - "disable_elementary": False, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id}", - upstream_tasks=[wait_upload_table], + wait_upload_table = create_table_and_upload_to_gcs( + data_path=filepath, + dataset_id=dataset_id, + table_id=table_id, + dump_mode="append", + wait=filepath, + upstream_tasks=[filepath], ) - wait_for_materialization = wait_for_flow_run( - materialization_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - upstream_tasks=[materialization_flow], - ) - wait_for_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - with case(update_metadata, True): - update_django_metadata( - dataset_id=dataset_id, - table_id=table_id, - date_column_name={"year": "ano_extracao", "month": "mes_extracao"}, - date_format="%Y-%m", - coverage_type="part_bdpro", - time_delta={"months": 6}, - prefect_mode=materialization_mode, - bq_project="basedosdados", - upstream_tasks=[wait_for_materialization], + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + materialization_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id, + "mode": materialization_mode, + "dbt_alias": dbt_alias, + "dbt_command": "run/test", + "disable_elementary": False, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id}", + upstream_tasks=[wait_upload_table], + ) + + wait_for_materialization = wait_for_flow_run( + materialization_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + upstream_tasks=[materialization_flow], + ) + wait_for_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value ) + with case(update_metadata, True): + update_django_metadata( + dataset_id=dataset_id, + table_id=table_id, + date_column_name={"year": "ano_extracao", "month": "mes_extracao"}, + date_format="%Y-%m", + coverage_type="part_bdpro", + time_delta={"months": 6}, + prefect_mode=materialization_mode, + bq_project="basedosdados", + upstream_tasks=[wait_for_materialization], + ) flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 4657caa8a..fa98be38b 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -9,6 +9,7 @@ from pipelines.utils.crawler_cgu.utils import read_csv from pipelines.utils.crawler_cgu.constants import constants from pipelines.utils.crawler_cgu.utils import download_file +import basedosdados as bd @task def partition_data(table_id: str, year: str, month: str) -> str: @@ -16,30 +17,39 @@ def partition_data(table_id: str, year: str, month: str) -> str: Partition data from a given table """ - flow_unico = constants.TABELA.value[table_id] + value_constants = constants.TABELA.value[table_id] - log("Partitioning data") + log("Read data") # Read the data df = read_csv(table_id = table_id, year = year, month = month, - url = flow_unico['URL']) + url = value_constants['URL']) + # Partition the data to_partitions( data = df, partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], - savepath = flow_unico['OUTPUT'], + savepath = value_constants['OUTPUT'], file_type='csv') log("Data partitioned") - return flow_unico['OUTPUT'] + return value_constants['OUTPUT'] @task def get_max_date(table_id, year, month): - max_date = download_file(table_id, year, month) + """ + Get the maximum date from a given table for a specific year and month. - fix_date = max_date[:4] + '-' + max_date[4:6] + '-' + '01' + Args: + table_id (str): The ID of the table. + year (int): The year. + month (int): The month. - date = datetime.strptime(fix_date, '%Y-%m-%d') + Returns: + datetime: The maximum date as a datetime object. + """ + max_date = str(download_file(table_id, year, month)) + date = datetime.strptime(max_date, '%Y-%m-%d') return date \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index a9a008268..d50ef8295 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -5,40 +5,49 @@ import pandas as pd import os +import basedosdados as bd +import requests from pipelines.utils.crawler_cgu.constants import constants -from urllib.request import urlopen -from io import BytesIO -import zipfile from typing import List import unidecode - -def download_and_unzip_file(url : str, path : str) -> None: - print("------------------ Baixando e descompactando arquivo ------------------") - try: - r = urlopen(url) - zip = zipfile.ZipFile(BytesIO(r.read())) - zip.extractall(path=path) - except Exception as e: - print(e) - print("Erro ao baixar e descompactar arquivo") +from pipelines.utils.utils import log, download_and_unzip_file def download_file(table_id : str, year : str, month : str) -> None: """ Download a file from a given URL and save it to a given path """ - flow_unico = constants.TABELA.value[table_id] - if not flow_unico['UNICO']: - url = f"{flow_unico['URL']}{year}{str(month).zfill(2)}/" - print(f'------------------ {url} ------------------') + value_constants = constants.TABELA.value[table_id] + log(f' --------------------- Year = {year} ---------------------') + log(f' --------------------- Month = {month} ---------------------') + log(f' --------------------- Table = {table_id} ---------------------') + if not value_constants['UNICO']: + + url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" + + status = requests.get(url).status_code == 200 + if status: + log(f'------------------ {url} ------------------') + download_and_unzip_file(url, value_constants['INPUT']) + return url.split("/")[-2] + + else: + log('URL não encontrada. Fazendo uma query na BD') + log(f'------------------ {url} ------------------') + query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados-dev.br_cgu_cartao_pagamento.microdados_governo_federal`", + billing_project_id="basedosdados-dev", + from_file=True) + + log(f'Query: {query_bd}') + log(f'Data máxima na BD: {query_bd["valor_maximo"][0]}') - download_and_unzip_file(url, flow_unico['INPUT']) + return query_bd["valor_maximo"][0] - return url.split("/")[-2] + elif value_constants['UNICO']: + url = value_constants['URL'] + download_and_unzip_file(url, value_constants['INPUT']) - url = flow_unico['URL'] - download_and_unzip_file(url, flow_unico['INPUT']) - return None + return None def read_csv(table_id : str, url : str, year : str, month : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: @@ -46,20 +55,17 @@ def read_csv(table_id : str, url : str, year : str, month : str, column_replace Read a csv file from a given path """ - flow_unico = constants.TABELA.value[table_id] + value_constants = constants.TABELA.value[table_id] # Check if the file exists - input = flow_unico['INPUT'] - print(f"Criando diretório: {input}") + input = value_constants['INPUT'] + log(f"Criando diretório: {input}") if not os.path.exists(input): os.makedirs(input) - print(f' --------------------- {year} ---------------------') - print(f' --------------------- {month} ---------------------') - print(f' --------------------- {flow_unico["READ"]} ---------------------') # Read the file - file_with_year_month = f"{input}/{year}{month}{flow_unico['READ']}.csv" - print(file_with_year_month) + file_with_year_month = f"{input}/{year}{str(month).zfill(2)}{value_constants['READ']}.csv" + log(file_with_year_month) - df = pd.read_csv(filepath_or_buffer=file_with_year_month, sep=';', encoding='latin1') + df = pd.read_csv(file_with_year_month, sep=';', encoding='latin1') df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] diff --git a/pipelines/utils/utils.py b/pipelines/utils/utils.py index 6c5dc3673..fdefc2994 100644 --- a/pipelines/utils/utils.py +++ b/pipelines/utils/utils.py @@ -14,7 +14,9 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union from uuid import uuid4 - +from io import BytesIO +from urllib.request import urlopen +import zipfile import basedosdados as bd import croniter import hvac @@ -463,6 +465,36 @@ def to_partitions( raise BaseException("Data need to be a pandas DataFrame") + +############### +# +# Download data +# +############### + +def download_and_unzip_file(url : str, path : str) -> None: + """ + Downloads a file from the given URL and extracts it to the specified path. + + Parameters: + url (str): The URL of the file to be downloaded. + path (str): The path where the file will be extracted. + + Returns: + None + """ + + log("------------------ Downloading and unzipping file ------------------") + try: + r = urlopen(url) + zip = zipfile.ZipFile(BytesIO(r.read())) + zip.extractall(path=path) + log("Complet download and unzip") + + except Exception as e: + log(e) + log("Error when downloading and unzipping file") + ############### # # Storage utils From 728deaeb6769db58da0888b5d2092e530f19f553 Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 17 Sep 2024 17:01:45 -0300 Subject: [PATCH 03/13] create constants year and month in constants.py --- .../datasets/br_cgu_cartao_pagamento/flows.py | 36 ++++++--- .../br_cgu_cartao_pagamento/schedules.py | 75 +++++++++++++++++++ pipelines/utils/crawler_cgu/constants.py | 7 +- 3 files changed, 105 insertions(+), 13 deletions(-) create mode 100644 pipelines/datasets/br_cgu_cartao_pagamento/schedules.py diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py index 1e2fc1858..083956ad9 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py @@ -1,18 +1,32 @@ # -*- coding: utf-8 -*- from copy import deepcopy - from prefect.run_configs import KubernetesRun from prefect.storage import GCS - from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento from pipelines.constants import constants -# from pipelines.datasets.br_ms_sinan.schedules import ( -# everyday_sinan_microdados -# ) +from pipelines.datasets.br_cgu_cartao_pagamento.schedules import ( + every_day_microdados_compras_centralizadas, + every_day_microdados_defesa_civil, + every_day_microdados_governo_federal +) + +br_cgu_cartao_pagamento__governo_federal = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__governo_federal.name = "br_cgu_cartao_pagamento.governo_federal" +br_cgu_cartao_pagamento__governo_federal.code_owners = ["trick"] +br_cgu_cartao_pagamento__governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento__governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_cartao_pagamento__governo_federal.schedule = every_day_microdados_governo_federal + +br_cgu_cartao_pagamento__defesa_civil = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__defesa_civil.name = "br_cgu_cartao_pagamento.defesa_civil" +br_cgu_cartao_pagamento__defesa_civil.code_owners = ["trick"] +br_cgu_cartao_pagamento__defesa_civil.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento__defesa_civil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_cartao_pagamento__defesa_civil.schedule = every_day_microdados_defesa_civil -br_cgu_cartao_pagamento_governo_federal = deepcopy(flow_cgu_cartao_pagamento) -br_cgu_cartao_pagamento_governo_federal.name = "br_cgu_cartao_pagamento.governo_federal" -br_cgu_cartao_pagamento_governo_federal.code_owners = ["trick"] -br_cgu_cartao_pagamento_governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -br_cgu_cartao_pagamento_governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) -#br_cgu_cartao_pagamento_governo_federal.schedule = everyday_sinan_microdados \ No newline at end of file +br_cgu_cartao_pagamento__compras_centralizadas = deepcopy(flow_cgu_cartao_pagamento) +br_cgu_cartao_pagamento__compras_centralizadas.name = "br_cgu_cartao_pagamento.compras_centralizadas" +br_cgu_cartao_pagamento__compras_centralizadas.code_owners = ["trick"] +br_cgu_cartao_pagamento__compras_centralizadas.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +br_cgu_cartao_pagamento__compras_centralizadas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) +br_cgu_cartao_pagamento__compras_centralizadas.schedule = every_day_microdados_compras_centralizadas \ No newline at end of file diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py new file mode 100644 index 000000000..74967675c --- /dev/null +++ b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from prefect.schedules import Schedule +from prefect.schedules.clocks import CronClock, IntervalClock +from pipelines.constants import constants + + +every_day_microdados_governo_federal = Schedule( + clocks=[ + CronClock( + cron="0 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_cartao_pagamento", + "table_id": "microdados_governo_federal", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "historical_data": False, + "update_metadata": True, + "year" : constants.year.value, + "month" : constants.month.value, + }, + ), + ], +) + +every_day_microdados_defesa_civil = Schedule( + clocks=[ + CronClock( + cron="30 20 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_cartao_pagamento", + "table_id": "microdados_defesa_civil", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "historical_data": False, + "update_metadata": True, + "year" : constants.year.value, + "month" : constants.month.value, + }, + ), + ], +) + +every_day_microdados_compras_centralizadas = Schedule( + clocks=[ + CronClock( + cron="00 21 * * *", + start_date=datetime(2021, 3, 31, 17, 11), + labels=[ + constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "br_cgu_cartao_pagamento", + "table_id": "microdados_compras_centralizadas", + "materialization_mode": "prod", + "materialize_after_dump": True, + "dbt_alias": True, + "historical_data": False, + "update_metadata": True, + "year" : constants.year.value, + "month" : constants.month.value, + }, + ), + ], +) \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index d283657cd..86301671b 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -4,7 +4,7 @@ """ from enum import Enum - +from datetime import datetime class constants(Enum): # pylint: disable=c0103 """ @@ -32,4 +32,7 @@ class constants(Enum): # pylint: disable=c0103 "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "UNICO" : False} - } \ No newline at end of file + } + + year = datetime.now().year + month = datetime.now().month \ No newline at end of file From 2610b4344fc9c0d8418e70ea2de897b1746ac281 Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 17 Sep 2024 18:22:39 -0300 Subject: [PATCH 04/13] import year and mont of constants --- pipelines/datasets/br_cgu_cartao_pagamento/schedules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py index 74967675c..1fbed6050 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py @@ -3,7 +3,7 @@ from prefect.schedules import Schedule from prefect.schedules.clocks import CronClock, IntervalClock from pipelines.constants import constants - +from pipelines.utils.crawler_cgu.constants import constants every_day_microdados_governo_federal = Schedule( clocks=[ From fa005ea203edf51d95ed410c46c12d7883d9e915 Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 17 Sep 2024 18:33:49 -0300 Subject: [PATCH 05/13] rename constants to constants_cgu --- .../datasets/br_cgu_cartao_pagamento/schedules.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py index 1fbed6050..95a7e57be 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py @@ -3,7 +3,7 @@ from prefect.schedules import Schedule from prefect.schedules.clocks import CronClock, IntervalClock from pipelines.constants import constants -from pipelines.utils.crawler_cgu.constants import constants +from pipelines.utils.crawler_cgu.constants import constants as constants_cgu every_day_microdados_governo_federal = Schedule( clocks=[ @@ -21,8 +21,8 @@ "dbt_alias": True, "historical_data": False, "update_metadata": True, - "year" : constants.year.value, - "month" : constants.month.value, + "year" : constants_cgu.year.value, + "month" : constants_cgu.month.value, }, ), ], @@ -44,8 +44,8 @@ "dbt_alias": True, "historical_data": False, "update_metadata": True, - "year" : constants.year.value, - "month" : constants.month.value, + "year" : constants_cgu.year.value, + "month" : constants_cgu.month.value, }, ), ], @@ -67,8 +67,8 @@ "dbt_alias": True, "historical_data": False, "update_metadata": True, - "year" : constants.year.value, - "month" : constants.month.value, + "year" : constants_cgu.year.value, + "month" : constants_cgu.month.value, }, ), ], From 58fce0892aeddb513b9a494e489238cc0cc9efb5 Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 17 Sep 2024 22:32:15 -0300 Subject: [PATCH 06/13] fix get_max_date --- pipelines/utils/crawler_cgu/flows.py | 2 +- pipelines/utils/crawler_cgu/tasks.py | 11 +++++++++++ pipelines/utils/crawler_cgu/utils.py | 23 ++++++++--------------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 304516f4e..f2ee3499d 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -53,7 +53,7 @@ filepath = partition_data(table_id=table_id, year=year, month=month, - upstream_tasks=[data_source_max_date] + upstream_tasks=[dados_desatualizados] ) wait_upload_table = create_table_and_upload_to_gcs( diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index fa98be38b..e2b14e2ad 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -27,11 +27,13 @@ def partition_data(table_id: str, year: str, month: str) -> str: url = value_constants['URL']) # Partition the data + log("Partiting data") to_partitions( data = df, partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], savepath = value_constants['OUTPUT'], file_type='csv') + log("Data partitioned") return value_constants['OUTPUT'] @@ -51,5 +53,14 @@ def get_max_date(table_id, year, month): """ max_date = str(download_file(table_id, year, month)) + + if len(max_date) == 10: + pass + + elif len(max_date) == 6: + max_date = max_date[0:4] + '-' + max_date[4:6] + '-01' + log(max_date) + date = datetime.strptime(max_date, '%Y-%m-%d') + return date \ No newline at end of file diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index d50ef8295..45b75ca60 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -18,9 +18,12 @@ def download_file(table_id : str, year : str, month : str) -> None: """ value_constants = constants.TABELA.value[table_id] + input = value_constants['INPUT'] + log(f"Criando diretório: {input}") + if not os.path.exists(input): + os.makedirs(input) log(f' --------------------- Year = {year} ---------------------') log(f' --------------------- Month = {month} ---------------------') - log(f' --------------------- Table = {table_id} ---------------------') if not value_constants['UNICO']: url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" @@ -34,33 +37,25 @@ def download_file(table_id : str, year : str, month : str) -> None: else: log('URL não encontrada. Fazendo uma query na BD') log(f'------------------ {url} ------------------') - query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados-dev.br_cgu_cartao_pagamento.microdados_governo_federal`", + query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados-dev.br_cgu_cartao_pagamento.{table_id}`", billing_project_id="basedosdados-dev", from_file=True) - log(f'Query: {query_bd}') - log(f'Data máxima na BD: {query_bd["valor_maximo"][0]}') - return query_bd["valor_maximo"][0] - elif value_constants['UNICO']: + if value_constants['UNICO']: url = value_constants['URL'] download_and_unzip_file(url, value_constants['INPUT']) - return None + def read_csv(table_id : str, url : str, year : str, month : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: """ Read a csv file from a given path """ - value_constants = constants.TABELA.value[table_id] - # Check if the file exists - input = value_constants['INPUT'] - log(f"Criando diretório: {input}") - if not os.path.exists(input): - os.makedirs(input) + # Read the file file_with_year_month = f"{input}/{year}{str(month).zfill(2)}{value_constants['READ']}.csv" log(file_with_year_month) @@ -72,6 +67,4 @@ def read_csv(table_id : str, url : str, year : str, month : str, column_replace for list_column_replace in column_replace: df[list_column_replace] = df[list_column_replace].str.replace(",", ".").astype(float) - df.columns - return df \ No newline at end of file From a12aad46e6633ede888dd175e071af7b47a76986 Mon Sep 17 00:00:00 2001 From: tricktx Date: Wed, 18 Sep 2024 10:26:34 -0300 Subject: [PATCH 07/13] add logs to reviews --- pipelines/utils/crawler_cgu/constants.py | 12 ++++++------ pipelines/utils/crawler_cgu/tasks.py | 4 ++-- pipelines/utils/crawler_cgu/utils.py | 11 +++++++---- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index 86301671b..1accded80 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -13,22 +13,22 @@ class constants(Enum): # pylint: disable=c0103 TABELA = { "microdados_governo_federal" : { - "INPUT" : "/tmp/input/microdados_governo_federal", - "OUTPUT" : "/tmp/output/microdados_governo_federal", + "INPUT_DATA" : "/tmp/input/microdados_governo_federal", + "OUTPUT_DATA" : "/tmp/output/microdados_governo_federal", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", "READ" : "_CPGF", "UNICO" : False}, "microdados_compras_centralizadas" : { - "INPUT" : "/tmp/input/microdados_compras_centralizadas", - "OUTPUT" : "/tmp/output/microdados_compras_centralizadas", + "INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas", + "OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", "READ" : "_CPGFComprasCentralizadas", "UNICO" : False}, "microdados_defesa_civil" : { - "INPUT" : "/tmp/input/microdados_defesa_civil", - "OUTPUT" : "/tmp/output/microdados_defesa_civil", + "INPUT_DATA" : "/tmp/input/microdados_defesa_civil", + "OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "UNICO" : False} diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index e2b14e2ad..f3bf3efc8 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -31,12 +31,12 @@ def partition_data(table_id: str, year: str, month: str) -> str: to_partitions( data = df, partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], - savepath = value_constants['OUTPUT'], + savepath = value_constants['OUTPUT_DATA'], file_type='csv') log("Data partitioned") - return value_constants['OUTPUT'] + return value_constants['OUTPUT_DATA'] @task def get_max_date(table_id, year, month): diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 45b75ca60..d3e26787e 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -18,12 +18,13 @@ def download_file(table_id : str, year : str, month : str) -> None: """ value_constants = constants.TABELA.value[table_id] - input = value_constants['INPUT'] + input = value_constants['INPUT_DATA'] log(f"Criando diretório: {input}") if not os.path.exists(input): os.makedirs(input) log(f' --------------------- Year = {year} ---------------------') log(f' --------------------- Month = {month} ---------------------') + log(f' --------------------- URL = {value_constants["INPUT_DATA"]} ---------------------') if not value_constants['UNICO']: url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" @@ -31,7 +32,7 @@ def download_file(table_id : str, year : str, month : str) -> None: status = requests.get(url).status_code == 200 if status: log(f'------------------ {url} ------------------') - download_and_unzip_file(url, value_constants['INPUT']) + download_and_unzip_file(url, value_constants['INPUT_DATA']) return url.split("/")[-2] else: @@ -45,7 +46,7 @@ def download_file(table_id : str, year : str, month : str) -> None: if value_constants['UNICO']: url = value_constants['URL'] - download_and_unzip_file(url, value_constants['INPUT']) + download_and_unzip_file(url, value_constants['INPUT_DATA']) return None @@ -57,7 +58,9 @@ def read_csv(table_id : str, url : str, year : str, month : str, column_replace value_constants = constants.TABELA.value[table_id] # Read the file - file_with_year_month = f"{input}/{year}{str(month).zfill(2)}{value_constants['READ']}.csv" + log(os.listdir(value_constants['INPUT_DATA'])) + log(value_constants['INPUT_DATA']) + file_with_year_month = f"{value_constants['INPUT_DATA']}/{year}{str(month).zfill(2)}{value_constants['READ']}.csv" log(file_with_year_month) df = pd.read_csv(file_with_year_month, sep=';', encoding='latin1') From 27f9b5022562d94f8af17b51a142e2d749623fa0 Mon Sep 17 00:00:00 2001 From: tricktx Date: Wed, 18 Sep 2024 11:43:23 -0300 Subject: [PATCH 08/13] remove logs useless --- pipelines/utils/crawler_cgu/flows.py | 2 +- pipelines/utils/crawler_cgu/schedules.py | 90 ------------------------ pipelines/utils/crawler_cgu/tasks.py | 8 +-- pipelines/utils/crawler_cgu/utils.py | 12 ++-- 4 files changed, 9 insertions(+), 103 deletions(-) delete mode 100644 pipelines/utils/crawler_cgu/schedules.py diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index f2ee3499d..4529a81dc 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -106,7 +106,7 @@ coverage_type="part_bdpro", time_delta={"months": 6}, prefect_mode=materialization_mode, - bq_project="basedosdados", + bq_project="basedosdados-dev", upstream_tasks=[wait_for_materialization], ) diff --git a/pipelines/utils/crawler_cgu/schedules.py b/pipelines/utils/crawler_cgu/schedules.py deleted file mode 100644 index 851c74ccd..000000000 --- a/pipelines/utils/crawler_cgu/schedules.py +++ /dev/null @@ -1,90 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Schedules for br_cgu_cartao_pagamento -""" - -############################################################################### -# -# Aqui é onde devem ser definidos os schedules para os flows do projeto. -# Cada schedule indica o intervalo de tempo entre as execuções. -# Um schedule pode ser definido para um ou mais flows. -# Mais informações sobre schedules podem ser encontradas na documentação do -# Prefect: https://docs.prefect.io/core/concepts/schedules.html -# -# De modo a manter consistência na codebase, todo o código escrito passará -# pelo pylint. Todos os warnings e erros devem ser corrigidos. -# -# Os schedules devem ser definidos de acordo com a sintaxe do Prefect, como, -# por exemplo, o seguinte (para executar a cada 1 minuto): -# -# ----------------------------------------------------------------------------- -# from datetime import timedelta, datetime -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# minute_schedule = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(minutes=1), -# start_date=datetime(2021, 1, 1), -# labels=[ -# constants.DATASETS_AGENT_LABEL.value, -# ] -# ), -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Vale notar que o parâmetro `labels` é obrigatório e deve ser uma lista com -# apenas um elemento, correspondendo ao label do agente que será executado. -# O label do agente é definido em `constants.py` e deve ter o formato -# `DATASETS_AGENT_LABEL`. -# -# Outro exemplo, para executar todos os dias à meia noite, segue abaixo: -# -# ----------------------------------------------------------------------------- -# from prefect import task -# from datetime import timedelta -# import pendulum -# from prefect.schedules import Schedule -# from prefect.schedules.clocks import IntervalClock -# from pipelines.constants import constants -# -# every_day_at_midnight = Schedule( -# clocks=[ -# IntervalClock( -# interval=timedelta(days=1), -# start_date=pendulum.datetime( -# 2021, 1, 1, 0, 0, 0, tz="America/Sao_Paulo"), -# labels=[ -# constants.K8S_AGENT_LABEL.value, -# ] -# ) -# ] -# ) -# ----------------------------------------------------------------------------- -# -# Abaixo segue um código para exemplificação, que pode ser removido. -# -############################################################################### - - -from datetime import datetime, timedelta - -from prefect.schedules import Schedule -from prefect.schedules.clocks import IntervalClock - -from pipelines.constants import constants - -every_two_weeks = Schedule( - clocks=[ - IntervalClock( - interval=timedelta(weeks=2), - start_date=datetime(2021, 1, 1), - labels=[ - constants.DATASETS_AGENT_LABEL.value, - ] - ), - ] -) diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index f3bf3efc8..8cd8955d8 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -19,7 +19,7 @@ def partition_data(table_id: str, year: str, month: str) -> str: value_constants = constants.TABELA.value[table_id] - log("Read data") + log("---------------------------- Read data ----------------------------") # Read the data df = read_csv(table_id = table_id, year = year, @@ -27,14 +27,15 @@ def partition_data(table_id: str, year: str, month: str) -> str: url = value_constants['URL']) # Partition the data - log("Partiting data") + log(" ---------------------------- Partiting data -----------------------") + to_partitions( data = df, partition_columns=['ANO_EXTRATO', 'MES_EXTRATO'], savepath = value_constants['OUTPUT_DATA'], file_type='csv') - log("Data partitioned") + log("---------------------------- Data partitioned ----------------------") return value_constants['OUTPUT_DATA'] @@ -59,7 +60,6 @@ def get_max_date(table_id, year, month): elif len(max_date) == 6: max_date = max_date[0:4] + '-' + max_date[4:6] + '-01' - log(max_date) date = datetime.strptime(max_date, '%Y-%m-%d') diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index d3e26787e..056179b58 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -19,11 +19,10 @@ def download_file(table_id : str, year : str, month : str) -> None: value_constants = constants.TABELA.value[table_id] input = value_constants['INPUT_DATA'] - log(f"Criando diretório: {input}") if not os.path.exists(input): os.makedirs(input) - log(f' --------------------- Year = {year} ---------------------') - log(f' --------------------- Month = {month} ---------------------') + log(f' ---------------------------- Year = {year} --------------------------------------') + log(f' ---------------------------- Month = {month} ------------------------------------') log(f' --------------------- URL = {value_constants["INPUT_DATA"]} ---------------------') if not value_constants['UNICO']: @@ -31,13 +30,13 @@ def download_file(table_id : str, year : str, month : str) -> None: status = requests.get(url).status_code == 200 if status: - log(f'------------------ {url} ------------------') + log(f'------------------ URL = {url} ------------------') download_and_unzip_file(url, value_constants['INPUT_DATA']) return url.split("/")[-2] else: log('URL não encontrada. Fazendo uma query na BD') - log(f'------------------ {url} ------------------') + log(f'------------------ URL = {url} ------------------') query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados-dev.br_cgu_cartao_pagamento.{table_id}`", billing_project_id="basedosdados-dev", from_file=True) @@ -58,10 +57,7 @@ def read_csv(table_id : str, url : str, year : str, month : str, column_replace value_constants = constants.TABELA.value[table_id] # Read the file - log(os.listdir(value_constants['INPUT_DATA'])) - log(value_constants['INPUT_DATA']) file_with_year_month = f"{value_constants['INPUT_DATA']}/{year}{str(month).zfill(2)}{value_constants['READ']}.csv" - log(file_with_year_month) df = pd.read_csv(file_with_year_month, sep=';', encoding='latin1') From d9a2007d516b2948f265b4fccb49bbddcc382315 Mon Sep 17 00:00:00 2001 From: tricktx Date: Wed, 18 Sep 2024 15:22:09 -0300 Subject: [PATCH 09/13] basedosdados-dev to basedosdados --- pipelines/utils/crawler_cgu/flows.py | 2 +- pipelines/utils/crawler_cgu/utils.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index 4529a81dc..f2ee3499d 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -106,7 +106,7 @@ coverage_type="part_bdpro", time_delta={"months": 6}, prefect_mode=materialization_mode, - bq_project="basedosdados-dev", + bq_project="basedosdados", upstream_tasks=[wait_for_materialization], ) diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index 056179b58..dd1570294 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -37,8 +37,8 @@ def download_file(table_id : str, year : str, month : str) -> None: else: log('URL não encontrada. Fazendo uma query na BD') log(f'------------------ URL = {url} ------------------') - query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados-dev.br_cgu_cartao_pagamento.{table_id}`", - billing_project_id="basedosdados-dev", + query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados.br_cgu_cartao_pagamento.{table_id}`", + billing_project_id="basedosdados", from_file=True) return query_bd["valor_maximo"][0] From 92725293474d798c1a17dce5756f16d43a43a4b7 Mon Sep 17 00:00:00 2001 From: tricktx Date: Wed, 18 Sep 2024 15:44:36 -0300 Subject: [PATCH 10/13] fix date column name extracao to extrato --- pipelines/utils/crawler_cgu/flows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index f2ee3499d..c41dd7110 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -101,7 +101,7 @@ update_django_metadata( dataset_id=dataset_id, table_id=table_id, - date_column_name={"year": "ano_extracao", "month": "mes_extracao"}, + date_column_name={"year": "ano_extrato", "month": "mes_extrato"}, date_format="%Y-%m", coverage_type="part_bdpro", time_delta={"months": 6}, From a3f102b38065ed2926e7c36febbcdd96c2a8bb31 Mon Sep 17 00:00:00 2001 From: tricktx Date: Thu, 19 Sep 2024 18:37:39 -0300 Subject: [PATCH 11/13] refatoring pipeline cgu, remove year and month in default flow --- .../br_cgu_cartao_pagamento/schedules.py | 6 --- pipelines/utils/crawler_cgu/constants.py | 6 +-- pipelines/utils/crawler_cgu/flows.py | 28 ++++++------- pipelines/utils/crawler_cgu/tasks.py | 28 +++++++------ pipelines/utils/crawler_cgu/utils.py | 40 +++++++++++++------ pipelines/utils/dump_to_gcs/flows.py | 2 +- 6 files changed, 61 insertions(+), 49 deletions(-) diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py index 95a7e57be..c9bd50a4c 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/schedules.py @@ -21,8 +21,6 @@ "dbt_alias": True, "historical_data": False, "update_metadata": True, - "year" : constants_cgu.year.value, - "month" : constants_cgu.month.value, }, ), ], @@ -44,8 +42,6 @@ "dbt_alias": True, "historical_data": False, "update_metadata": True, - "year" : constants_cgu.year.value, - "month" : constants_cgu.month.value, }, ), ], @@ -67,8 +63,6 @@ "dbt_alias": True, "historical_data": False, "update_metadata": True, - "year" : constants_cgu.year.value, - "month" : constants_cgu.month.value, }, ), ], diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index 1accded80..b15e3dc07 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -17,21 +17,21 @@ class constants(Enum): # pylint: disable=c0103 "OUTPUT_DATA" : "/tmp/output/microdados_governo_federal", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/", "READ" : "_CPGF", - "UNICO" : False}, + "ONLY_ONE_FILE" : False}, "microdados_compras_centralizadas" : { "INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas", "OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/", "READ" : "_CPGFComprasCentralizadas", - "UNICO" : False}, + "ONLY_ONE_FILE" : False}, "microdados_defesa_civil" : { "INPUT_DATA" : "/tmp/input/microdados_defesa_civil", "OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil", "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", - "UNICO" : False} + "ONLY_ONE_FILE" : False} } year = datetime.now().year diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index c41dd7110..c8b6d6a4e 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -12,9 +12,8 @@ from pipelines.utils.decorators import Flow from pipelines.utils.crawler_cgu.tasks import ( partition_data, - get_max_date + get_current_date_and_download_file, ) - from pipelines.utils.decorators import Flow from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated @@ -28,17 +27,20 @@ name="CGU - Cartão de Pagamento" ) as flow_cgu_cartao_pagamento: - dataset_id = Parameter("dataset_id", required=True) - table_id = Parameter("table_id", required=True) - year = Parameter("year", default=2024, required=False) - month = Parameter("month", default=8, required=False) + dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True) + table_id = Parameter("table_id", default ="microdados_governo_federal", required=True) materialization_mode = Parameter("materialization_mode", default="dev", required=False) materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) dbt_alias = Parameter("dbt_alias", default=True, required=False) update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) + year = Parameter("year", required=False) + month = Parameter("month", required=False) - data_source_max_date = get_max_date(table_id, year, month) + data_source_max_date = get_current_date_and_download_file( + table_id, + dataset_id + ) dados_desatualizados = check_if_data_is_outdated( dataset_id=dataset_id, @@ -50,11 +52,10 @@ with case(dados_desatualizados, True): - filepath = partition_data(table_id=table_id, - year=year, - month=month, - upstream_tasks=[dados_desatualizados] - ) + filepath = partition_data( + table_id=table_id, + upstream_tasks=[dados_desatualizados] + ) wait_upload_table = create_table_and_upload_to_gcs( data_path=filepath, @@ -66,7 +67,7 @@ ) with case(materialize_after_dump, True): - # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() materialization_flow = create_flow_run( flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, @@ -110,7 +111,6 @@ upstream_tasks=[wait_for_materialization], ) - flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value) flow_cgu_cartao_pagamento.run_config = KubernetesRun( image=constants.DOCKER_IMAGE.value diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 8cd8955d8..812c1d008 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -2,17 +2,18 @@ """ Tasks for br_cgu_cartao_pagamento """ +import datetime from prefect import task -from datetime import datetime +from dateutil.relativedelta import relativedelta import pandas as pd from pipelines.utils.utils import log, to_partitions -from pipelines.utils.crawler_cgu.utils import read_csv +from pipelines.utils.crawler_cgu.utils import read_csv, last_date_in_metadata from pipelines.utils.crawler_cgu.constants import constants from pipelines.utils.crawler_cgu.utils import download_file -import basedosdados as bd +from typing import Tuple @task -def partition_data(table_id: str, year: str, month: str) -> str: +def partition_data(table_id: str) -> str: """ Partition data from a given table """ @@ -22,8 +23,6 @@ def partition_data(table_id: str, year: str, month: str) -> str: log("---------------------------- Read data ----------------------------") # Read the data df = read_csv(table_id = table_id, - year = year, - month = month, url = value_constants['URL']) # Partition the data @@ -40,7 +39,7 @@ def partition_data(table_id: str, year: str, month: str) -> str: return value_constants['OUTPUT_DATA'] @task -def get_max_date(table_id, year, month): +def get_current_date_and_download_file(table_id : str, dataset_id : str) -> datetime: """ Get the maximum date from a given table for a specific year and month. @@ -52,15 +51,18 @@ def get_max_date(table_id, year, month): Returns: datetime: The maximum date as a datetime object. """ + last_date = last_date_in_metadata( + dataset_id = dataset_id, + table_id = table_id + ) - max_date = str(download_file(table_id, year, month)) + next_date = last_date + relativedelta(months=1) - if len(max_date) == 10: - pass + year = next_date.year + month = next_date.month - elif len(max_date) == 6: - max_date = max_date[0:4] + '-' + max_date[4:6] + '-01' + max_date = str(download_file(table_id, year, month)) date = datetime.strptime(max_date, '%Y-%m-%d') - return date \ No newline at end of file + return date diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index dd1570294..cf6f15cd0 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -2,7 +2,7 @@ """ General purpose functions for the br_cgu_cartao_pagamento project """ - +import datetime import pandas as pd import os import basedosdados as bd @@ -11,6 +11,8 @@ from typing import List import unidecode from pipelines.utils.utils import log, download_and_unzip_file +from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url + def download_file(table_id : str, year : str, month : str) -> None: """ @@ -24,7 +26,7 @@ def download_file(table_id : str, year : str, month : str) -> None: log(f' ---------------------------- Year = {year} --------------------------------------') log(f' ---------------------------- Month = {month} ------------------------------------') log(f' --------------------- URL = {value_constants["INPUT_DATA"]} ---------------------') - if not value_constants['UNICO']: + if not value_constants['ONLY_ONE_FILE']: url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" @@ -37,33 +39,47 @@ def download_file(table_id : str, year : str, month : str) -> None: else: log('URL não encontrada. Fazendo uma query na BD') log(f'------------------ URL = {url} ------------------') - query_bd = bd.read_sql(f"select max(date(ano_extrato, mes_extrato, 1)) as valor_maximo from `basedosdados.br_cgu_cartao_pagamento.{table_id}`", - billing_project_id="basedosdados", - from_file=True) + last_date = last_date_in_metadata( + dataset_id="br_cgu_cartao_pagamento", + table_id=table_id + ) + return last_date - return query_bd["valor_maximo"][0] - - if value_constants['UNICO']: + if value_constants['ONLY_ONE_FILE']: url = value_constants['URL'] download_and_unzip_file(url, value_constants['INPUT_DATA']) return None -def read_csv(table_id : str, url : str, year : str, month : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: +def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: """ Read a csv file from a given path """ value_constants = constants.TABELA.value[table_id] # Read the file - file_with_year_month = f"{value_constants['INPUT_DATA']}/{year}{str(month).zfill(2)}{value_constants['READ']}.csv" + os.listdir(value_constants['INPUT_DATA']) + + get_file = [x for x in os.listdir(value_constants['INPUT_DATA']) if x.endswith('.csv')][0] - df = pd.read_csv(file_with_year_month, sep=';', encoding='latin1') + df = pd.read_csv(get_file, sep=';', encoding='latin1') df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] for list_column_replace in column_replace: df[list_column_replace] = df[list_column_replace].str.replace(",", ".").astype(float) - return df \ No newline at end of file + return df + +def last_date_in_metadata(dataset_id : str, table_id : str) -> datetime.date: + + backend = bd.Backend(graphql_url=get_url("prod")) + last_date_in_api = get_api_most_recent_date( + dataset_id=dataset_id, + table_id=table_id, + date_format="%Y-%m", + backend=backend, + ) + + return last_date_in_api \ No newline at end of file diff --git a/pipelines/utils/dump_to_gcs/flows.py b/pipelines/utils/dump_to_gcs/flows.py index 5080a6ad2..af28e62dc 100644 --- a/pipelines/utils/dump_to_gcs/flows.py +++ b/pipelines/utils/dump_to_gcs/flows.py @@ -14,7 +14,7 @@ download_data_to_gcs, get_project_id, ) -from pipelines.datasets.cross_update.tasks import get_all_eligible_in_selected_year +#from pipelines.datasets.cross_update.tasks import get_all_eligible_in_selected_year from pipelines.utils.tasks import rename_current_flow_run_dataset_table with Flow( From 3f8cf526528909c971b9730ae8cee58eff2404ad Mon Sep 17 00:00:00 2001 From: tricktx Date: Fri, 20 Sep 2024 12:54:30 -0300 Subject: [PATCH 12/13] docs pipeline and remove year/month, create relative_month --- .../datasets/br_cgu_cartao_pagamento/flows.py | 2 +- pipelines/utils/crawler_cgu/flows.py | 11 ++- pipelines/utils/crawler_cgu/tasks.py | 32 ++++--- pipelines/utils/crawler_cgu/utils.py | 84 +++++++++++++++---- 4 files changed, 99 insertions(+), 30 deletions(-) diff --git a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py index 083956ad9..4aa94be4d 100644 --- a/pipelines/datasets/br_cgu_cartao_pagamento/flows.py +++ b/pipelines/datasets/br_cgu_cartao_pagamento/flows.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -from copy import deepcopy +from copy import deepcopy, copy from prefect.run_configs import KubernetesRun from prefect.storage import GCS from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento diff --git a/pipelines/utils/crawler_cgu/flows.py b/pipelines/utils/crawler_cgu/flows.py index c8b6d6a4e..f3bcb2a4b 100644 --- a/pipelines/utils/crawler_cgu/flows.py +++ b/pipelines/utils/crawler_cgu/flows.py @@ -29,17 +29,20 @@ dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True) table_id = Parameter("table_id", default ="microdados_governo_federal", required=True) + #### + # Relative_month = 1 means that the data will be downloaded for the current month + #### + relative_month = Parameter("relative_month", default=1, required=False) materialization_mode = Parameter("materialization_mode", default="dev", required=False) materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False) dbt_alias = Parameter("dbt_alias", default=True, required=False) update_metadata = Parameter("update_metadata", default=False, required=False) rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) - year = Parameter("year", required=False) - month = Parameter("month", required=False) data_source_max_date = get_current_date_and_download_file( table_id, - dataset_id + dataset_id, + relative_month, ) dados_desatualizados = check_if_data_is_outdated( @@ -48,7 +51,7 @@ data_source_max_date=data_source_max_date, date_format="%Y-%m", upstream_tasks=[data_source_max_date] -) + ) with case(dados_desatualizados, True): diff --git a/pipelines/utils/crawler_cgu/tasks.py b/pipelines/utils/crawler_cgu/tasks.py index 812c1d008..2bc2f2b2b 100644 --- a/pipelines/utils/crawler_cgu/tasks.py +++ b/pipelines/utils/crawler_cgu/tasks.py @@ -2,7 +2,7 @@ """ Tasks for br_cgu_cartao_pagamento """ -import datetime +from datetime import datetime from prefect import task from dateutil.relativedelta import relativedelta import pandas as pd @@ -15,7 +15,17 @@ @task def partition_data(table_id: str) -> str: """ - Partition data from a given table + Partition data from a given table. + + This function reads data from a specified table, partitions it based on + the columns 'ANO_EXTRATO' and 'MES_EXTRATO', and saves the partitioned + data to a specified output path. + + Args: + table_id (str): The identifier of the table to be partitioned. + + Returns: + str: The path where the partitioned data is saved. """ value_constants = constants.TABELA.value[table_id] @@ -39,7 +49,9 @@ def partition_data(table_id: str) -> str: return value_constants['OUTPUT_DATA'] @task -def get_current_date_and_download_file(table_id : str, dataset_id : str) -> datetime: +def get_current_date_and_download_file(table_id : str, + dataset_id : str, + relative_month : int = 1) -> datetime: """ Get the maximum date from a given table for a specific year and month. @@ -51,17 +63,17 @@ def get_current_date_and_download_file(table_id : str, dataset_id : str) -> date Returns: datetime: The maximum date as a datetime object. """ - last_date = last_date_in_metadata( + last_date_in_api, next_date_in_api = last_date_in_metadata( dataset_id = dataset_id, - table_id = table_id + table_id = table_id, + relative_month = relative_month ) - next_date = last_date + relativedelta(months=1) - - year = next_date.year - month = next_date.month - max_date = str(download_file(table_id, year, month)) + max_date = str(download_file(table_id = table_id, + year = next_date_in_api.year, + month = next_date_in_api.month, + relative_month=relative_month)) date = datetime.strptime(max_date, '%Y-%m-%d') diff --git a/pipelines/utils/crawler_cgu/utils.py b/pipelines/utils/crawler_cgu/utils.py index cf6f15cd0..68c1d2e5b 100644 --- a/pipelines/utils/crawler_cgu/utils.py +++ b/pipelines/utils/crawler_cgu/utils.py @@ -3,6 +3,7 @@ General purpose functions for the br_cgu_cartao_pagamento project """ import datetime +from dateutil.relativedelta import relativedelta import pandas as pd import os import basedosdados as bd @@ -14,18 +15,29 @@ from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url -def download_file(table_id : str, year : str, month : str) -> None: +def download_file(table_id : str, year : int, month : int, relative_month = int) -> None: """ - Download a file from a given URL and save it to a given path - """ - + Downloads and unzips a file from a specified URL based on the given table ID, year, and month. + + Parameters: + table_id (str): The identifier for the table to download data for. + year (int): The year for which data is to be downloaded. + month (int): The month for which data is to be downloaded. + relative_month (int): The relative month used for querying metadata. + + Returns: + None: If the file is successfully downloaded and unzipped. + str: The next date in the API if the URL is found. + str: The last date in the API if the URL is not found. + """ value_constants = constants.TABELA.value[table_id] input = value_constants['INPUT_DATA'] if not os.path.exists(input): os.makedirs(input) + log(f' ---------------------------- Year = {year} --------------------------------------') log(f' ---------------------------- Month = {month} ------------------------------------') - log(f' --------------------- URL = {value_constants["INPUT_DATA"]} ---------------------') + if not value_constants['ONLY_ONE_FILE']: url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/" @@ -34,16 +46,26 @@ def download_file(table_id : str, year : str, month : str) -> None: if status: log(f'------------------ URL = {url} ------------------') download_and_unzip_file(url, value_constants['INPUT_DATA']) - return url.split("/")[-2] + + last_date_in_api, next_date_in_api = last_date_in_metadata( + dataset_id="br_cgu_cartao_pagamento", + table_id=table_id, + relative_month=relative_month + ) + + return next_date_in_api else: log('URL não encontrada. Fazendo uma query na BD') log(f'------------------ URL = {url} ------------------') - last_date = last_date_in_metadata( + + last_date_in_api, next_date_in_api = last_date_in_metadata( dataset_id="br_cgu_cartao_pagamento", - table_id=table_id + table_id=table_id, + relative_month=relative_month ) - return last_date + + return last_date_in_api if value_constants['ONLY_ONE_FILE']: url = value_constants['URL'] @@ -54,16 +76,30 @@ def download_file(table_id : str, year : str, month : str) -> None: def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame: """ - Read a csv file from a given path + Reads a CSV file from a specified path and processes its columns. + + Args: + table_id (str): The identifier for the table to be read. + url (str): The URL from which the CSV file is to be read. + column_replace (List, optional): A list of column names whose values need to be replaced. Default is ['VALOR_TRANSACAO']. + + Returns: + pd.DataFrame: A DataFrame containing the processed data from the CSV file. + + Notes: + - The function reads the CSV file from a directory specified in a constants file. + - It assumes the CSV file is encoded in 'latin1' and uses ';' as the separator. + - Column names are converted to uppercase, spaces are replaced with underscores, and accents are removed. + - For columns specified in `column_replace`, commas in their values are replaced with dots and the values are converted to float. """ value_constants = constants.TABELA.value[table_id] - # Read the file os.listdir(value_constants['INPUT_DATA']) - get_file = [x for x in os.listdir(value_constants['INPUT_DATA']) if x.endswith('.csv')][0] + csv_file = [f for f in os.listdir(value_constants['INPUT_DATA']) if f.endswith('.csv')][0] + log(f"CSV files: {csv_file}") - df = pd.read_csv(get_file, sep=';', encoding='latin1') + df = pd.read_csv(f"{value_constants['INPUT_DATA']}/{csv_file}", sep=';', encoding='latin1') df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns] @@ -72,7 +108,23 @@ def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACA return df -def last_date_in_metadata(dataset_id : str, table_id : str) -> datetime.date: +def last_date_in_metadata(dataset_id : str, + table_id : str, + relative_month) -> datetime.date: + """ + Retrieves the most recent date from the metadata of a specified dataset and table, + and calculates the next date based on a relative month offset. + + Args: + dataset_id (str): The ID of the dataset to query. + table_id (str): The ID of the table within the dataset to query. + relative_month (int): The number of months to add to the most recent date to calculate the next date. + + Returns: + tuple: A tuple containing: + - last_date_in_api (datetime.date): The most recent date found in the API. + - next_date_in_api (datetime.date): The date obtained by adding the relative month to the most recent date. + """ backend = bd.Backend(graphql_url=get_url("prod")) last_date_in_api = get_api_most_recent_date( @@ -82,4 +134,6 @@ def last_date_in_metadata(dataset_id : str, table_id : str) -> datetime.date: backend=backend, ) - return last_date_in_api \ No newline at end of file + next_date_in_api = last_date_in_api + relativedelta(months=relative_month) + + return last_date_in_api, next_date_in_api \ No newline at end of file From 184417bb0a9dd5c718c5bcb4d5dd1403fe817f62 Mon Sep 17 00:00:00 2001 From: tricktx Date: Tue, 15 Oct 2024 11:32:09 -0300 Subject: [PATCH 13/13] remove year and month in constants --- pipelines/utils/crawler_cgu/constants.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pipelines/utils/crawler_cgu/constants.py b/pipelines/utils/crawler_cgu/constants.py index b15e3dc07..419ca410f 100644 --- a/pipelines/utils/crawler_cgu/constants.py +++ b/pipelines/utils/crawler_cgu/constants.py @@ -32,7 +32,4 @@ class constants(Enum): # pylint: disable=c0103 "URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/", "READ" : "_CPDC", "ONLY_ONE_FILE" : False} - } - - year = datetime.now().year - month = datetime.now().month \ No newline at end of file + } \ No newline at end of file