Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: br_ibge_ppm models and files #573

Merged
merged 5 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ models:
br_b3_cotacoes:
+materialized: table
+schema: br_b3_cotacoes
+post-hook:
- REVOKE `roles/bigquery.dataViewer` ON TABLE {{ this }} FROM "specialGroup:allUsers"
- GRANT `roles/bigquery.dataViewer` ON TABLE {{ this }} TO "group:[email protected]"
br_bcb_agencia:
+materialized: table
+schema: br_bcb_agencia
Expand Down Expand Up @@ -180,6 +177,9 @@ models:
br_ibge_pnadc:
+materialized: table
+schema: br_ibge_pnadc
br_ibge_ppm:
+materialized: table
+schema: br_ibge_ppm
br_inep_censo_educacao_superior:
+materialized: table
+schema: br_inep_censo_educacao_superior
Expand Down
20 changes: 20 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__efetivo_rebanhos.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(
alias="efetivo_rebanhos",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(tipo_rebanho as string) tipo_rebanho,
safe_cast(quantidade as int64) quantidade
from `basedosdados-staging.br_ibge_ppm_staging.efetivo_rebanhos` as t
where quantidade is not null
21 changes: 21 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__producao_aquicultura.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{{
config(
alias="producao_aquicultura",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 2013, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(quantidade as int64) quantidade,
safe_cast(valor as int64) valor,
from `basedosdados-staging.br_ibge_ppm_staging.producao_aquicultura` as t
where quantidade is not null
22 changes: 22 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__producao_origem_animal.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{{
config(
alias="producao_origem_animal",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(produto as string) produto,
safe_cast(unidade as string) unidade,
safe_cast(quantidade as int64) quantidade,
safe_cast(valor as int64) valor,
from `basedosdados-staging.br_ibge_ppm_staging.producao_origem_animal` as t
where quantidade is not null
20 changes: 20 additions & 0 deletions models/br_ibge_ppm/br_ibge_ppm__producao_pecuaria.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{{
config(
alias="producao_pecuaria",
schema="br_ibge_ppm",
partition_by={
"field": "ano",
"data_type": "int64",
"range": {"start": 1974, "end": 2022, "interval": 1},
},
cluster_by=["sigla_uf"],
)
}}
select
safe_cast(ano as int64) ano,
safe_cast(sigla_uf as string) sigla_uf,
safe_cast(id_municipio as string) id_municipio,
safe_cast(ovinos_tosquiados as int64) ovinos_tosquiados,
safe_cast(vacas_ordenhadas as int64) vacas_ordenhadas,
from `basedosdados-staging.br_ibge_ppm_staging.producao_pecuaria` as t
where ovinos_tosquiados is not null or vacas_ordenhadas is not null
68 changes: 68 additions & 0 deletions models/br_ibge_ppm/efetivo_rebanhos/code/api_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import aiohttp
import json
import glob
from tqdm.asyncio import tqdm
from aiohttp import ClientTimeout, TCPConnector
from tqdm import tqdm
from typing import Any, Dict, List

API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]"
AGREGADO = "3939" # É a tabela no SIDRA
PERIODOS = range(1974, 2022 + 1)
VARIAVEIS = ["105"] # As variáveis da tabela
NIVEL_GEOGRAFICO = "N6" # N6 = Municipal
LOCALIDADES = "all"
CLASSIFICACAO = "79" # Código pré-definido por agregado
CATEGORIAS = ["2670", "2675", "2672", "32794", "32795", "2681", "2677"
"32796", "32793", "2680"] # Produtos
ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")]
ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS]

async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""
Faz uma requisição GET à API e retorna a resposta em formato JSON.

Parâmetros:
- session (aiohttp.ClientSession): A sessão do cliente aiohttp.
- url (str): A URL da API para a qual a requisição será feita.

Retorna:
- Dict[str, Any]: A resposta da API em formato JSON.
"""
async with session.get(url) as response:
return await response.json()

async def main(years: List[int], variables: List[str], categories: List[str]) -> None:
"""
Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON.

Parâmetros:
- years (List[int]): Lista de anos para os quais os dados serão consultados.
- variables (List[str]): Lista de variáveis da tabela a serem consultadas.
- categories (List[str]): Lista de categorias a serem consultadas.

Retorna:
- None
"""
for year in years:
print(f'Consultando dados do ano: {year}')
async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session:
tasks = []
for variable in variables:
for category in categories:
url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category)
task = fetch(session, url)
tasks.append(asyncio.ensure_future(task))
responses = []
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
try:
response = await future
responses.append(response)
except asyncio.TimeoutError:
print(f"Request timed out for {url}")
with open(f'../json/{year}.json', 'a') as f:
json.dump(responses, f)

if __name__ == "__main__":
asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS))
109 changes: 109 additions & 0 deletions models/br_ibge_ppm/efetivo_rebanhos/code/json_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import numpy as np
import os
import re
from collections import OrderedDict
import pandas as pd
import pyarrow as pa
from pathlib import Path

def parse_file(file_path):
dict_list = []

with open(file_path) as f:
json_list = json.load(f)

for j in json_list:
temp_od = OrderedDict()
temp_variavel = j[0]["variavel"]
for r in j[0]["resultados"]:
temp_caracteristica = list(r["classificacoes"][0]["categoria"].values())[0]
for s in r["series"]:
temp_ano = list(s["serie"].keys())[0]
temp_valor = list(s["serie"].values())[0]
temp_sigla_uf = s["localidade"]["nome"].split("-")[-1].strip()
temp_id_municipio = s["localidade"]["id"]

temp_od["ano"] = temp_ano
temp_od["sigla_uf"] = temp_sigla_uf
temp_od["id_municipio"] = temp_id_municipio
temp_od["tipo_rebanho"] = temp_caracteristica
temp_od[temp_variavel] = temp_valor

dict_list.append(dict(temp_od))
temp_od.clear()
return dict_list

def join_dicts_by_keys(dict_list, keys):
result = {}
for d in dict_list:
key_tuple = tuple(d[k] for k in keys)
if key_tuple not in result:
result[key_tuple] = d
else:
result[key_tuple].update(d)
return list(result.values())

def create_raw_df(dict_list):
return pd.DataFrame(dict_list, dtype=str)

def rename_columns(dataframe):
renamed_dataframe = dataframe.rename(columns={
"Tipo de rebanho": "tipo_rebanho",
"Efetivo dos rebanhos": "quantidade"
})
return renamed_dataframe

def treat_columns(dataframe):
dataframe = dataframe[["ano", "sigla_uf", "id_municipio", "tipo_rebanho",
"quantidade"]]
COLUNAS_PARA_TRATAR = ["quantidade"]

for coluna in COLUNAS_PARA_TRATAR:
dataframe[coluna] = dataframe[coluna].apply(lambda x: np.nan if x in ("-", "..", "...", "X") else x)
dataframe[coluna] = dataframe[coluna].astype("Int64")
return dataframe

def get_existing_years(directory):
root_dir = Path(directory)
year_dirs = root_dir.glob('ano=*')
years = set()
for year_dir in year_dirs:
match = re.match(r'ano=(\d+)', year_dir.name)
if match:
year = int(match.group(1))
years.add(year)

return sorted(list(years))

if __name__ == '__main__':
ANOS_TRANSFORMADOS = get_existing_years("../parquet")
ARQUIVOS_JSON = list(Path('../json/').glob('*.json'))
JSON_FALTANTES = [arquivo for arquivo in ARQUIVOS_JSON if int(arquivo.stem) not in ANOS_TRANSFORMADOS]

for path in JSON_FALTANTES:
print(f"Criando dict_list com base no arquivo: {path}...")
dict_list = parse_file(path)
print("Concatendo as colunas do dict_list...")
dict_list = join_dicts_by_keys(dict_list, ["ano", "sigla_uf", "id_municipio", "tipo_rebanho"])
print("Criando DataFrame a partir do dict_list...")
df = create_raw_df(dict_list)
print("Renomeando as colunas do DataFrame...")
df = rename_columns(df)
print("Tratando as colunas do DataFrame...")
df = treat_columns(df)
print("Transformações finalizadas!")
temp_ano = df["ano"].max()
print("Deletando a coluna ano para possibilitar o particionamento...")
df.drop(columns=["ano"], inplace=True)
print("Transformações finalizadas!")
temp_export_file_path = f"../parquet/ano={temp_ano}/data.parquet"
print(f"Exportando o DataFrame particionado em {temp_export_file_path}...")
os.makedirs(os.path.dirname(temp_export_file_path), exist_ok=True)
temp_schema = pa.schema([("sigla_uf", pa.string()),
("id_municipio", pa.string()),
("tipo_rebanho", pa.string()),
("quantidade", pa.int64())])
df.to_parquet(temp_export_file_path, schema=temp_schema, index=False)
del(df) # Liberando memória
print()
70 changes: 70 additions & 0 deletions models/br_ibge_ppm/producao_aquicultura/code/api_to_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import aiohttp
import json
import glob
from tqdm.asyncio import tqdm
from aiohttp import ClientTimeout, TCPConnector
from typing import Any, Dict, List

API_URL_BASE = "https://servicodados.ibge.gov.br/api/v3/agregados/{}/periodos/{}/variaveis/{}?localidades={}[{}]&classificacao={}[{}]"
AGREGADO = "3940" # É a tabela no SIDRA
PERIODOS = range(2013, 2022 + 1)
VARIAVEIS = ["4146", "215"] # As variáveis da tabela
NIVEL_GEOGRAFICO = "N6" # N6 = Municipal
LOCALIDADES = "all"
CLASSIFICACAO = "654" # Código pré-definido por agregado
CATEGORIAS = ["32861", "32865", "32866", "32867", "32868", "32869", "32870",
"32871", "32872", "32873", "32874", "32875", "32876", "32877",
"32878", "32879", "32880", "32881", "32886", "32887", "32888",
"32889", "32890", "32891"] # Produtos
ANOS_BAIXADOS = [int(glob.os.path.basename(f).split(".")[0]) for f in glob.glob(f"../json/*.json")]
ANOS_RESTANTES = [int(ANO) for ANO in PERIODOS if ANO not in ANOS_BAIXADOS]

async def fetch(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""
Faz uma requisição GET à API e retorna a resposta em formato JSON, incluindo a URL usada.

Parâmetros:
- session (aiohttp.ClientSession): A sessão do cliente aiohttp.
- url (str): A URL da API para a qual a requisição será feita.

Retorna:
- Dict[str, Any]: A resposta da API em formato JSON, incluindo a URL usada.
"""
async with session.get(url) as response:
data = await response.json()
return {'url': url, 'data': data}

async def main(years: List[int], variables: List[str], categories: List[str]) -> None:
"""
Faz requisições para a API para cada ano, variável e categoria, salvando as respostas em arquivos JSON.

Parâmetros:
- years (List[int]): Lista de anos para os quais os dados serão consultados.
- variables (List[str]): Lista de variáveis da tabela a serem consultadas.
- categories (List[str]): Lista de categorias a serem consultadas.

Retorna:
- None
"""
for year in years:
print(f'Consultando dados do ano: {year}')
async with aiohttp.ClientSession(connector=TCPConnector(limit=100, force_close=True), timeout=ClientTimeout(total=1200)) as session:
tasks = []
for variable in variables:
for category in categories:
url = API_URL_BASE.format(AGREGADO, year, variable, NIVEL_GEOGRAFICO, LOCALIDADES, CLASSIFICACAO, category)
task = fetch(session, url)
tasks.append(asyncio.ensure_future(task))
responses = []
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
try:
response = await future
responses.append(response)
except asyncio.TimeoutError:
print(f"Request timed out for {url}")
with open(f'../json/{year}.json', 'a') as f:
json.dump(responses, f)

if __name__ == "__main__":
asyncio.run(main(ANOS_RESTANTES, VARIAVEIS, CATEGORIAS))
Loading
Loading