Skip to content

Commit

Permalink
docs pipeline and remove year/month, create relative_month
Browse files Browse the repository at this point in the history
  • Loading branch information
tricktx committed Sep 20, 2024
1 parent a3f102b commit 3f8cf52
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pipelines/datasets/br_cgu_cartao_pagamento/flows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from copy import deepcopy
from copy import deepcopy, copy
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento
Expand Down
11 changes: 7 additions & 4 deletions pipelines/utils/crawler_cgu/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@

dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True)
table_id = Parameter("table_id", default ="microdados_governo_federal", required=True)
####
# Relative_month = 1 means that the data will be downloaded for the current month
####
relative_month = Parameter("relative_month", default=1, required=False)
materialization_mode = Parameter("materialization_mode", default="dev", required=False)
materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)
rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id)
year = Parameter("year", required=False)
month = Parameter("month", required=False)

data_source_max_date = get_current_date_and_download_file(
table_id,
dataset_id
dataset_id,
relative_month,
)

dados_desatualizados = check_if_data_is_outdated(
Expand All @@ -48,7 +51,7 @@
data_source_max_date=data_source_max_date,
date_format="%Y-%m",
upstream_tasks=[data_source_max_date]
)
)

with case(dados_desatualizados, True):

Expand Down
32 changes: 22 additions & 10 deletions pipelines/utils/crawler_cgu/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
Tasks for br_cgu_cartao_pagamento
"""
import datetime
from datetime import datetime
from prefect import task
from dateutil.relativedelta import relativedelta
import pandas as pd
Expand All @@ -15,7 +15,17 @@
@task
def partition_data(table_id: str) -> str:
"""
Partition data from a given table
Partition data from a given table.
This function reads data from a specified table, partitions it based on
the columns 'ANO_EXTRATO' and 'MES_EXTRATO', and saves the partitioned
data to a specified output path.
Args:
table_id (str): The identifier of the table to be partitioned.
Returns:
str: The path where the partitioned data is saved.
"""

value_constants = constants.TABELA.value[table_id]
Expand All @@ -39,7 +49,9 @@ def partition_data(table_id: str) -> str:
return value_constants['OUTPUT_DATA']

@task
def get_current_date_and_download_file(table_id : str, dataset_id : str) -> datetime:
def get_current_date_and_download_file(table_id : str,
dataset_id : str,
relative_month : int = 1) -> datetime:
"""
Get the maximum date from a given table for a specific year and month.
Expand All @@ -51,17 +63,17 @@ def get_current_date_and_download_file(table_id : str, dataset_id : str) -> date
Returns:
datetime: The maximum date as a datetime object.
"""
last_date = last_date_in_metadata(
last_date_in_api, next_date_in_api = last_date_in_metadata(
dataset_id = dataset_id,
table_id = table_id
table_id = table_id,
relative_month = relative_month
)

next_date = last_date + relativedelta(months=1)

year = next_date.year
month = next_date.month

max_date = str(download_file(table_id, year, month))
max_date = str(download_file(table_id = table_id,
year = next_date_in_api.year,
month = next_date_in_api.month,
relative_month=relative_month))

date = datetime.strptime(max_date, '%Y-%m-%d')

Expand Down
84 changes: 69 additions & 15 deletions pipelines/utils/crawler_cgu/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
General purpose functions for the br_cgu_cartao_pagamento project
"""
import datetime
from dateutil.relativedelta import relativedelta
import pandas as pd
import os
import basedosdados as bd
Expand All @@ -14,18 +15,29 @@
from pipelines.utils.metadata.utils import get_api_most_recent_date, get_url


def download_file(table_id : str, year : str, month : str) -> None:
def download_file(table_id : str, year : int, month : int, relative_month = int) -> None:
"""
Download a file from a given URL and save it to a given path
"""

