Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: br_ibge_pam models + code #539

Merged
merged 3 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ models:
br_ibge_ipca15:
+materialized: table
+schema: br_ibge_ipca15
br_ibge_pam:
+materialized: table
+schema: br_ibge_pam
br_ibge_pevs:
+materialized: table
+schema: br_ibge_pevs
Expand Down
23 changes: 23 additions & 0 deletions models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{
config(
alias="lavoura_permanente",
schema="br_ibge_pam",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf", "id_municipio"],
)
}}
select
jeantozzi marked this conversation as resolved.
Show resolved Hide resolved
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(area_destinada_colheita as int64) area_destinada_colheita,
safe_cast(area_colhida as int64) area_colhida,
round(safe_cast(quantidade_produzida as float64), 4) quantidade_produzida,
round(safe_cast(rendimento_medio_producao as float64), 4) rendimento_medio_producao,
round(safe_cast(valor_producao as float64), 4) valor_producao
from `basedosdados-staging.br_ibge_pam_staging.lavoura_permanente` as t
23 changes: 23 additions & 0 deletions models/br_ibge_pam/br_ibge_pam__lavoura_temporaria.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{{
config(
alias="lavoura_temporaria",
schema="br_ibge_pam",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf", "id_municipio"],
)
}}
select
jeantozzi marked this conversation as resolved.
Show resolved Hide resolved
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(area_plantada as int64) area_plantada,
safe_cast(area_colhida as int64) area_colhida,
round(safe_cast(quantidade_produzida as float64), 4) quantidade_produzida,
round(safe_cast(rendimento_medio_producao as float64), 4) rendimento_medio_producao,
round(safe_cast(valor_producao as float64), 4) valor_producao
from `basedosdados-staging.br_ibge_pam_staging.lavoura_temporaria` as t
73 changes: 73 additions & 0 deletions models/br_ibge_pam/code/api_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import aiohttp
import json
import glob
from tqdm.asyncio import tqdm
from aiohttp import ClientTimeout, TCPConnector
from tqdm import tqdm
from typing import Any, Dict, List

API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]"
AGREGADO = "1613" # É a tabela no SIDRA
PERIODOS = range(1974, 2022 + 1)
VARIAVEIS = ["2313", "1002313", "216", "1000216", "214", "112","215",
"1000215"] # As variáveis da tabela
NIVEL_GEOGRAFICO = "N6" # N6 = Municipal
LOCALIDADES = "all"
CLASSIFICACAO = "82" # Código pré-definido por agregado
CATEGORIAS = ["2717", "2718", "45981", "2719", "2720", "2721", "40472",
"2722", "2723", "31619", "31620", "40473", "2724", "2725",
"2726", "2727", "2728", "2729", "2730", "2731", "2732",
"2733", "2734", "2735", "2736", "2737", "2738", "2739",
"2740", "90001", "2741", "2742", "2743", "2744", "2745",
"2746", "2747", "2748"] # Produtos
ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")]
ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS]

async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""
Faz uma requisição GET à API e retorna a resposta em formato JSON.

Parâmetros:
- session (aiohttp.ClientSession): A sessão do cliente aiohttp.
- url (str): A URL da API para a qual a requisição será feita.

Retorna:
- Dict[str, Any]: A resposta da API em formato JSON.
"""
async with session.get(url) as response:
return await response.json()

async def main(years: List[int], variables: List[str], categories: List[str]) -> None:
"""
Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON.

Parâmetros:
- years (List[int]): Lista de anos para os quais os dados serão consultados.
- variables (List[str]): Lista de variáveis da tabela a serem consultadas.
- categories (List[str]): Lista de categorias a serem consultadas.

