Skip to content

Commit

Permalink
add: br_ibge_pam models + code
Browse files Browse the repository at this point in the history
  • Loading branch information
jeantozzi committed Apr 10, 2024
1 parent 8681112 commit c090fda
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ models:
br_ibge_ipca15:
+materialized: table
+schema: br_ibge_ipca15
br_ibge_pam:
+materialized: table
+schema: br_ibge_pam
br_ibge_pevs:
+materialized: table
+schema: br_ibge_pevs
Expand Down
12 changes: 12 additions & 0 deletions models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{ config(alias="lavoura_permanente", schema="br_ibge_pam") }}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(area_destinada_colheita as int64) area_destinada_colheita,
safe_cast(area_colhida as int64) area_colhida,
round(safe_cast(quantidade_produzida as float64), 4) quantidade_produzida,
round(safe_cast(rendimento_medio_producao as float64), 4) rendimento_medio_producao,
round(safe_cast(valor_producao as float64), 4) valor_producao
from `basedosdados-staging.br_ibge_pam_staging.lavoura_permanente` as t
12 changes: 12 additions & 0 deletions models/br_ibge_pam/br_ibge_pam__lavoura_temporaria.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{ config(alias="lavoura_temporaria", schema="br_ibge_pam") }}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(area_plantada as int64) area_plantada,
safe_cast(area_colhida as int64) area_colhida,
round(safe_cast(quantidade_produzida as float64), 4) quantidade_produzida,
round(safe_cast(rendimento_medio_producao as float64), 4) rendimento_medio_producao,
round(safe_cast(valor_producao as float64), 4) valor_producao
from `basedosdados-staging.br_ibge_pam_staging.lavoura_temporaria` as t
51 changes: 51 additions & 0 deletions models/br_ibge_pam/code/api_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio
import aiohttp
import json
import glob
from tqdm.asyncio import tqdm
from aiohttp import ClientTimeout, TCPConnector
from tqdm import tqdm

API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]"
AGREGADO = "1613" # É a tabela no SIDRA
PERIODOS = range(1974, 2022 + 1)
VARIAVEIS = ["2313", "1002313", "216", "1000216", "214", "112","215",
"1000215"] # As variáveis da tabela
NIVEL_GEOGRAFICO = "N6" # N6 = Municipal
LOCALIDADES = "all"
CLASSIFICACAO = "82" # Código pré-definido por agregado
CATEGORIAS = ["2717", "2718", "45981", "2719", "2720", "2721", "40472",
"2722", "2723", "31619", "31620", "40473", "2724", "2725",
"2726", "2727", "2728", "2729", "2730", "2731", "2732",
"2733", "2734", "2735", "2736", "2737", "2738", "2739",
"2740", "90001", "2741", "2742", "2743", "2744", "2745",
"2746", "2747", "2748"] # Produtos
ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")]
ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS]

async def fetch(session, url):
async with session.get(url) as response:
return await response.json()

async def main(years, variables, categories):
for year in years:
print(f'Consultando dados do ano: {year}')
async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session:
tasks = []
for variable in variables: # Foi preciso iterar por cada variável porque senão retorna HTTP 500
for category in categories:
url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category)
task = fetch(session, url)
tasks.append(asyncio.ensure_future(task))
responses = []
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
try:
response = await future
responses.append(response)
except asyncio.TimeoutError:
print(f"Request timed out for {url}")
with open(f'../json/{year}.json', 'a') as f:
json.dump(responses, f)

if __name__ == "__main__":
asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS))
208 changes: 208 additions & 0 deletions models/br_ibge_pam/code/json_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import json
import numpy as np
import os
import re
from collections import OrderedDict
import pandas as pd
import pyarrow as pa
from pathlib import Path

def parse_file(file_path):
dict_list = []

with open(file_path) as f:
json_list = json.load(f)

for j in json_list:
temp_od = OrderedDict()
temp_variavel = j[0]["variavel"]
for r in j[0]["resultados"]:
temp_produto = list(r["classificacoes"][0]["categoria"].values())[0]
for s in r["series"]:
temp_ano = list(s["serie"].keys())[0]
temp_valor = list(s["serie"].values())[0]
temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip()
temp_id_municipio = s["localidade"]["id"]

temp_od["ano"] = temp_ano
temp_od["sigla_uf"] = temp_sigla_uf
temp_od["id_municipio"] = temp_id_municipio
temp_od["produto"] = temp_produto
temp_od[temp_variavel] = temp_valor

dict_list.append(dict(temp_od))
temp_od.clear()
return dict_list