Downloads and unzips a file from a specified URL based on the given table ID, year, and month.
Parameters:
table_id (str): The identifier for the table to download data for.
year (int): The year for which data is to be downloaded.
month (int): The month for which data is to be downloaded.
relative_month (int): The relative month used for querying metadata.
Returns:
None: If the file is successfully downloaded and unzipped.
str: The next date in the API if the URL is found.
str: The last date in the API if the URL is not found.
"""
value_constants = constants.TABELA.value[table_id]
input = value_constants['INPUT_DATA']
if not os.path.exists(input):
os.makedirs(input)

log(f' ---------------------------- Year = {year} --------------------------------------')
log(f' ---------------------------- Month = {month} ------------------------------------')
log(f' --------------------- URL = {value_constants["INPUT_DATA"]} ---------------------')

if not value_constants['ONLY_ONE_FILE']:

url = f"{value_constants['URL']}{year}{str(month).zfill(2)}/"
Expand All @@ -34,16 +46,26 @@ def download_file(table_id : str, year : str, month : str) -> None:
if status:
log(f'------------------ URL = {url} ------------------')
download_and_unzip_file(url, value_constants['INPUT_DATA'])
return url.split("/")[-2]

last_date_in_api, next_date_in_api = last_date_in_metadata(
dataset_id="br_cgu_cartao_pagamento",
table_id=table_id,
relative_month=relative_month
)

return next_date_in_api

else:
log('URL não encontrada. Fazendo uma query na BD')
log(f'------------------ URL = {url} ------------------')
last_date = last_date_in_metadata(

last_date_in_api, next_date_in_api = last_date_in_metadata(
dataset_id="br_cgu_cartao_pagamento",
table_id=table_id
table_id=table_id,
relative_month=relative_month
)
return last_date

return last_date_in_api

if value_constants['ONLY_ONE_FILE']:
url = value_constants['URL']
Expand All @@ -54,16 +76,30 @@ def download_file(table_id : str, year : str, month : str) -> None:

def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACAO']) -> pd.DataFrame:
"""
Read a csv file from a given path
Reads a CSV file from a specified path and processes its columns.
Args:
table_id (str): The identifier for the table to be read.
url (str): The URL from which the CSV file is to be read.
column_replace (List, optional): A list of column names whose values need to be replaced. Default is ['VALOR_TRANSACAO'].
Returns:
pd.DataFrame: A DataFrame containing the processed data from the CSV file.
Notes:
- The function reads the CSV file from a directory specified in a constants file.
- It assumes the CSV file is encoded in 'latin1' and uses ';' as the separator.
- Column names are converted to uppercase, spaces are replaced with underscores, and accents are removed.
- For columns specified in `column_replace`, commas in their values are replaced with dots and the values are converted to float.
"""
value_constants = constants.TABELA.value[table_id]

# Read the file
os.listdir(value_constants['INPUT_DATA'])

get_file = [x for x in os.listdir(value_constants['INPUT_DATA']) if x.endswith('.csv')][0]
csv_file = [f for f in os.listdir(value_constants['INPUT_DATA']) if f.endswith('.csv')][0]
log(f"CSV files: {csv_file}")

df = pd.read_csv(get_file, sep=';', encoding='latin1')
df = pd.read_csv(f"{value_constants['INPUT_DATA']}/{csv_file}", sep=';', encoding='latin1')

df.columns = [unidecode.unidecode(x).upper().replace(" ", "_") for x in df.columns]

Expand All @@ -72,7 +108,23 @@ def read_csv(table_id : str, url : str, column_replace : List = ['VALOR_TRANSACA

return df

def last_date_in_metadata(dataset_id : str, table_id : str) -> datetime.date:
def last_date_in_metadata(dataset_id : str,
table_id : str,
relative_month) -> datetime.date:
"""
Retrieves the most recent date from the metadata of a specified dataset and table,
and calculates the next date based on a relative month offset.
Args:
dataset_id (str): The ID of the dataset to query.
table_id (str): The ID of the table within the dataset to query.
relative_month (int): The number of months to add to the most recent date to calculate the next date.
Returns:
tuple: A tuple containing:
- last_date_in_api (datetime.date): The most recent date found in the API.
- next_date_in_api (datetime.date): The date obtained by adding the relative month to the most recent date.
"""

backend = bd.Backend(graphql_url=get_url("prod"))
last_date_in_api = get_api_most_recent_date(
Expand All @@ -82,4 +134,6 @@ def last_date_in_metadata(dataset_id : str, table_id : str) -> datetime.date:
backend=backend,
)

return last_date_in_api
next_date_in_api = last_date_in_api + relativedelta(months=relative_month)

return last_date_in_api, next_date_in_api

0 comments on commit 3f8cf52

Please sign in to comment.