Retorna:
- None
"""
for year in years:
print(f'Consultando dados do ano: {year}')
async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session:
tasks = []
for variable in variables:
for category in categories:
url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category)
task = fetch(session, url)
tasks.append(asyncio.ensure_future(task))
responses = []
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
try:
response = await future
responses.append(response)
except asyncio.TimeoutError:
print(f"Request timed out for {url}")
with open(f'../json/{year}.json', 'a') as f:
json.dump(responses, f)

if __name__ == "__main__":
asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS))
208 changes: 208 additions & 0 deletions models/br_ibge_pam/code/json_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import json
import numpy as np
import os
import re
from collections import OrderedDict
import pandas as pd
import pyarrow as pa
from pathlib import Path

def parse_file(file_path):
dict_list = []

with open(file_path) as f:
json_list = json.load(f)

for j in json_list:
temp_od = OrderedDict()
temp_variavel = j[0]["variavel"]
for r in j[0]["resultados"]:
temp_produto = list(r["classificacoes"][0]["categoria"].values())[0]
for s in r["series"]:
temp_ano = list(s["serie"].keys())[0]
temp_valor = list(s["serie"].values())[0]
temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip()
temp_id_municipio = s["localidade"]["id"]

temp_od["ano"] = temp_ano
temp_od["sigla_uf"] = temp_sigla_uf
temp_od["id_municipio"] = temp_id_municipio
temp_od["produto"] = temp_produto
temp_od[temp_variavel] = temp_valor

dict_list.append(dict(temp_od))
temp_od.clear()
return dict_list

def join_dicts_by_keys(dict_list, keys):
result = {}
for d in dict_list:
key_tuple = tuple(d[k] for k in keys)
if key_tuple not in result:
result[key_tuple] = d
else:
result[key_tuple].update(d)
return list(result.values())

def create_raw_df(dict_list):
return pd.DataFrame(dict_list, dtype=str)

def drop_columns(dataframe):
dropped_df = dataframe.drop(columns=[
"Área destinada à colheita - percentual do total geral",
"Área colhida - percentual do total geral",
"Valor da produção - percentual do total geral",
])

return dropped_df

def rename_columns(dataframe):
renamed_dataframe = dataframe.rename(columns={
"Área destinada à colheita": "area_destinada_colheita",
"Área colhida": "area_colhida",
"Quantidade produzida": "quantidade_produzida",
"Rendimento médio da produção": "rendimento_medio_producao",
"Valor da produção": "valor_producao"
})
return renamed_dataframe

def treat_columns(dataframe):
dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto",
"area_destinada_colheita", "area_colhida",
"quantidade_produzida", "rendimento_medio_producao",
"valor_producao"]]
COLUNAS_PARA_TRATAR = ["ano", "area_destinada_colheita", "area_colhida",
"quantidade_produzida", "rendimento_medio_producao",
"valor_producao"]

for coluna in COLUNAS_PARA_TRATAR:
dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x)
dataframe[coluna] = dataframe[coluna].astype("Int64")
return dataframe

def products_weight_ratio_fix(row):
"""
2 - A partir do ano de 2001 as quantidades produzidas dos produtos abacate, banana,
caqui, figo, goiaba, laranja, limão, maçã, mamão, manga, maracujá, marmelo, pera,
pêssego e tangerina passam a ser expressas em toneladas. Nos anos anteriores eram
expressas em mil frutos, com exceção da banana, que era expressa em mil cachos. O
rendimento médio passa a ser expresso em Kg/ha. Nos anos anteriores era expresso
em frutos/ha, com exceção da banana, que era expressa em cachos/ha.
3 - Veja em o documento
https://sidra.ibge.gov.br/content/documentos/pam/AlteracoesUnidadesMedidaFrutas.pdf
com as alterações de unidades de medida das frutíferas ocorridas em 2001 e a tabela
de conversão fruto x quilograma.
"""

DICIONARIO_DE_PROPORCOES = {
"Abacate": 0.38,
"Banana (cacho)": 10.20,
"Caqui": 0.18,
"Figo": 0.09,
"Goiaba": 0.16,
"Larajna": 0.16,
"Limão": 0.10,
"Maçã": 0.15,
"Mamão": 0.80,
"Manga": 0.31,
"Maracujá": 0.15,
"Marmelo": 0.19,
"Pera": 0.17,
"Pêra": 0.17, # Para garantir, pois nos dados parece que só há Pera, sem acento
"Pêssego": 0.13,
"Tangerina": 0.15,
"Melancia": 6.08,
"Melão": 1.39
}

if row["ano"] >= 2001:
return row

if (pd.isna(row["quantidade_produzida"]) or pd.isna(row["area_colhida"])
or row["quantidade_produzida"] == 0 or row["area_colhida"] == 0):
return row

if row["produto"] not in DICIONARIO_DE_PROPORCOES.keys():
return row

quantidade_produzida = row["quantidade_produzida"] * DICIONARIO_DE_PROPORCOES[row["produto"]]
rendimento_medio_producao = quantidade_produzida / row["area_colhida"] * 1000 # kg / ha

row["quantidade_produzida"] = quantidade_produzida
row["rendimento_medio_producao"] = rendimento_medio_producao

return row

def currency_fix(row):
"""
Valor da produção (Mil Cruzeiros [1974 a 1985, 1990 a 1992], Mil Cruzados [1986 a 1988],
Mil Cruzados Novos [1989], Mil Cruzeiros Reais [1993], Mil Reais [1994 a 2022])
Verificado em http://www.igf.com.br/calculadoras/conversor/conversor.htm
"""

if 1974 <= row["ano"] <= 1985:
return row["valor_producao"] / (1000**4 * 2.75)
elif 1986 <= row["ano"] <= 1988:
return row["valor_producao"] / (1000**3 * 2.75)
elif row["ano"] == 1989:
return row["valor_producao"] / (1000**2 * 2.75)
elif 1990 <= row["ano"] <= 1992:
return row["valor_producao"] / (1000**2 * 2.75)
elif row["ano"] == 1993:
return row["valor_producao"] / (1000 * 2.75)
else:
return row["valor_producao"]

def get_existing_years(directory):
root_dir = Path(directory)
year_dirs = root_dir.glob('ano=*')
years = set()
for year_dir in year_dirs:
match = re.match(r'ano=(\d+)', year_dir.name)
if match:
year = int(match.group(1))
years.add(year)

return sorted(list(years))

if __name__ == '__main__':
ANOS_TRANSFORMADOS = get_existing_years("../parquet")
ARQUIVOS_JSON = list(Path('../json/').glob('*.json'))
JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS]

for path in JSON_FALTANTES:
print(f"Criando dict_list com base no arquivo: {path}...")
dict_list = parse_file(path)
print("Concatendo as colunas do dict_list...")
dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"])
print("Criando DataFrame a partir do dict_list...")
df = create_raw_df(dict_list)
print("Removendo as colunas do DataFrame...")
df = drop_columns(df)
print("Renomeando as colunas do DataFrame...")
df = rename_columns(df)
print("Tratando as colunas do DataFrame...")
df = treat_columns(df)
print("Tratando a questão dos pesos dos produtos... Impacto em: quantidade_producao e rendimento_medio_producao")
df = df.apply(products_weight_ratio_fix, axis=1)
print("Aplicando a correção nominal retroativa da moeda... Impacto: valor_producao")
df["valor_producao"] = df.apply(currency_fix, axis=1)
print("Transformações finalizadas!")
temp_ano = df["ano"].max()
print("Deletando a coluna ano para possibilitar o particionamento...")
df.drop(columns=["ano"], inplace=True)
print("Transformações finalizadas!")
temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet"
print(f"Exportando o DataFrame particionado em {temp_export_file_path}...")
os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True)
temp_schema = pa.schema([("sigla_uf", pa.string()),
("id_municipio", pa.string()),
("produto", pa.string()),
("area_destinada_colheita", pa.int64()),
("area_colhida", pa.int64()),
("quantidade_produzida", pa.float64()),
("rendimento_medio_producao", pa.float64()),
("valor_producao", pa.float64())])
df.to_parquet(temp_export_file_path, schema=temp_schema, index=False)
del(df) # Liberando memória
print()
Loading
Loading