diff --git a/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql b/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql index 357a1b49..7a5fedfa 100644 --- a/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql +++ b/models/br_ibge_pam/br_ibge_pam__lavoura_permanente.sql @@ -1,4 +1,5 @@ {{ + config( alias="lavoura_permanente", schema="br_ibge_pam", diff --git a/models/br_ibge_pam/code/api_to_json.py b/models/br_ibge_pam/lavoura_permanente/code/api_to_json.py similarity index 98% rename from models/br_ibge_pam/code/api_to_json.py rename to models/br_ibge_pam/lavoura_permanente/code/api_to_json.py index d34ae782..7a0ab35c 100644 --- a/models/br_ibge_pam/code/api_to_json.py +++ b/models/br_ibge_pam/lavoura_permanente/code/api_to_json.py @@ -5,7 +5,6 @@ from tqdm.asyncio import tqdm from aiohttp import ClientTimeout, TCPConnector from tqdm import tqdm -from typing import Any, Dict, List API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" AGREGADO = "1613" # É a tabela no SIDRA diff --git a/models/br_ibge_pam/code/json_to_parquet.py b/models/br_ibge_pam/lavoura_permanente/code/json_to_parquet.py similarity index 100% rename from models/br_ibge_pam/code/json_to_parquet.py rename to models/br_ibge_pam/lavoura_permanente/code/json_to_parquet.py diff --git a/models/br_ibge_pam/lavoura_temporaria/code/api_to_json.py b/models/br_ibge_pam/lavoura_temporaria/code/api_to_json.py new file mode 100644 index 00000000..41b69679 --- /dev/null +++ b/models/br_ibge_pam/lavoura_temporaria/code/api_to_json.py @@ -0,0 +1,71 @@ +import asyncio +import aiohttp +import json +import glob +from tqdm.asyncio import tqdm +from aiohttp import ClientTimeout, TCPConnector +from tqdm import tqdm + +API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]" +AGREGADO = "1612" # É a tabela no SIDRA +PERIODOS = range(1974, 2022 + 1) +VARIAVEIS = ["109", "1000109", "216", "1000216", "214", "112", "215", + "1000215"] # As variáveis da tabela +NIVEL_GEOGRAFICO = "N6" # N6 = Municipal +LOCALIDADES = "all" +CLASSIFICACAO = "81" # Código pré-definido por agregado +CATEGORIAS = ["2688", "40471", "2689", "2690", "2691", "2692", "2693", + "2694", "2695", "2696", "40470", "2697", "2698", "2699", + "2700", "2701", "2702", "2703", "109179", "2704", "2705", + "2706", "2707", "2708", "2709", "2710", "2711", "2712", + "2713", "2714", "2715", "2716", "109180"] # Produtos +ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")] +ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS] + +async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]: + """ + Faz uma requisição GET à API e retorna a resposta em formato JSON. + + Parâmetros: + - session (aiohttp.ClientSession): A sessão do cliente aiohttp. + - url (str): A URL da API para a qual a requisição será feita. + + Retorna: + - Dict[str, Any]: A resposta da API em formato JSON. + """ + async with session.get(url) as response: + return await response.json() + +async def main(years: List[int], variables: List[str], categories: List[str]) -> None: + """ + Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON. + + Parâmetros: + - years (List[int]): Lista de anos para os quais os dados serão consultados. + - variables (List[str]): Lista de variáveis da tabela a serem consultadas. + - categories (List[str]): Lista de categorias a serem consultadas. + + Retorna: + - None + """ + for year in years: + print(f'Consultando dados do ano: {year}') + async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session: + tasks = [] + for variable in variables: + for category in categories: + url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category) + task = fetch(session, url) + tasks.append(asyncio.ensure_future(task)) + responses = [] + for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)): + try: + response = await future + responses.append(response) + except asyncio.TimeoutError: + print(f"Request timed out for {url}") + with open(f'../json/{year}.json', 'a') as f: + json.dump(responses, f) + +if __name__ == "__main__": + asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS)) diff --git a/models/br_ibge_pam/lavoura_temporaria/code/json_to_parquet.py b/models/br_ibge_pam/lavoura_temporaria/code/json_to_parquet.py new file mode 100644 index 00000000..3c99ae99 --- /dev/null +++ b/models/br_ibge_pam/lavoura_temporaria/code/json_to_parquet.py @@ -0,0 +1,208 @@ +import json +import numpy as np +import os +import re +from collections import OrderedDict +import pandas as pd +import pyarrow as pa +from pathlib import Path + +def parse_file(file_path): + dict_list = [] + + with open(file_path) as f: + json_list = json.load(f) + + for j in json_list: + temp_od = OrderedDict() + temp_variavel = j[0]["variavel"] + for r in j[0]["resultados"]: + temp_produto = list(r["classificacoes"][0]["categoria"].values())[0] + for s in r["series"]: + temp_ano = list(s["serie"].keys())[0] + temp_valor = list(s["serie"].values())[0] + temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip() + temp_id_municipio = s["localidade"]["id"] + + temp_od["ano"] = temp_ano + temp_od["sigla_uf"] = temp_sigla_uf + temp_od["id_municipio"] = temp_id_municipio + temp_od["produto"] = temp_produto + temp_od[temp_variavel] = temp_valor + + dict_list.append(dict(temp_od)) + temp_od.clear() + return dict_list + +def join_dicts_by_keys(dict_list, keys): + result = {} + for d in dict_list: + key_tuple = tuple(d[k] for k in keys) + if key_tuple not in result: + result[key_tuple] = d + else: + result[key_tuple].update(d) + return list(result.values()) + +def create_raw_df(dict_list): + return pd.DataFrame(dict_list, dtype=str) + +def drop_columns(dataframe): + dropped_df = dataframe.drop(columns=[ + "Área plantada - percentual do total geral", + "Área colhida - percentual do total geral", + "Valor da produção - percentual do total geral", + ]) + + return dropped_df + +def rename_columns(dataframe): + renamed_dataframe = dataframe.rename(columns={ + "Área plantada": "area_plantada", + "Área colhida": "area_colhida", + "Quantidade produzida": "quantidade_produzida", + "Rendimento médio da produção": "rendimento_medio_producao", + "Valor da produção": "valor_producao" + }) + return renamed_dataframe + +def treat_columns(dataframe): + dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "produto", + "area_plantada", "area_colhida", + "quantidade_produzida", "rendimento_medio_producao", + "valor_producao"]] + COLUNAS_PARA_TRATAR = ["ano", "area_plantada", "area_colhida", + "quantidade_produzida", "rendimento_medio_producao", + "valor_producao"] + + for coluna in COLUNAS_PARA_TRATAR: + dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x) + dataframe[coluna] = dataframe[coluna].astype("Int64") + return dataframe + +def products_weight_ratio_fix(row): + """ + 2 - A partir do ano de 2001 as quantidades produzidas dos produtos abacate, banana, + caqui, figo, goiaba, laranja, limão, maçã, mamão, manga, maracujá, marmelo, pera, + pêssego e tangerina passam a ser expressas em toneladas. Nos anos anteriores eram + expressas em mil frutos, com exceção da banana, que era expressa em mil cachos. O + rendimento médio passa a ser expresso em Kg/ha. Nos anos anteriores era expresso + em frutos/ha, com exceção da banana, que era expressa em cachos/ha. + 3 - Veja em o documento + https://sidra.ibge.gov.br/content/documentos/pam/AlteracoesUnidadesMedidaFrutas.pdf + com as alterações de unidades de medida das frutíferas ocorridas em 2001 e a tabela + de conversão fruto x quilograma. + """ + + DICIONARIO_DE_PROPORCOES = { + "Abacate": 0.38, + "Banana (cacho)": 10.20, + "Caqui": 0.18, + "Figo": 0.09, + "Goiaba": 0.16, + "Larajna": 0.16, + "Limão": 0.10, + "Maçã": 0.15, + "Mamão": 0.80, + "Manga": 0.31, + "Maracujá": 0.15, + "Marmelo": 0.19, + "Pera": 0.17, + "Pêra": 0.17, # Para garantir, pois nos dados parece que só há Pera, sem acento + "Pêssego": 0.13, + "Tangerina": 0.15, + "Melancia": 6.08, + "Melão": 1.39 + } + + if row["ano"] >= 2001: + return row + + if (pd.isna(row["quantidade_produzida"]) or pd.isna(row["area_colhida"]) + or row["quantidade_produzida"] == 0 or row["area_colhida"] == 0): + return row + + if row["produto"] not in DICIONARIO_DE_PROPORCOES.keys(): + return row + + quantidade_produzida = row["quantidade_produzida"] * DICIONARIO_DE_PROPORCOES[row["produto"]] + + rendimento_medio_producao = quantidade_produzida / row["area_colhida"] * 1000 # kg / ha + + row["quantidade_produzida"] = quantidade_produzida + row["rendimento_medio_producao"] = rendimento_medio_producao + + return row + +def currency_fix(row): + """ + Valor da produção (Mil Cruzeiros [1974 a 1985, 1990 a 1992], Mil Cruzados [1986 a 1988], + Mil Cruzados Novos [1989], Mil Cruzeiros Reais [1993], Mil Reais [1994 a 2022]) + Verificado em http://www.igf.com.br/calculadoras/conversor/conversor.htm + """ + + if 1974 <= row["ano"] <= 1985: + return row["valor_producao"] / (1000**4 * 2.75) + elif 1986 <= row["ano"] <= 1988: + return row["valor_producao"] / (1000**3 * 2.75) + elif row["ano"] == 1989: + return row["valor_producao"] / (1000**2 * 2.75) + elif 1990 <= row["ano"] <= 1992: + return row["valor_producao"] / (1000**2 * 2.75) + elif row["ano"] == 1993: + return row["valor_producao"] / (1000 * 2.75) + else: + return row["valor_producao"] + +def get_existing_years(directory): + root_dir = Path(directory) + year_dirs = root_dir.glob('ano=*') + years = set() + for year_dir in year_dirs: + match = re.match(r'ano=(\d+)', year_dir.name) + if match: + year = int(match.group(1)) + years.add(year) + + return sorted(list(years)) + +if __name__ == '__main__': + ANOS_TRANSFORMADOS = get_existing_years("../parquet") + ARQUIVOS_JSON = list(Path('../json/').glob('*.json')) + JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS] + + for path in JSON_FALTANTES: + print(f"Criando dict_list com base no arquivo: {path}...") + dict_list = parse_file(path) + print("Concatendo as colunas do dict_list...") + dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "produto"]) + print("Criando DataFrame a partir do dict_list...") + df = create_raw_df(dict_list) + print("Removendo as colunas do DataFrame...") + df = drop_columns(df) + print("Renomeando as colunas do DataFrame...") + df = rename_columns(df) + print("Tratando as colunas do DataFrame...") + df = treat_columns(df) + print("Tratando a questão dos pesos dos produtos... Impacto em: quantidade_producao e rendimento_medio_producao") + df = df.apply(products_weight_ratio_fix, axis=1) + print("Aplicando a correção nominal retroativa da moeda... Impacto: valor_producao") + df["valor_producao"] = df.apply(currency_fix, axis=1) + temp_ano = df["ano"].max() + print("Deletando a coluna ano para possibilitar o particionamento...") + df.drop(columns=["ano"], inplace=True) + print("Transformações finalizadas!") + temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet" + print(f"Exportando o DataFrame particionado em {temp_export_file_path}...") + os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True) + temp_schema = pa.schema([("sigla_uf", pa.string()), + ("id_municipio", pa.string()), + ("produto", pa.string()), + ("area_plantada", pa.int64()), + ("area_colhida", pa.int64()), + ("quantidade_produzida", pa.float64()), + ("rendimento_medio_producao", pa.float64()), + ("valor_producao", pa.float64())]) + df.to_parquet(temp_export_file_path, schema=temp_schema, index=False) + del(df) # Liberando memória + print()