def join_dicts_by_keys(dict_list, keys):
result = {}
for d in dict_list:
key_tuple = tuple(d[k] for k in keys)
if key_tuple not in result:
result[key_tuple] = d
else:
result[key_tuple].update(d)
return list(result.values())

def create_raw_df(dict_list):
return pd.DataFrame(dict_list, dtype=str)

def drop_columns(dataframe):
dropped_df = dataframe.drop(columns=[
"Área destinada à colheita - percentual do total geral",
"Área colhida - percentual do total geral",
"Valor da produção - percentual do total geral",
])

return dropped_df

def rename_columns(dataframe):
renamed_dataframe = dataframe.rename(columns={
"Área destinada à colheita": "area_destinada_colheita",
"Área colhida": "area_colhida",
"Quantidade produzida": "quantidade_produzida",
"Rendimento médio da produção": "rendimento_medio_producao",
"Valor da produção": "valor_producao"
})
return renamed_dataframe

def treat_columns(dataframe):
dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto",
"area_destinada_colheita", "area_colhida",
"quantidade_produzida", "rendimento_medio_producao",
"valor_producao"]]
COLUNAS_PARA_TRATAR = ["ano", "area_destinada_colheita", "area_colhida",
"quantidade_produzida", "rendimento_medio_producao",
"valor_producao"]

for coluna in COLUNAS_PARA_TRATAR:
dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x)
dataframe[coluna] = dataframe[coluna].astype("Int64")
return dataframe

def products_weight_ratio_fix(row):
"""
2 - A partir do ano de 2001 as quantidades produzidas dos produtos abacate, banana,
caqui, figo, goiaba, laranja, limão, maçã, mamão, manga, maracujá, marmelo, pera,
pêssego e tangerina passam a ser expressas em toneladas. Nos anos anteriores eram
expressas em mil frutos, com exceção da banana, que era expressa em mil cachos. O
rendimento médio passa a ser expresso em Kg/ha. Nos anos anteriores era expresso
em frutos/ha, com exceção da banana, que era expressa em cachos/ha.
3 - Veja em o documento
https://sidra.ibge.gov.br/content/documentos/pam/AlteracoesUnidadesMedidaFrutas.pdf
com as alterações de unidades de medida das frutíferas ocorridas em 2001 e a tabela
de conversão fruto x quilograma.
"""

DICIONARIO_DE_PROPORCOES = {
"Abacate": 0.38,
"Banana (cacho)": 10.20,
"Caqui": 0.18,
"Figo": 0.09,
"Goiaba": 0.16,
"Larajna": 0.16,
"Limão": 0.10,
"Maçã": 0.15,
"Mamão": 0.80,
"Manga": 0.31,
"Maracujá": 0.15,
"Marmelo": 0.19,
"Pera": 0.17,
"Pêra": 0.17, # Para garantir, pois nos dados parece que só há Pera, sem acento
"Pêssego": 0.13,
"Tangerina": 0.15,
"Melancia": 6.08,
"Melão": 1.39
}

if row["ano"] >= 2001:
return row

if (pd.isna(row["quantidade_produzida"]) or pd.isna(row["area_colhida"])
or row["quantidade_produzida"] == 0 or row["area_colhida"] == 0):
return row

if row["produto"] not in DICIONARIO_DE_PROPORCOES.keys():
return row

quantidade_produzida = row["quantidade_produzida"] * DICIONARIO_DE_PROPORCOES[row["produto"]]
rendimento_medio_producao = quantidade_produzida / row["area_colhida"] * 1000 # kg / ha

row["quantidade_produzida"] = quantidade_produzida
row["rendimento_medio_producao"] = rendimento_medio_producao

return row

def currency_fix(row):
"""
Valor da produção (Mil Cruzeiros [1974 a 1985, 1990 a 1992], Mil Cruzados [1986 a 1988],
Mil Cruzados Novos [1989], Mil Cruzeiros Reais [1993], Mil Reais [1994 a 2022])
Verificado em http://www.igf.com.br/calculadoras/conversor/conversor.htm
"""

if 1974 <= row["ano"] <= 1985:
return row["valor_producao"] / (1000**4 * 2.75)
elif 1986 <= row["ano"] <= 1988:
return row["valor_producao"] / (1000**3 * 2.75)
elif row["ano"] == 1989:
return row["valor_producao"] / (1000**2 * 2.75)
elif 1990 <= row["ano"] <= 1992:
return row["valor_producao"] / (1000**2 * 2.75)
elif row["ano"] == 1993:
return row["valor_producao"] / (1000 * 2.75)
else:
return row["valor_producao"]

def get_existing_years(directory):
root_dir = Path(directory)
year_dirs = root_dir.glob('ano=*')
years = set()
for year_dir in year_dirs:
match = re.match(r'ano=(\d+)', year_dir.name)
if match:
year = int(match.group(1))
years.add(year)

return sorted(list(years))

if __name__ == '__main__':
ANOS_TRANSFORMADOS = get_existing_years("../parquet")
ARQUIVOS_JSON = list(Path('../json/').glob('*.json'))
JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS]

for path in JSON_FALTANTES:
print(f"Criando dict_list com base no arquivo: {path}...")
dict_list = parse_file(path)
print("Concatendo as colunas do dict_list...")
dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"])
print("Criando DataFrame a partir do dict_list...")
df = create_raw_df(dict_list)
print("Removendo as colunas do DataFrame...")
df = drop_columns(df)
print("Renomeando as colunas do DataFrame...")
df = rename_columns(df)
print("Tratando as colunas do DataFrame...")
df = treat_columns(df)
print("Tratando a questão dos pesos dos produtos... Impacto em: quantidade_producao e rendimento_medio_producao")
df = df.apply(products_weight_ratio_fix, axis=1)
print("Aplicando a correção nominal retroativa da moeda... Impacto: valor_producao")
df["valor_producao"] = df.apply(currency_fix, axis=1)
print("Transformações finalizadas!")
temp_ano = df["ano"].max()
print("Deletando a coluna ano para possibilitar o particionamento...")
df.drop(columns=["ano"], inplace=True)
print("Transformações finalizadas!")
temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet"
print(f"Exportando o DataFrame particionado em {temp_export_file_path}...")
os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True)
temp_schema = pa.schema([("sigla_uf", pa.string()),
("id_municipio", pa.string()),
("produto", pa.string()),
("area_destinada_colheita", pa.int64()),
("area_colhida", pa.int64()),
("quantidade_produzida", pa.float64()),
("rendimento_medio_producao", pa.float64()),
("valor_producao", pa.float64())])
df.to_parquet(temp_export_file_path, schema=temp_schema, index=False)
del(df) # Liberando memória
print()
89 changes: 89 additions & 0 deletions models/br_ibge_pam/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
---
version: 2
models:
- name: br_ibge_pam__lavoura_temporaria
description: Área plantada, área colhida, quantidade produzida, rendimento médio
e valor da produção das lavouras temporárias
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ano
- sigla_uf
- id_municipio
- produto
- not_null_proportion_multiple_columns:
at_least: 0.05
columns:
- name: ano
description: Ano
tests:
- relationships:
to: ref('br_bd_diretorios_data_tempo__ano')
field: ano.ano
- name: sigla_uf
description: Sigla da Unidade da Federação
tests:
- relationships:
to: ref('br_bd_diretorios_brasil__uf')
field: sigla
- name: id_municipio
description: ID Município IBGE 7 dígitos
tests:
- relationships:
to: ref('br_bd_diretorios_brasil__municipio')
field: id_municipio
- name: produto
description: Produto
- name: area_plantada
description: Área destinada à colheita
- name: area_colhida
description: Área colhida
- name: quantidade_produzida
description: Quantidade da produção
- name: rendimento_medio_producao
description: Rendimento médio da produção
- name: valor_producao
description: Valor da produção
- name: br_ibge_pam__lavoura_permanente
description: Área destinada à colheita, área colhida, quantidade produzida, rendimento
médio e valor da produção das lavouras permanentes
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ano
- sigla_uf
- id_municipio
- produto
- not_null_proportion_multiple_columns:
at_least: 0.05
columns:
- name: ano
description: Ano
tests:
- relationships:
to: ref('br_bd_diretorios_data_tempo__ano')
field: ano.ano
- name: sigla_uf
description: Sigla da Unidade da Federação
tests:
- relationships:
to: ref('br_bd_diretorios_brasil__uf')
field: sigla
- name: id_municipio
description: ID Município IBGE 7 dígitos
tests:
- relationships:
to: ref('br_bd_diretorios_brasil__municipio')
field: id_municipio
- name: produto
description: Produto
- name: area_destinada_colheita
description: Área destinada à colheita
- name: area_colhida
description: Área colhida
- name: quantidade_produzida
description: Quantidade da produção
- name: rendimento_medio_producao
description: Rendimento médio da produção
- name: valor_producao
description: Valor da produção

0 comments on commit c090fda

Please sign in to comment.